Merge pull request #3405 from filecoin-project/feat/worker-storage-cli

worker: Cli to attach storage paths
This commit is contained in:
Łukasz Magiera 2020-08-31 10:09:20 +02:00 committed by GitHub
commit c96613a499
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 260 additions and 4 deletions

View File

@ -32,6 +32,8 @@ type WorkerAPI interface {
UnsealPiece(context.Context, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error UnsealPiece(context.Context, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error
ReadPiece(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) (bool, error) ReadPiece(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) (bool, error)
StorageAddLocal(ctx context.Context, path string) error
Fetch(context.Context, abi.SectorID, stores.SectorFileType, stores.PathType, stores.AcquireMode) error Fetch(context.Context, abi.SectorID, stores.SectorFileType, stores.PathType, stores.AcquireMode) error
Closing(context.Context) (<-chan struct{}, error) Closing(context.Context) (<-chan struct{}, error)

View File

@ -318,6 +318,7 @@ type WorkerStruct struct {
ReleaseUnsealed func(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error `perm:"admin"` ReleaseUnsealed func(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error `perm:"admin"`
Remove func(ctx context.Context, sector abi.SectorID) error `perm:"admin"` Remove func(ctx context.Context, sector abi.SectorID) error `perm:"admin"`
MoveStorage func(ctx context.Context, sector abi.SectorID) error `perm:"admin"` MoveStorage func(ctx context.Context, sector abi.SectorID) error `perm:"admin"`
StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"`
UnsealPiece func(context.Context, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error `perm:"admin"` UnsealPiece func(context.Context, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) error `perm:"admin"`
ReadPiece func(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) (bool, error) `perm:"admin"` ReadPiece func(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) (bool, error) `perm:"admin"`
@ -1223,6 +1224,10 @@ func (w *WorkerStruct) MoveStorage(ctx context.Context, sector abi.SectorID) err
return w.Internal.MoveStorage(ctx, sector) return w.Internal.MoveStorage(ctx, sector)
} }
func (w *WorkerStruct) StorageAddLocal(ctx context.Context, path string) error {
return w.Internal.StorageAddLocal(ctx, path)
}
func (w *WorkerStruct) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, c cid.Cid) error { func (w *WorkerStruct) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, c cid.Cid) error {
return w.Internal.UnsealPiece(ctx, id, index, size, randomness, c) return w.Internal.UnsealPiece(ctx, id, index, size, randomness, c)
} }

View File

@ -75,6 +75,8 @@ func flagForAPI(t repo.RepoType) string {
return "api" return "api"
case repo.StorageMiner: case repo.StorageMiner:
return "miner-api" return "miner-api"
case repo.Worker:
return "worker-api"
default: default:
panic(fmt.Sprintf("Unknown repo type: %v", t)) panic(fmt.Sprintf("Unknown repo type: %v", t))
} }
@ -86,6 +88,8 @@ func flagForRepo(t repo.RepoType) string {
return "repo" return "repo"
case repo.StorageMiner: case repo.StorageMiner:
return "miner-repo" return "miner-repo"
case repo.Worker:
return "worker-repo"
default: default:
panic(fmt.Sprintf("Unknown repo type: %v", t)) panic(fmt.Sprintf("Unknown repo type: %v", t))
} }
@ -97,6 +101,8 @@ func envForRepo(t repo.RepoType) string {
return "FULLNODE_API_INFO" return "FULLNODE_API_INFO"
case repo.StorageMiner: case repo.StorageMiner:
return "MINER_API_INFO" return "MINER_API_INFO"
case repo.Worker:
return "WORKER_API_INFO"
default: default:
panic(fmt.Sprintf("Unknown repo type: %v", t)) panic(fmt.Sprintf("Unknown repo type: %v", t))
} }
@ -109,6 +115,8 @@ func envForRepoDeprecation(t repo.RepoType) string {
return "FULLNODE_API_INFO" return "FULLNODE_API_INFO"
case repo.StorageMiner: case repo.StorageMiner:
return "STORAGE_API_INFO" return "STORAGE_API_INFO"
case repo.Worker:
return "WORKER_API_INFO"
default: default:
panic(fmt.Sprintf("Unknown repo type: %v", t)) panic(fmt.Sprintf("Unknown repo type: %v", t))
} }
@ -234,6 +242,15 @@ func GetStorageMinerAPI(ctx *cli.Context, opts ...jsonrpc.Option) (api.StorageMi
return client.NewStorageMinerRPC(ctx.Context, addr, headers, opts...) return client.NewStorageMinerRPC(ctx.Context, addr, headers, opts...)
} }
func GetWorkerAPI(ctx *cli.Context) (api.WorkerAPI, jsonrpc.ClientCloser, error) {
addr, headers, err := GetRawAPI(ctx, repo.Worker)
if err != nil {
return nil, nil, err
}
return client.NewWorkerRPC(ctx.Context, addr, headers)
}
func DaemonContext(cctx *cli.Context) context.Context { func DaemonContext(cctx *cli.Context) context.Context {
if mtCtx, ok := cctx.App.Metadata[metadataTraceContext]; ok { if mtCtx, ok := cctx.App.Metadata[metadataTraceContext]; ok {
return mtCtx.(context.Context) return mtCtx.(context.Context)

View File

@ -0,0 +1,71 @@
package main
import (
"fmt"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
)
var infoCmd = &cli.Command{
Name: "info",
Usage: "Print worker info",
Action: func(cctx *cli.Context) error {
api, closer, err := lcli.GetWorkerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
ver, err := api.Version(ctx)
if err != nil {
return xerrors.Errorf("getting version: %w", err)
}
fmt.Println("Worker version: ", ver)
fmt.Print("CLI version: ")
cli.VersionPrinter(cctx)
fmt.Println()
info, err := api.Info(ctx)
if err != nil {
return xerrors.Errorf("getting info: %w", err)
}
fmt.Printf("Hostname: %s\n", info.Hostname)
fmt.Printf("CPUs: %d; GPUs: %v\n", info.Resources.CPUs, info.Resources.GPUs)
fmt.Printf("RAM: %s; Swap: %s\n", types.SizeStr(types.NewInt(info.Resources.MemPhysical)), types.SizeStr(types.NewInt(info.Resources.MemSwap)))
fmt.Printf("Reserved memory: %s\n", types.SizeStr(types.NewInt(info.Resources.MemReserved)))
fmt.Println()
paths, err := api.Paths(ctx)
if err != nil {
return xerrors.Errorf("getting path info: %w", err)
}
for _, path := range paths {
fmt.Printf("%s:\n", path.ID)
fmt.Printf("\tWeight: %d; Use: ", path.Weight)
if path.CanSeal || path.CanStore {
fmt.Printf("Weight: %d; Use: ", path.Weight)
if path.CanSeal {
fmt.Print("Seal ")
}
if path.CanStore {
fmt.Print("Store")
}
fmt.Println("")
} else {
fmt.Print("Use: ReadOnly")
}
fmt.Printf("\tLocal: %s\n", path.LocalPath)
}
return nil
},
}

View File

@ -16,6 +16,7 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/gorilla/mux" "github.com/gorilla/mux"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/urfave/cli/v2" "github.com/urfave/cli/v2"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -46,10 +47,10 @@ const FlagWorkerRepoDeprecation = "workerrepo"
func main() { func main() {
lotuslog.SetupLogLevels() lotuslog.SetupLogLevels()
log.Info("Starting lotus worker")
local := []*cli.Command{ local := []*cli.Command{
runCmd, runCmd,
infoCmd,
storageCmd,
} }
app := &cli.App{ app := &cli.App{
@ -153,6 +154,8 @@ var runCmd = &cli.Command{
return nil return nil
}, },
Action: func(cctx *cli.Context) error { Action: func(cctx *cli.Context) error {
log.Info("Starting lotus worker")
if !cctx.Bool("enable-gpu-proving") { if !cctx.Bool("enable-gpu-proving") {
if err := os.Setenv("BELLMAN_NO_GPU", "true"); err != nil { if err := os.Setenv("BELLMAN_NO_GPU", "true"); err != nil {
return xerrors.Errorf("could not set no-gpu env: %+v", err) return xerrors.Errorf("could not set no-gpu env: %+v", err)
@ -342,6 +345,8 @@ var runCmd = &cli.Command{
SealProof: spt, SealProof: spt,
TaskTypes: taskTypes, TaskTypes: taskTypes,
}, remote, localStore, nodeApi), }, remote, localStore, nodeApi),
localStore: localStore,
ls: lr,
} }
mux := mux.NewRouter() mux := mux.NewRouter()
@ -383,6 +388,32 @@ var runCmd = &cli.Command{
return err return err
} }
{
a, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
return xerrors.Errorf("parsing address: %w", err)
}
ma, err := manet.FromNetAddr(a)
if err != nil {
return xerrors.Errorf("creating api multiaddress: %w", err)
}
if err := lr.SetAPIEndpoint(ma); err != nil {
return xerrors.Errorf("setting api endpoint: %w", err)
}
ainfo, err := lcli.GetAPIInfo(cctx, repo.StorageMiner)
if err != nil {
return xerrors.Errorf("could not get miner API info: %w", err)
}
// TODO: ideally this would be a token with some permissions dropped
if err := lr.SetAPIToken(ainfo.Token); err != nil {
return xerrors.Errorf("setting api token: %w", err)
}
}
log.Info("Waiting for tasks") log.Info("Waiting for tasks")
go func() { go func() {

View File

@ -3,19 +3,44 @@ package main
import ( import (
"context" "context"
"github.com/mitchellh/go-homedir"
"golang.org/x/xerrors"
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
) )
type worker struct { type worker struct {
*sectorstorage.LocalWorker *sectorstorage.LocalWorker
localStore *stores.Local
ls stores.LocalStorage
} }
func (w *worker) Version(context.Context) (build.Version, error) { func (w *worker) Version(context.Context) (build.Version, error) {
return build.APIVersion, nil return build.APIVersion, nil
} }
func (w *worker) StorageAddLocal(ctx context.Context, path string) error {
path, err := homedir.Expand(path)
if err != nil {
return xerrors.Errorf("expanding local path: %w", err)
}
if err := w.localStore.OpenPath(ctx, path); err != nil {
return xerrors.Errorf("opening local path: %w", err)
}
if err := w.ls.SetStorage(func(sc *stores.StorageConfig) {
sc.StoragePaths = append(sc.StoragePaths, stores.LocalPath{Path: path})
}); err != nil {
return xerrors.Errorf("get storage config: %w", err)
}
return nil
}
var _ storage.Sealer = &worker{} var _ storage.Sealer = &worker{}

View File

@ -0,0 +1,105 @@
package main
import (
"encoding/json"
"io/ioutil"
"os"
"path/filepath"
"github.com/google/uuid"
"github.com/mitchellh/go-homedir"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
)
const metaFile = "sectorstore.json"
var storageCmd = &cli.Command{
Name: "storage",
Usage: "manage sector storage",
Subcommands: []*cli.Command{
storageAttachCmd,
},
}
var storageAttachCmd = &cli.Command{
Name: "attach",
Usage: "attach local storage path",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "init",
Usage: "initialize the path first",
},
&cli.Uint64Flag{
Name: "weight",
Usage: "(for init) path weight",
Value: 10,
},
&cli.BoolFlag{
Name: "seal",
Usage: "(for init) use path for sealing",
},
&cli.BoolFlag{
Name: "store",
Usage: "(for init) use path for long-term storage",
},
},
Action: func(cctx *cli.Context) error {
nodeApi, closer, err := lcli.GetWorkerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
if !cctx.Args().Present() {
return xerrors.Errorf("must specify storage path to attach")
}
p, err := homedir.Expand(cctx.Args().First())
if err != nil {
return xerrors.Errorf("expanding path: %w", err)
}
if cctx.Bool("init") {
if err := os.MkdirAll(p, 0755); err != nil {
if !os.IsExist(err) {
return err
}
}
_, err := os.Stat(filepath.Join(p, metaFile))
if !os.IsNotExist(err) {
if err == nil {
return xerrors.Errorf("path is already initialized")
}
return err
}
cfg := &stores.LocalStorageMeta{
ID: stores.ID(uuid.New().String()),
Weight: cctx.Uint64("weight"),
CanSeal: cctx.Bool("seal"),
CanStore: cctx.Bool("store"),
}
if !(cfg.CanStore || cfg.CanSeal) {
return xerrors.Errorf("must specify at least one of --store of --seal")
}
b, err := json.MarshalIndent(cfg, "", " ")
if err != nil {
return xerrors.Errorf("marshaling storage config: %w", err)
}
if err := ioutil.WriteFile(filepath.Join(p, metaFile), b, 0644); err != nil {
return xerrors.Errorf("persisting storage metadata (%s): %w", filepath.Join(p, metaFile), err)
}
}
return nodeApi.StorageAddLocal(ctx, p)
},
}