From 65ffde9e4b909310011cdf1ce36d7a6d90b05e0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 18 Aug 2020 16:20:31 +0200 Subject: [PATCH 1/7] fsm: Config for max WaitDeals sectors --- extern/storage-sealing/fsm.go | 6 +- extern/storage-sealing/sealing.go | 109 +++++++++++++++++++++++------- extern/storage-sealing/types.go | 4 +- node/builder.go | 4 +- node/config/def.go | 21 ++++-- node/impl/storminer.go | 19 ++++-- node/modules/dtypes/miner.go | 5 +- node/modules/storageminer.go | 26 ++++--- storage/miner.go | 14 ++-- 9 files changed, 151 insertions(+), 57 deletions(-) diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index fd9ff2d29..ddfebad24 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -328,7 +328,7 @@ func (m *Sealing) restartSectors(ctx context.Context) error { log.Errorf("loading sector list: %+v", err) } - sd, err := m.getSealDelay() + cfg, err := m.getConfig() if err != nil { return xerrors.Errorf("getting the sealing delay: %w", err) } @@ -339,8 +339,8 @@ func (m *Sealing) restartSectors(ctx context.Context) error { } if sector.State == WaitDeals { - if sd > 0 { - timer := time.NewTimer(sd) + if cfg.WaitDealsDelay > 0 { + timer := time.NewTimer(cfg.WaitDealsDelay) go func() { <-timer.C m.StartPacking(sector.SectorNumber) diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index eca387da3..7ae2e5163 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -3,6 +3,7 @@ package sealing import ( "context" "io" + "math" "sync" "time" @@ -33,6 +34,16 @@ type SectorLocation struct { Partition uint64 } +type Config struct { + // 0 = no limit + MaxWaitDealsSectors uint64 + + // includes failed, 0 = no limit + MaxSealingSectors uint64 + + WaitDealsDelay time.Duration +} + type SealingAPI interface { StateWaitMsg(context.Context, cid.Cid) (MsgLookup, error) StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tok TipSetToken) (cid.Cid, error) @@ -70,7 +81,7 @@ type Sealing struct { upgradeLk sync.Mutex toUpgrade map[abi.SectorNumber]struct{} - getSealDelay GetSealingDelayFunc + getConfig GetSealingConfigFunc } type FeeConfig struct { @@ -80,7 +91,7 @@ type FeeConfig struct { type UnsealedSectorMap struct { infos map[abi.SectorNumber]UnsealedSectorInfo - mux sync.Mutex + lk sync.Mutex } type UnsealedSectorInfo struct { @@ -90,7 +101,7 @@ type UnsealedSectorInfo struct { pieceSizes []abi.UnpaddedPieceSize } -func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy, gsd GetSealingDelayFunc) *Sealing { +func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy, gc GetSealingConfigFunc) *Sealing { s := &Sealing{ api: api, feeCfg: fc, @@ -103,11 +114,11 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds pcp: pcp, unsealedInfoMap: UnsealedSectorMap{ infos: make(map[abi.SectorNumber]UnsealedSectorInfo), - mux: sync.Mutex{}, + lk: sync.Mutex{}, }, - toUpgrade: map[abi.SectorNumber]struct{}{}, - getSealDelay: gsd, + toUpgrade: map[abi.SectorNumber]struct{}{}, + getConfig: gc, } s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{}) @@ -137,18 +148,18 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec return 0, 0, xerrors.Errorf("piece cannot fit into a sector") } - m.unsealedInfoMap.mux.Lock() + m.unsealedInfoMap.lk.Lock() sid, pads, err := m.getSectorAndPadding(size) if err != nil { - m.unsealedInfoMap.mux.Unlock() + m.unsealedInfoMap.lk.Unlock() return 0, 0, xerrors.Errorf("getting available sector: %w", err) } for _, p := range pads { err = m.addPiece(ctx, sid, p.Unpadded(), NewNullReader(p.Unpadded()), nil) if err != nil { - m.unsealedInfoMap.mux.Unlock() + m.unsealedInfoMap.lk.Unlock() return 0, 0, xerrors.Errorf("writing pads: %w", err) } } @@ -157,12 +168,15 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec err = m.addPiece(ctx, sid, size, r, &d) if err != nil { - m.unsealedInfoMap.mux.Unlock() + m.unsealedInfoMap.lk.Unlock() return 0, 0, xerrors.Errorf("adding piece to sector: %w", err) } - m.unsealedInfoMap.mux.Unlock() - if m.unsealedInfoMap.infos[sid].numDeals == getDealPerSectorLimit(m.sealer.SectorSize()) { + startPacking := m.unsealedInfoMap.infos[sid].numDeals >= getDealPerSectorLimit(m.sealer.SectorSize()) + + m.unsealedInfoMap.lk.Unlock() + + if startPacking { if err := m.StartPacking(sid); err != nil { return 0, 0, xerrors.Errorf("start packing: %w", err) } @@ -171,7 +185,7 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec return sid, offset, nil } -// Caller should hold m.unsealedInfoMap.mux +// Caller should hold m.unsealedInfoMap.lk func (m *Sealing) addPiece(ctx context.Context, sectorID abi.SectorNumber, size abi.UnpaddedPieceSize, r io.Reader, di *DealInfo) error { log.Infof("Adding piece to sector %d", sectorID) ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx, DealSectorPriority), m.minerSector(sectorID), m.unsealedInfoMap.infos[sectorID].pieceSizes, size, r) @@ -206,7 +220,7 @@ func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error { return m.sectors.Send(uint64(sid), SectorRemove{}) } -// Caller should NOT hold m.unsealedInfoMap.mux +// Caller should NOT hold m.unsealedInfoMap.lk func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error { log.Infof("Starting packing sector %d", sectorID) err := m.sectors.Send(uint64(sectorID), SectorStartPacking{}) @@ -214,14 +228,14 @@ func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error { return err } - m.unsealedInfoMap.mux.Lock() + m.unsealedInfoMap.lk.Lock() delete(m.unsealedInfoMap.infos, sectorID) - m.unsealedInfoMap.mux.Unlock() + m.unsealedInfoMap.lk.Unlock() return nil } -// Caller should hold m.unsealedInfoMap.mux +// Caller should hold m.unsealedInfoMap.lk func (m *Sealing) getSectorAndPadding(size abi.UnpaddedPieceSize) (abi.SectorNumber, []abi.PaddedPieceSize, error) { ss := abi.PaddedPieceSize(m.sealer.SectorSize()) for k, v := range m.unsealedInfoMap.infos { @@ -231,7 +245,7 @@ func (m *Sealing) getSectorAndPadding(size abi.UnpaddedPieceSize) (abi.SectorNum } } - ns, err := m.newSector() + ns, err := m.newDealSector() if err != nil { return 0, nil, err } @@ -245,8 +259,57 @@ func (m *Sealing) getSectorAndPadding(size abi.UnpaddedPieceSize) (abi.SectorNum return ns, nil, nil } -// newSector creates a new sector for deal storage -func (m *Sealing) newSector() (abi.SectorNumber, error) { +// newDealSector creates a new sector for deal storage +func (m *Sealing) newDealSector() (abi.SectorNumber, error) { + // First make sure we don't have too many 'open' sectors + + cfg, err := m.getConfig() + if err != nil { + return 0, xerrors.Errorf("getting config: %w", err) + } + + if cfg.MaxWaitDealsSectors > 0 { + // run in a loop because we have to drop the map lock here for a bit + tries := 0 + + for uint64(len(m.unsealedInfoMap.infos)) >= cfg.MaxWaitDealsSectors { + if tries > 10 { + // whatever... + break + } + + if tries > 0 { + m.unsealedInfoMap.lk.Unlock() + time.Sleep(time.Second) + m.unsealedInfoMap.lk.Lock() + } + + tries++ + var mostStored abi.PaddedPieceSize = math.MaxUint64 + var best abi.SectorNumber = math.MaxUint64 + + for sn, info := range m.unsealedInfoMap.infos { + if info.stored + 1 > mostStored + 1 { // 18446744073709551615 + 1 = 0 + best = sn + } + } + + if best == math.MaxUint64 { + // probably not possible, but who knows + break + } + + m.unsealedInfoMap.lk.Unlock() + if err := m.StartPacking(best); err != nil { + log.Error("newDealSector StartPacking error: %+v", err) + continue // let's pretend this is fine + } + m.unsealedInfoMap.lk.Lock() + } + } + + // Now actually create a new sector + sid, err := m.sc.Next() if err != nil { return 0, xerrors.Errorf("getting sector number: %w", err) @@ -272,13 +335,13 @@ func (m *Sealing) newSector() (abi.SectorNumber, error) { return 0, xerrors.Errorf("starting the sector fsm: %w", err) } - sd, err := m.getSealDelay() + cf, err := m.getConfig() if err != nil { return 0, xerrors.Errorf("getting the sealing delay: %w", err) } - if sd > 0 { - timer := time.NewTimer(sd) + if cf.WaitDealsDelay > 0 { + timer := time.NewTimer(cf.WaitDealsDelay) go func() { <-timer.C m.StartPacking(sid) diff --git a/extern/storage-sealing/types.go b/extern/storage-sealing/types.go index 45993bb82..1e7c9f76c 100644 --- a/extern/storage-sealing/types.go +++ b/extern/storage-sealing/types.go @@ -3,8 +3,6 @@ package sealing import ( "bytes" "context" - "time" - "github.com/ipfs/go-cid" "github.com/filecoin-project/specs-actors/actors/abi" @@ -188,7 +186,7 @@ type MessageReceipt struct { GasUsed int64 } -type GetSealingDelayFunc func() (time.Duration, error) +type GetSealingConfigFunc func() (Config, error) func (mr *MessageReceipt) Equals(o *MessageReceipt) bool { return mr.ExitCode == o.ExitCode && bytes.Equal(mr.Return, o.Return) && mr.GasUsed == o.GasUsed diff --git a/node/builder.go b/node/builder.go index 7f1247084..cf07a660a 100644 --- a/node/builder.go +++ b/node/builder.go @@ -328,8 +328,8 @@ func Online() Option { Override(new(dtypes.SetConsiderOfflineStorageDealsConfigFunc), modules.NewSetConsideringOfflineStorageDealsFunc), Override(new(dtypes.ConsiderOfflineRetrievalDealsConfigFunc), modules.NewConsiderOfflineRetrievalDealsConfigFunc), Override(new(dtypes.SetConsiderOfflineRetrievalDealsConfigFunc), modules.NewSetConsiderOfflineRetrievalDealsConfigFunc), - Override(new(dtypes.SetSealingDelayFunc), modules.NewSetSealDelayFunc), - Override(new(dtypes.GetSealingDelayFunc), modules.NewGetSealDelayFunc), + Override(new(dtypes.SetSealingConfigFunc), modules.NewSetSealConfigFunc), + Override(new(dtypes.GetSealingConfigFunc), modules.NewGetSealConfigFunc), Override(new(dtypes.SetExpectedSealDurationFunc), modules.NewSetExpectedSealDurationFunc), Override(new(dtypes.GetExpectedSealDurationFunc), modules.NewGetExpectedSealDurationFunc), ), diff --git a/node/config/def.go b/node/config/def.go index 436935916..fcddf73ba 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -31,10 +31,9 @@ type StorageMiner struct { Common Dealmaking DealmakingConfig + Sealing SealingConfig Storage sectorstorage.SealerConfig Fees MinerFeeConfig - - SealingDelay Duration } type DealmakingConfig struct { @@ -48,6 +47,16 @@ type DealmakingConfig struct { Filter string } +type SealingConfig struct { + // 0 = no limit + MaxWaitDealsSectors uint64 + + // includes failed, 0 = no limit + MaxSealingSectors uint64 + + WaitDealsDelay Duration +} + type MinerFeeConfig struct { MaxPreCommitGasFee types.FIL MaxCommitGasFee types.FIL @@ -130,6 +139,12 @@ func DefaultFullNode() *FullNode { func DefaultStorageMiner() *StorageMiner { cfg := &StorageMiner{ Common: defCommon(), + + Sealing: SealingConfig{ + MaxWaitDealsSectors: 2, // 64G with 32G sectors + MaxSealingSectors: 0, + WaitDealsDelay: Duration(time.Hour), + }, Storage: sectorstorage.SealerConfig{ AllowAddPiece: true, @@ -158,8 +173,6 @@ func DefaultStorageMiner() *StorageMiner { MaxCommitGasFee: types.FIL(types.BigDiv(types.FromFil(1), types.NewInt(20))), MaxWindowPoStGasFee: types.FIL(types.FromFil(50)), }, - - SealingDelay: Duration(time.Hour), } cfg.Common.API.ListenAddress = "/ip4/127.0.0.1/tcp/2345/http" cfg.Common.API.RemoteListenAddress = "127.0.0.1:2345" diff --git a/node/impl/storminer.go b/node/impl/storminer.go index e302f7051..77052164b 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -62,8 +62,8 @@ type StorageMinerAPI struct { SetConsiderOfflineStorageDealsConfigFunc dtypes.SetConsiderOfflineStorageDealsConfigFunc ConsiderOfflineRetrievalDealsConfigFunc dtypes.ConsiderOfflineRetrievalDealsConfigFunc SetConsiderOfflineRetrievalDealsConfigFunc dtypes.SetConsiderOfflineRetrievalDealsConfigFunc - SetSealingDelayFunc dtypes.SetSealingDelayFunc - GetSealingDelayFunc dtypes.GetSealingDelayFunc + SetSealingConfigFunc dtypes.SetSealingConfigFunc + GetSealingConfigFunc dtypes.GetSealingConfigFunc GetExpectedSealDurationFunc dtypes.GetExpectedSealDurationFunc SetExpectedSealDurationFunc dtypes.SetExpectedSealDurationFunc } @@ -232,11 +232,22 @@ func (sm *StorageMinerAPI) SectorStartSealing(ctx context.Context, number abi.Se } func (sm *StorageMinerAPI) SectorSetSealDelay(ctx context.Context, delay time.Duration) error { - return sm.SetSealingDelayFunc(delay) + cfg, err := sm.GetSealingConfigFunc() + if err != nil { + return xerrors.Errorf("get config: %w", err) + } + + cfg.WaitDealsDelay = delay + + return sm.SetSealingConfigFunc(cfg) } func (sm *StorageMinerAPI) SectorGetSealDelay(ctx context.Context) (time.Duration, error) { - return sm.GetSealingDelayFunc() + cfg, err := sm.GetSealingConfigFunc() + if err != nil { + return 0, err + } + return cfg.WaitDealsDelay, nil } func (sm *StorageMinerAPI) SectorSetExpectedSealDuration(ctx context.Context, delay time.Duration) error { diff --git a/node/modules/dtypes/miner.go b/node/modules/dtypes/miner.go index 34911df5e..600d19dc7 100644 --- a/node/modules/dtypes/miner.go +++ b/node/modules/dtypes/miner.go @@ -8,6 +8,7 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-fil-markets/storagemarket" + sealing "github.com/filecoin-project/lotus/extern/storage-sealing" "github.com/filecoin-project/specs-actors/actors/abi" ) @@ -56,10 +57,10 @@ type ConsiderOfflineRetrievalDealsConfigFunc func() (bool, error) type SetConsiderOfflineRetrievalDealsConfigFunc func(bool) error // SetSealingDelay sets how long a sector waits for more deals before sealing begins. -type SetSealingDelayFunc func(time.Duration) error +type SetSealingConfigFunc func(sealing.Config) error // GetSealingDelay returns how long a sector waits for more deals before sealing begins. -type GetSealingDelayFunc func() (time.Duration, error) +type GetSealingConfigFunc func() (sealing.Config, error) // SetExpectedSealDurationFunc is a function which is used to set how long sealing is expected to take. // Deals that would need to start earlier than this duration will be rejected. diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 7aec09482..e0cbb434e 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -141,8 +141,8 @@ func SectorIDCounter(ds dtypes.MetadataDS) sealing.SectorIDCounter { return &sidsc{sc} } -func StorageMiner(fc config.MinerFeeConfig) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, api lapi.FullNode, h host.Host, ds dtypes.MetadataDS, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingDelayFunc) (*storage.Miner, error) { - return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, api lapi.FullNode, h host.Host, ds dtypes.MetadataDS, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingDelayFunc) (*storage.Miner, error) { +func StorageMiner(fc config.MinerFeeConfig) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, api lapi.FullNode, h host.Host, ds dtypes.MetadataDS, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc) (*storage.Miner, error) { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, api lapi.FullNode, h host.Host, ds dtypes.MetadataDS, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc) (*storage.Miner, error) { maddr, err := minerAddrFromDS(ds) if err != nil { return nil, err @@ -593,19 +593,27 @@ func NewSetConsiderOfflineRetrievalDealsConfigFunc(r repo.LockedRepo) (dtypes.Se }, nil } -func NewSetSealDelayFunc(r repo.LockedRepo) (dtypes.SetSealingDelayFunc, error) { - return func(delay time.Duration) (err error) { - err = mutateCfg(r, func(cfg *config.StorageMiner) { - cfg.SealingDelay = config.Duration(delay) +func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error) { + return func(cfg sealing.Config) (err error) { + err = mutateCfg(r, func(c *config.StorageMiner) { + c.Sealing = config.SealingConfig{ + MaxWaitDealsSectors: cfg.MaxWaitDealsSectors, + MaxSealingSectors: cfg.MaxSealingSectors, + WaitDealsDelay: config.Duration(cfg.WaitDealsDelay), + } }) return }, nil } -func NewGetSealDelayFunc(r repo.LockedRepo) (dtypes.GetSealingDelayFunc, error) { - return func() (out time.Duration, err error) { +func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error) { + return func() (out sealing.Config, err error) { err = readCfg(r, func(cfg *config.StorageMiner) { - out = time.Duration(cfg.SealingDelay) + out = sealing.Config{ + MaxWaitDealsSectors: cfg.Sealing.MaxWaitDealsSectors, + MaxSealingSectors: cfg.Sealing.MaxSealingSectors, + WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay), + } }) return }, nil diff --git a/storage/miner.go b/storage/miner.go index f8a6691f4..803556cdf 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -43,8 +43,8 @@ type Miner struct { maddr address.Address worker address.Address - getSealDelay dtypes.GetSealingDelayFunc - sealing *sealing.Sealing + getSealConfig dtypes.GetSealingConfigFunc + sealing *sealing.Sealing } type storageMinerApi interface { @@ -84,7 +84,7 @@ type storageMinerApi interface { WalletHas(context.Context, address.Address) (bool, error) } -func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingDelayFunc, feeCfg config.MinerFeeConfig) (*Miner, error) { +func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc, feeCfg config.MinerFeeConfig) (*Miner, error) { m := &Miner{ api: api, feeCfg: feeCfg, @@ -94,9 +94,9 @@ func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, d sc: sc, verif: verif, - maddr: maddr, - worker: worker, - getSealDelay: gsd, + maddr: maddr, + worker: worker, + getSealConfig: gsd, } return m, nil @@ -120,7 +120,7 @@ func (m *Miner) Run(ctx context.Context) error { evts := events.NewEvents(ctx, m.api) adaptedAPI := NewSealingAPIAdapter(m.api) pcp := sealing.NewBasicPreCommitPolicy(adaptedAPI, miner.MaxSectorExpirationExtension-(miner.WPoStProvingPeriod*2), md.PeriodStart%miner.WPoStProvingPeriod) - m.sealing = sealing.New(adaptedAPI, fc, NewEventsAdapter(evts), m.maddr, m.ds, m.sealer, m.sc, m.verif, &pcp, sealing.GetSealingDelayFunc(m.getSealDelay)) + m.sealing = sealing.New(adaptedAPI, fc, NewEventsAdapter(evts), m.maddr, m.ds, m.sealer, m.sc, m.verif, &pcp, sealing.GetSealingConfigFunc(m.getSealConfig)) go m.sealing.Run(ctx) //nolint:errcheck // logged intside the function From 6a49bd6d8e4a2200d49bbb28f4b9d0878f1a9fac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 18 Aug 2020 16:54:05 +0200 Subject: [PATCH 2/7] fsm: Config for limiting max sealing sectors --- extern/storage-sealing/fsm.go | 2 ++ extern/storage-sealing/sealing.go | 19 +++++++++++ extern/storage-sealing/sector_state.go | 11 +++++++ extern/storage-sealing/stats.go | 45 ++++++++++++++++++++++++++ 4 files changed, 77 insertions(+) create mode 100644 extern/storage-sealing/stats.go diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index ddfebad24..4842e6023 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -225,6 +225,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta */ + m.stats.updateSector(m.minerSector(state.SectorNumber), state.State) + switch state.State { // Happy path case Empty: diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 7ae2e5163..3ae75e189 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -81,6 +81,8 @@ type Sealing struct { upgradeLk sync.Mutex toUpgrade map[abi.SectorNumber]struct{} + stats SectorStats + getConfig GetSealingConfigFunc } @@ -268,6 +270,12 @@ func (m *Sealing) newDealSector() (abi.SectorNumber, error) { return 0, xerrors.Errorf("getting config: %w", err) } + if cfg.MaxSealingSectors > 0 { + if m.stats.curSealing() > cfg.MaxSealingSectors { + return 0, xerrors.Errorf("too many sectors sealing") + } + } + if cfg.MaxWaitDealsSectors > 0 { // run in a loop because we have to drop the map lock here for a bit tries := 0 @@ -353,6 +361,17 @@ func (m *Sealing) newDealSector() (abi.SectorNumber, error) { // newSectorCC accepts a slice of pieces with no deal (junk data) func (m *Sealing) newSectorCC(sid abi.SectorNumber, pieces []Piece) error { + cfg, err := m.getConfig() + if err != nil { + return xerrors.Errorf("getting config: %w", err) + } + + if cfg.MaxSealingSectors > 0 { + if m.stats.curSealing() > cfg.MaxSealingSectors { + return xerrors.Errorf("too many sectors sealing (curSealing: %d, max: %d)", m.stats.curSealing(), cfg.MaxSealingSectors) + } + } + rt, err := ffiwrapper.SealProofTypeFromSectorSize(m.sealer.SectorSize()) if err != nil { return xerrors.Errorf("bad sector size: %w", err) diff --git a/extern/storage-sealing/sector_state.go b/extern/storage-sealing/sector_state.go index f2801c9fc..2f57d83e8 100644 --- a/extern/storage-sealing/sector_state.go +++ b/extern/storage-sealing/sector_state.go @@ -36,3 +36,14 @@ const ( RemoveFailed SectorState = "RemoveFailed" Removed SectorState = "Removed" ) + +func toStatState(st SectorState) statSectorState { + switch st { + case Empty, WaitDeals, Packing, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, WaitSeed, Committing, CommitWait, FinalizeSector: + return sstSealing + case Proving, Removed, Removing: + return sstProving + } + + return sstFailed +} diff --git a/extern/storage-sealing/stats.go b/extern/storage-sealing/stats.go new file mode 100644 index 000000000..3d31a39ff --- /dev/null +++ b/extern/storage-sealing/stats.go @@ -0,0 +1,45 @@ +package sealing + +import ( + "sync" + + "github.com/filecoin-project/specs-actors/actors/abi" +) + +type statSectorState int +const ( + sstSealing statSectorState = iota + sstFailed + sstProving + nsst +) + +type SectorStats struct { + lk sync.Mutex + + bySector map[abi.SectorID]statSectorState + totals [nsst]uint64 +} + +func (ss *SectorStats) updateSector(id abi.SectorID, st SectorState) { + ss.lk.Lock() + defer ss.lk.Unlock() + + oldst, found := ss.bySector[id] + if found { + ss.totals[oldst]-- + } + + sst := toStatState(st) + ss.bySector[id] = sst + ss.totals[sst]++ +} + + +// return the number of sectors currently in the sealing pipeline +func (ss *SectorStats) curSealing() uint64 { + ss.lk.Lock() + defer ss.lk.Unlock() + + return ss.totals[sstSealing] + ss.totals[sstFailed] +} From 886d9cd5eb9ccb4fdb821d27d584bc30ca271c34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 18 Aug 2020 18:02:13 +0200 Subject: [PATCH 3/7] fsm: handle already-precommitted CommitFailed sectors correctly --- extern/storage-sealing/checks.go | 15 +++++++++++++++ extern/storage-sealing/fsm.go | 3 +++ extern/storage-sealing/fsm_events.go | 4 ++++ extern/storage-sealing/sealing.go | 10 ++++++++++ extern/storage-sealing/states_failed.go | 12 ++++++++++++ extern/storage-sealing/states_sealing.go | 18 ++++++++++++++++++ storage/adapter_storage_miner.go | 23 ++++++++++++++++++++++- storage/miner.go | 1 + 8 files changed, 85 insertions(+), 1 deletion(-) diff --git a/extern/storage-sealing/checks.go b/extern/storage-sealing/checks.go index 95ef101fa..af62b9548 100644 --- a/extern/storage-sealing/checks.go +++ b/extern/storage-sealing/checks.go @@ -26,10 +26,12 @@ type ErrBadCommD struct{ error } type ErrExpiredTicket struct{ error } type ErrBadTicket struct{ error } type ErrPrecommitOnChain struct{ error } +type ErrSectorNumberAllocated struct{ error } type ErrBadSeed struct{ error } type ErrInvalidProof struct{ error } type ErrNoPrecommit struct{ error } +type ErrCommitWaitFailed struct{ error } func checkPieces(ctx context.Context, si SectorInfo, api SealingAPI) error { tok, height, err := api.ChainHead(ctx) @@ -87,6 +89,9 @@ func checkPrecommit(ctx context.Context, maddr address.Address, si SectorInfo, t pci, err := api.StateSectorPreCommitInfo(ctx, maddr, si.SectorNumber, tok) if err != nil { + if err == ErrSectorAllocated { + return &ErrSectorNumberAllocated{err} + } return &ErrApi{xerrors.Errorf("getting precommit info: %w", err)} } @@ -106,6 +111,16 @@ func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte, } pci, err := m.api.StateSectorPreCommitInfo(ctx, m.maddr, si.SectorNumber, tok) + if err == ErrSectorAllocated { + // not much more we can check here, basically try to wait for commit, + // and hope that this will work + + if si.CommitMessage != nil { + return &ErrCommitWaitFailed{err} + } + + return err + } if err != nil { return xerrors.Errorf("getting precommit info: %w", err) } diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index 4842e6023..d9648a99d 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -108,6 +108,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto on(SectorRetryPreCommitWait{}, PreCommitWait), on(SectorChainPreCommitFailed{}, PreCommitFailed), on(SectorRetryPreCommit{}, PreCommitting), + on(SectorRetryCommitWait{}, CommitWait), ), FinalizeFailed: planOne( on(SectorRetryFinalize{}, FinalizeSector), @@ -317,6 +318,8 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) error { state.State = SealPreCommit1Failed case SectorCommitFailed: state.State = CommitFailed + case SectorRetryCommitWait: + state.State = CommitWait default: return xerrors.Errorf("planCommitting got event of unknown type %T, events: %+v", event.User, events) } diff --git a/extern/storage-sealing/fsm_events.go b/extern/storage-sealing/fsm_events.go index c4278991e..f270b3668 100644 --- a/extern/storage-sealing/fsm_events.go +++ b/extern/storage-sealing/fsm_events.go @@ -252,6 +252,10 @@ func (evt SectorRetryInvalidProof) apply(state *SectorInfo) { state.InvalidProofs++ } +type SectorRetryCommitWait struct{} + +func (evt SectorRetryCommitWait) apply(state *SectorInfo) {} + // Faults type SectorFaulty struct{} diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 3ae75e189..3a6bb8e5f 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -2,6 +2,7 @@ package sealing import ( "context" + "errors" "io" "math" "sync" @@ -44,9 +45,14 @@ type Config struct { WaitDealsDelay time.Duration } +var ErrSectorAllocated = errors.New("sectorNumber is allocated, but PreCommit info wasn't found on chain") + type SealingAPI interface { StateWaitMsg(context.Context, cid.Cid) (MsgLookup, error) + StateSearchMsg(context.Context, cid.Cid) (*MsgLookup, error) StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tok TipSetToken) (cid.Cid, error) + + // Can return ErrSectorAllocated in case precommit info wasn't found, but the sector number is marked as allocated StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorPreCommitOnChainInfo, error) StateSectorGetInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorOnChainInfo, error) StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*SectorLocation, error) @@ -121,6 +127,10 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds toUpgrade: map[abi.SectorNumber]struct{}{}, getConfig: gc, + + stats: SectorStats{ + bySector: map[abi.SectorID]statSectorState{}, + }, } s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{}) diff --git a/extern/storage-sealing/states_failed.go b/extern/storage-sealing/states_failed.go index e208a8cca..cf829f44f 100644 --- a/extern/storage-sealing/states_failed.go +++ b/extern/storage-sealing/states_failed.go @@ -85,6 +85,10 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI return ctx.Send(SectorRetryPreCommit{}) case *ErrPrecommitOnChain: // noop + case *ErrSectorNumberAllocated: + log.Errorf("handlePreCommitFailed: sector number already allocated, not proceeding: %+v", err) + // TODO: check if the sector is committed (not sure how we'd end up here) + return nil default: return xerrors.Errorf("checkPrecommit sanity check error: %w", err) } @@ -158,6 +162,8 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("no precommit: %w", err)}) case *ErrPrecommitOnChain: // noop, this is expected + case *ErrSectorNumberAllocated: + // noop, already committed? default: return xerrors.Errorf("checkPrecommit sanity check error (%T): %w", err, err) } @@ -186,6 +192,12 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo return ctx.Send(SectorRetryPreCommitWait{}) case *ErrNoPrecommit: return ctx.Send(SectorRetryPreCommit{}) + case *ErrCommitWaitFailed: + if err := failedCooldown(ctx, sector); err != nil { + return err + } + + return ctx.Send(SectorRetryCommitWait{}) default: return xerrors.Errorf("checkCommit sanity check error (%T): %w", err, err) } diff --git a/extern/storage-sealing/states_sealing.go b/extern/storage-sealing/states_sealing.go index 2178ce0b4..110a73ac0 100644 --- a/extern/storage-sealing/states_sealing.go +++ b/extern/storage-sealing/states_sealing.go @@ -149,6 +149,10 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad ticket: %w", err)}) case *ErrPrecommitOnChain: return ctx.Send(SectorPreCommitLanded{TipSet: tok}) // we re-did precommit + case *ErrSectorNumberAllocated: + log.Errorf("handlePreCommitFailed: sector number already allocated, not proceeding: %+v", err) + // TODO: check if the sector is committed (not sure how we'd end up here) + return nil default: return xerrors.Errorf("checkPrecommit sanity check error: %w", err) } @@ -275,6 +279,20 @@ func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) er } func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) error { + if sector.CommitMessage != nil { + log.Warnf("sector %d entered committing state with a commit message cid", sector.SectorNumber) + + ml, err := m.api.StateSearchMsg(ctx.Context(), *sector.CommitMessage) + if err != nil { + log.Warnf("sector %d searching existing commit message %s: %+v", sector.SectorNumber, *sector.CommitMessage, err) + } + + if ml != nil { + // some weird retry paths can lead here + return ctx.Send(SectorRetryCommitWait{}) + } + } + log.Info("scheduling seal proof computation...") log.Infof("KOMIT %d %x(%d); %x(%d); %v; r:%x; d:%x", sector.SectorNumber, sector.TicketValue, sector.TicketEpoch, sector.SeedValue, sector.SeedEpoch, sector.pieceInfos(), sector.CommR, sector.CommD) diff --git a/storage/adapter_storage_miner.go b/storage/adapter_storage_miner.go index 8881e599e..1890a369f 100644 --- a/storage/adapter_storage_miner.go +++ b/storage/adapter_storage_miner.go @@ -108,6 +108,27 @@ func (s SealingAPIAdapter) StateWaitMsg(ctx context.Context, mcid cid.Cid) (seal }, nil } +func (s SealingAPIAdapter) StateSearchMsg(ctx context.Context, c cid.Cid) (*sealing.MsgLookup, error) { + wmsg, err := s.delegate.StateSearchMsg(ctx, c) + if err != nil { + return nil, err + } + + if wmsg == nil { + return nil, nil + } + + return &sealing.MsgLookup{ + Receipt: sealing.MessageReceipt{ + ExitCode: wmsg.Receipt.ExitCode, + Return: wmsg.Receipt.Return, + GasUsed: wmsg.Receipt.GasUsed, + }, + TipSetTok: wmsg.TipSet.Bytes(), + Height: wmsg.Height, + }, nil +} + func (s SealingAPIAdapter) StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tok sealing.TipSetToken) (cid.Cid, error) { tsk, err := types.TipSetKeyFromBytes(tok) if err != nil { @@ -186,7 +207,7 @@ func (s SealingAPIAdapter) StateSectorPreCommitInfo(ctx context.Context, maddr a return nil, xerrors.Errorf("checking if sector is allocated: %w", err) } if set { - return nil, xerrors.Errorf("sectorNumber is allocated") + return nil, sealing.ErrSectorAllocated } return nil, nil diff --git a/storage/miner.go b/storage/miner.go index 803556cdf..7baffee30 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -60,6 +60,7 @@ type storageMinerApi interface { StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*miner.DeadlineInfo, error) StateMinerPreCommitDepositForPower(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (types.BigInt, error) StateMinerInitialPledgeCollateral(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (types.BigInt, error) + StateSearchMsg(context.Context, cid.Cid) (*api.MsgLookup, error) StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64) (*api.MsgLookup, error) // TODO: removeme eventually StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error) StateGetReceipt(context.Context, cid.Cid, types.TipSetKey) (*types.MessageReceipt, error) From baf91f08a12962067d3235c75705025fb3699752 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 18 Aug 2020 18:27:18 +0200 Subject: [PATCH 4/7] Unbreak tests --- extern/storage-sealing/fsm_test.go | 26 +++++++++++++++++++--- extern/storage-sealing/sealiface/config.go | 16 +++++++++++++ extern/storage-sealing/sealing.go | 10 --------- extern/storage-sealing/types.go | 3 ++- node/modules/dtypes/miner.go | 7 +++--- node/modules/storageminer.go | 7 +++--- 6 files changed, 49 insertions(+), 20 deletions(-) create mode 100644 extern/storage-sealing/sealiface/config.go diff --git a/extern/storage-sealing/fsm_test.go b/extern/storage-sealing/fsm_test.go index b1e53133c..474ea1966 100644 --- a/extern/storage-sealing/fsm_test.go +++ b/extern/storage-sealing/fsm_test.go @@ -3,6 +3,8 @@ package sealing import ( "testing" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/specs-actors/actors/abi" logging "github.com/ipfs/go-log/v2" "github.com/stretchr/testify/require" @@ -25,8 +27,14 @@ type test struct { } func TestHappyPath(t *testing.T) { + ma, _ := address.NewIDAddress(55151) m := test{ - s: &Sealing{}, + s: &Sealing{ + maddr: ma, + stats: SectorStats{ + bySector: map[abi.SectorID]statSectorState{}, + }, + }, t: t, state: &SectorInfo{State: Packing}, } @@ -60,8 +68,14 @@ func TestHappyPath(t *testing.T) { } func TestSeedRevert(t *testing.T) { + ma, _ := address.NewIDAddress(55151) m := test{ - s: &Sealing{}, + s: &Sealing{ + maddr: ma, + stats: SectorStats{ + bySector: map[abi.SectorID]statSectorState{}, + }, + }, t: t, state: &SectorInfo{State: Packing}, } @@ -101,8 +115,14 @@ func TestSeedRevert(t *testing.T) { } func TestPlanCommittingHandlesSectorCommitFailed(t *testing.T) { + ma, _ := address.NewIDAddress(55151) m := test{ - s: &Sealing{}, + s: &Sealing{ + maddr: ma, + stats: SectorStats{ + bySector: map[abi.SectorID]statSectorState{}, + }, + }, t: t, state: &SectorInfo{State: Committing}, } diff --git a/extern/storage-sealing/sealiface/config.go b/extern/storage-sealing/sealiface/config.go new file mode 100644 index 000000000..ad0b12c40 --- /dev/null +++ b/extern/storage-sealing/sealiface/config.go @@ -0,0 +1,16 @@ +package sealiface + +import "time" + +// this has to be in a separate package to not make lotus API depend on filecoin-ffi + +type Config struct { + // 0 = no limit + MaxWaitDealsSectors uint64 + + // includes failed, 0 = no limit + MaxSealingSectors uint64 + + WaitDealsDelay time.Duration +} + diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 3a6bb8e5f..006118e58 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -35,16 +35,6 @@ type SectorLocation struct { Partition uint64 } -type Config struct { - // 0 = no limit - MaxWaitDealsSectors uint64 - - // includes failed, 0 = no limit - MaxSealingSectors uint64 - - WaitDealsDelay time.Duration -} - var ErrSectorAllocated = errors.New("sectorNumber is allocated, but PreCommit info wasn't found on chain") type SealingAPI interface { diff --git a/extern/storage-sealing/types.go b/extern/storage-sealing/types.go index 1e7c9f76c..a9c2a7203 100644 --- a/extern/storage-sealing/types.go +++ b/extern/storage-sealing/types.go @@ -12,6 +12,7 @@ import ( "github.com/filecoin-project/specs-storage/storage" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" + "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" ) // Piece is a tuple of piece and deal info @@ -186,7 +187,7 @@ type MessageReceipt struct { GasUsed int64 } -type GetSealingConfigFunc func() (Config, error) +type GetSealingConfigFunc func() (sealiface.Config, error) func (mr *MessageReceipt) Equals(o *MessageReceipt) bool { return mr.ExitCode == o.ExitCode && bytes.Equal(mr.Return, o.Return) && mr.GasUsed == o.GasUsed diff --git a/node/modules/dtypes/miner.go b/node/modules/dtypes/miner.go index 600d19dc7..d559a2de1 100644 --- a/node/modules/dtypes/miner.go +++ b/node/modules/dtypes/miner.go @@ -8,8 +8,9 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-fil-markets/storagemarket" - sealing "github.com/filecoin-project/lotus/extern/storage-sealing" "github.com/filecoin-project/specs-actors/actors/abi" + + "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" ) type MinerAddress address.Address @@ -57,10 +58,10 @@ type ConsiderOfflineRetrievalDealsConfigFunc func() (bool, error) type SetConsiderOfflineRetrievalDealsConfigFunc func(bool) error // SetSealingDelay sets how long a sector waits for more deals before sealing begins. -type SetSealingConfigFunc func(sealing.Config) error +type SetSealingConfigFunc func(sealiface.Config) error // GetSealingDelay returns how long a sector waits for more deals before sealing begins. -type GetSealingConfigFunc func() (sealing.Config, error) +type GetSealingConfigFunc func() (sealiface.Config, error) // SetExpectedSealDurationFunc is a function which is used to set how long sealing is expected to take. // Deals that would need to start earlier than this duration will be rejected. diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index e0cbb434e..3be869f0f 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -48,6 +48,7 @@ import ( "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" "github.com/filecoin-project/lotus/extern/sector-storage/stores" sealing "github.com/filecoin-project/lotus/extern/storage-sealing" + "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" lapi "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" @@ -594,7 +595,7 @@ func NewSetConsiderOfflineRetrievalDealsConfigFunc(r repo.LockedRepo) (dtypes.Se } func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error) { - return func(cfg sealing.Config) (err error) { + return func(cfg sealiface.Config) (err error) { err = mutateCfg(r, func(c *config.StorageMiner) { c.Sealing = config.SealingConfig{ MaxWaitDealsSectors: cfg.MaxWaitDealsSectors, @@ -607,9 +608,9 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error } func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error) { - return func() (out sealing.Config, err error) { + return func() (out sealiface.Config, err error) { err = readCfg(r, func(cfg *config.StorageMiner) { - out = sealing.Config{ + out = sealiface.Config{ MaxWaitDealsSectors: cfg.Sealing.MaxWaitDealsSectors, MaxSealingSectors: cfg.Sealing.MaxSealingSectors, WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay), From e7d65be90a5832afb4fd3e82c336ed27f477d4f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 18 Aug 2020 18:27:28 +0200 Subject: [PATCH 5/7] gofmt --- cli/pprof.go | 5 ++--- cmd/lotus-storage-miner/config.go | 2 +- extern/storage-sealing/fsm_test.go | 24 +++++++++++----------- extern/storage-sealing/sealiface/config.go | 1 - extern/storage-sealing/sealing.go | 2 +- extern/storage-sealing/stats.go | 4 ++-- node/config/def.go | 4 ++-- 7 files changed, 20 insertions(+), 22 deletions(-) diff --git a/cli/pprof.go b/cli/pprof.go index dff089e7f..50a67ef86 100644 --- a/cli/pprof.go +++ b/cli/pprof.go @@ -13,7 +13,7 @@ import ( ) var pprofCmd = &cli.Command{ - Name: "pprof", + Name: "pprof", Hidden: true, Subcommands: []*cli.Command{ PprofGoroutines, @@ -42,7 +42,7 @@ var PprofGoroutines = &cli.Command{ return err } - addr = "http://" + addr + "/debug/pprof/goroutine?debug=2" + addr = "http://" + addr + "/debug/pprof/goroutine?debug=2" r, err := http.Get(addr) if err != nil { @@ -56,4 +56,3 @@ var PprofGoroutines = &cli.Command{ return r.Body.Close() }, } - diff --git a/cmd/lotus-storage-miner/config.go b/cmd/lotus-storage-miner/config.go index 0c843fe23..e5e4fc4c4 100644 --- a/cmd/lotus-storage-miner/config.go +++ b/cmd/lotus-storage-miner/config.go @@ -9,7 +9,7 @@ import ( ) var configCmd = &cli.Command{ - Name: "config", + Name: "config", Usage: "Output default configuration", Action: func(cctx *cli.Context) error { comm, err := config.ConfigComment(config.DefaultStorageMiner()) diff --git a/extern/storage-sealing/fsm_test.go b/extern/storage-sealing/fsm_test.go index 474ea1966..f41d8c535 100644 --- a/extern/storage-sealing/fsm_test.go +++ b/extern/storage-sealing/fsm_test.go @@ -70,12 +70,12 @@ func TestHappyPath(t *testing.T) { func TestSeedRevert(t *testing.T) { ma, _ := address.NewIDAddress(55151) m := test{ - s: &Sealing{ - maddr: ma, - stats: SectorStats{ - bySector: map[abi.SectorID]statSectorState{}, - }, - }, + s: &Sealing{ + maddr: ma, + stats: SectorStats{ + bySector: map[abi.SectorID]statSectorState{}, + }, + }, t: t, state: &SectorInfo{State: Packing}, } @@ -117,12 +117,12 @@ func TestSeedRevert(t *testing.T) { func TestPlanCommittingHandlesSectorCommitFailed(t *testing.T) { ma, _ := address.NewIDAddress(55151) m := test{ - s: &Sealing{ - maddr: ma, - stats: SectorStats{ - bySector: map[abi.SectorID]statSectorState{}, - }, - }, + s: &Sealing{ + maddr: ma, + stats: SectorStats{ + bySector: map[abi.SectorID]statSectorState{}, + }, + }, t: t, state: &SectorInfo{State: Committing}, } diff --git a/extern/storage-sealing/sealiface/config.go b/extern/storage-sealing/sealiface/config.go index ad0b12c40..36fa7edad 100644 --- a/extern/storage-sealing/sealiface/config.go +++ b/extern/storage-sealing/sealiface/config.go @@ -13,4 +13,3 @@ type Config struct { WaitDealsDelay time.Duration } - diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 006118e58..43c8973db 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -297,7 +297,7 @@ func (m *Sealing) newDealSector() (abi.SectorNumber, error) { var best abi.SectorNumber = math.MaxUint64 for sn, info := range m.unsealedInfoMap.infos { - if info.stored + 1 > mostStored + 1 { // 18446744073709551615 + 1 = 0 + if info.stored+1 > mostStored+1 { // 18446744073709551615 + 1 = 0 best = sn } } diff --git a/extern/storage-sealing/stats.go b/extern/storage-sealing/stats.go index 3d31a39ff..871c962c1 100644 --- a/extern/storage-sealing/stats.go +++ b/extern/storage-sealing/stats.go @@ -7,6 +7,7 @@ import ( ) type statSectorState int + const ( sstSealing statSectorState = iota sstFailed @@ -18,7 +19,7 @@ type SectorStats struct { lk sync.Mutex bySector map[abi.SectorID]statSectorState - totals [nsst]uint64 + totals [nsst]uint64 } func (ss *SectorStats) updateSector(id abi.SectorID, st SectorState) { @@ -35,7 +36,6 @@ func (ss *SectorStats) updateSector(id abi.SectorID, st SectorState) { ss.totals[sst]++ } - // return the number of sectors currently in the sealing pipeline func (ss *SectorStats) curSealing() uint64 { ss.lk.Lock() diff --git a/node/config/def.go b/node/config/def.go index fcddf73ba..f3b33f5aa 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -139,11 +139,11 @@ func DefaultFullNode() *FullNode { func DefaultStorageMiner() *StorageMiner { cfg := &StorageMiner{ Common: defCommon(), - + Sealing: SealingConfig{ MaxWaitDealsSectors: 2, // 64G with 32G sectors MaxSealingSectors: 0, - WaitDealsDelay: Duration(time.Hour), + WaitDealsDelay: Duration(time.Hour), }, Storage: sectorstorage.SealerConfig{ From 6a7d72c030435523cabc7b3b775ecc92c39d4088 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 18 Aug 2020 19:26:17 +0200 Subject: [PATCH 6/7] fsm: fail early on too-many-sealing-sectors in pledge --- extern/storage-sealing/garbage.go | 11 +++++++++++ extern/storage-sealing/sealing.go | 11 ----------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/extern/storage-sealing/garbage.go b/extern/storage-sealing/garbage.go index d8cdb4248..4b95c1b67 100644 --- a/extern/storage-sealing/garbage.go +++ b/extern/storage-sealing/garbage.go @@ -31,6 +31,17 @@ func (m *Sealing) pledgeSector(ctx context.Context, sectorID abi.SectorID, exist } func (m *Sealing) PledgeSector() error { + cfg, err := m.getConfig() + if err != nil { + return xerrors.Errorf("getting config: %w", err) + } + + if cfg.MaxSealingSectors > 0 { + if m.stats.curSealing() > cfg.MaxSealingSectors { + return xerrors.Errorf("too many sectors sealing (curSealing: %d, max: %d)", m.stats.curSealing(), cfg.MaxSealingSectors) + } + } + go func() { ctx := context.TODO() // we can't use the context from command which invokes // this, as we run everything here async, and it's cancelled when the diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 43c8973db..d52f2d731 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -361,17 +361,6 @@ func (m *Sealing) newDealSector() (abi.SectorNumber, error) { // newSectorCC accepts a slice of pieces with no deal (junk data) func (m *Sealing) newSectorCC(sid abi.SectorNumber, pieces []Piece) error { - cfg, err := m.getConfig() - if err != nil { - return xerrors.Errorf("getting config: %w", err) - } - - if cfg.MaxSealingSectors > 0 { - if m.stats.curSealing() > cfg.MaxSealingSectors { - return xerrors.Errorf("too many sectors sealing (curSealing: %d, max: %d)", m.stats.curSealing(), cfg.MaxSealingSectors) - } - } - rt, err := ffiwrapper.SealProofTypeFromSectorSize(m.sealer.SectorSize()) if err != nil { return xerrors.Errorf("bad sector size: %w", err) From a03192a39b0f1bb30cda4347ef03bcfa9eff495c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 18 Aug 2020 19:52:20 +0200 Subject: [PATCH 7/7] Separate concurrent sealing sector limit for deals --- extern/storage-sealing/sealiface/config.go | 3 +++ extern/storage-sealing/sealing.go | 7 +++++-- node/config/def.go | 10 +++++++--- node/modules/storageminer.go | 7 ++++--- 4 files changed, 19 insertions(+), 8 deletions(-) diff --git a/extern/storage-sealing/sealiface/config.go b/extern/storage-sealing/sealiface/config.go index 36fa7edad..945565562 100644 --- a/extern/storage-sealing/sealiface/config.go +++ b/extern/storage-sealing/sealiface/config.go @@ -11,5 +11,8 @@ type Config struct { // includes failed, 0 = no limit MaxSealingSectors uint64 + // includes failed, 0 = no limit + MaxSealingSectorsForDeals uint64 + WaitDealsDelay time.Duration } diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index d52f2d731..062ade2a3 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -270,8 +270,8 @@ func (m *Sealing) newDealSector() (abi.SectorNumber, error) { return 0, xerrors.Errorf("getting config: %w", err) } - if cfg.MaxSealingSectors > 0 { - if m.stats.curSealing() > cfg.MaxSealingSectors { + if cfg.MaxSealingSectorsForDeals > 0 { + if m.stats.curSealing() > cfg.MaxSealingSectorsForDeals { return 0, xerrors.Errorf("too many sectors sealing") } } @@ -280,6 +280,9 @@ func (m *Sealing) newDealSector() (abi.SectorNumber, error) { // run in a loop because we have to drop the map lock here for a bit tries := 0 + // we have to run in a loop as we're dropping unsealedInfoMap.lk + // to actually call StartPacking. When we do that, another entry can + // get added to unsealedInfoMap. for uint64(len(m.unsealedInfoMap.infos)) >= cfg.MaxWaitDealsSectors { if tries > 10 { // whatever... diff --git a/node/config/def.go b/node/config/def.go index f3b33f5aa..d10810998 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -54,6 +54,9 @@ type SealingConfig struct { // includes failed, 0 = no limit MaxSealingSectors uint64 + // includes failed, 0 = no limit + MaxSealingSectorsForDeals uint64 + WaitDealsDelay Duration } @@ -141,9 +144,10 @@ func DefaultStorageMiner() *StorageMiner { Common: defCommon(), Sealing: SealingConfig{ - MaxWaitDealsSectors: 2, // 64G with 32G sectors - MaxSealingSectors: 0, - WaitDealsDelay: Duration(time.Hour), + MaxWaitDealsSectors: 2, // 64G with 32G sectors + MaxSealingSectors: 0, + MaxSealingSectorsForDeals: 0, + WaitDealsDelay: Duration(time.Hour), }, Storage: sectorstorage.SealerConfig{ diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 3be869f0f..ce2427f2c 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -611,9 +611,10 @@ func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error return func() (out sealiface.Config, err error) { err = readCfg(r, func(cfg *config.StorageMiner) { out = sealiface.Config{ - MaxWaitDealsSectors: cfg.Sealing.MaxWaitDealsSectors, - MaxSealingSectors: cfg.Sealing.MaxSealingSectors, - WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay), + MaxWaitDealsSectors: cfg.Sealing.MaxWaitDealsSectors, + MaxSealingSectors: cfg.Sealing.MaxSealingSectors, + MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals, + WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay), } }) return