refactor RepoType

This commit is contained in:
Anton Evangelatov 2022-02-10 17:33:38 +01:00
parent 6b06c1b5d3
commit e3edab66e3
50 changed files with 248 additions and 224 deletions

View File

@ -121,7 +121,7 @@ func NewGeneratorWithSectorsAndUpgradeSchedule(numSectors int, us stmgr.UpgradeS
policy.SetSupportedProofTypes(abi.RegisteredSealProof_StackedDrg2KiBV1)
mr := repo.NewMemory(nil)
lr, err := mr.Lock(repo.StorageMiner)
lr, err := mr.Lock(repo.StorageMinerRepoType{})
if err != nil {
return nil, xerrors.Errorf("taking mem-repo lock failed: %w", err)
}

View File

@ -48,7 +48,7 @@ func BenchmarkGetRandomness(b *testing.B) {
b.Fatal(err)
}
lr, err := r.Lock(repo.FullNode)
lr, err := r.Lock(repo.FullNodeRepoType{})
if err != nil {
b.Fatal(err)
}

View File

@ -114,7 +114,7 @@ var AuthApiInfoToken = &cli.Command{
ti, ok := cctx.App.Metadata["repoType"]
if !ok {
log.Errorf("unknown repo type, are you sure you want to use GetCommonAPI?")
ti = repo.FullNode
ti = repo.FullNodeRepoType{}
}
t, ok := ti.(repo.RepoType)
if !ok {

View File

@ -403,7 +403,7 @@ var clientRetrieveCatCmd = &cli.Command{
return ShowHelp(cctx, fmt.Errorf("incorrect number of arguments"))
}
ainfo, err := GetAPIInfo(cctx, repo.FullNode)
ainfo, err := GetAPIInfo(cctx, repo.FullNodeRepoType{})
if err != nil {
return xerrors.Errorf("could not get API info: %w", err)
}
@ -482,7 +482,7 @@ var clientRetrieveLsCmd = &cli.Command{
return ShowHelp(cctx, fmt.Errorf("incorrect number of arguments"))
}
ainfo, err := GetAPIInfo(cctx, repo.FullNode)
ainfo, err := GetAPIInfo(cctx, repo.FullNodeRepoType{})
if err != nil {
return xerrors.Errorf("could not get API info: %w", err)
}

View File

@ -26,7 +26,7 @@ var PprofGoroutines = &cli.Command{
ti, ok := cctx.App.Metadata["repoType"]
if !ok {
log.Errorf("unknown repo type, are you sure you want to use GetAPI?")
ti = repo.FullNode
ti = repo.FullNodeRepoType{}
}
t, ok := ti.(repo.RepoType)
if !ok {

View File

@ -32,14 +32,14 @@ const (
// of the API server (only used by the tests), in the order of precedence they
// should be applied for the requested kind of node.
func flagsForAPI(t repo.RepoType) []string {
switch t {
case repo.FullNode:
switch t.Type() {
case "FullNode":
return []string{"api-url"}
case repo.StorageMiner:
case "StorageMiner":
return []string{"miner-api-url"}
case repo.Worker:
case "Worker":
return []string{"worker-api-url"}
case repo.Markets:
case "Markets":
// support split markets-miner and monolith deployments.
return []string{"markets-api-url", "miner-api-url"}
default:
@ -48,14 +48,14 @@ func flagsForAPI(t repo.RepoType) []string {
}
func flagsForRepo(t repo.RepoType) []string {
switch t {
case repo.FullNode:
switch t.Type() {
case "FullNode":
return []string{"repo"}
case repo.StorageMiner:
case "StorageMiner":
return []string{"miner-repo"}
case repo.Worker:
case "Worker":
return []string{"worker-repo"}
case repo.Markets:
case "Markets":
// support split markets-miner and monolith deployments.
return []string{"markets-repo", "miner-repo"}
default:
@ -69,15 +69,15 @@ func flagsForRepo(t repo.RepoType) []string {
// It returns the current variables and deprecated ones separately, so that
// the user can log a warning when deprecated ones are found to be in use.
func EnvsForAPIInfos(t repo.RepoType) (primary string, fallbacks []string, deprecated []string) {
switch t {
case repo.FullNode:
switch t.Type() {
case "FullNode":
return "FULLNODE_API_INFO", nil, nil
case repo.StorageMiner:
case "StorageMiner":
// TODO remove deprecated deprecation period
return "MINER_API_INFO", nil, []string{"STORAGE_API_INFO"}
case repo.Worker:
case "Worker":
return "WORKER_API_INFO", nil, nil
case repo.Markets:
case "Markets":
// support split markets-miner and monolith deployments.
return "MARKETS_API_INFO", []string{"MINER_API_INFO"}, nil
default:
@ -200,7 +200,7 @@ func GetCommonAPI(ctx *cli.Context) (api.CommonNet, jsonrpc.ClientCloser, error)
ti, ok := ctx.App.Metadata["repoType"]
if !ok {
log.Errorf("unknown repo type, are you sure you want to use GetCommonAPI?")
ti = repo.FullNode
ti = repo.FullNodeRepoType{}
}
t, ok := ti.(repo.RepoType)
if !ok {
@ -232,7 +232,7 @@ func GetFullNodeAPI(ctx *cli.Context) (v0api.FullNode, jsonrpc.ClientCloser, err
return &v0api.WrapperV1Full{FullNode: tn.(v1api.FullNode)}, func() {}, nil
}
addr, headers, err := GetRawAPI(ctx, repo.FullNode, "v0")
addr, headers, err := GetRawAPI(ctx, repo.FullNodeRepoType{}, "v0")
if err != nil {
return nil, nil, err
}
@ -249,7 +249,7 @@ func GetFullNodeAPIV1(ctx *cli.Context) (v1api.FullNode, jsonrpc.ClientCloser, e
return tn.(v1api.FullNode), func() {}, nil
}
addr, headers, err := GetRawAPI(ctx, repo.FullNode, "v1")
addr, headers, err := GetRawAPI(ctx, repo.FullNodeRepoType{}, "v1")
if err != nil {
return nil, nil, err
}
@ -293,7 +293,7 @@ func GetStorageMinerAPI(ctx *cli.Context, opts ...GetStorageMinerOption) (api.St
return tn.(api.StorageMiner), func() {}, nil
}
addr, headers, err := GetRawAPI(ctx, repo.StorageMiner, "v0")
addr, headers, err := GetRawAPI(ctx, repo.StorageMinerRepoType{}, "v0")
if err != nil {
return nil, nil, err
}
@ -322,7 +322,7 @@ func GetStorageMinerAPI(ctx *cli.Context, opts ...GetStorageMinerOption) (api.St
}
func GetWorkerAPI(ctx *cli.Context) (api.Worker, jsonrpc.ClientCloser, error) {
addr, headers, err := GetRawAPI(ctx, repo.Worker, "v0")
addr, headers, err := GetRawAPI(ctx, repo.WorkerRepoType{}, "v0")
if err != nil {
return nil, nil, err
}
@ -340,7 +340,7 @@ func GetMarketsAPI(ctx *cli.Context) (api.StorageMiner, jsonrpc.ClientCloser, er
return tn.(api.StorageMiner), func() {}, nil
}
addr, headers, err := GetRawAPI(ctx, repo.Markets, "v0")
addr, headers, err := GetRawAPI(ctx, repo.MarketsRepoType{}, "v0")
if err != nil {
return nil, nil, err
}
@ -356,7 +356,7 @@ func GetMarketsAPI(ctx *cli.Context) (api.StorageMiner, jsonrpc.ClientCloser, er
}
func GetGatewayAPI(ctx *cli.Context) (api.Gateway, jsonrpc.ClientCloser, error) {
addr, headers, err := GetRawAPI(ctx, repo.FullNode, "v1")
addr, headers, err := GetRawAPI(ctx, repo.FullNodeRepoType{}, "v1")
if err != nil {
return nil, nil, err
}
@ -369,7 +369,7 @@ func GetGatewayAPI(ctx *cli.Context) (api.Gateway, jsonrpc.ClientCloser, error)
}
func GetGatewayAPIV0(ctx *cli.Context) (v0api.Gateway, jsonrpc.ClientCloser, error) {
addr, headers, err := GetRawAPI(ctx, repo.FullNode, "v0")
addr, headers, err := GetRawAPI(ctx, repo.FullNodeRepoType{}, "v0")
if err != nil {
return nil, nil, err
}

View File

@ -40,7 +40,7 @@ func TestWorkerKeyChange(t *testing.T) {
run := func(cmd *cli.Command, args ...string) error {
app := cli.NewApp()
app.Metadata = map[string]interface{}{
"repoType": repo.StorageMiner,
"repoType": repo.StorageMinerRepoType{},
"testnode-full": client1,
"testnode-storage": miner,
}

View File

@ -36,7 +36,7 @@ func TestMinerAllInfo(t *testing.T) {
run := func(t *testing.T) {
app := cli.NewApp()
app.Metadata = map[string]interface{}{
"repoType": repo.StorageMiner,
"repoType": repo.StorageMinerRepoType{},
"testnode-full": client,
"testnode-storage": miner,
}

View File

@ -9,6 +9,6 @@ import (
"github.com/filecoin-project/lotus/node/repo"
)
var backupCmd = lcli.BackupCmd(FlagMinerRepo, repo.StorageMiner, func(cctx *cli.Context) (lcli.BackupAPI, jsonrpc.ClientCloser, error) {
var backupCmd = lcli.BackupCmd(FlagMinerRepo, repo.StorageMinerRepoType{}, func(cctx *cli.Context) (lcli.BackupAPI, jsonrpc.ClientCloser, error) {
return lcli.GetStorageMinerAPI(cctx)
})

View File

@ -66,7 +66,7 @@ var configUpdateCmd = &cli.Command{
return xerrors.Errorf("repo not initialized")
}
lr, err := r.LockRO(repo.StorageMiner)
lr, err := r.LockRO(repo.StorageMinerRepoType{})
if err != nil {
return xerrors.Errorf("locking repo: %w", err)
}

View File

@ -203,12 +203,12 @@ var initCmd = &cli.Command{
log.Info("Initializing repo")
if err := r.Init(repo.StorageMiner); err != nil {
if err := r.Init(repo.StorageMinerRepoType{}); err != nil {
return err
}
{
lr, err := r.Lock(repo.StorageMiner)
lr, err := r.Lock(repo.StorageMinerRepoType{})
if err != nil {
return err
}
@ -410,7 +410,7 @@ func findMarketDealID(ctx context.Context, api v1api.FullNode, deal market2.Deal
}
func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode, r repo.Repo, ssize abi.SectorSize, gasPrice types.BigInt) error {
lr, err := r.Lock(repo.StorageMiner)
lr, err := r.Lock(repo.StorageMinerRepoType{})
if err != nil {
return err
}

View File

@ -159,11 +159,11 @@ func restore(ctx context.Context, cctx *cli.Context, targetPath string, strConfi
log.Info("Initializing repo")
if err := r.Init(repo.StorageMiner); err != nil {
if err := r.Init(repo.StorageMinerRepoType{}); err != nil {
return err
}
lr, err := r.Lock(repo.StorageMiner)
lr, err := r.Lock(repo.StorageMinerRepoType{})
if err != nil {
return err
}

View File

@ -93,7 +93,7 @@ func main() {
return err
}
}
c.App.Metadata["repoType"] = repo.Markets
c.App.Metadata["repoType"] = repo.MarketsRepoType{}
return nil
}
@ -150,7 +150,7 @@ func main() {
// this command is explicitly called on markets, inform
// common commands by overriding the repoType.
if c.Bool("call-on-markets") {
c.App.Metadata["repoType"] = repo.Markets
c.App.Metadata["repoType"] = repo.MarketsRepoType{}
}
return nil
},
@ -164,7 +164,7 @@ func main() {
},
}
app.Setup()
app.Metadata["repoType"] = repo.StorageMiner
app.Metadata["repoType"] = repo.StorageMinerRepoType{}
lcli.RunApp(app)
}

View File

@ -119,7 +119,7 @@ var runCmd = &cli.Command{
return xerrors.Errorf("repo at '%s' is not initialized, run 'lotus-miner init' to set it up", minerRepoPath)
}
lr, err := r.Lock(repo.StorageMiner)
lr, err := r.Lock(repo.StorageMinerRepoType{})
if err != nil {
return err
}

View File

@ -108,7 +108,7 @@ func main() {
Commands: local,
}
app.Setup()
app.Metadata["repoType"] = repo.Worker
app.Metadata["repoType"] = repo.WorkerRepoType{}
if err := app.Run(os.Args); err != nil {
log.Warnf("%+v", err)
@ -310,11 +310,11 @@ var runCmd = &cli.Command{
return err
}
if !ok {
if err := r.Init(repo.Worker); err != nil {
if err := r.Init(repo.WorkerRepoType{}); err != nil {
return err
}
lr, err := r.Lock(repo.Worker)
lr, err := r.Lock(repo.WorkerRepoType{})
if err != nil {
return err
}
@ -359,7 +359,7 @@ var runCmd = &cli.Command{
}
}
lr, err := r.Lock(repo.Worker)
lr, err := r.Lock(repo.WorkerRepoType{})
if err != nil {
return err
}
@ -397,7 +397,7 @@ var runCmd = &cli.Command{
}
// Setup remote sector store
sminfo, err := lcli.GetAPIInfo(cctx, repo.StorageMiner)
sminfo, err := lcli.GetAPIInfo(cctx, repo.StorageMinerRepoType{})
if err != nil {
return xerrors.Errorf("could not get api info: %w", err)
}
@ -484,7 +484,7 @@ var runCmd = &cli.Command{
return xerrors.Errorf("setting api endpoint: %w", err)
}
ainfo, err := lcli.GetAPIInfo(cctx, repo.StorageMiner)
ainfo, err := lcli.GetAPIInfo(cctx, repo.StorageMinerRepoType{})
if err != nil {
return xerrors.Errorf("could not get miner API info: %w", err)
}

View File

@ -486,7 +486,7 @@ var chainBalanceStateCmd = &cli.Command{
return err
}
lkrepo, err := fsrepo.Lock(repo.FullNode)
lkrepo, err := fsrepo.Lock(repo.FullNodeRepoType{})
if err != nil {
return err
}
@ -710,7 +710,7 @@ var chainPledgeCmd = &cli.Command{
return err
}
lkrepo, err := fsrepo.Lock(repo.FullNode)
lkrepo, err := fsrepo.Lock(repo.FullNodeRepoType{})
if err != nil {
return err
}

View File

@ -39,10 +39,10 @@ var datastoreListCmd = &cli.Command{
Name: "list",
Description: "list datastore keys",
Flags: []cli.Flag{
&cli.IntFlag{
&cli.StringFlag{
Name: "repo-type",
Usage: "node type (1 - full, 2 - storage, 3 - worker)",
Value: 1,
Usage: "node type (FullNode, StorageMiner, Worker, Wallet)",
Value: "FullNode",
},
&cli.BoolFlag{
Name: "top-level",
@ -70,7 +70,7 @@ var datastoreListCmd = &cli.Command{
return xerrors.Errorf("lotus repo doesn't exist")
}
lr, err := r.Lock(repo.RepoType(cctx.Int("repo-type")))
lr, err := r.Lock(repo.NewRepoTypeFromString(cctx.String("repo-type")))
if err != nil {
return err
}
@ -108,10 +108,10 @@ var datastoreGetCmd = &cli.Command{
Name: "get",
Description: "list datastore keys",
Flags: []cli.Flag{
&cli.IntFlag{
&cli.StringFlag{
Name: "repo-type",
Usage: "node type (1 - full, 2 - storage, 3 - worker)",
Value: 1,
Usage: "node type (FullNode, StorageMiner, Worker, Wallet)",
Value: "FullNode",
},
&cli.StringFlag{
Name: "enc",
@ -136,7 +136,7 @@ var datastoreGetCmd = &cli.Command{
return xerrors.Errorf("lotus repo doesn't exist")
}
lr, err := r.Lock(repo.RepoType(cctx.Int("repo-type")))
lr, err := r.Lock(repo.NewRepoTypeFromString(cctx.String("repo-type")))
if err != nil {
return err
}

View File

@ -67,7 +67,7 @@ var exportCarCmd = &cli.Command{
return xerrors.Errorf("lotus repo doesn't exist")
}
lr, err := r.Lock(repo.FullNode)
lr, err := r.Lock(repo.FullNodeRepoType{})
if err != nil {
return err
}

View File

@ -59,7 +59,7 @@ var exportChainCmd = &cli.Command{
return xerrors.Errorf("lotus repo doesn't exist")
}
lr, err := r.Lock(repo.FullNode)
lr, err := r.Lock(repo.FullNodeRepoType{})
if err != nil {
return err
}

View File

@ -35,7 +35,7 @@ var importCarCmd = &cli.Command{
return xerrors.Errorf("lotus repo doesn't exist")
}
lr, err := r.Lock(repo.FullNode)
lr, err := r.Lock(repo.FullNodeRepoType{})
if err != nil {
return err
}
@ -112,7 +112,7 @@ var importObjectCmd = &cli.Command{
return xerrors.Errorf("lotus repo doesn't exist")
}
lr, err := r.Lock(repo.FullNode)
lr, err := r.Lock(repo.FullNodeRepoType{})
if err != nil {
return err
}

View File

@ -183,7 +183,7 @@ var keyinfoImportCmd = &cli.Command{
return err
}
lkrepo, err := fsrepo.Lock(repo.FullNode)
lkrepo, err := fsrepo.Lock(repo.FullNodeRepoType{})
if err != nil {
return err
}

View File

@ -301,7 +301,7 @@ func openLockedRepo(path string) (repo.LockedRepo, error) {
}
// Lock the repo
lr, err := rpo.Lock(repo.StorageMiner)
lr, err := rpo.Lock(repo.StorageMinerRepoType{})
if err != nil {
return nil, xerrors.Errorf("locking repo %s: %w", path, err)
}

View File

@ -47,7 +47,7 @@ var migrationsCmd = &cli.Command{
return err
}
lkrepo, err := fsrepo.Lock(repo.FullNode)
lkrepo, err := fsrepo.Lock(repo.FullNodeRepoType{})
if err != nil {
return err
}

View File

@ -54,7 +54,7 @@ var minerPeeridCmd = &cli.Command{
return err
}
lkrepo, err := fsrepo.Lock(repo.FullNode)
lkrepo, err := fsrepo.Lock(repo.FullNodeRepoType{})
if err != nil {
return err
}

View File

@ -50,7 +50,7 @@ var minerTypesCmd = &cli.Command{
return err
}
lkrepo, err := fsrepo.Lock(repo.FullNode)
lkrepo, err := fsrepo.Lock(repo.FullNodeRepoType{})
if err != nil {
return err
}

View File

@ -123,7 +123,7 @@ var stateTreePruneCmd = &cli.Command{
return err
}
lkrepo, err := fsrepo.Lock(repo.FullNode)
lkrepo, err := fsrepo.Lock(repo.FullNodeRepoType{})
if err != nil {
return err
}

View File

@ -34,9 +34,11 @@ var rpcCmd = &cli.Command{
},
},
Action: func(cctx *cli.Context) error {
rt := repo.FullNode
var rt repo.RepoType
if cctx.Bool("miner") {
rt = repo.StorageMiner
rt = repo.StorageMinerRepoType{}
} else {
rt = repo.FullNodeRepoType{}
}
addr, headers, err := lcli.GetRawAPI(cctx, rt, cctx.String("version"))
@ -123,7 +125,7 @@ var rpcCmd = &cli.Command{
return send(cctx.Args().Get(0), params)
}
cctx.App.Metadata["repoType"] = repo.FullNode
cctx.App.Metadata["repoType"] = repo.FullNodeRepoType{}
if err := lcli.VersionCmd.Action(cctx); err != nil {
return err
}

View File

@ -67,7 +67,7 @@ var splitstoreRollbackCmd = &cli.Command{
return xerrors.Errorf("lotus repo doesn't exist")
}
lr, err := r.Lock(repo.FullNode)
lr, err := r.Lock(repo.FullNodeRepoType{})
if err != nil {
return xerrors.Errorf("error locking repo: %w", err)
}
@ -153,7 +153,7 @@ var splitstoreClearCmd = &cli.Command{
return xerrors.Errorf("lotus repo doesn't exist")
}
lr, err := r.Lock(repo.FullNode)
lr, err := r.Lock(repo.FullNodeRepoType{})
if err != nil {
return xerrors.Errorf("error locking repo: %w", err)
}

View File

@ -53,7 +53,7 @@ var terminationsCmd = &cli.Command{
return err
}
lkrepo, err := fsrepo.Lock(repo.FullNode)
lkrepo, err := fsrepo.Lock(repo.FullNodeRepoType{})
if err != nil {
return err
}

View File

@ -41,7 +41,7 @@ func OpenNode(ctx context.Context, path string) (*Node, error) {
// NewNode constructs a new node from the given repo.
func NewNode(ctx context.Context, r repo.Repo) (nd *Node, _err error) {
lr, err := r.Lock(repo.FullNode)
lr, err := r.Lock(repo.FullNodeRepoType{})
if err != nil {
return nil, err
}

View File

@ -278,12 +278,12 @@ func openRepo(cctx *cli.Context) (repo.LockedRepo, types.KeyStore, error) {
return nil, nil, err
}
if !ok {
if err := r.Init(repo.Worker); err != nil {
if err := r.Init(repo.WorkerRepoType{}); err != nil {
return nil, nil, err
}
}
lr, err := r.Lock(repo.Wallet)
lr, err := r.Lock(repo.WalletRepoType{})
if err != nil {
return nil, nil, err
}

View File

@ -18,7 +18,7 @@ import (
"github.com/filecoin-project/lotus/node/repo"
)
var backupCmd = lcli.BackupCmd("repo", repo.FullNode, func(cctx *cli.Context) (lcli.BackupAPI, jsonrpc.ClientCloser, error) {
var backupCmd = lcli.BackupCmd("repo", repo.FullNodeRepoType{}, func(cctx *cli.Context) (lcli.BackupAPI, jsonrpc.ClientCloser, error) {
return lcli.GetFullNodeAPI(cctx)
})
@ -39,7 +39,7 @@ func restore(cctx *cli.Context, r repo.Repo) error {
}
defer f.Close() // nolint:errcheck
lr, err := r.Lock(repo.FullNode)
lr, err := r.Lock(repo.FullNodeRepoType{})
if err != nil {
return err
}

View File

@ -66,7 +66,7 @@ var configUpdateCmd = &cli.Command{
return xerrors.Errorf("repo not initialized")
}
lr, err := r.LockRO(repo.FullNode)
lr, err := r.LockRO(repo.FullNodeRepoType{})
if err != nil {
return xerrors.Errorf("locking repo: %w", err)
}

View File

@ -227,7 +227,7 @@ var DaemonCmd = &cli.Command{
r.SetConfigPath(cctx.String("config"))
}
err = r.Init(repo.FullNode)
err = r.Init(repo.FullNodeRepoType{})
if err != nil && err != repo.ErrRepoExists {
return xerrors.Errorf("repo init error: %w", err)
}
@ -462,7 +462,7 @@ func ImportChain(ctx context.Context, r repo.Repo, fname string, snapshot bool)
l = st.Size()
}
lr, err := r.Lock(repo.FullNode)
lr, err := r.Lock(repo.FullNodeRepoType{})
if err != nil {
return err
}

View File

@ -106,7 +106,7 @@ func main() {
app.Setup()
app.Metadata["traceContext"] = ctx
app.Metadata["repoType"] = repo.FullNode
app.Metadata["repoType"] = repo.FullNodeRepoType{}
lcli.RunApp(app)
}

View File

@ -341,8 +341,8 @@ FIXME: This section needs to be clarified / corrected...I don't fully understand
At the end of the `Repo()` function we see two mutually exclusive configuration calls based on the `RepoType` (`node/repo/fsrepo.go`).
```Go
ApplyIf(isType(repo.FullNode), ConfigFullNode(c)),
ApplyIf(isType(repo.StorageMiner), ConfigStorageMiner(c)),
ApplyIf(isType(repo.FullNodeRepoType{}), ConfigFullNode(c)),
ApplyIf(isType(repo.StorageMinerRepoType{}), ConfigStorageMiner(c)),
```
As we said, the repo fully identifies the node so a repo type is also a *node* type, in this case a full node or a miner. (FIXME: What is the difference between the two, does *full* imply miner?) In this case the `daemon` command will create a `FullNode`, this is specified in the command logic itself in `main.DaemonCmd()`, the `FsRepo` created (and passed to `node.Repo()`) will be initiated with that type (see `(*FsRepo).Init(t RepoType)`).
@ -352,7 +352,7 @@ FIXME: Much of this might need to be subsumed into the p2p section
The `node.Online()` configuration function (`node/builder.go`) initializes components that involve connecting to,
or interacting with, the Filecoin network. These connections are managed through the libp2p stack (FIXME link to this section when it exists).
We discuss some of the components found in the full node type (that is, included in the `ApplyIf(isType(repo.FullNode),` call).
We discuss some of the components found in the full node type (that is, included in the `ApplyIf(isType(repo.FullNodeRepoType{}),` call).
#### Chainstore

View File

@ -73,8 +73,8 @@ func TestMoveShared(t *testing.T) {
openRepo := func(dir string) repo.LockedRepo {
r, err := repo.NewFS(dir)
require.NoError(t, err)
require.NoError(t, r.Init(repo.Worker))
lr, err := r.Lock(repo.Worker)
require.NoError(t, r.Init(repo.WorkerRepoType{}))
lr, err := r.Lock(repo.WorkerRepoType{})
require.NoError(t, err)
t.Cleanup(func() {

View File

@ -146,7 +146,7 @@ func TestSimultanenousTransferLimit(t *testing.T) {
)
runTest := func(t *testing.T) {
client, miner, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ConstructorOpts(
node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(graphsyncThrottle, 0, graphsyncThrottle))),
node.ApplyIf(node.IsType(repo.StorageMinerRepoType{}), node.Override(new(dtypes.StagingGraphsync), modules.StagingGraphsync(graphsyncThrottle, 0, graphsyncThrottle))),
node.Override(new(dtypes.Graphsync), modules.Graphsync(graphsyncThrottle, graphsyncThrottle)),
))
ens.InterconnectAll().BeginMining(250 * time.Millisecond)

View File

@ -443,7 +443,7 @@ func (n *Ensemble) Start() *Ensemble {
r := repo.NewMemory(nil)
lr, err := r.Lock(repo.StorageMiner)
lr, err := r.Lock(repo.StorageMinerRepoType{})
require.NoError(n.t, err)
c, err := lr.Config()

View File

@ -35,7 +35,7 @@ func TestDealsWithFinalizeEarly(t *testing.T) {
var blockTime = 50 * time.Millisecond
client, miner, ens := kit.EnsembleMinimal(t, kit.ThroughRPC(), kit.ConstructorOpts(
node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
node.ApplyIf(node.IsType(repo.StorageMinerRepoType{}), node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
return func() (sealiface.Config, error) {
cf := config.DefaultStorageMiner()
cf.Sealing.FinalizeEarly = true

View File

@ -38,7 +38,7 @@ func TestMinerBalanceCollateral(t *testing.T) {
defer cancel()
opts := kit.ConstructorOpts(
node.ApplyIf(node.IsType(repo.StorageMiner), node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
node.ApplyIf(node.IsType(repo.StorageMinerRepoType{}), node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
return func() (sealiface.Config, error) {
return sealiface.Config{
MaxWaitDealsSectors: 4,

View File

@ -224,12 +224,16 @@ var LibP2P = Options(
)
func IsType(t repo.RepoType) func(s *Settings) bool {
return func(s *Settings) bool { return s.nodeType == t }
return func(s *Settings) bool { return s.nodeType.Type() == t.Type() }
}
func isFullOrLiteNode(s *Settings) bool { return s.nodeType == repo.FullNode }
func isFullNode(s *Settings) bool { return s.nodeType == repo.FullNode && !s.Lite }
func isLiteNode(s *Settings) bool { return s.nodeType == repo.FullNode && s.Lite }
func isFullOrLiteNode(s *Settings) bool { return s.nodeType.Type() == repo.FullNodeRepoType{}.Type() }
func isFullNode(s *Settings) bool {
return s.nodeType.Type() == repo.FullNodeRepoType{}.Type() && !s.Lite
}
func isLiteNode(s *Settings) bool {
return s.nodeType.Type() == repo.FullNodeRepoType{}.Type() && s.Lite
}
func Base() Option {
return Options(
@ -241,7 +245,7 @@ func Base() Option {
LibP2P,
),
ApplyIf(isFullOrLiteNode, ChainNode),
ApplyIf(IsType(repo.StorageMiner), MinerNode),
ApplyIf(IsType(repo.StorageMinerRepoType{}), MinerNode),
)
}
@ -314,8 +318,8 @@ func Repo(r repo.Repo) Option {
Override(new(*dtypes.APIAlg), modules.APISecret),
ApplyIf(IsType(repo.FullNode), ConfigFullNode(c)),
ApplyIf(IsType(repo.StorageMiner), ConfigStorageMiner(c)),
ApplyIf(IsType(repo.FullNodeRepoType{}), ConfigFullNode(c)),
ApplyIf(IsType(repo.StorageMinerRepoType{}), ConfigStorageMiner(c)),
)(settings)
}
}

View File

@ -248,7 +248,7 @@ func Lite(enable bool) FullOption {
func FullAPI(out *api.FullNode, fopts ...FullOption) Option {
return Options(
func(s *Settings) error {
s.nodeType = repo.FullNode
s.nodeType = repo.FullNodeRepoType{}
s.enableLibp2pNode = true
return nil
},

View File

@ -151,8 +151,8 @@ func ConfigStorageMiner(c interface{}) Option {
Override(new(dtypes.RetrievalPricingFunc), modules.RetrievalPricingFunc(cfg.Dealmaking)),
// DAG Store
Override(new(dagstore.MinerAPI), modules.NewMinerAPI),
Override(DAGStoreKey, modules.DAGStore),
Override(new(dagstore.MinerAPI), modules.NewMinerAPI(cfg.DAGStore)),
Override(DAGStoreKey, modules.DAGStore(cfg.DAGStore)),
// Markets (retrieval)
Override(new(dagstore.SectorAccessor), sectoraccessor.NewSectorAccessor),
@ -221,7 +221,7 @@ func StorageMiner(out *api.StorageMiner, subsystemsCfg config.MinerSubsystemConf
),
func(s *Settings) error {
s.nodeType = repo.StorageMiner
s.nodeType = repo.StorageMinerRepoType{}
s.enableLibp2pNode = subsystemsCfg.EnableMarkets
return nil
},

View File

@ -23,88 +23,70 @@ const (
)
// NewMinerAPI creates a new MinerAPI adaptor for the dagstore mounts.
func NewMinerAPI(lc fx.Lifecycle, r repo.LockedRepo, pieceStore dtypes.ProviderPieceStore, sa mdagstore.SectorAccessor) (mdagstore.MinerAPI, error) {
cfg, err := extractDAGStoreConfig(r)
if err != nil {
return nil, err
}
// caps the amount of concurrent calls to the storage, so that we don't
// spam it during heavy processes like bulk migration.
if v, ok := os.LookupEnv("LOTUS_DAGSTORE_MOUNT_CONCURRENCY"); ok {
concurrency, err := strconv.Atoi(v)
if err == nil {
cfg.MaxConcurrencyStorageCalls = concurrency
}
}
mountApi := mdagstore.NewMinerAPI(pieceStore, sa, cfg.MaxConcurrencyStorageCalls, cfg.MaxConcurrentUnseals)
ready := make(chan error, 1)
pieceStore.OnReady(func(err error) {
ready <- err
})
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
if err := <-ready; err != nil {
return fmt.Errorf("aborting dagstore start; piecestore failed to start: %s", err)
func NewMinerAPI(cfg config.DAGStoreConfig) func(fx.Lifecycle, repo.LockedRepo, dtypes.ProviderPieceStore, mdagstore.SectorAccessor) (mdagstore.MinerAPI, error) {
return func(lc fx.Lifecycle, r repo.LockedRepo, pieceStore dtypes.ProviderPieceStore, sa mdagstore.SectorAccessor) (mdagstore.MinerAPI, error) {
// caps the amount of concurrent calls to the storage, so that we don't
// spam it during heavy processes like bulk migration.
if v, ok := os.LookupEnv("LOTUS_DAGSTORE_MOUNT_CONCURRENCY"); ok {
concurrency, err := strconv.Atoi(v)
if err == nil {
cfg.MaxConcurrencyStorageCalls = concurrency
}
return mountApi.Start(ctx)
},
OnStop: func(context.Context) error {
return nil
},
})
}
return mountApi, nil
mountApi := mdagstore.NewMinerAPI(pieceStore, sa, cfg.MaxConcurrencyStorageCalls, cfg.MaxConcurrentUnseals)
ready := make(chan error, 1)
pieceStore.OnReady(func(err error) {
ready <- err
})
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
if err := <-ready; err != nil {
return fmt.Errorf("aborting dagstore start; piecestore failed to start: %s", err)
}
return mountApi.Start(ctx)
},
OnStop: func(context.Context) error {
return nil
},
})
return mountApi, nil
}
}
// DAGStore constructs a DAG store using the supplied minerAPI, and the
// user configuration. It returns both the DAGStore and the Wrapper suitable for
// passing to markets.
func DAGStore(lc fx.Lifecycle, r repo.LockedRepo, minerAPI mdagstore.MinerAPI) (*dagstore.DAGStore, *mdagstore.Wrapper, error) {
cfg, err := extractDAGStoreConfig(r)
if err != nil {
return nil, nil, err
}
// fall back to default root directory if not explicitly set in the config.
if cfg.RootDir == "" {
cfg.RootDir = filepath.Join(r.Path(), DefaultDAGStoreDir)
}
v, ok := os.LookupEnv(EnvDAGStoreCopyConcurrency)
if ok {
concurrency, err := strconv.Atoi(v)
if err == nil {
cfg.MaxConcurrentReadyFetches = concurrency
func DAGStore(cfg config.DAGStoreConfig) func(fx.Lifecycle, repo.LockedRepo, mdagstore.MinerAPI) (*dagstore.DAGStore, *mdagstore.Wrapper, error) {
return func(lc fx.Lifecycle, r repo.LockedRepo, minerAPI mdagstore.MinerAPI) (*dagstore.DAGStore, *mdagstore.Wrapper, error) {
// fall back to default root directory if not explicitly set in the config.
if cfg.RootDir == "" {
cfg.RootDir = filepath.Join(r.Path(), DefaultDAGStoreDir)
}
v, ok := os.LookupEnv(EnvDAGStoreCopyConcurrency)
if ok {
concurrency, err := strconv.Atoi(v)
if err == nil {
cfg.MaxConcurrentReadyFetches = concurrency
}
}
dagst, w, err := mdagstore.NewDAGStore(cfg, minerAPI)
if err != nil {
return nil, nil, xerrors.Errorf("failed to create DAG store: %w", err)
}
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return w.Start(ctx)
},
OnStop: func(context.Context) error {
return w.Close()
},
})
return dagst, w, nil
}
dagst, w, err := mdagstore.NewDAGStore(cfg, minerAPI)
if err != nil {
return nil, nil, xerrors.Errorf("failed to create DAG store: %w", err)
}
lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return w.Start(ctx)
},
OnStop: func(context.Context) error {
return w.Close()
},
})
return dagst, w, nil
}
func extractDAGStoreConfig(r repo.LockedRepo) (config.DAGStoreConfig, error) {
cfg, err := r.Config()
if err != nil {
return config.DAGStoreConfig{}, xerrors.Errorf("could not load config: %w", err)
}
mcfg, ok := cfg.(*config.StorageMiner)
if !ok {
return config.DAGStoreConfig{}, xerrors.Errorf("config not expected type; expected config.StorageMiner, got: %T", cfg)
}
return mcfg.DAGStore, nil
}

View File

@ -41,47 +41,83 @@ const (
fsKeystore = "keystore"
)
type RepoType int
const (
_ = iota // Default is invalid
FullNode RepoType = iota
StorageMiner
Worker
Wallet
Markets
)
func (t RepoType) String() string {
s := [...]string{
"__invalid__",
"FullNode",
"StorageMiner",
"Worker",
"Wallet",
"Markets",
func NewRepoTypeFromString(t string) RepoType {
switch t {
case "FullNode":
return FullNodeRepoType{}
case "StorageMiner":
return StorageMinerRepoType{}
case "Worker":
return WorkerRepoType{}
case "Wallet":
return WalletRepoType{}
default:
panic("unknown RepoType")
}
if t < 0 || int(t) > len(s) {
return "__invalid__"
}
return s[t]
}
type RepoType interface {
Type() string
Config() interface{}
}
type FullNodeRepoType struct {
}
func (f FullNodeRepoType) Type() string {
return "FullNode"
}
func (f FullNodeRepoType) Config() interface{} {
return config.DefaultFullNode()
}
type StorageMinerRepoType struct {
}
func (f StorageMinerRepoType) Type() string {
return "StorageMiner"
}
func (f StorageMinerRepoType) Config() interface{} {
return config.DefaultStorageMiner()
}
type MarketsRepoType struct {
}
func (f MarketsRepoType) Type() string {
return "Markets"
}
func (f MarketsRepoType) Config() interface{} {
return config.DefaultStorageMiner()
}
type WorkerRepoType struct {
}
func (f WorkerRepoType) Type() string {
return "Worker"
}
func (f WorkerRepoType) Config() interface{} {
return &struct{}{}
}
type WalletRepoType struct {
}
func (f WalletRepoType) Type() string {
return "Wallet"
}
func (f WalletRepoType) Config() interface{} {
return &struct{}{}
}
func defConfForType(t RepoType) interface{} {
switch t {
case FullNode:
return config.DefaultFullNode()
case StorageMiner, Markets:
// markets is a specialised miner service
// this taxonomy needs to be cleaned up
return config.DefaultStorageMiner()
case Worker:
return &struct{}{}
case Wallet:
return &struct{}{}
default:
panic(fmt.Sprintf("unknown RepoType(%d)", int(t)))
}
return t.Config()
}
var log = logging.Logger("repo")

View File

@ -18,7 +18,7 @@ func genFsRepo(t *testing.T) (*FsRepo, func()) {
t.Fatal(err)
}
err = repo.Init(FullNode)
err = repo.Init(FullNodeRepoType{})
if err != ErrRepoExists && err != nil {
t.Fatal(err)
}

View File

@ -106,7 +106,7 @@ func (lmem *lockedMemRepo) Path() string {
panic(err) // only used in tests, probably fine
}
if lmem.t == StorageMiner {
if lmem.t.Type() == "StorageMiner" || lmem.t.Type() == "Boost" {
// this is required due to the method makeDealStaging from cmd/lotus-storage-miner/init.go
// deal-staging is the directory deal files are staged in before being sealed into sectors
// for offline deal flow.

View File

@ -21,12 +21,12 @@ func basicTest(t *testing.T, repo Repo) {
}
assert.Nil(t, apima, "with no api endpoint, return should be nil")
lrepo, err := repo.Lock(FullNode)
lrepo, err := repo.Lock(FullNodeRepoType{})
assert.NoError(t, err, "should be able to lock once")
assert.NotNil(t, lrepo, "locked repo shouldn't be nil")
{
lrepo2, err := repo.Lock(FullNode)
lrepo2, err := repo.Lock(FullNodeRepoType{})
if assert.Error(t, err) {
assert.Equal(t, ErrRepoAlreadyLocked, err)
}
@ -36,7 +36,7 @@ func basicTest(t *testing.T, repo Repo) {
err = lrepo.Close()
assert.NoError(t, err, "should be able to unlock")
lrepo, err = repo.Lock(FullNode)
lrepo, err = repo.Lock(FullNodeRepoType{})
assert.NoError(t, err, "should be able to relock")
assert.NotNil(t, lrepo, "locked repo shouldn't be nil")
@ -80,7 +80,7 @@ func basicTest(t *testing.T, repo Repo) {
k1 := types.KeyInfo{Type: "foo"}
k2 := types.KeyInfo{Type: "bar"}
lrepo, err = repo.Lock(FullNode)
lrepo, err = repo.Lock(FullNodeRepoType{})
assert.NoError(t, err, "should be able to relock")
assert.NotNil(t, lrepo, "locked repo shouldn't be nil")

View File

@ -138,7 +138,7 @@ func PrepareMiner(t *TestEnvironment) (*LotusMiner, error) {
return nil, err
}
err = minerRepo.Init(repo.StorageMiner)
err = minerRepo.Init(repo.StorageMinerRepoType{})
if err != nil {
return nil, err
}
@ -146,7 +146,7 @@ func PrepareMiner(t *TestEnvironment) (*LotusMiner, error) {
var subsystems config.MinerSubsystemConfig
{
lr, err := minerRepo.Lock(repo.StorageMiner)
lr, err := minerRepo.Lock(repo.StorageMinerRepoType{})
if err != nil {
return nil, err
}
@ -244,7 +244,7 @@ func PrepareMiner(t *TestEnvironment) (*LotusMiner, error) {
return nil, err
}
err = nodeRepo.Init(repo.FullNode)
err = nodeRepo.Init(repo.FullNodeRepoType{})
if err != nil {
return nil, err
}