diff --git a/cli/auth.go b/cli/auth.go index e6eeace89..caea4cb42 100644 --- a/cli/auth.go +++ b/cli/auth.go @@ -9,7 +9,6 @@ import ( "github.com/filecoin-project/go-jsonrpc/auth" "github.com/filecoin-project/lotus/api" - cliutil "github.com/filecoin-project/lotus/cli/util" "github.com/filecoin-project/lotus/node/repo" ) @@ -128,7 +127,7 @@ var AuthApiInfoToken = &cli.Command{ // TODO: Log in audit log when it is implemented - currentEnv, _, _ := cliutil.EnvsForAPIInfos(t) + currentEnv, _, _ := t.APIInfoEnvVars() fmt.Printf("%s=%s:%s\n", currentEnv, string(token), ainfo.Addr) return nil }, diff --git a/cli/util/api.go b/cli/util/api.go index d87817bb3..6c673d91f 100644 --- a/cli/util/api.go +++ b/cli/util/api.go @@ -28,63 +28,6 @@ const ( metadataTraceContext = "traceContext" ) -// flagsForAPI returns flags passed on the command line with the listen address -// of the API server (only used by the tests), in the order of precedence they -// should be applied for the requested kind of node. -func flagsForAPI(t repo.RepoType) []string { - switch t { - case repo.FullNode: - return []string{"api-url"} - case repo.StorageMiner: - return []string{"miner-api-url"} - case repo.Worker: - return []string{"worker-api-url"} - case repo.Markets: - // support split markets-miner and monolith deployments. - return []string{"markets-api-url", "miner-api-url"} - default: - panic(fmt.Sprintf("Unknown repo type: %v", t)) - } -} - -func flagsForRepo(t repo.RepoType) []string { - switch t { - case repo.FullNode: - return []string{"repo"} - case repo.StorageMiner: - return []string{"miner-repo"} - case repo.Worker: - return []string{"worker-repo"} - case repo.Markets: - // support split markets-miner and monolith deployments. - return []string{"markets-repo", "miner-repo"} - default: - panic(fmt.Sprintf("Unknown repo type: %v", t)) - } -} - -// EnvsForAPIInfos returns the environment variables to use in order of precedence -// to determine the API endpoint of the specified node type. -// -// It returns the current variables and deprecated ones separately, so that -// the user can log a warning when deprecated ones are found to be in use. -func EnvsForAPIInfos(t repo.RepoType) (primary string, fallbacks []string, deprecated []string) { - switch t { - case repo.FullNode: - return "FULLNODE_API_INFO", nil, nil - case repo.StorageMiner: - // TODO remove deprecated deprecation period - return "MINER_API_INFO", nil, []string{"STORAGE_API_INFO"} - case repo.Worker: - return "WORKER_API_INFO", nil, nil - case repo.Markets: - // support split markets-miner and monolith deployments. - return "MARKETS_API_INFO", []string{"MINER_API_INFO"}, nil - default: - panic(fmt.Sprintf("Unknown repo type: %v", t)) - } -} - // GetAPIInfo returns the API endpoint to use for the specified kind of repo. // // The order of precedence is as follows: @@ -96,8 +39,7 @@ func EnvsForAPIInfos(t repo.RepoType) (primary string, fallbacks []string, depre func GetAPIInfo(ctx *cli.Context, t repo.RepoType) (APIInfo, error) { // Check if there was a flag passed with the listen address of the API // server (only used by the tests) - apiFlags := flagsForAPI(t) - for _, f := range apiFlags { + for _, f := range t.APIFlags() { if !ctx.IsSet(f) { continue } @@ -111,7 +53,7 @@ func GetAPIInfo(ctx *cli.Context, t repo.RepoType) (APIInfo, error) { // Note: it is not correct/intuitive to prefer environment variables over // CLI flags (repo flags below). // - primaryEnv, fallbacksEnvs, deprecatedEnvs := EnvsForAPIInfos(t) + primaryEnv, fallbacksEnvs, deprecatedEnvs := t.APIInfoEnvVars() env, ok := os.LookupEnv(primaryEnv) if ok { return ParseApiInfo(env), nil @@ -125,8 +67,7 @@ func GetAPIInfo(ctx *cli.Context, t repo.RepoType) (APIInfo, error) { } } - repoFlags := flagsForRepo(t) - for _, f := range repoFlags { + for _, f := range t.RepoFlags() { // cannot use ctx.IsSet because it ignores default values path := ctx.String(f) if path == "" { @@ -175,13 +116,13 @@ func GetAPIInfo(ctx *cli.Context, t repo.RepoType) (APIInfo, error) { } } - return APIInfo{}, fmt.Errorf("could not determine API endpoint for node type: %v", t) + return APIInfo{}, fmt.Errorf("could not determine API endpoint for node type: %v", t.Type()) } func GetRawAPI(ctx *cli.Context, t repo.RepoType, version string) (string, http.Header, error) { ainfo, err := GetAPIInfo(ctx, t) if err != nil { - return "", nil, xerrors.Errorf("could not get API info for %s: %w", t, err) + return "", nil, xerrors.Errorf("could not get API info for %s: %w", t.Type(), err) } addr, err := ainfo.DialArgs(version) diff --git a/cmd/lotus-shed/datastore.go b/cmd/lotus-shed/datastore.go index 698e63324..7cdb2b1e6 100644 --- a/cmd/lotus-shed/datastore.go +++ b/cmd/lotus-shed/datastore.go @@ -40,10 +40,10 @@ var datastoreListCmd = &cli.Command{ Name: "list", Description: "list datastore keys", Flags: []cli.Flag{ - &cli.IntFlag{ + &cli.StringFlag{ Name: "repo-type", - Usage: "node type (1 - full, 2 - storage, 3 - worker)", - Value: 1, + Usage: "node type (FullNode, StorageMiner, Worker, Wallet)", + Value: "FullNode", }, &cli.BoolFlag{ Name: "top-level", @@ -71,7 +71,7 @@ var datastoreListCmd = &cli.Command{ return xerrors.Errorf("lotus repo doesn't exist") } - lr, err := r.Lock(repo.RepoType(cctx.Int("repo-type"))) + lr, err := r.Lock(repo.NewRepoTypeFromString(cctx.String("repo-type"))) if err != nil { return err } @@ -109,10 +109,10 @@ var datastoreGetCmd = &cli.Command{ Name: "get", Description: "list datastore keys", Flags: []cli.Flag{ - &cli.IntFlag{ + &cli.StringFlag{ Name: "repo-type", - Usage: "node type (1 - full, 2 - storage, 3 - worker)", - Value: 1, + Usage: "node type (FullNode, StorageMiner, Worker, Wallet)", + Value: "FullNode", }, &cli.StringFlag{ Name: "enc", @@ -137,7 +137,7 @@ var datastoreGetCmd = &cli.Command{ return xerrors.Errorf("lotus repo doesn't exist") } - lr, err := r.Lock(repo.RepoType(cctx.Int("repo-type"))) + lr, err := r.Lock(repo.NewRepoTypeFromString(cctx.String("repo-type"))) if err != nil { return err } diff --git a/cmd/lotus-shed/rpc.go b/cmd/lotus-shed/rpc.go index 81171916e..b253304a1 100644 --- a/cmd/lotus-shed/rpc.go +++ b/cmd/lotus-shed/rpc.go @@ -34,9 +34,11 @@ var rpcCmd = &cli.Command{ }, }, Action: func(cctx *cli.Context) error { - rt := repo.FullNode + var rt repo.RepoType if cctx.Bool("miner") { rt = repo.StorageMiner + } else { + rt = repo.FullNode } addr, headers, err := lcli.GetRawAPI(cctx, rt, cctx.String("version")) diff --git a/itests/batch_deal_test.go b/itests/batch_deal_test.go index 96aa5c62d..b3c77fcba 100644 --- a/itests/batch_deal_test.go +++ b/itests/batch_deal_test.go @@ -51,7 +51,8 @@ func TestBatchDealInput(t *testing.T) { })), node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) { return func() (sealiface.Config, error) { - sc := modules.ToSealingConfig(config.DefaultStorageMiner()) + cfg := config.DefaultStorageMiner() + sc := modules.ToSealingConfig(cfg.Dealmaking, cfg.Sealing) sc.MaxWaitDealsSectors = 2 sc.MaxSealingSectors = 1 sc.MaxSealingSectorsForDeals = 3 diff --git a/itests/sector_finalize_early_test.go b/itests/sector_finalize_early_test.go index 233bc8fcb..0f0fcdec6 100644 --- a/itests/sector_finalize_early_test.go +++ b/itests/sector_finalize_early_test.go @@ -39,7 +39,7 @@ func TestDealsWithFinalizeEarly(t *testing.T) { return func() (sealiface.Config, error) { cf := config.DefaultStorageMiner() cf.Sealing.FinalizeEarly = true - return modules.ToSealingConfig(cf), nil + return modules.ToSealingConfig(cf.Dealmaking, cf.Sealing), nil }, nil })))) // no mock proofs. ens.InterconnectAll().BeginMining(blockTime) diff --git a/itests/sector_miner_collateral_test.go b/itests/sector_miner_collateral_test.go index 81aefc3ad..22c04b5ea 100644 --- a/itests/sector_miner_collateral_test.go +++ b/itests/sector_miner_collateral_test.go @@ -42,7 +42,8 @@ func TestMinerBalanceCollateral(t *testing.T) { opts := kit.ConstructorOpts( node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) { return func() (sealiface.Config, error) { - sc := modules.ToSealingConfig(config.DefaultStorageMiner()) + cfg := config.DefaultStorageMiner() + sc := modules.ToSealingConfig(cfg.Dealmaking, cfg.Sealing) sc.MaxWaitDealsSectors = 4 sc.MaxSealingSectors = 4 diff --git a/node/builder.go b/node/builder.go index 6386f78a6..f0106ad97 100644 --- a/node/builder.go +++ b/node/builder.go @@ -231,8 +231,12 @@ func IsType(t repo.RepoType) func(s *Settings) bool { } func isFullOrLiteNode(s *Settings) bool { return s.nodeType == repo.FullNode } -func isFullNode(s *Settings) bool { return s.nodeType == repo.FullNode && !s.Lite } -func isLiteNode(s *Settings) bool { return s.nodeType == repo.FullNode && s.Lite } +func isFullNode(s *Settings) bool { + return s.nodeType == repo.FullNode && !s.Lite +} +func isLiteNode(s *Settings) bool { + return s.nodeType == repo.FullNode && s.Lite +} func Base() Option { return Options( diff --git a/node/builder_miner.go b/node/builder_miner.go index d24beff80..5b7beb50b 100644 --- a/node/builder_miner.go +++ b/node/builder_miner.go @@ -153,8 +153,8 @@ func ConfigStorageMiner(c interface{}) Option { Override(new(dtypes.RetrievalPricingFunc), modules.RetrievalPricingFunc(cfg.Dealmaking)), // DAG Store - Override(new(dagstore.MinerAPI), modules.NewMinerAPI), - Override(DAGStoreKey, modules.DAGStore), + Override(new(dagstore.MinerAPI), modules.NewMinerAPI(cfg.DAGStore)), + Override(DAGStoreKey, modules.DAGStore(cfg.DAGStore)), // Markets (retrieval) Override(new(dagstore.SectorAccessor), sectoraccessor.NewSectorAccessor), diff --git a/node/config/dynamic_config.go b/node/config/dynamic_config.go new file mode 100644 index 000000000..777ca4712 --- /dev/null +++ b/node/config/dynamic_config.go @@ -0,0 +1,27 @@ +package config + +type DealmakingConfiger interface { + GetDealmakingConfig() DealmakingConfig + SetDealmakingConfig(DealmakingConfig) +} + +func (c *StorageMiner) GetDealmakingConfig() DealmakingConfig { + return c.Dealmaking +} + +func (c *StorageMiner) SetDealmakingConfig(other DealmakingConfig) { + c.Dealmaking = other +} + +type SealingConfiger interface { + GetSealingConfig() SealingConfig + SetSealingConfig(SealingConfig) +} + +func (c *StorageMiner) GetSealingConfig() SealingConfig { + return c.Sealing +} + +func (c *StorageMiner) SetSealingConfig(other SealingConfig) { + c.Sealing = other +} diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index 05d41a427..179363e29 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -764,8 +764,9 @@ func StorageAuthWithURL(apiInfo string) func(ctx helpers.MetricsCtx, ca v0api.Co func NewConsiderOnlineStorageDealsConfigFunc(r repo.LockedRepo) (dtypes.ConsiderOnlineStorageDealsConfigFunc, error) { return func() (out bool, err error) { - err = readCfg(r, func(cfg *config.StorageMiner) { - out = cfg.Dealmaking.ConsiderOnlineStorageDeals + err = readDealmakingCfg(r, func(c config.DealmakingConfiger) { + cfg := c.GetDealmakingConfig() + out = cfg.ConsiderOnlineStorageDeals }) return }, nil @@ -773,8 +774,10 @@ func NewConsiderOnlineStorageDealsConfigFunc(r repo.LockedRepo) (dtypes.Consider func NewSetConsideringOnlineStorageDealsFunc(r repo.LockedRepo) (dtypes.SetConsiderOnlineStorageDealsConfigFunc, error) { return func(b bool) (err error) { - err = mutateCfg(r, func(cfg *config.StorageMiner) { - cfg.Dealmaking.ConsiderOnlineStorageDeals = b + err = mutateDealmakingCfg(r, func(c config.DealmakingConfiger) { + cfg := c.GetDealmakingConfig() + cfg.ConsiderOnlineStorageDeals = b + c.SetDealmakingConfig(cfg) }) return }, nil @@ -782,8 +785,9 @@ func NewSetConsideringOnlineStorageDealsFunc(r repo.LockedRepo) (dtypes.SetConsi func NewConsiderOnlineRetrievalDealsConfigFunc(r repo.LockedRepo) (dtypes.ConsiderOnlineRetrievalDealsConfigFunc, error) { return func() (out bool, err error) { - err = readCfg(r, func(cfg *config.StorageMiner) { - out = cfg.Dealmaking.ConsiderOnlineRetrievalDeals + err = readDealmakingCfg(r, func(c config.DealmakingConfiger) { + cfg := c.GetDealmakingConfig() + out = cfg.ConsiderOnlineRetrievalDeals }) return }, nil @@ -791,8 +795,10 @@ func NewConsiderOnlineRetrievalDealsConfigFunc(r repo.LockedRepo) (dtypes.Consid func NewSetConsiderOnlineRetrievalDealsConfigFunc(r repo.LockedRepo) (dtypes.SetConsiderOnlineRetrievalDealsConfigFunc, error) { return func(b bool) (err error) { - err = mutateCfg(r, func(cfg *config.StorageMiner) { - cfg.Dealmaking.ConsiderOnlineRetrievalDeals = b + err = mutateDealmakingCfg(r, func(c config.DealmakingConfiger) { + cfg := c.GetDealmakingConfig() + cfg.ConsiderOnlineRetrievalDeals = b + c.SetDealmakingConfig(cfg) }) return }, nil @@ -800,8 +806,9 @@ func NewSetConsiderOnlineRetrievalDealsConfigFunc(r repo.LockedRepo) (dtypes.Set func NewStorageDealPieceCidBlocklistConfigFunc(r repo.LockedRepo) (dtypes.StorageDealPieceCidBlocklistConfigFunc, error) { return func() (out []cid.Cid, err error) { - err = readCfg(r, func(cfg *config.StorageMiner) { - out = cfg.Dealmaking.PieceCidBlocklist + err = readDealmakingCfg(r, func(c config.DealmakingConfiger) { + cfg := c.GetDealmakingConfig() + out = cfg.PieceCidBlocklist }) return }, nil @@ -809,8 +816,10 @@ func NewStorageDealPieceCidBlocklistConfigFunc(r repo.LockedRepo) (dtypes.Storag func NewSetStorageDealPieceCidBlocklistConfigFunc(r repo.LockedRepo) (dtypes.SetStorageDealPieceCidBlocklistConfigFunc, error) { return func(blocklist []cid.Cid) (err error) { - err = mutateCfg(r, func(cfg *config.StorageMiner) { - cfg.Dealmaking.PieceCidBlocklist = blocklist + err = mutateDealmakingCfg(r, func(c config.DealmakingConfiger) { + cfg := c.GetDealmakingConfig() + cfg.PieceCidBlocklist = blocklist + c.SetDealmakingConfig(cfg) }) return }, nil @@ -818,8 +827,9 @@ func NewSetStorageDealPieceCidBlocklistConfigFunc(r repo.LockedRepo) (dtypes.Set func NewConsiderOfflineStorageDealsConfigFunc(r repo.LockedRepo) (dtypes.ConsiderOfflineStorageDealsConfigFunc, error) { return func() (out bool, err error) { - err = readCfg(r, func(cfg *config.StorageMiner) { - out = cfg.Dealmaking.ConsiderOfflineStorageDeals + err = readDealmakingCfg(r, func(c config.DealmakingConfiger) { + cfg := c.GetDealmakingConfig() + out = cfg.ConsiderOfflineStorageDeals }) return }, nil @@ -827,8 +837,10 @@ func NewConsiderOfflineStorageDealsConfigFunc(r repo.LockedRepo) (dtypes.Conside func NewSetConsideringOfflineStorageDealsFunc(r repo.LockedRepo) (dtypes.SetConsiderOfflineStorageDealsConfigFunc, error) { return func(b bool) (err error) { - err = mutateCfg(r, func(cfg *config.StorageMiner) { - cfg.Dealmaking.ConsiderOfflineStorageDeals = b + err = mutateDealmakingCfg(r, func(c config.DealmakingConfiger) { + cfg := c.GetDealmakingConfig() + cfg.ConsiderOfflineStorageDeals = b + c.SetDealmakingConfig(cfg) }) return }, nil @@ -836,8 +848,9 @@ func NewSetConsideringOfflineStorageDealsFunc(r repo.LockedRepo) (dtypes.SetCons func NewConsiderOfflineRetrievalDealsConfigFunc(r repo.LockedRepo) (dtypes.ConsiderOfflineRetrievalDealsConfigFunc, error) { return func() (out bool, err error) { - err = readCfg(r, func(cfg *config.StorageMiner) { - out = cfg.Dealmaking.ConsiderOfflineRetrievalDeals + err = readDealmakingCfg(r, func(c config.DealmakingConfiger) { + cfg := c.GetDealmakingConfig() + out = cfg.ConsiderOfflineRetrievalDeals }) return }, nil @@ -845,8 +858,10 @@ func NewConsiderOfflineRetrievalDealsConfigFunc(r repo.LockedRepo) (dtypes.Consi func NewSetConsiderOfflineRetrievalDealsConfigFunc(r repo.LockedRepo) (dtypes.SetConsiderOfflineRetrievalDealsConfigFunc, error) { return func(b bool) (err error) { - err = mutateCfg(r, func(cfg *config.StorageMiner) { - cfg.Dealmaking.ConsiderOfflineRetrievalDeals = b + err = mutateDealmakingCfg(r, func(c config.DealmakingConfiger) { + cfg := c.GetDealmakingConfig() + cfg.ConsiderOfflineRetrievalDeals = b + c.SetDealmakingConfig(cfg) }) return }, nil @@ -854,8 +869,9 @@ func NewSetConsiderOfflineRetrievalDealsConfigFunc(r repo.LockedRepo) (dtypes.Se func NewConsiderVerifiedStorageDealsConfigFunc(r repo.LockedRepo) (dtypes.ConsiderVerifiedStorageDealsConfigFunc, error) { return func() (out bool, err error) { - err = readCfg(r, func(cfg *config.StorageMiner) { - out = cfg.Dealmaking.ConsiderVerifiedStorageDeals + err = readDealmakingCfg(r, func(c config.DealmakingConfiger) { + cfg := c.GetDealmakingConfig() + out = cfg.ConsiderVerifiedStorageDeals }) return }, nil @@ -863,8 +879,10 @@ func NewConsiderVerifiedStorageDealsConfigFunc(r repo.LockedRepo) (dtypes.Consid func NewSetConsideringVerifiedStorageDealsFunc(r repo.LockedRepo) (dtypes.SetConsiderVerifiedStorageDealsConfigFunc, error) { return func(b bool) (err error) { - err = mutateCfg(r, func(cfg *config.StorageMiner) { - cfg.Dealmaking.ConsiderVerifiedStorageDeals = b + err = mutateDealmakingCfg(r, func(c config.DealmakingConfiger) { + cfg := c.GetDealmakingConfig() + cfg.ConsiderVerifiedStorageDeals = b + c.SetDealmakingConfig(cfg) }) return }, nil @@ -872,8 +890,9 @@ func NewSetConsideringVerifiedStorageDealsFunc(r repo.LockedRepo) (dtypes.SetCon func NewConsiderUnverifiedStorageDealsConfigFunc(r repo.LockedRepo) (dtypes.ConsiderUnverifiedStorageDealsConfigFunc, error) { return func() (out bool, err error) { - err = readCfg(r, func(cfg *config.StorageMiner) { - out = cfg.Dealmaking.ConsiderUnverifiedStorageDeals + err = readDealmakingCfg(r, func(c config.DealmakingConfiger) { + cfg := c.GetDealmakingConfig() + out = cfg.ConsiderUnverifiedStorageDeals }) return }, nil @@ -881,8 +900,10 @@ func NewConsiderUnverifiedStorageDealsConfigFunc(r repo.LockedRepo) (dtypes.Cons func NewSetConsideringUnverifiedStorageDealsFunc(r repo.LockedRepo) (dtypes.SetConsiderUnverifiedStorageDealsConfigFunc, error) { return func(b bool) (err error) { - err = mutateCfg(r, func(cfg *config.StorageMiner) { - cfg.Dealmaking.ConsiderUnverifiedStorageDeals = b + err = mutateDealmakingCfg(r, func(c config.DealmakingConfiger) { + cfg := c.GetDealmakingConfig() + cfg.ConsiderUnverifiedStorageDeals = b + c.SetDealmakingConfig(cfg) }) return }, nil @@ -890,8 +911,8 @@ func NewSetConsideringUnverifiedStorageDealsFunc(r repo.LockedRepo) (dtypes.SetC func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error) { return func(cfg sealiface.Config) (err error) { - err = mutateCfg(r, func(c *config.StorageMiner) { - c.Sealing = config.SealingConfig{ + err = mutateSealingCfg(r, func(c config.SealingConfiger) { + newCfg := config.SealingConfig{ MaxWaitDealsSectors: cfg.MaxWaitDealsSectors, MaxSealingSectors: cfg.MaxSealingSectors, MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals, @@ -922,51 +943,54 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error TerminateBatchMin: cfg.TerminateBatchMin, TerminateBatchWait: config.Duration(cfg.TerminateBatchWait), } + c.SetSealingConfig(newCfg) }) return }, nil } -func ToSealingConfig(cfg *config.StorageMiner) sealiface.Config { +func ToSealingConfig(dealmakingCfg config.DealmakingConfig, sealingCfg config.SealingConfig) sealiface.Config { return sealiface.Config{ - MaxWaitDealsSectors: cfg.Sealing.MaxWaitDealsSectors, - MaxSealingSectors: cfg.Sealing.MaxSealingSectors, - MaxSealingSectorsForDeals: cfg.Sealing.MaxSealingSectorsForDeals, - StartEpochSealingBuffer: abi.ChainEpoch(cfg.Dealmaking.StartEpochSealingBuffer), - MakeNewSectorForDeals: cfg.Dealmaking.MakeNewSectorForDeals, - CommittedCapacitySectorLifetime: time.Duration(cfg.Sealing.CommittedCapacitySectorLifetime), - WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay), - MakeCCSectorsAvailable: cfg.Sealing.MakeCCSectorsAvailable, - AlwaysKeepUnsealedCopy: cfg.Sealing.AlwaysKeepUnsealedCopy, - FinalizeEarly: cfg.Sealing.FinalizeEarly, + MaxWaitDealsSectors: sealingCfg.MaxWaitDealsSectors, + MaxSealingSectors: sealingCfg.MaxSealingSectors, + MaxSealingSectorsForDeals: sealingCfg.MaxSealingSectorsForDeals, + StartEpochSealingBuffer: abi.ChainEpoch(dealmakingCfg.StartEpochSealingBuffer), + MakeNewSectorForDeals: dealmakingCfg.MakeNewSectorForDeals, + CommittedCapacitySectorLifetime: time.Duration(sealingCfg.CommittedCapacitySectorLifetime), + WaitDealsDelay: time.Duration(sealingCfg.WaitDealsDelay), + MakeCCSectorsAvailable: sealingCfg.MakeCCSectorsAvailable, + AlwaysKeepUnsealedCopy: sealingCfg.AlwaysKeepUnsealedCopy, + FinalizeEarly: sealingCfg.FinalizeEarly, - CollateralFromMinerBalance: cfg.Sealing.CollateralFromMinerBalance, - AvailableBalanceBuffer: types.BigInt(cfg.Sealing.AvailableBalanceBuffer), - DisableCollateralFallback: cfg.Sealing.DisableCollateralFallback, + CollateralFromMinerBalance: sealingCfg.CollateralFromMinerBalance, + AvailableBalanceBuffer: types.BigInt(sealingCfg.AvailableBalanceBuffer), + DisableCollateralFallback: sealingCfg.DisableCollateralFallback, - BatchPreCommits: cfg.Sealing.BatchPreCommits, - MaxPreCommitBatch: cfg.Sealing.MaxPreCommitBatch, - PreCommitBatchWait: time.Duration(cfg.Sealing.PreCommitBatchWait), - PreCommitBatchSlack: time.Duration(cfg.Sealing.PreCommitBatchSlack), + BatchPreCommits: sealingCfg.BatchPreCommits, + MaxPreCommitBatch: sealingCfg.MaxPreCommitBatch, + PreCommitBatchWait: time.Duration(sealingCfg.PreCommitBatchWait), + PreCommitBatchSlack: time.Duration(sealingCfg.PreCommitBatchSlack), - AggregateCommits: cfg.Sealing.AggregateCommits, - MinCommitBatch: cfg.Sealing.MinCommitBatch, - MaxCommitBatch: cfg.Sealing.MaxCommitBatch, - CommitBatchWait: time.Duration(cfg.Sealing.CommitBatchWait), - CommitBatchSlack: time.Duration(cfg.Sealing.CommitBatchSlack), - AggregateAboveBaseFee: types.BigInt(cfg.Sealing.AggregateAboveBaseFee), - BatchPreCommitAboveBaseFee: types.BigInt(cfg.Sealing.BatchPreCommitAboveBaseFee), + AggregateCommits: sealingCfg.AggregateCommits, + MinCommitBatch: sealingCfg.MinCommitBatch, + MaxCommitBatch: sealingCfg.MaxCommitBatch, + CommitBatchWait: time.Duration(sealingCfg.CommitBatchWait), + CommitBatchSlack: time.Duration(sealingCfg.CommitBatchSlack), + AggregateAboveBaseFee: types.BigInt(sealingCfg.AggregateAboveBaseFee), + BatchPreCommitAboveBaseFee: types.BigInt(sealingCfg.BatchPreCommitAboveBaseFee), - TerminateBatchMax: cfg.Sealing.TerminateBatchMax, - TerminateBatchMin: cfg.Sealing.TerminateBatchMin, - TerminateBatchWait: time.Duration(cfg.Sealing.TerminateBatchWait), + TerminateBatchMax: sealingCfg.TerminateBatchMax, + TerminateBatchMin: sealingCfg.TerminateBatchMin, + TerminateBatchWait: time.Duration(sealingCfg.TerminateBatchWait), } } func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error) { return func() (out sealiface.Config, err error) { - err = readCfg(r, func(cfg *config.StorageMiner) { - out = ToSealingConfig(cfg) + err = readSealingCfg(r, func(dc config.DealmakingConfiger, sc config.SealingConfiger) { + scfg := sc.GetSealingConfig() + dcfg := dc.GetDealmakingConfig() + out = ToSealingConfig(dcfg, scfg) }) return }, nil @@ -974,8 +998,10 @@ func NewGetSealConfigFunc(r repo.LockedRepo) (dtypes.GetSealingConfigFunc, error func NewSetExpectedSealDurationFunc(r repo.LockedRepo) (dtypes.SetExpectedSealDurationFunc, error) { return func(delay time.Duration) (err error) { - err = mutateCfg(r, func(cfg *config.StorageMiner) { - cfg.Dealmaking.ExpectedSealDuration = config.Duration(delay) + err = mutateDealmakingCfg(r, func(c config.DealmakingConfiger) { + cfg := c.GetDealmakingConfig() + cfg.ExpectedSealDuration = config.Duration(delay) + c.SetDealmakingConfig(cfg) }) return }, nil @@ -983,8 +1009,9 @@ func NewSetExpectedSealDurationFunc(r repo.LockedRepo) (dtypes.SetExpectedSealDu func NewGetExpectedSealDurationFunc(r repo.LockedRepo) (dtypes.GetExpectedSealDurationFunc, error) { return func() (out time.Duration, err error) { - err = readCfg(r, func(cfg *config.StorageMiner) { - out = time.Duration(cfg.Dealmaking.ExpectedSealDuration) + err = readDealmakingCfg(r, func(c config.DealmakingConfiger) { + cfg := c.GetDealmakingConfig() + out = time.Duration(cfg.ExpectedSealDuration) }) return }, nil @@ -992,8 +1019,10 @@ func NewGetExpectedSealDurationFunc(r repo.LockedRepo) (dtypes.GetExpectedSealDu func NewSetMaxDealStartDelayFunc(r repo.LockedRepo) (dtypes.SetMaxDealStartDelayFunc, error) { return func(delay time.Duration) (err error) { - err = mutateCfg(r, func(cfg *config.StorageMiner) { - cfg.Dealmaking.MaxDealStartDelay = config.Duration(delay) + err = mutateDealmakingCfg(r, func(c config.DealmakingConfiger) { + cfg := c.GetDealmakingConfig() + cfg.MaxDealStartDelay = config.Duration(delay) + c.SetDealmakingConfig(cfg) }) return }, nil @@ -1001,22 +1030,60 @@ func NewSetMaxDealStartDelayFunc(r repo.LockedRepo) (dtypes.SetMaxDealStartDelay func NewGetMaxDealStartDelayFunc(r repo.LockedRepo) (dtypes.GetMaxDealStartDelayFunc, error) { return func() (out time.Duration, err error) { - err = readCfg(r, func(cfg *config.StorageMiner) { - out = time.Duration(cfg.Dealmaking.MaxDealStartDelay) + err = readDealmakingCfg(r, func(c config.DealmakingConfiger) { + cfg := c.GetDealmakingConfig() + out = time.Duration(cfg.MaxDealStartDelay) }) return }, nil } -func readCfg(r repo.LockedRepo, accessor func(*config.StorageMiner)) error { +func readSealingCfg(r repo.LockedRepo, accessor func(config.DealmakingConfiger, config.SealingConfiger)) error { raw, err := r.Config() if err != nil { return err } - cfg, ok := raw.(*config.StorageMiner) + scfg, ok := raw.(config.SealingConfiger) if !ok { - return xerrors.New("expected address of config.StorageMiner") + return xerrors.New("expected config with sealing config trait") + } + + dcfg, ok := raw.(config.DealmakingConfiger) + if !ok { + return xerrors.New("expected config with dealmaking config trait") + } + + accessor(dcfg, scfg) + + return nil +} + +func mutateSealingCfg(r repo.LockedRepo, mutator func(config.SealingConfiger)) error { + var typeErr error + + setConfigErr := r.SetConfig(func(raw interface{}) { + cfg, ok := raw.(config.SealingConfiger) + if !ok { + typeErr = errors.New("expected config with sealing config trait") + return + } + + mutator(cfg) + }) + + return multierr.Combine(typeErr, setConfigErr) +} + +func readDealmakingCfg(r repo.LockedRepo, accessor func(config.DealmakingConfiger)) error { + raw, err := r.Config() + if err != nil { + return err + } + + cfg, ok := raw.(config.DealmakingConfiger) + if !ok { + return xerrors.New("expected config with dealmaking config trait") } accessor(cfg) @@ -1024,13 +1091,13 @@ func readCfg(r repo.LockedRepo, accessor func(*config.StorageMiner)) error { return nil } -func mutateCfg(r repo.LockedRepo, mutator func(*config.StorageMiner)) error { +func mutateDealmakingCfg(r repo.LockedRepo, mutator func(config.DealmakingConfiger)) error { var typeErr error setConfigErr := r.SetConfig(func(raw interface{}) { - cfg, ok := raw.(*config.StorageMiner) + cfg, ok := raw.(config.DealmakingConfiger) if !ok { - typeErr = errors.New("expected miner config") + typeErr = errors.New("expected config with dealmaking config trait") return } diff --git a/node/modules/storageminer_dagstore.go b/node/modules/storageminer_dagstore.go index e90c3bd90..84f7b33b6 100644 --- a/node/modules/storageminer_dagstore.go +++ b/node/modules/storageminer_dagstore.go @@ -25,88 +25,70 @@ const ( ) // NewMinerAPI creates a new MinerAPI adaptor for the dagstore mounts. -func NewMinerAPI(lc fx.Lifecycle, r repo.LockedRepo, pieceStore dtypes.ProviderPieceStore, sa mdagstore.SectorAccessor) (mdagstore.MinerAPI, error) { - cfg, err := extractDAGStoreConfig(r) - if err != nil { - return nil, err - } - - // caps the amount of concurrent calls to the storage, so that we don't - // spam it during heavy processes like bulk migration. - if v, ok := os.LookupEnv("LOTUS_DAGSTORE_MOUNT_CONCURRENCY"); ok { - concurrency, err := strconv.Atoi(v) - if err == nil { - cfg.MaxConcurrencyStorageCalls = concurrency - } - } - - mountApi := mdagstore.NewMinerAPI(pieceStore, sa, cfg.MaxConcurrencyStorageCalls, cfg.MaxConcurrentUnseals) - ready := make(chan error, 1) - pieceStore.OnReady(func(err error) { - ready <- err - }) - lc.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { - if err := <-ready; err != nil { - return fmt.Errorf("aborting dagstore start; piecestore failed to start: %s", err) +func NewMinerAPI(cfg config.DAGStoreConfig) func(fx.Lifecycle, repo.LockedRepo, dtypes.ProviderPieceStore, mdagstore.SectorAccessor) (mdagstore.MinerAPI, error) { + return func(lc fx.Lifecycle, r repo.LockedRepo, pieceStore dtypes.ProviderPieceStore, sa mdagstore.SectorAccessor) (mdagstore.MinerAPI, error) { + // caps the amount of concurrent calls to the storage, so that we don't + // spam it during heavy processes like bulk migration. + if v, ok := os.LookupEnv("LOTUS_DAGSTORE_MOUNT_CONCURRENCY"); ok { + concurrency, err := strconv.Atoi(v) + if err == nil { + cfg.MaxConcurrencyStorageCalls = concurrency } - return mountApi.Start(ctx) - }, - OnStop: func(context.Context) error { - return nil - }, - }) + } - return mountApi, nil + mountApi := mdagstore.NewMinerAPI(pieceStore, sa, cfg.MaxConcurrencyStorageCalls, cfg.MaxConcurrentUnseals) + ready := make(chan error, 1) + pieceStore.OnReady(func(err error) { + ready <- err + }) + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + if err := <-ready; err != nil { + return fmt.Errorf("aborting dagstore start; piecestore failed to start: %s", err) + } + return mountApi.Start(ctx) + }, + OnStop: func(context.Context) error { + return nil + }, + }) + + return mountApi, nil + } } // DAGStore constructs a DAG store using the supplied minerAPI, and the // user configuration. It returns both the DAGStore and the Wrapper suitable for // passing to markets. -func DAGStore(lc fx.Lifecycle, r repo.LockedRepo, minerAPI mdagstore.MinerAPI, h host.Host) (*dagstore.DAGStore, *mdagstore.Wrapper, error) { - cfg, err := extractDAGStoreConfig(r) - if err != nil { - return nil, nil, err - } - - // fall back to default root directory if not explicitly set in the config. - if cfg.RootDir == "" { - cfg.RootDir = filepath.Join(r.Path(), DefaultDAGStoreDir) - } - - v, ok := os.LookupEnv(EnvDAGStoreCopyConcurrency) - if ok { - concurrency, err := strconv.Atoi(v) - if err == nil { - cfg.MaxConcurrentReadyFetches = concurrency +func DAGStore(cfg config.DAGStoreConfig) func(lc fx.Lifecycle, r repo.LockedRepo, minerAPI mdagstore.MinerAPI, h host.Host) (*dagstore.DAGStore, *mdagstore.Wrapper, error) { + return func(lc fx.Lifecycle, r repo.LockedRepo, minerAPI mdagstore.MinerAPI, h host.Host) (*dagstore.DAGStore, *mdagstore.Wrapper, error) { + // fall back to default root directory if not explicitly set in the config. + if cfg.RootDir == "" { + cfg.RootDir = filepath.Join(r.Path(), DefaultDAGStoreDir) } + + v, ok := os.LookupEnv(EnvDAGStoreCopyConcurrency) + if ok { + concurrency, err := strconv.Atoi(v) + if err == nil { + cfg.MaxConcurrentReadyFetches = concurrency + } + } + + dagst, w, err := mdagstore.NewDAGStore(cfg, minerAPI, h) + if err != nil { + return nil, nil, xerrors.Errorf("failed to create DAG store: %w", err) + } + + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + return w.Start(ctx) + }, + OnStop: func(context.Context) error { + return w.Close() + }, + }) + + return dagst, w, nil } - - dagst, w, err := mdagstore.NewDAGStore(cfg, minerAPI, h) - if err != nil { - return nil, nil, xerrors.Errorf("failed to create DAG store: %w", err) - } - - lc.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { - return w.Start(ctx) - }, - OnStop: func(context.Context) error { - return w.Close() - }, - }) - - return dagst, w, nil -} - -func extractDAGStoreConfig(r repo.LockedRepo) (config.DAGStoreConfig, error) { - cfg, err := r.Config() - if err != nil { - return config.DAGStoreConfig{}, xerrors.Errorf("could not load config: %w", err) - } - mcfg, ok := cfg.(*config.StorageMiner) - if !ok { - return config.DAGStoreConfig{}, xerrors.Errorf("config not expected type; expected config.StorageMiner, got: %T", cfg) - } - return mcfg.DAGStore, nil } diff --git a/node/repo/fsrepo.go b/node/repo/fsrepo.go index 5c1c91bc5..9688a518b 100644 --- a/node/repo/fsrepo.go +++ b/node/repo/fsrepo.go @@ -41,47 +41,174 @@ const ( fsKeystore = "keystore" ) -type RepoType int - -const ( - _ = iota // Default is invalid - FullNode RepoType = iota - StorageMiner - Worker - Wallet - Markets -) - -func (t RepoType) String() string { - s := [...]string{ - "__invalid__", - "FullNode", - "StorageMiner", - "Worker", - "Wallet", - "Markets", +func NewRepoTypeFromString(t string) RepoType { + switch t { + case "FullNode": + return FullNode + case "StorageMiner": + return StorageMiner + case "Worker": + return Worker + case "Wallet": + return Wallet + default: + panic("unknown RepoType") } - if t < 0 || int(t) > len(s) { - return "__invalid__" - } - return s[t] } -func defConfForType(t RepoType) interface{} { - switch t { - case FullNode: - return config.DefaultFullNode() - case StorageMiner, Markets: - // markets is a specialised miner service - // this taxonomy needs to be cleaned up - return config.DefaultStorageMiner() - case Worker: - return &struct{}{} - case Wallet: - return &struct{}{} - default: - panic(fmt.Sprintf("unknown RepoType(%d)", int(t))) - } +type RepoType interface { + Type() string + Config() interface{} + + // APIFlags returns flags passed on the command line with the listen address + // of the API server (only used by the tests), in the order of precedence they + // should be applied for the requested kind of node. + APIFlags() []string + + RepoFlags() []string + + // APIInfoEnvVars returns the environment variables to use in order of precedence + // to determine the API endpoint of the specified node type. + // + // It returns the current variables and deprecated ones separately, so that + // the user can log a warning when deprecated ones are found to be in use. + APIInfoEnvVars() (string, []string, []string) +} + +// SupportsStagingDeals is a trait for services that support staging deals +type SupportsStagingDeals interface { + SupportsStagingDeals() +} + +var FullNode fullNode + +type fullNode struct { +} + +func (fullNode) Type() string { + return "FullNode" +} + +func (fullNode) Config() interface{} { + return config.DefaultFullNode() +} + +func (fullNode) APIFlags() []string { + return []string{"api-url"} +} + +func (fullNode) RepoFlags() []string { + return []string{"repo"} +} + +func (fullNode) APIInfoEnvVars() (primary string, fallbacks []string, deprecated []string) { + return "FULLNODE_API_INFO", nil, nil +} + +var StorageMiner storageMiner + +type storageMiner struct{} + +func (storageMiner) SupportsStagingDeals() {} + +func (storageMiner) Type() string { + return "StorageMiner" +} + +func (storageMiner) Config() interface{} { + return config.DefaultStorageMiner() +} + +func (storageMiner) APIFlags() []string { + return []string{"miner-api-url"} +} + +func (storageMiner) RepoFlags() []string { + return []string{"miner-repo"} +} + +func (storageMiner) APIInfoEnvVars() (primary string, fallbacks []string, deprecated []string) { + // TODO remove deprecated deprecation period + return "MINER_API_INFO", nil, []string{"STORAGE_API_INFO"} +} + +var Markets markets + +type markets struct{} + +func (markets) SupportsStagingDeals() {} + +func (markets) Type() string { + return "Markets" +} + +func (markets) Config() interface{} { + return config.DefaultStorageMiner() +} + +func (markets) APIFlags() []string { + // support split markets-miner and monolith deployments. + return []string{"markets-api-url", "miner-api-url"} +} + +func (markets) RepoFlags() []string { + // support split markets-miner and monolith deployments. + return []string{"markets-repo", "miner-repo"} +} + +func (markets) APIInfoEnvVars() (primary string, fallbacks []string, deprecated []string) { + // support split markets-miner and monolith deployments. + return "MARKETS_API_INFO", []string{"MINER_API_INFO"}, nil +} + +type worker struct { +} + +var Worker worker + +func (worker) Type() string { + return "Worker" +} + +func (worker) Config() interface{} { + return &struct{}{} +} + +func (worker) APIFlags() []string { + return []string{"worker-api-url"} +} + +func (worker) RepoFlags() []string { + return []string{"worker-repo"} +} + +func (worker) APIInfoEnvVars() (primary string, fallbacks []string, deprecated []string) { + return "WORKER_API_INFO", nil, nil +} + +var Wallet wallet + +type wallet struct { +} + +func (wallet) Type() string { + return "Wallet" +} + +func (wallet) Config() interface{} { + return &struct{}{} +} + +func (wallet) APIFlags() []string { + panic("not supported") +} + +func (wallet) RepoFlags() []string { + panic("not supported") +} + +func (wallet) APIInfoEnvVars() (primary string, fallbacks []string, deprecated []string) { + panic("not supported") } var log = logging.Logger("repo") @@ -165,7 +292,7 @@ func (fsr *FsRepo) initConfig(t RepoType) error { return err } - comm, err := config.ConfigComment(defConfForType(t)) + comm, err := config.ConfigComment(t.Config()) if err != nil { return xerrors.Errorf("comment: %w", err) } @@ -406,7 +533,7 @@ func (fsr *fsLockedRepo) Config() (interface{}, error) { } func (fsr *fsLockedRepo) loadConfigFromDisk() (interface{}, error) { - return config.FromFile(fsr.configPath, defConfForType(fsr.repoType)) + return config.FromFile(fsr.configPath, fsr.repoType.Config()) } func (fsr *fsLockedRepo) SetConfig(c func(interface{})) error { diff --git a/node/repo/memrepo.go b/node/repo/memrepo.go index 3be4ab567..977eec8b6 100644 --- a/node/repo/memrepo.go +++ b/node/repo/memrepo.go @@ -36,9 +36,6 @@ type MemRepo struct { keystore map[string]types.KeyInfo blockstore blockstore.Blockstore - // given a repo type, produce the default config - configF func(t RepoType) interface{} - // holds the current config value config struct { sync.Mutex @@ -106,7 +103,7 @@ func (lmem *lockedMemRepo) Path() string { panic(err) // only used in tests, probably fine } - if lmem.t == StorageMiner { + if _, ok := lmem.t.(SupportsStagingDeals); ok { // this is required due to the method makeDealStaging from cmd/lotus-storage-miner/init.go // deal-staging is the directory deal files are staged in before being sealed into sectors // for offline deal flow. @@ -144,7 +141,6 @@ var _ Repo = &MemRepo{} // MemRepoOptions contains options for memory repo type MemRepoOptions struct { Ds datastore.Datastore - ConfigF func(RepoType) interface{} KeyStore map[string]types.KeyInfo } @@ -155,9 +151,6 @@ func NewMemory(opts *MemRepoOptions) *MemRepo { if opts == nil { opts = &MemRepoOptions{} } - if opts.ConfigF == nil { - opts.ConfigF = defConfForType - } if opts.Ds == nil { opts.Ds = dssync.MutexWrap(datastore.NewMapDatastore()) } @@ -169,7 +162,6 @@ func NewMemory(opts *MemRepoOptions) *MemRepo { repoLock: make(chan struct{}, 1), blockstore: blockstore.WrapIDStore(blockstore.NewMemorySync()), datastore: opts.Ds, - configF: opts.ConfigF, keystore: opts.KeyStore, } } @@ -284,7 +276,7 @@ func (lmem *lockedMemRepo) Config() (interface{}, error) { defer lmem.mem.config.Unlock() if lmem.mem.config.val == nil { - lmem.mem.config.val = lmem.mem.configF(lmem.t) + lmem.mem.config.val = lmem.t.Config() } return lmem.mem.config.val, nil @@ -299,7 +291,7 @@ func (lmem *lockedMemRepo) SetConfig(c func(interface{})) error { defer lmem.mem.config.Unlock() if lmem.mem.config.val == nil { - lmem.mem.config.val = lmem.mem.configF(lmem.t) + lmem.mem.config.val = lmem.t.Config() } c(lmem.mem.config.val)