From 7e97f14109738d9483bb770ed4a6ab81c9385060 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 8 Feb 2024 13:27:46 +0100 Subject: [PATCH] lotus-provider: cli command for easy cluster rpc access --- api/api_lp.go | 2 + cmd/lotus-provider/cli.go | 173 ++++++++++++++++++++++++++++++++ cmd/lotus-provider/deps/deps.go | 5 +- cmd/lotus-provider/main.go | 7 +- cmd/lotus-provider/rpc/rpc.go | 48 +++++++++ cmd/lotus-provider/storage.go | 158 +++++++++++++++++++++++++++++ 6 files changed, 385 insertions(+), 8 deletions(-) create mode 100644 cmd/lotus-provider/cli.go create mode 100644 cmd/lotus-provider/storage.go diff --git a/api/api_lp.go b/api/api_lp.go index 447a3de6d..97eb2af3e 100644 --- a/api/api_lp.go +++ b/api/api_lp.go @@ -13,6 +13,8 @@ type LotusProvider interface { AllocatePieceToSector(ctx context.Context, maddr address.Address, piece PieceDealInfo, rawSize int64, source url.URL, header http.Header) (SectorOffset, error) //perm:write + StorageAddLocal(ctx context.Context, path string) error //perm:admin + // Trigger shutdown Shutdown(context.Context) error //perm:admin } diff --git a/cmd/lotus-provider/cli.go b/cmd/lotus-provider/cli.go new file mode 100644 index 000000000..ace9c2e17 --- /dev/null +++ b/cmd/lotus-provider/cli.go @@ -0,0 +1,173 @@ +package main + +import ( + "bufio" + "encoding/base64" + "fmt" + "github.com/BurntSushi/toml" + "github.com/filecoin-project/go-jsonrpc/auth" + "github.com/filecoin-project/lotus/api" + lcli "github.com/filecoin-project/lotus/cli" + "github.com/filecoin-project/lotus/cmd/lotus-provider/deps" + "github.com/gbrlsnchs/jwt/v3" + manet "github.com/multiformats/go-multiaddr/net" + "github.com/urfave/cli/v2" + "golang.org/x/xerrors" + "net" + "os" + "time" +) + +const providerEnvVar = "PROVIDER_API_INFO" + +var cliCmd = &cli.Command{ + Name: "cli", + Usage: "Execute cli commands", + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "machine", + Usage: "machine host:port", + }, + }, + Before: func(cctx *cli.Context) error { + if os.Getenv(providerEnvVar) != "" { + // set already + return nil + } + + db, err := deps.MakeDB(cctx) + if err != nil { + return err + } + + ctx := lcli.ReqContext(cctx) + + machine := cctx.String("machine") + if machine == "" { + // interactive picker + var machines []struct { + HostAndPort string `db:"host_and_port"` + LastContact time.Time `db:"last_contact"` + } + + err := db.Select(ctx, &machines, "select host_and_port, last_contact from harmony_machines") + if err != nil { + return xerrors.Errorf("getting machine list: %w", err) + } + + now := time.Now() + fmt.Println("Available machines:") + for i, m := range machines { + // A machine is healthy if contacted not longer than 2 minutes ago + healthStatus := "unhealthy" + if now.Sub(m.LastContact) <= 2*time.Minute { + healthStatus = "healthy" + } + fmt.Printf("%d. %s %s\n", i+1, m.HostAndPort, healthStatus) + } + + fmt.Print("Select: ") + reader := bufio.NewReader(os.Stdin) + input, err := reader.ReadString('\n') + if err != nil { + return xerrors.Errorf("reading selection: %w", err) + } + + var selection int + _, err = fmt.Sscanf(input, "%d", &selection) + if err != nil { + return xerrors.Errorf("parsing selection: %w", err) + } + + if selection < 1 || selection > len(machines) { + return xerrors.New("invalid selection") + } + } + + var apiKeys []string + { + var dbconfigs []struct { + Config string `db:"config"` + Title string `db:"title"` + } + + err := db.Select(ctx, &dbconfigs, "select config from harmony_config") + if err != nil { + return xerrors.Errorf("getting configs: %w", err) + } + + var seen = make(map[string]struct{}) + + for _, config := range dbconfigs { + var layer struct { + Apis struct { + StorageRPCSecret string + } + } + + if _, err := toml.Decode(config.Config, &layer); err != nil { + return xerrors.Errorf("decode config layer %s: %w", config.Title, err) + } + + if layer.Apis.StorageRPCSecret != "" { + if _, ok := seen[layer.Apis.StorageRPCSecret]; ok { + continue + } + seen[layer.Apis.StorageRPCSecret] = struct{}{} + apiKeys = append(apiKeys, layer.Apis.StorageRPCSecret) + } + } + } + + if len(apiKeys) == 0 { + return xerrors.New("no api keys found in the database") + } + if len(apiKeys) > 1 { + return xerrors.Errorf("multiple api keys found in the database, not supported yet") + } + + var apiToken []byte + { + type jwtPayload struct { + Allow []auth.Permission + } + + p := jwtPayload{ + Allow: api.AllPermissions, + } + + sk, err := base64.StdEncoding.DecodeString(apiKeys[0]) + if err != nil { + return xerrors.Errorf("decode secret: %w", err) + } + + apiToken, err = jwt.Sign(&p, jwt.NewHS256(sk)) + if err != nil { + return xerrors.Errorf("signing token: %w", err) + } + } + + { + + laddr, err := net.ResolveTCPAddr("tcp", machine) + if err != nil { + return xerrors.Errorf("net resolve: %w", err) + } + + if len(laddr.IP) == 0 { + // set localhost + laddr.IP = net.IPv4(127, 0, 0, 1) + } + + ma, err := manet.FromNetAddr(laddr) + if err != nil { + return xerrors.Errorf("net from addr (%v): %w", laddr, err) + } + + fmt.Printf("Token: %s:%s\n", string(apiToken), ma) + } + + return nil + }, + Subcommands: []*cli.Command{}, +} diff --git a/cmd/lotus-provider/deps/deps.go b/cmd/lotus-provider/deps/deps.go index da8eeb8aa..3a26c1468 100644 --- a/cmd/lotus-provider/deps/deps.go +++ b/cmd/lotus-provider/deps/deps.go @@ -101,6 +101,7 @@ type Deps struct { Stor *paths.Remote Si *paths.DBIndex LocalStore *paths.Local + LocalPaths *paths.BasicLocalStorage ListenAddr string } @@ -193,7 +194,7 @@ func (deps *Deps) PopulateRemainingDeps(ctx context.Context, cctx *cli.Context, }() } - bls := &paths.BasicLocalStorage{ + deps.LocalPaths = &paths.BasicLocalStorage{ PathToJSON: cctx.String("storage-json"), } @@ -212,7 +213,7 @@ func (deps *Deps) PopulateRemainingDeps(ctx context.Context, cctx *cli.Context, } } if deps.LocalStore == nil { - deps.LocalStore, err = paths.NewLocal(ctx, bls, deps.Si, []string{"http://" + deps.ListenAddr + "/remote"}) + deps.LocalStore, err = paths.NewLocal(ctx, deps.LocalPaths, deps.Si, []string{"http://" + deps.ListenAddr + "/remote"}) if err != nil { return err } diff --git a/cmd/lotus-provider/main.go b/cmd/lotus-provider/main.go index d6185cc3c..a5cd3f452 100644 --- a/cmd/lotus-provider/main.go +++ b/cmd/lotus-provider/main.go @@ -42,18 +42,13 @@ func main() { local := []*cli.Command{ //initCmd, + cliCmd, runCmd, stopCmd, configCmd, testCmd, webCmd, pipelineCmd, - //backupCmd, - //lcli.WithCategory("chain", actorCmd), - //lcli.WithCategory("storage", sectorsCmd), - //lcli.WithCategory("storage", provingCmd), - //lcli.WithCategory("storage", storageCmd), - //lcli.WithCategory("storage", sealingCmd), } jaeger := tracing.SetupJaegerTracing("lotus") diff --git a/cmd/lotus-provider/rpc/rpc.go b/cmd/lotus-provider/rpc/rpc.go index 4734843fa..99a7adabf 100644 --- a/cmd/lotus-provider/rpc/rpc.go +++ b/cmd/lotus-provider/rpc/rpc.go @@ -5,6 +5,12 @@ import ( "context" "encoding/base64" "encoding/json" + "github.com/filecoin-project/lotus/api/client" + cliutil "github.com/filecoin-project/lotus/cli/util" + "github.com/filecoin-project/lotus/node/repo" + "github.com/filecoin-project/lotus/storage/sealer/storiface" + "github.com/mitchellh/go-homedir" + "github.com/urfave/cli/v2" "net" "net/http" "net/url" @@ -90,6 +96,25 @@ func (p *ProviderAPI) Shutdown(context.Context) error { return nil } +func (p *ProviderAPI) StorageAddLocal(ctx context.Context, path string) error { + path, err := homedir.Expand(path) + if err != nil { + return xerrors.Errorf("expanding local path: %w", err) + } + + if err := p.LocalStore.OpenPath(ctx, path); err != nil { + return xerrors.Errorf("opening local path: %w", err) + } + + if err := p.LocalPaths.SetStorage(func(sc *storiface.StorageConfig) { + sc.StoragePaths = append(sc.StoragePaths, storiface.LocalPath{Path: path}) + }); err != nil { + return xerrors.Errorf("get storage config: %w", err) + } + + return nil +} + func ListenAndServe(ctx context.Context, dependencies *deps.Deps, shutdownChan chan struct{}) error { fh := &paths.FetchHandler{Local: dependencies.LocalStore, PfHandler: &paths.DefaultPartialFileHandler{}} remoteHandler := func(w http.ResponseWriter, r *http.Request) { @@ -158,3 +183,26 @@ func ListenAndServe(ctx context.Context, dependencies *deps.Deps, shutdownChan c } return eg.Wait() } + +func GetProviderAPI(ctx *cli.Context) (api.LotusProvider, jsonrpc.ClientCloser, error) { + addr, headers, err := cliutil.GetRawAPI(ctx, repo.Provider, "v0") + if err != nil { + return nil, nil, err + } + + u, err := url.Parse(addr) + if err != nil { + return nil, nil, xerrors.Errorf("parsing miner api URL: %w", err) + } + + switch u.Scheme { + case "ws": + u.Scheme = "http" + case "wss": + u.Scheme = "https" + } + + addr = u.String() + + return client.NewProviderRpc(ctx.Context, addr, headers) +} diff --git a/cmd/lotus-provider/storage.go b/cmd/lotus-provider/storage.go new file mode 100644 index 000000000..05d6c248d --- /dev/null +++ b/cmd/lotus-provider/storage.go @@ -0,0 +1,158 @@ +package main + +import ( + "encoding/json" + "github.com/docker/go-units" + lcli "github.com/filecoin-project/lotus/cli" + "github.com/filecoin-project/lotus/cmd/lotus-provider/rpc" + "github.com/filecoin-project/lotus/storage/sealer/storiface" + "github.com/google/uuid" + "github.com/mitchellh/go-homedir" + "github.com/urfave/cli/v2" + "golang.org/x/xerrors" + "os" + "path/filepath" +) + +const metaFile = "sectorstore.json" + +var storageCmd = &cli.Command{ + Name: "storage", + Usage: "manage sector storage", + Description: `Sectors can be stored across many filesystem paths. These +commands provide ways to manage the storage the miner will used to store sectors +long term for proving (references as 'store') as well as how sectors will be +stored while moving through the sealing pipeline (references as 'seal').`, + Subcommands: []*cli.Command{ + storageAttachCmd, + /*storageDetachCmd, + storageRedeclareCmd, + storageListCmd, + storageFindCmd, + storageCleanupCmd, + storageLocks,*/ + }, +} + +var storageAttachCmd = &cli.Command{ + Name: "attach", + Usage: "attach local storage path", + ArgsUsage: "[path]", + Description: `Storage can be attached to the miner using this command. The storage volume +list is stored local to the miner in storage.json set in lotus-provider run. We do not +recommend manually modifying this value without further understanding of the +storage system. + +Each storage volume contains a configuration file which describes the +capabilities of the volume. When the '--init' flag is provided, this file will +be created using the additional flags. + +Weight +A high weight value means data will be more likely to be stored in this path + +Seal +Data for the sealing process will be stored here + +Store +Finalized sectors that will be moved here for long term storage and be proven +over time + `, + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "init", + Usage: "initialize the path first", + }, + &cli.Uint64Flag{ + Name: "weight", + Usage: "(for init) path weight", + Value: 10, + }, + &cli.BoolFlag{ + Name: "seal", + Usage: "(for init) use path for sealing", + }, + &cli.BoolFlag{ + Name: "store", + Usage: "(for init) use path for long-term storage", + }, + &cli.StringFlag{ + Name: "max-storage", + Usage: "(for init) limit storage space for sectors (expensive for very large paths!)", + }, + &cli.StringSliceFlag{ + Name: "groups", + Usage: "path group names", + }, + &cli.StringSliceFlag{ + Name: "allow-to", + Usage: "path groups allowed to pull data from this path (allow all if not specified)", + }, + }, + Action: func(cctx *cli.Context) error { + minerApi, closer, err := rpc.GetProviderAPI(cctx) + if err != nil { + return err + } + + defer closer() + ctx := lcli.ReqContext(cctx) + + if cctx.NArg() != 1 { + return lcli.IncorrectNumArgs(cctx) + } + + p, err := homedir.Expand(cctx.Args().First()) + if err != nil { + return xerrors.Errorf("expanding path: %w", err) + } + + if cctx.Bool("init") { + if err := os.MkdirAll(p, 0755); err != nil { + if !os.IsExist(err) { + return err + } + } + + _, err := os.Stat(filepath.Join(p, metaFile)) + if !os.IsNotExist(err) { + if err == nil { + return xerrors.Errorf("path is already initialized") + } + return err + } + + var maxStor int64 + if cctx.IsSet("max-storage") { + maxStor, err = units.RAMInBytes(cctx.String("max-storage")) + if err != nil { + return xerrors.Errorf("parsing max-storage: %w", err) + } + } + + cfg := &storiface.LocalStorageMeta{ + ID: storiface.ID(uuid.New().String()), + Weight: cctx.Uint64("weight"), + CanSeal: cctx.Bool("seal"), + CanStore: cctx.Bool("store"), + MaxStorage: uint64(maxStor), + Groups: cctx.StringSlice("groups"), + AllowTo: cctx.StringSlice("allow-to"), + } + + if !(cfg.CanStore || cfg.CanSeal) { + return xerrors.Errorf("must specify at least one of --store or --seal") + } + + b, err := json.MarshalIndent(cfg, "", " ") + if err != nil { + return xerrors.Errorf("marshaling storage config: %w", err) + } + + if err := os.WriteFile(filepath.Join(p, metaFile), b, 0644); err != nil { + return xerrors.Errorf("persisting storage metadata (%s): %w", filepath.Join(p, metaFile), err) + } + } + + return minerApi.StorageAddLocal(ctx, p) + }, +}