storageminer: Config for local worker task types

This commit is contained in:
Łukasz Magiera 2020-03-24 20:38:00 +01:00
parent 62cf9a5dd7
commit 5de17e903c
5 changed files with 27 additions and 10 deletions

View File

@ -407,7 +407,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api lapi.FullNode,
smgr, err := sectorstorage.New(ctx, lr, stores.NewIndex(), &sectorbuilder.Config{ smgr, err := sectorstorage.New(ctx, lr, stores.NewIndex(), &sectorbuilder.Config{
SealProofType: spt, SealProofType: spt,
PoStProofType: ppt, PoStProofType: ppt,
}, nil, api) }, config.Storage{true, true, true}, nil, api)
if err != nil { if err != nil {
return err return err
} }

View File

@ -382,7 +382,11 @@ func ConfigStorageMiner(c interface{}) Option {
return Error(xerrors.Errorf("invalid config from repo, got: %T", c)) return Error(xerrors.Errorf("invalid config from repo, got: %T", c))
} }
return Options(ConfigCommon(&cfg.Common)) return Options(
ConfigCommon(&cfg.Common),
Override(new(config.Storage), cfg.Storage),
)
} }
func Repo(r repo.Repo) Option { func Repo(r repo.Repo) Option {

View File

@ -53,6 +53,10 @@ type Metrics struct {
// // Storage Miner // // Storage Miner
type Storage struct { type Storage struct {
// Local worker config
AllowPreCommit1 bool
AllowPreCommit2 bool
AllowCommit bool
} }
func defCommon() Common { func defCommon() Common {
@ -86,7 +90,11 @@ func DefaultStorageMiner() *StorageMiner {
cfg := &StorageMiner{ cfg := &StorageMiner{
Common: defCommon(), Common: defCommon(),
Storage: Storage{}, Storage: Storage{
AllowPreCommit1: true,
AllowPreCommit2: true,
AllowCommit: true,
},
} }
cfg.Common.API.ListenAddress = "/ip4/127.0.0.1/tcp/2345/http" cfg.Common.API.ListenAddress = "/ip4/127.0.0.1/tcp/2345/http"
return cfg return cfg

View File

@ -44,6 +44,7 @@ import (
"github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/markets/retrievaladapter" "github.com/filecoin-project/lotus/markets/retrievaladapter"
"github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/miner"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/node/modules/helpers" "github.com/filecoin-project/lotus/node/modules/helpers"
"github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/node/repo"
@ -338,8 +339,8 @@ func RetrievalProvider(h host.Host, miner *storage.Miner, sealer sectorstorage.S
return retrievalimpl.NewProvider(address, adapter, network, pieceStore, ibs, ds) return retrievalimpl.NewProvider(address, adapter, network, pieceStore, ibs, ds)
} }
func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Config, urls sectorstorage.URLs, ca lapi.Common) (*sectorstorage.Manager, error) { func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Config, sc config.Storage, urls sectorstorage.URLs, ca lapi.Common) (*sectorstorage.Manager, error) {
ctx := helpers.LifecycleCtx(mctx, lc) ctx := helpers.LifecycleCtx(mctx, lc)
return sectorstorage.New(ctx, ls, si, cfg, urls, ca) return sectorstorage.New(ctx, ls, si, cfg, sc, urls, ca)
} }

View File

@ -74,7 +74,7 @@ type Manager struct {
schedQueue *list.List // List[*workerRequest] schedQueue *list.List // List[*workerRequest]
} }
func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Config, urls URLs, ca api.Common) (*Manager, error) { func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg *sectorbuilder.Config, sc config.Storage, urls URLs, ca api.Common) (*Manager, error) {
lstor, err := stores.NewLocal(ctx, ls, si, urls) lstor, err := stores.NewLocal(ctx, ls, si, urls)
if err != nil { if err != nil {
return nil, err return nil, err
@ -114,12 +114,16 @@ func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg
go m.runSched() go m.runSched()
localTasks := []sealtasks.TaskType{
sealtasks.TTAddPiece, sealtasks.TTCommit1, sealtasks.TTFinalize,
}
if sc.AllowPreCommit1 { localTasks = append(localTasks, sealtasks.TTPreCommit1)}
if sc.AllowPreCommit2 { localTasks = append(localTasks, sealtasks.TTPreCommit2)}
if sc.AllowCommit { localTasks = append(localTasks, sealtasks.TTCommit2)}
err = m.AddWorker(ctx, NewLocalWorker(WorkerConfig{ err = m.AddWorker(ctx, NewLocalWorker(WorkerConfig{
SealProof: cfg.SealProofType, SealProof: cfg.SealProofType,
TaskTypes: []sealtasks.TaskType{ TaskTypes: localTasks,
sealtasks.TTAddPiece, sealtasks.TTCommit1, sealtasks.TTFinalize,
sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2, // TODO: Config
},
}, stor, lstor, si)) }, stor, lstor, si))
if err != nil { if err != nil {
return nil, xerrors.Errorf("adding local worker: %w", err) return nil, xerrors.Errorf("adding local worker: %w", err)