fsm: Config for max WaitDeals sectors

This commit is contained in:
Łukasz Magiera 2020-08-18 16:20:31 +02:00
parent 2d73b04f7d
commit 65ffde9e4b
9 changed files with 151 additions and 57 deletions

View File

@ -328,7 +328,7 @@ func (m *Sealing) restartSectors(ctx context.Context) error {
log.Errorf("loading sector list: %+v", err) log.Errorf("loading sector list: %+v", err)
} }
sd, err := m.getSealDelay() cfg, err := m.getConfig()
if err != nil { if err != nil {
return xerrors.Errorf("getting the sealing delay: %w", err) return xerrors.Errorf("getting the sealing delay: %w", err)
} }
@ -339,8 +339,8 @@ func (m *Sealing) restartSectors(ctx context.Context) error {
} }
if sector.State == WaitDeals { if sector.State == WaitDeals {
if sd > 0 { if cfg.WaitDealsDelay > 0 {
timer := time.NewTimer(sd) timer := time.NewTimer(cfg.WaitDealsDelay)
go func() { go func() {
<-timer.C <-timer.C
m.StartPacking(sector.SectorNumber) m.StartPacking(sector.SectorNumber)

View File

@ -3,6 +3,7 @@ package sealing
import ( import (
"context" "context"
"io" "io"
"math"
"sync" "sync"
"time" "time"
@ -33,6 +34,16 @@ type SectorLocation struct {
Partition uint64 Partition uint64
} }
type Config struct {
// 0 = no limit
MaxWaitDealsSectors uint64
// includes failed, 0 = no limit
MaxSealingSectors uint64
WaitDealsDelay time.Duration
}
type SealingAPI interface { type SealingAPI interface {
StateWaitMsg(context.Context, cid.Cid) (MsgLookup, error) StateWaitMsg(context.Context, cid.Cid) (MsgLookup, error)
StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tok TipSetToken) (cid.Cid, error) StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tok TipSetToken) (cid.Cid, error)
@ -70,7 +81,7 @@ type Sealing struct {
upgradeLk sync.Mutex upgradeLk sync.Mutex
toUpgrade map[abi.SectorNumber]struct{} toUpgrade map[abi.SectorNumber]struct{}
getSealDelay GetSealingDelayFunc getConfig GetSealingConfigFunc
} }
type FeeConfig struct { type FeeConfig struct {
@ -80,7 +91,7 @@ type FeeConfig struct {
type UnsealedSectorMap struct { type UnsealedSectorMap struct {
infos map[abi.SectorNumber]UnsealedSectorInfo infos map[abi.SectorNumber]UnsealedSectorInfo
mux sync.Mutex lk sync.Mutex
} }
type UnsealedSectorInfo struct { type UnsealedSectorInfo struct {
@ -90,7 +101,7 @@ type UnsealedSectorInfo struct {
pieceSizes []abi.UnpaddedPieceSize pieceSizes []abi.UnpaddedPieceSize
} }
func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy, gsd GetSealingDelayFunc) *Sealing { func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds datastore.Batching, sealer sectorstorage.SectorManager, sc SectorIDCounter, verif ffiwrapper.Verifier, pcp PreCommitPolicy, gc GetSealingConfigFunc) *Sealing {
s := &Sealing{ s := &Sealing{
api: api, api: api,
feeCfg: fc, feeCfg: fc,
@ -103,11 +114,11 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds
pcp: pcp, pcp: pcp,
unsealedInfoMap: UnsealedSectorMap{ unsealedInfoMap: UnsealedSectorMap{
infos: make(map[abi.SectorNumber]UnsealedSectorInfo), infos: make(map[abi.SectorNumber]UnsealedSectorInfo),
mux: sync.Mutex{}, lk: sync.Mutex{},
}, },
toUpgrade: map[abi.SectorNumber]struct{}{}, toUpgrade: map[abi.SectorNumber]struct{}{},
getSealDelay: gsd, getConfig: gc,
} }
s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{}) s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{})
@ -137,18 +148,18 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec
return 0, 0, xerrors.Errorf("piece cannot fit into a sector") return 0, 0, xerrors.Errorf("piece cannot fit into a sector")
} }
m.unsealedInfoMap.mux.Lock() m.unsealedInfoMap.lk.Lock()
sid, pads, err := m.getSectorAndPadding(size) sid, pads, err := m.getSectorAndPadding(size)
if err != nil { if err != nil {
m.unsealedInfoMap.mux.Unlock() m.unsealedInfoMap.lk.Unlock()
return 0, 0, xerrors.Errorf("getting available sector: %w", err) return 0, 0, xerrors.Errorf("getting available sector: %w", err)
} }
for _, p := range pads { for _, p := range pads {
err = m.addPiece(ctx, sid, p.Unpadded(), NewNullReader(p.Unpadded()), nil) err = m.addPiece(ctx, sid, p.Unpadded(), NewNullReader(p.Unpadded()), nil)
if err != nil { if err != nil {
m.unsealedInfoMap.mux.Unlock() m.unsealedInfoMap.lk.Unlock()
return 0, 0, xerrors.Errorf("writing pads: %w", err) return 0, 0, xerrors.Errorf("writing pads: %w", err)
} }
} }
@ -157,12 +168,15 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec
err = m.addPiece(ctx, sid, size, r, &d) err = m.addPiece(ctx, sid, size, r, &d)
if err != nil { if err != nil {
m.unsealedInfoMap.mux.Unlock() m.unsealedInfoMap.lk.Unlock()
return 0, 0, xerrors.Errorf("adding piece to sector: %w", err) return 0, 0, xerrors.Errorf("adding piece to sector: %w", err)
} }
m.unsealedInfoMap.mux.Unlock() startPacking := m.unsealedInfoMap.infos[sid].numDeals >= getDealPerSectorLimit(m.sealer.SectorSize())
if m.unsealedInfoMap.infos[sid].numDeals == getDealPerSectorLimit(m.sealer.SectorSize()) {
m.unsealedInfoMap.lk.Unlock()
if startPacking {
if err := m.StartPacking(sid); err != nil { if err := m.StartPacking(sid); err != nil {
return 0, 0, xerrors.Errorf("start packing: %w", err) return 0, 0, xerrors.Errorf("start packing: %w", err)
} }
@ -171,7 +185,7 @@ func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPiec
return sid, offset, nil return sid, offset, nil
} }
// Caller should hold m.unsealedInfoMap.mux // Caller should hold m.unsealedInfoMap.lk
func (m *Sealing) addPiece(ctx context.Context, sectorID abi.SectorNumber, size abi.UnpaddedPieceSize, r io.Reader, di *DealInfo) error { func (m *Sealing) addPiece(ctx context.Context, sectorID abi.SectorNumber, size abi.UnpaddedPieceSize, r io.Reader, di *DealInfo) error {
log.Infof("Adding piece to sector %d", sectorID) log.Infof("Adding piece to sector %d", sectorID)
ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx, DealSectorPriority), m.minerSector(sectorID), m.unsealedInfoMap.infos[sectorID].pieceSizes, size, r) ppi, err := m.sealer.AddPiece(sectorstorage.WithPriority(ctx, DealSectorPriority), m.minerSector(sectorID), m.unsealedInfoMap.infos[sectorID].pieceSizes, size, r)
@ -206,7 +220,7 @@ func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error {
return m.sectors.Send(uint64(sid), SectorRemove{}) return m.sectors.Send(uint64(sid), SectorRemove{})
} }
// Caller should NOT hold m.unsealedInfoMap.mux // Caller should NOT hold m.unsealedInfoMap.lk
func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error { func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error {
log.Infof("Starting packing sector %d", sectorID) log.Infof("Starting packing sector %d", sectorID)
err := m.sectors.Send(uint64(sectorID), SectorStartPacking{}) err := m.sectors.Send(uint64(sectorID), SectorStartPacking{})
@ -214,14 +228,14 @@ func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error {
return err return err
} }
m.unsealedInfoMap.mux.Lock() m.unsealedInfoMap.lk.Lock()
delete(m.unsealedInfoMap.infos, sectorID) delete(m.unsealedInfoMap.infos, sectorID)
m.unsealedInfoMap.mux.Unlock() m.unsealedInfoMap.lk.Unlock()
return nil return nil
} }
// Caller should hold m.unsealedInfoMap.mux // Caller should hold m.unsealedInfoMap.lk
func (m *Sealing) getSectorAndPadding(size abi.UnpaddedPieceSize) (abi.SectorNumber, []abi.PaddedPieceSize, error) { func (m *Sealing) getSectorAndPadding(size abi.UnpaddedPieceSize) (abi.SectorNumber, []abi.PaddedPieceSize, error) {
ss := abi.PaddedPieceSize(m.sealer.SectorSize()) ss := abi.PaddedPieceSize(m.sealer.SectorSize())
for k, v := range m.unsealedInfoMap.infos { for k, v := range m.unsealedInfoMap.infos {
@ -231,7 +245,7 @@ func (m *Sealing) getSectorAndPadding(size abi.UnpaddedPieceSize) (abi.SectorNum
} }
} }
ns, err := m.newSector() ns, err := m.newDealSector()
if err != nil { if err != nil {
return 0, nil, err return 0, nil, err
} }
@ -245,8 +259,57 @@ func (m *Sealing) getSectorAndPadding(size abi.UnpaddedPieceSize) (abi.SectorNum
return ns, nil, nil return ns, nil, nil
} }
// newSector creates a new sector for deal storage // newDealSector creates a new sector for deal storage
func (m *Sealing) newSector() (abi.SectorNumber, error) { func (m *Sealing) newDealSector() (abi.SectorNumber, error) {
// First make sure we don't have too many 'open' sectors
cfg, err := m.getConfig()
if err != nil {
return 0, xerrors.Errorf("getting config: %w", err)
}
if cfg.MaxWaitDealsSectors > 0 {
// run in a loop because we have to drop the map lock here for a bit
tries := 0
for uint64(len(m.unsealedInfoMap.infos)) >= cfg.MaxWaitDealsSectors {
if tries > 10 {
// whatever...
break
}
if tries > 0 {
m.unsealedInfoMap.lk.Unlock()
time.Sleep(time.Second)
m.unsealedInfoMap.lk.Lock()
}
tries++
var mostStored abi.PaddedPieceSize = math.MaxUint64
var best abi.SectorNumber = math.MaxUint64
for sn, info := range m.unsealedInfoMap.infos {
if info.stored + 1 > mostStored + 1 { // 18446744073709551615 + 1 = 0
best = sn
}
}
if best == math.MaxUint64 {
// probably not possible, but who knows
break
}
m.unsealedInfoMap.lk.Unlock()
if err := m.StartPacking(best); err != nil {
log.Error("newDealSector StartPacking error: %+v", err)
continue // let's pretend this is fine
}
m.unsealedInfoMap.lk.Lock()
}
}
// Now actually create a new sector
sid, err := m.sc.Next() sid, err := m.sc.Next()
if err != nil { if err != nil {
return 0, xerrors.Errorf("getting sector number: %w", err) return 0, xerrors.Errorf("getting sector number: %w", err)
@ -272,13 +335,13 @@ func (m *Sealing) newSector() (abi.SectorNumber, error) {
return 0, xerrors.Errorf("starting the sector fsm: %w", err) return 0, xerrors.Errorf("starting the sector fsm: %w", err)
} }
sd, err := m.getSealDelay() cf, err := m.getConfig()
if err != nil { if err != nil {
return 0, xerrors.Errorf("getting the sealing delay: %w", err) return 0, xerrors.Errorf("getting the sealing delay: %w", err)
} }
if sd > 0 { if cf.WaitDealsDelay > 0 {
timer := time.NewTimer(sd) timer := time.NewTimer(cf.WaitDealsDelay)
go func() { go func() {
<-timer.C <-timer.C
m.StartPacking(sid) m.StartPacking(sid)

View File

@ -3,8 +3,6 @@ package sealing
import ( import (
"bytes" "bytes"
"context" "context"
"time"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
@ -188,7 +186,7 @@ type MessageReceipt struct {
GasUsed int64 GasUsed int64
} }
type GetSealingDelayFunc func() (time.Duration, error) type GetSealingConfigFunc func() (Config, error)
func (mr *MessageReceipt) Equals(o *MessageReceipt) bool { func (mr *MessageReceipt) Equals(o *MessageReceipt) bool {
return mr.ExitCode == o.ExitCode && bytes.Equal(mr.Return, o.Return) && mr.GasUsed == o.GasUsed return mr.ExitCode == o.ExitCode && bytes.Equal(mr.Return, o.Return) && mr.GasUsed == o.GasUsed

View File

@ -328,8 +328,8 @@ func Online() Option {
Override(new(dtypes.SetConsiderOfflineStorageDealsConfigFunc), modules.NewSetConsideringOfflineStorageDealsFunc), Override(new(dtypes.SetConsiderOfflineStorageDealsConfigFunc), modules.NewSetConsideringOfflineStorageDealsFunc),
Override(new(dtypes.ConsiderOfflineRetrievalDealsConfigFunc), modules.NewConsiderOfflineRetrievalDealsConfigFunc), Override(new(dtypes.ConsiderOfflineRetrievalDealsConfigFunc), modules.NewConsiderOfflineRetrievalDealsConfigFunc),
Override(new(dtypes.SetConsiderOfflineRetrievalDealsConfigFunc), modules.NewSetConsiderOfflineRetrievalDealsConfigFunc), Override(new(dtypes.SetConsiderOfflineRetrievalDealsConfigFunc), modules.NewSetConsiderOfflineRetrievalDealsConfigFunc),
Override(new(dtypes.SetSealingDelayFunc), modules.NewSetSealDelayFunc), Override(new(dtypes.SetSealingConfigFunc), modules.NewSetSealConfigFunc),
Override(new(dtypes.GetSealingDelayFunc), modules.NewGetSealDelayFunc), Override(new(dtypes.GetSealingConfigFunc), modules.NewGetSealConfigFunc),
Override(new(dtypes.SetExpectedSealDurationFunc), modules.NewSetExpectedSealDurationFunc), Override(new(dtypes.SetExpectedSealDurationFunc), modules.NewSetExpectedSealDurationFunc),
Override(new(dtypes.GetExpectedSealDurationFunc), modules.NewGetExpectedSealDurationFunc), Override(new(dtypes.GetExpectedSealDurationFunc), modules.NewGetExpectedSealDurationFunc),
), ),

View File

@ -31,10 +31,9 @@ type StorageMiner struct {
Common Common
Dealmaking DealmakingConfig Dealmaking DealmakingConfig
Sealing SealingConfig
Storage sectorstorage.SealerConfig Storage sectorstorage.SealerConfig
Fees MinerFeeConfig Fees MinerFeeConfig
SealingDelay Duration
} }
type DealmakingConfig struct { type DealmakingConfig struct {
@ -48,6 +47,16 @@ type DealmakingConfig struct {
Filter string Filter string
} }
type SealingConfig struct {
// 0 = no limit
MaxWaitDealsSectors uint64
// includes failed, 0 = no limit
MaxSealingSectors uint64
WaitDealsDelay Duration
}
type MinerFeeConfig struct { type MinerFeeConfig struct {
MaxPreCommitGasFee types.FIL MaxPreCommitGasFee types.FIL
MaxCommitGasFee types.FIL MaxCommitGasFee types.FIL
@ -131,6 +140,12 @@ func DefaultStorageMiner() *StorageMiner {
cfg := &StorageMiner{ cfg := &StorageMiner{
Common: defCommon(), Common: defCommon(),
Sealing: SealingConfig{
MaxWaitDealsSectors: 2, // 64G with 32G sectors
MaxSealingSectors: 0,
WaitDealsDelay: Duration(time.Hour),
},
Storage: sectorstorage.SealerConfig{ Storage: sectorstorage.SealerConfig{
AllowAddPiece: true, AllowAddPiece: true,
AllowPreCommit1: true, AllowPreCommit1: true,
@ -158,8 +173,6 @@ func DefaultStorageMiner() *StorageMiner {
MaxCommitGasFee: types.FIL(types.BigDiv(types.FromFil(1), types.NewInt(20))), MaxCommitGasFee: types.FIL(types.BigDiv(types.FromFil(1), types.NewInt(20))),
MaxWindowPoStGasFee: types.FIL(types.FromFil(50)), MaxWindowPoStGasFee: types.FIL(types.FromFil(50)),
}, },
SealingDelay: Duration(time.Hour),
} }
cfg.Common.API.ListenAddress = "/ip4/127.0.0.1/tcp/2345/http" cfg.Common.API.ListenAddress = "/ip4/127.0.0.1/tcp/2345/http"
cfg.Common.API.RemoteListenAddress = "127.0.0.1:2345" cfg.Common.API.RemoteListenAddress = "127.0.0.1:2345"

View File

@ -62,8 +62,8 @@ type StorageMinerAPI struct {
SetConsiderOfflineStorageDealsConfigFunc dtypes.SetConsiderOfflineStorageDealsConfigFunc SetConsiderOfflineStorageDealsConfigFunc dtypes.SetConsiderOfflineStorageDealsConfigFunc
ConsiderOfflineRetrievalDealsConfigFunc dtypes.ConsiderOfflineRetrievalDealsConfigFunc ConsiderOfflineRetrievalDealsConfigFunc dtypes.ConsiderOfflineRetrievalDealsConfigFunc
SetConsiderOfflineRetrievalDealsConfigFunc dtypes.SetConsiderOfflineRetrievalDealsConfigFunc SetConsiderOfflineRetrievalDealsConfigFunc dtypes.SetConsiderOfflineRetrievalDealsConfigFunc
SetSealingDelayFunc dtypes.SetSealingDelayFunc SetSealingConfigFunc dtypes.SetSealingConfigFunc
GetSealingDelayFunc dtypes.GetSealingDelayFunc GetSealingConfigFunc dtypes.GetSealingConfigFunc
GetExpectedSealDurationFunc dtypes.GetExpectedSealDurationFunc GetExpectedSealDurationFunc dtypes.GetExpectedSealDurationFunc
SetExpectedSealDurationFunc dtypes.SetExpectedSealDurationFunc SetExpectedSealDurationFunc dtypes.SetExpectedSealDurationFunc
} }
@ -232,11 +232,22 @@ func (sm *StorageMinerAPI) SectorStartSealing(ctx context.Context, number abi.Se
} }
func (sm *StorageMinerAPI) SectorSetSealDelay(ctx context.Context, delay time.Duration) error { func (sm *StorageMinerAPI) SectorSetSealDelay(ctx context.Context, delay time.Duration) error {
return sm.SetSealingDelayFunc(delay) cfg, err := sm.GetSealingConfigFunc()
if err != nil {
return xerrors.Errorf("get config: %w", err)
}
cfg.WaitDealsDelay = delay
return sm.SetSealingConfigFunc(cfg)
} }
func (sm *StorageMinerAPI) SectorGetSealDelay(ctx context.Context) (time.Duration, error) { func (sm *StorageMinerAPI) SectorGetSealDelay(ctx context.Context) (time.Duration, error) {
return sm.GetSealingDelayFunc() cfg, err := sm.GetSealingConfigFunc()
if err != nil {
return 0, err
}
return cfg.WaitDealsDelay, nil
} }
func (sm *StorageMinerAPI) SectorSetExpectedSealDuration(ctx context.Context, delay time.Duration) error { func (sm *StorageMinerAPI) SectorSetExpectedSealDuration(ctx context.Context, delay time.Duration) error {

View File

@ -8,6 +8,7 @@ import (
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/storagemarket" "github.com/filecoin-project/go-fil-markets/storagemarket"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
) )
@ -56,10 +57,10 @@ type ConsiderOfflineRetrievalDealsConfigFunc func() (bool, error)
type SetConsiderOfflineRetrievalDealsConfigFunc func(bool) error type SetConsiderOfflineRetrievalDealsConfigFunc func(bool) error
// SetSealingDelay sets how long a sector waits for more deals before sealing begins. // SetSealingDelay sets how long a sector waits for more deals before sealing begins.
type SetSealingDelayFunc func(time.Duration) error type SetSealingConfigFunc func(sealing.Config) error
// GetSealingDelay returns how long a sector waits for more deals before sealing begins. // GetSealingDelay returns how long a sector waits for more deals before sealing begins.
type GetSealingDelayFunc func() (time.Duration, error) type GetSealingConfigFunc func() (sealing.Config, error)
// SetExpectedSealDurationFunc is a function which is used to set how long sealing is expected to take. // SetExpectedSealDurationFunc is a function which is used to set how long sealing is expected to take.
// Deals that would need to start earlier than this duration will be rejected. // Deals that would need to start earlier than this duration will be rejected.

View File

@ -141,8 +141,8 @@ func SectorIDCounter(ds dtypes.MetadataDS) sealing.SectorIDCounter {
return &sidsc{sc} return &sidsc{sc}
} }
func StorageMiner(fc config.MinerFeeConfig) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, api lapi.FullNode, h host.Host, ds dtypes.MetadataDS, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingDelayFunc) (*storage.Miner, error) { func StorageMiner(fc config.MinerFeeConfig) func(mctx helpers.MetricsCtx, lc fx.Lifecycle, api lapi.FullNode, h host.Host, ds dtypes.MetadataDS, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc) (*storage.Miner, error) {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, api lapi.FullNode, h host.Host, ds dtypes.MetadataDS, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingDelayFunc) (*storage.Miner, error) { return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, api lapi.FullNode, h host.Host, ds dtypes.MetadataDS, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc) (*storage.Miner, error) {
maddr, err := minerAddrFromDS(ds) maddr, err := minerAddrFromDS(ds)
if err != nil { if err != nil {
return nil, err return nil, err
@ -593,19 +593,27 @@ func NewSetConsiderOfflineRetrievalDealsConfigFunc(r repo.LockedRepo) (dtypes.Se
}, nil }, nil
} }
func NewSetSealDelayFunc(r repo.LockedRepo) (dtypes.SetSealingDelayFunc, error) { func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error) {
return func(delay time.Duration) (err error) { return func(cfg sealing.Config) (err error) {
err = mutateCfg(r, func(cfg *config.StorageMiner) { err = mutateCfg(r, func(c *config.StorageMiner) {
cfg.SealingDelay = config.Duration(delay) c.Sealing = config.SealingConfig{
MaxWaitDealsSectors: cfg.MaxWaitDealsSectors,
MaxSealingSectors: cfg.MaxSealingSectors,
WaitDealsDelay: config.Duration(cfg.WaitDealsDelay),
}
}) })
return return
}, nil }, nil
} }
func NewGetSealDelayFunc(r repo.LockedRepo) (dtypes.GetSealingDelayFunc, error) { func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error) {
return func() (out time.Duration, err error) { return func() (out sealing.Config, err error) {
err = readCfg(r, func(cfg *config.StorageMiner) { err = readCfg(r, func(cfg *config.StorageMiner) {
out = time.Duration(cfg.SealingDelay) out = sealing.Config{
MaxWaitDealsSectors: cfg.Sealing.MaxWaitDealsSectors,
MaxSealingSectors: cfg.Sealing.MaxSealingSectors,
WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay),
}
}) })
return return
}, nil }, nil

View File

@ -43,7 +43,7 @@ type Miner struct {
maddr address.Address maddr address.Address
worker address.Address worker address.Address
getSealDelay dtypes.GetSealingDelayFunc getSealConfig dtypes.GetSealingConfigFunc
sealing *sealing.Sealing sealing *sealing.Sealing
} }
@ -84,7 +84,7 @@ type storageMinerApi interface {
WalletHas(context.Context, address.Address) (bool, error) WalletHas(context.Context, address.Address) (bool, error)
} }
func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingDelayFunc, feeCfg config.MinerFeeConfig) (*Miner, error) { func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, ds datastore.Batching, sealer sectorstorage.SectorManager, sc sealing.SectorIDCounter, verif ffiwrapper.Verifier, gsd dtypes.GetSealingConfigFunc, feeCfg config.MinerFeeConfig) (*Miner, error) {
m := &Miner{ m := &Miner{
api: api, api: api,
feeCfg: feeCfg, feeCfg: feeCfg,
@ -96,7 +96,7 @@ func NewMiner(api storageMinerApi, maddr, worker address.Address, h host.Host, d
maddr: maddr, maddr: maddr,
worker: worker, worker: worker,
getSealDelay: gsd, getSealConfig: gsd,
} }
return m, nil return m, nil
@ -120,7 +120,7 @@ func (m *Miner) Run(ctx context.Context) error {
evts := events.NewEvents(ctx, m.api) evts := events.NewEvents(ctx, m.api)
adaptedAPI := NewSealingAPIAdapter(m.api) adaptedAPI := NewSealingAPIAdapter(m.api)
pcp := sealing.NewBasicPreCommitPolicy(adaptedAPI, miner.MaxSectorExpirationExtension-(miner.WPoStProvingPeriod*2), md.PeriodStart%miner.WPoStProvingPeriod) pcp := sealing.NewBasicPreCommitPolicy(adaptedAPI, miner.MaxSectorExpirationExtension-(miner.WPoStProvingPeriod*2), md.PeriodStart%miner.WPoStProvingPeriod)
m.sealing = sealing.New(adaptedAPI, fc, NewEventsAdapter(evts), m.maddr, m.ds, m.sealer, m.sc, m.verif, &pcp, sealing.GetSealingDelayFunc(m.getSealDelay)) m.sealing = sealing.New(adaptedAPI, fc, NewEventsAdapter(evts), m.maddr, m.ds, m.sealer, m.sc, m.verif, &pcp, sealing.GetSealingConfigFunc(m.getSealConfig))
go m.sealing.Run(ctx) //nolint:errcheck // logged intside the function go m.sealing.Run(ctx) //nolint:errcheck // logged intside the function