From 0e6ff668ebec795021249021214751ac4c18774f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sun, 30 Aug 2020 20:28:58 +0200 Subject: [PATCH] worker: Cli to attach storage paths --- api/api_worker.go | 2 + api/apistruct/struct.go | 5 ++ cli/cmd.go | 17 +++++ cmd/lotus-seal-worker/info.go | 71 +++++++++++++++++++++ cmd/lotus-seal-worker/main.go | 35 ++++++++++- cmd/lotus-seal-worker/rpc.go | 29 ++++++++- cmd/lotus-seal-worker/storage.go | 105 +++++++++++++++++++++++++++++++ 7 files changed, 260 insertions(+), 4 deletions(-) create mode 100644 cmd/lotus-seal-worker/info.go create mode 100644 cmd/lotus-seal-worker/storage.go diff --git a/api/api_worker.go b/api/api_worker.go index 5b7cdc7e4..e65ea5f1d 100644 --- a/api/api_worker.go +++ b/api/api_worker.go @@ -32,6 +32,8 @@ type WorkerAPI interface { UnsealPiece(context.Context, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error ReadPiece(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) (bool, error) + StorageAddLocal(ctx context.Context, path string) error + Fetch(context.Context, abi.SectorID, stores.SectorFileType, stores.PathType, stores.AcquireMode) error Closing(context.Context) (<-chan struct{}, error) diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index dca135e6a..2422d7cd8 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -318,6 +318,7 @@ type WorkerStruct struct { ReleaseUnsealed func(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error `perm:"admin"` Remove func(ctx context.Context, sector abi.SectorID) error `perm:"admin"` MoveStorage func(ctx context.Context, sector abi.SectorID) error `perm:"admin"` + StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"` UnsealPiece func(context.Context, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error `perm:"admin"` ReadPiece func(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) (bool, error) `perm:"admin"` @@ -1223,6 +1224,10 @@ func (w *WorkerStruct) MoveStorage(ctx context.Context, sector abi.SectorID) err return w.Internal.MoveStorage(ctx, sector) } +func (w *WorkerStruct) StorageAddLocal(ctx context.Context, path string) error { + return w.Internal.StorageAddLocal(ctx, path) +} + func (w *WorkerStruct) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, c cid.Cid) error { return w.Internal.UnsealPiece(ctx, id, index, size, randomness, c) } diff --git a/cli/cmd.go b/cli/cmd.go index 77a0ef886..c6617dcfd 100644 --- a/cli/cmd.go +++ b/cli/cmd.go @@ -75,6 +75,8 @@ func flagForAPI(t repo.RepoType) string { return "api" case repo.StorageMiner: return "miner-api" + case repo.Worker: + return "worker-api" default: panic(fmt.Sprintf("Unknown repo type: %v", t)) } @@ -86,6 +88,8 @@ func flagForRepo(t repo.RepoType) string { return "repo" case repo.StorageMiner: return "miner-repo" + case repo.Worker: + return "worker-repo" default: panic(fmt.Sprintf("Unknown repo type: %v", t)) } @@ -97,6 +101,8 @@ func envForRepo(t repo.RepoType) string { return "FULLNODE_API_INFO" case repo.StorageMiner: return "MINER_API_INFO" + case repo.Worker: + return "WORKER_API_INFO" default: panic(fmt.Sprintf("Unknown repo type: %v", t)) } @@ -109,6 +115,8 @@ func envForRepoDeprecation(t repo.RepoType) string { return "FULLNODE_API_INFO" case repo.StorageMiner: return "STORAGE_API_INFO" + case repo.Worker: + return "WORKER_API_INFO" default: panic(fmt.Sprintf("Unknown repo type: %v", t)) } @@ -234,6 +242,15 @@ func GetStorageMinerAPI(ctx *cli.Context, opts ...jsonrpc.Option) (api.StorageMi return client.NewStorageMinerRPC(ctx.Context, addr, headers, opts...) } +func GetWorkerAPI(ctx *cli.Context) (api.WorkerAPI, jsonrpc.ClientCloser, error) { + addr, headers, err := GetRawAPI(ctx, repo.Worker) + if err != nil { + return nil, nil, err + } + + return client.NewWorkerRPC(ctx.Context, addr, headers) +} + func DaemonContext(cctx *cli.Context) context.Context { if mtCtx, ok := cctx.App.Metadata[metadataTraceContext]; ok { return mtCtx.(context.Context) diff --git a/cmd/lotus-seal-worker/info.go b/cmd/lotus-seal-worker/info.go new file mode 100644 index 000000000..9b08a0c80 --- /dev/null +++ b/cmd/lotus-seal-worker/info.go @@ -0,0 +1,71 @@ +package main + +import ( + "fmt" + + "github.com/urfave/cli/v2" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/chain/types" + lcli "github.com/filecoin-project/lotus/cli" +) + +var infoCmd = &cli.Command{ + Name: "info", + Usage: "Print worker info", + Action: func(cctx *cli.Context) error { + api, closer, err := lcli.GetWorkerAPI(cctx) + if err != nil { + return err + } + defer closer() + + ctx := lcli.ReqContext(cctx) + + ver, err := api.Version(ctx) + if err != nil { + return xerrors.Errorf("getting version: %w", err) + } + + fmt.Println("Worker version: ", ver) + fmt.Print("CLI version: ") + cli.VersionPrinter(cctx) + fmt.Println() + + info, err := api.Info(ctx) + if err != nil { + return xerrors.Errorf("getting info: %w", err) + } + + fmt.Printf("Hostname: %s\n", info.Hostname) + fmt.Printf("CPUs: %d; GPUs: %v\n", info.Resources.CPUs, info.Resources.GPUs) + fmt.Printf("RAM: %s; Swap: %s\n", types.SizeStr(types.NewInt(info.Resources.MemPhysical)), types.SizeStr(types.NewInt(info.Resources.MemSwap))) + fmt.Printf("Reserved memory: %s\n", types.SizeStr(types.NewInt(info.Resources.MemReserved))) + fmt.Println() + + paths, err := api.Paths(ctx) + if err != nil { + return xerrors.Errorf("getting path info: %w", err) + } + + for _, path := range paths { + fmt.Printf("%s:\n", path.ID) + fmt.Printf("\tWeight: %d; Use: ", path.Weight) + if path.CanSeal || path.CanStore { + fmt.Printf("Weight: %d; Use: ", path.Weight) + if path.CanSeal { + fmt.Print("Seal ") + } + if path.CanStore { + fmt.Print("Store") + } + fmt.Println("") + } else { + fmt.Print("Use: ReadOnly") + } + fmt.Printf("\tLocal: %s\n", path.LocalPath) + } + + return nil + }, +} diff --git a/cmd/lotus-seal-worker/main.go b/cmd/lotus-seal-worker/main.go index 1b6ed2782..e6361d3cf 100644 --- a/cmd/lotus-seal-worker/main.go +++ b/cmd/lotus-seal-worker/main.go @@ -16,6 +16,7 @@ import ( "github.com/google/uuid" "github.com/gorilla/mux" logging "github.com/ipfs/go-log/v2" + manet "github.com/multiformats/go-multiaddr/net" "github.com/urfave/cli/v2" "golang.org/x/xerrors" @@ -46,10 +47,10 @@ const FlagWorkerRepoDeprecation = "workerrepo" func main() { lotuslog.SetupLogLevels() - log.Info("Starting lotus worker") - local := []*cli.Command{ runCmd, + infoCmd, + storageCmd, } app := &cli.App{ @@ -153,6 +154,8 @@ var runCmd = &cli.Command{ return nil }, Action: func(cctx *cli.Context) error { + log.Info("Starting lotus worker") + if !cctx.Bool("enable-gpu-proving") { if err := os.Setenv("BELLMAN_NO_GPU", "true"); err != nil { return xerrors.Errorf("could not set no-gpu env: %+v", err) @@ -342,6 +345,8 @@ var runCmd = &cli.Command{ SealProof: spt, TaskTypes: taskTypes, }, remote, localStore, nodeApi), + localStore: localStore, + ls: lr, } mux := mux.NewRouter() @@ -383,6 +388,32 @@ var runCmd = &cli.Command{ return err } + { + a, err := net.ResolveTCPAddr("tcp", address) + if err != nil { + return xerrors.Errorf("parsing address: %w", err) + } + + ma, err := manet.FromNetAddr(a) + if err != nil { + return xerrors.Errorf("creating api multiaddress: %w", err) + } + + if err := lr.SetAPIEndpoint(ma); err != nil { + return xerrors.Errorf("setting api endpoint: %w", err) + } + + ainfo, err := lcli.GetAPIInfo(cctx, repo.StorageMiner) + if err != nil { + return xerrors.Errorf("could not get miner API info: %w", err) + } + + // TODO: ideally this would be a token with some permissions dropped + if err := lr.SetAPIToken(ainfo.Token); err != nil { + return xerrors.Errorf("setting api token: %w", err) + } + } + log.Info("Waiting for tasks") go func() { diff --git a/cmd/lotus-seal-worker/rpc.go b/cmd/lotus-seal-worker/rpc.go index b8508ccf2..5380fe432 100644 --- a/cmd/lotus-seal-worker/rpc.go +++ b/cmd/lotus-seal-worker/rpc.go @@ -3,19 +3,44 @@ package main import ( "context" + "github.com/mitchellh/go-homedir" + "golang.org/x/xerrors" + "github.com/filecoin-project/specs-storage/storage" - sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" - "github.com/filecoin-project/lotus/build" + sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" + "github.com/filecoin-project/lotus/extern/sector-storage/stores" ) type worker struct { *sectorstorage.LocalWorker + + localStore *stores.Local + ls stores.LocalStorage } func (w *worker) Version(context.Context) (build.Version, error) { return build.APIVersion, nil } +func (w *worker) StorageAddLocal(ctx context.Context, path string) error { + path, err := homedir.Expand(path) + if err != nil { + return xerrors.Errorf("expanding local path: %w", err) + } + + if err := w.localStore.OpenPath(ctx, path); err != nil { + return xerrors.Errorf("opening local path: %w", err) + } + + if err := w.ls.SetStorage(func(sc *stores.StorageConfig) { + sc.StoragePaths = append(sc.StoragePaths, stores.LocalPath{Path: path}) + }); err != nil { + return xerrors.Errorf("get storage config: %w", err) + } + + return nil +} + var _ storage.Sealer = &worker{} diff --git a/cmd/lotus-seal-worker/storage.go b/cmd/lotus-seal-worker/storage.go new file mode 100644 index 000000000..39cd3ad5a --- /dev/null +++ b/cmd/lotus-seal-worker/storage.go @@ -0,0 +1,105 @@ +package main + +import ( + "encoding/json" + "io/ioutil" + "os" + "path/filepath" + + "github.com/google/uuid" + "github.com/mitchellh/go-homedir" + "github.com/urfave/cli/v2" + "golang.org/x/xerrors" + + lcli "github.com/filecoin-project/lotus/cli" + "github.com/filecoin-project/lotus/extern/sector-storage/stores" +) + +const metaFile = "sectorstore.json" + +var storageCmd = &cli.Command{ + Name: "storage", + Usage: "manage sector storage", + Subcommands: []*cli.Command{ + storageAttachCmd, + }, +} + +var storageAttachCmd = &cli.Command{ + Name: "attach", + Usage: "attach local storage path", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "init", + Usage: "initialize the path first", + }, + &cli.Uint64Flag{ + Name: "weight", + Usage: "(for init) path weight", + Value: 10, + }, + &cli.BoolFlag{ + Name: "seal", + Usage: "(for init) use path for sealing", + }, + &cli.BoolFlag{ + Name: "store", + Usage: "(for init) use path for long-term storage", + }, + }, + Action: func(cctx *cli.Context) error { + nodeApi, closer, err := lcli.GetWorkerAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := lcli.ReqContext(cctx) + + if !cctx.Args().Present() { + return xerrors.Errorf("must specify storage path to attach") + } + + p, err := homedir.Expand(cctx.Args().First()) + if err != nil { + return xerrors.Errorf("expanding path: %w", err) + } + + if cctx.Bool("init") { + if err := os.MkdirAll(p, 0755); err != nil { + if !os.IsExist(err) { + return err + } + } + + _, err := os.Stat(filepath.Join(p, metaFile)) + if !os.IsNotExist(err) { + if err == nil { + return xerrors.Errorf("path is already initialized") + } + return err + } + + cfg := &stores.LocalStorageMeta{ + ID: stores.ID(uuid.New().String()), + Weight: cctx.Uint64("weight"), + CanSeal: cctx.Bool("seal"), + CanStore: cctx.Bool("store"), + } + + if !(cfg.CanStore || cfg.CanSeal) { + return xerrors.Errorf("must specify at least one of --store of --seal") + } + + b, err := json.MarshalIndent(cfg, "", " ") + if err != nil { + return xerrors.Errorf("marshaling storage config: %w", err) + } + + if err := ioutil.WriteFile(filepath.Join(p, metaFile), b, 0644); err != nil { + return xerrors.Errorf("persisting storage metadata (%s): %w", filepath.Join(p, metaFile), err) + } + } + + return nodeApi.StorageAddLocal(ctx, p) + }, +}