lotus-provider: cli command for easy cluster rpc access

This commit is contained in:
Łukasz Magiera 2024-02-08 13:27:46 +01:00
parent e9c6f9037b
commit 7e97f14109
6 changed files with 385 additions and 8 deletions

View File

@ -13,6 +13,8 @@ type LotusProvider interface {
AllocatePieceToSector(ctx context.Context, maddr address.Address, piece PieceDealInfo, rawSize int64, source url.URL, header http.Header) (SectorOffset, error) //perm:write
StorageAddLocal(ctx context.Context, path string) error //perm:admin
// Trigger shutdown
Shutdown(context.Context) error //perm:admin
}

173
cmd/lotus-provider/cli.go Normal file
View File

@ -0,0 +1,173 @@
package main
import (
"bufio"
"encoding/base64"
"fmt"
"github.com/BurntSushi/toml"
"github.com/filecoin-project/go-jsonrpc/auth"
"github.com/filecoin-project/lotus/api"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/cmd/lotus-provider/deps"
"github.com/gbrlsnchs/jwt/v3"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
"net"
"os"
"time"
)
const providerEnvVar = "PROVIDER_API_INFO"
var cliCmd = &cli.Command{
Name: "cli",
Usage: "Execute cli commands",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "machine",
Usage: "machine host:port",
},
},
Before: func(cctx *cli.Context) error {
if os.Getenv(providerEnvVar) != "" {
// set already
return nil
}
db, err := deps.MakeDB(cctx)
if err != nil {
return err
}
ctx := lcli.ReqContext(cctx)
machine := cctx.String("machine")
if machine == "" {
// interactive picker
var machines []struct {
HostAndPort string `db:"host_and_port"`
LastContact time.Time `db:"last_contact"`
}
err := db.Select(ctx, &machines, "select host_and_port, last_contact from harmony_machines")
if err != nil {
return xerrors.Errorf("getting machine list: %w", err)
}
now := time.Now()
fmt.Println("Available machines:")
for i, m := range machines {
// A machine is healthy if contacted not longer than 2 minutes ago
healthStatus := "unhealthy"
if now.Sub(m.LastContact) <= 2*time.Minute {
healthStatus = "healthy"
}
fmt.Printf("%d. %s %s\n", i+1, m.HostAndPort, healthStatus)
}
fmt.Print("Select: ")
reader := bufio.NewReader(os.Stdin)
input, err := reader.ReadString('\n')
if err != nil {
return xerrors.Errorf("reading selection: %w", err)
}
var selection int
_, err = fmt.Sscanf(input, "%d", &selection)
if err != nil {
return xerrors.Errorf("parsing selection: %w", err)
}
if selection < 1 || selection > len(machines) {
return xerrors.New("invalid selection")
}
}
var apiKeys []string
{
var dbconfigs []struct {
Config string `db:"config"`
Title string `db:"title"`
}
err := db.Select(ctx, &dbconfigs, "select config from harmony_config")
if err != nil {
return xerrors.Errorf("getting configs: %w", err)
}
var seen = make(map[string]struct{})
for _, config := range dbconfigs {
var layer struct {
Apis struct {
StorageRPCSecret string
}
}
if _, err := toml.Decode(config.Config, &layer); err != nil {
return xerrors.Errorf("decode config layer %s: %w", config.Title, err)
}
if layer.Apis.StorageRPCSecret != "" {
if _, ok := seen[layer.Apis.StorageRPCSecret]; ok {
continue
}
seen[layer.Apis.StorageRPCSecret] = struct{}{}
apiKeys = append(apiKeys, layer.Apis.StorageRPCSecret)
}
}
}
if len(apiKeys) == 0 {
return xerrors.New("no api keys found in the database")
}
if len(apiKeys) > 1 {
return xerrors.Errorf("multiple api keys found in the database, not supported yet")
}
var apiToken []byte
{
type jwtPayload struct {
Allow []auth.Permission
}
p := jwtPayload{
Allow: api.AllPermissions,
}
sk, err := base64.StdEncoding.DecodeString(apiKeys[0])
if err != nil {
return xerrors.Errorf("decode secret: %w", err)
}
apiToken, err = jwt.Sign(&p, jwt.NewHS256(sk))
if err != nil {
return xerrors.Errorf("signing token: %w", err)
}
}
{
laddr, err := net.ResolveTCPAddr("tcp", machine)
if err != nil {
return xerrors.Errorf("net resolve: %w", err)
}
if len(laddr.IP) == 0 {
// set localhost
laddr.IP = net.IPv4(127, 0, 0, 1)
}
ma, err := manet.FromNetAddr(laddr)
if err != nil {
return xerrors.Errorf("net from addr (%v): %w", laddr, err)
}
fmt.Printf("Token: %s:%s\n", string(apiToken), ma)
}
return nil
},
Subcommands: []*cli.Command{},
}

View File

@ -101,6 +101,7 @@ type Deps struct {
Stor *paths.Remote
Si *paths.DBIndex
LocalStore *paths.Local
LocalPaths *paths.BasicLocalStorage
ListenAddr string
}
@ -193,7 +194,7 @@ func (deps *Deps) PopulateRemainingDeps(ctx context.Context, cctx *cli.Context,
}()
}
bls := &paths.BasicLocalStorage{
deps.LocalPaths = &paths.BasicLocalStorage{
PathToJSON: cctx.String("storage-json"),
}
@ -212,7 +213,7 @@ func (deps *Deps) PopulateRemainingDeps(ctx context.Context, cctx *cli.Context,
}
}
if deps.LocalStore == nil {
deps.LocalStore, err = paths.NewLocal(ctx, bls, deps.Si, []string{"http://" + deps.ListenAddr + "/remote"})
deps.LocalStore, err = paths.NewLocal(ctx, deps.LocalPaths, deps.Si, []string{"http://" + deps.ListenAddr + "/remote"})
if err != nil {
return err
}

View File

@ -42,18 +42,13 @@ func main() {
local := []*cli.Command{
//initCmd,
cliCmd,
runCmd,
stopCmd,
configCmd,
testCmd,
webCmd,
pipelineCmd,
//backupCmd,
//lcli.WithCategory("chain", actorCmd),
//lcli.WithCategory("storage", sectorsCmd),
//lcli.WithCategory("storage", provingCmd),
//lcli.WithCategory("storage", storageCmd),
//lcli.WithCategory("storage", sealingCmd),
}
jaeger := tracing.SetupJaegerTracing("lotus")

View File

@ -5,6 +5,12 @@ import (
"context"
"encoding/base64"
"encoding/json"
"github.com/filecoin-project/lotus/api/client"
cliutil "github.com/filecoin-project/lotus/cli/util"
"github.com/filecoin-project/lotus/node/repo"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
"github.com/mitchellh/go-homedir"
"github.com/urfave/cli/v2"
"net"
"net/http"
"net/url"
@ -90,6 +96,25 @@ func (p *ProviderAPI) Shutdown(context.Context) error {
return nil
}
func (p *ProviderAPI) StorageAddLocal(ctx context.Context, path string) error {
path, err := homedir.Expand(path)
if err != nil {
return xerrors.Errorf("expanding local path: %w", err)
}
if err := p.LocalStore.OpenPath(ctx, path); err != nil {
return xerrors.Errorf("opening local path: %w", err)
}
if err := p.LocalPaths.SetStorage(func(sc *storiface.StorageConfig) {
sc.StoragePaths = append(sc.StoragePaths, storiface.LocalPath{Path: path})
}); err != nil {
return xerrors.Errorf("get storage config: %w", err)
}
return nil
}
func ListenAndServe(ctx context.Context, dependencies *deps.Deps, shutdownChan chan struct{}) error {
fh := &paths.FetchHandler{Local: dependencies.LocalStore, PfHandler: &paths.DefaultPartialFileHandler{}}
remoteHandler := func(w http.ResponseWriter, r *http.Request) {
@ -158,3 +183,26 @@ func ListenAndServe(ctx context.Context, dependencies *deps.Deps, shutdownChan c
}
return eg.Wait()
}
func GetProviderAPI(ctx *cli.Context) (api.LotusProvider, jsonrpc.ClientCloser, error) {
addr, headers, err := cliutil.GetRawAPI(ctx, repo.Provider, "v0")
if err != nil {
return nil, nil, err
}
u, err := url.Parse(addr)
if err != nil {
return nil, nil, xerrors.Errorf("parsing miner api URL: %w", err)
}
switch u.Scheme {
case "ws":
u.Scheme = "http"
case "wss":
u.Scheme = "https"
}
addr = u.String()
return client.NewProviderRpc(ctx.Context, addr, headers)
}

View File

@ -0,0 +1,158 @@
package main
import (
"encoding/json"
"github.com/docker/go-units"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/cmd/lotus-provider/rpc"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
"github.com/google/uuid"
"github.com/mitchellh/go-homedir"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
"os"
"path/filepath"
)
const metaFile = "sectorstore.json"
var storageCmd = &cli.Command{
Name: "storage",
Usage: "manage sector storage",
Description: `Sectors can be stored across many filesystem paths. These
commands provide ways to manage the storage the miner will used to store sectors
long term for proving (references as 'store') as well as how sectors will be
stored while moving through the sealing pipeline (references as 'seal').`,
Subcommands: []*cli.Command{
storageAttachCmd,
/*storageDetachCmd,
storageRedeclareCmd,
storageListCmd,
storageFindCmd,
storageCleanupCmd,
storageLocks,*/
},
}
var storageAttachCmd = &cli.Command{
Name: "attach",
Usage: "attach local storage path",
ArgsUsage: "[path]",
Description: `Storage can be attached to the miner using this command. The storage volume
list is stored local to the miner in storage.json set in lotus-provider run. We do not
recommend manually modifying this value without further understanding of the
storage system.
Each storage volume contains a configuration file which describes the
capabilities of the volume. When the '--init' flag is provided, this file will
be created using the additional flags.
Weight
A high weight value means data will be more likely to be stored in this path
Seal
Data for the sealing process will be stored here
Store
Finalized sectors that will be moved here for long term storage and be proven
over time
`,
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "init",
Usage: "initialize the path first",
},
&cli.Uint64Flag{
Name: "weight",
Usage: "(for init) path weight",
Value: 10,
},
&cli.BoolFlag{
Name: "seal",
Usage: "(for init) use path for sealing",
},
&cli.BoolFlag{
Name: "store",
Usage: "(for init) use path for long-term storage",
},
&cli.StringFlag{
Name: "max-storage",
Usage: "(for init) limit storage space for sectors (expensive for very large paths!)",
},
&cli.StringSliceFlag{
Name: "groups",
Usage: "path group names",
},
&cli.StringSliceFlag{
Name: "allow-to",
Usage: "path groups allowed to pull data from this path (allow all if not specified)",
},
},
Action: func(cctx *cli.Context) error {
minerApi, closer, err := rpc.GetProviderAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
if cctx.NArg() != 1 {
return lcli.IncorrectNumArgs(cctx)
}
p, err := homedir.Expand(cctx.Args().First())
if err != nil {
return xerrors.Errorf("expanding path: %w", err)
}
if cctx.Bool("init") {
if err := os.MkdirAll(p, 0755); err != nil {
if !os.IsExist(err) {
return err
}
}
_, err := os.Stat(filepath.Join(p, metaFile))
if !os.IsNotExist(err) {
if err == nil {
return xerrors.Errorf("path is already initialized")
}
return err
}
var maxStor int64
if cctx.IsSet("max-storage") {
maxStor, err = units.RAMInBytes(cctx.String("max-storage"))
if err != nil {
return xerrors.Errorf("parsing max-storage: %w", err)
}
}
cfg := &storiface.LocalStorageMeta{
ID: storiface.ID(uuid.New().String()),
Weight: cctx.Uint64("weight"),
CanSeal: cctx.Bool("seal"),
CanStore: cctx.Bool("store"),
MaxStorage: uint64(maxStor),
Groups: cctx.StringSlice("groups"),
AllowTo: cctx.StringSlice("allow-to"),
}
if !(cfg.CanStore || cfg.CanSeal) {
return xerrors.Errorf("must specify at least one of --store or --seal")
}
b, err := json.MarshalIndent(cfg, "", " ")
if err != nil {
return xerrors.Errorf("marshaling storage config: %w", err)
}
if err := os.WriteFile(filepath.Join(p, metaFile), b, 0644); err != nil {
return xerrors.Errorf("persisting storage metadata (%s): %w", filepath.Join(p, metaFile), err)
}
}
return minerApi.StorageAddLocal(ctx, p)
},
}