From d7ea5561dcf4b189537c77d80f1b01eb896721b1 Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Mon, 4 Dec 2023 22:30:40 -0600 Subject: [PATCH] maint: break out lp deps for easy testing --- cmd/lotus-provider/config.go | 47 +--- cmd/lotus-provider/deps/deps.go | 270 +++++++++++++++++++++++ cmd/lotus-provider/main.go | 9 +- cmd/lotus-provider/proving.go | 23 +- cmd/lotus-provider/rpc/rpc.go | 91 +++++++- cmd/lotus-provider/run.go | 343 +----------------------------- cmd/lotus-provider/tasks/tasks.go | 58 +++++ 7 files changed, 450 insertions(+), 391 deletions(-) create mode 100644 cmd/lotus-provider/deps/deps.go create mode 100644 cmd/lotus-provider/tasks/tasks.go diff --git a/cmd/lotus-provider/config.go b/cmd/lotus-provider/config.go index 5bd681429..49eed327a 100644 --- a/cmd/lotus-provider/config.go +++ b/cmd/lotus-provider/config.go @@ -2,7 +2,6 @@ package main import ( "context" - "database/sql" "errors" "fmt" "io" @@ -14,7 +13,7 @@ import ( "github.com/urfave/cli/v2" "golang.org/x/xerrors" - "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/cmd/lotus-provider/deps" "github.com/filecoin-project/lotus/node/config" ) @@ -77,7 +76,7 @@ var configSetCmd = &cli.Command{ Action: func(cctx *cli.Context) error { args := cctx.Args() - db, err := makeDB(cctx) + db, err := deps.MakeDB(cctx) if err != nil { return err } @@ -131,7 +130,7 @@ var configGetCmd = &cli.Command{ if args.Len() != 1 { return fmt.Errorf("want 1 layer arg, got %d", args.Len()) } - db, err := makeDB(cctx) + db, err := deps.MakeDB(cctx) if err != nil { return err } @@ -153,7 +152,7 @@ var configListCmd = &cli.Command{ Usage: "List config layers you can get.", Flags: []cli.Flag{}, Action: func(cctx *cli.Context) error { - db, err := makeDB(cctx) + db, err := deps.MakeDB(cctx) if err != nil { return err } @@ -180,7 +179,7 @@ var configRmCmd = &cli.Command{ if args.Len() != 1 { return errors.New("must have exactly 1 arg for the layer name") } - db, err := makeDB(cctx) + db, err := deps.MakeDB(cctx) if err != nil { return err } @@ -209,11 +208,11 @@ var configViewCmd = &cli.Command{ }, }, Action: func(cctx *cli.Context) error { - db, err := makeDB(cctx) + db, err := deps.MakeDB(cctx) if err != nil { return err } - lp, err := getConfig(cctx, db) + lp, err := deps.GetConfig(cctx, db) if err != nil { return err } @@ -225,35 +224,3 @@ var configViewCmd = &cli.Command{ return nil }, } - -func getConfig(cctx *cli.Context, db *harmonydb.DB) (*config.LotusProviderConfig, error) { - lp := config.DefaultLotusProvider() - have := []string{} - layers := cctx.StringSlice("layers") - for _, layer := range layers { - text := "" - err := db.QueryRow(cctx.Context, `SELECT config FROM harmony_config WHERE title=$1`, layer).Scan(&text) - if err != nil { - if strings.Contains(err.Error(), sql.ErrNoRows.Error()) { - return nil, fmt.Errorf("missing layer '%s' ", layer) - } - if layer == "base" { - return nil, errors.New(`lotus-provider defaults to a layer named 'base'. - Either use 'migrate' command or edit a base.toml and upload it with: lotus-provider config set base.toml`) - } - return nil, fmt.Errorf("could not read layer '%s': %w", layer, err) - } - meta, err := toml.Decode(text, &lp) - if err != nil { - return nil, fmt.Errorf("could not read layer, bad toml %s: %w", layer, err) - } - for _, k := range meta.Keys() { - have = append(have, strings.Join(k, " ")) - } - } - _ = have // FUTURE: verify that required fields are here. - // If config includes 3rd-party config, consider JSONSchema as a way that - // 3rd-parties can dynamically include config requirements and we can - // validate the config. Because of layering, we must validate @ startup. - return lp, nil -} diff --git a/cmd/lotus-provider/deps/deps.go b/cmd/lotus-provider/deps/deps.go new file mode 100644 index 000000000..f82a1ecd6 --- /dev/null +++ b/cmd/lotus-provider/deps/deps.go @@ -0,0 +1,270 @@ +// Package deps provides the dependencies for the lotus provider node. +package deps + +import ( + "context" + "database/sql" + "encoding/base64" + "errors" + "fmt" + "net" + "net/http" + "strings" + + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + logging "github.com/ipfs/go-log/v2" + + "github.com/BurntSushi/toml" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-jsonrpc/auth" + "github.com/filecoin-project/go-statestore" + "github.com/filecoin-project/lotus/api" + cliutil "github.com/filecoin-project/lotus/cli/util" + "github.com/filecoin-project/lotus/journal" + "github.com/filecoin-project/lotus/journal/alerting" + "github.com/filecoin-project/lotus/journal/fsjournal" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/node/repo" + "github.com/filecoin-project/lotus/provider" + "github.com/filecoin-project/lotus/storage/ctladdr" + "github.com/filecoin-project/lotus/storage/paths" + "github.com/filecoin-project/lotus/storage/sealer" + "github.com/filecoin-project/lotus/storage/sealer/ffiwrapper" + "github.com/filecoin-project/lotus/storage/sealer/storiface" + "github.com/gbrlsnchs/jwt/v3" + "github.com/urfave/cli/v2" + "golang.org/x/xerrors" +) + +var log = logging.Logger("lotus-provider/deps") + +func MakeDB(cctx *cli.Context) (*harmonydb.DB, error) { + dbConfig := config.HarmonyDB{ + Username: cctx.String("db-user"), + Password: cctx.String("db-password"), + Hosts: strings.Split(cctx.String("db-host"), ","), + Database: cctx.String("db-name"), + Port: cctx.String("db-port"), + } + return harmonydb.NewFromConfig(dbConfig) +} + +type JwtPayload struct { + Allow []auth.Permission +} + +func StorageAuth(apiKey string) (sealer.StorageAuth, error) { + if apiKey == "" { + return nil, xerrors.Errorf("no api key provided") + } + + rawKey, err := base64.StdEncoding.DecodeString(apiKey) + if err != nil { + return nil, xerrors.Errorf("decoding api key: %w", err) + } + + key := jwt.NewHS256(rawKey) + + p := JwtPayload{ + Allow: []auth.Permission{"admin"}, + } + + token, err := jwt.Sign(&p, key) + if err != nil { + return nil, err + } + + headers := http.Header{} + headers.Add("Authorization", "Bearer "+string(token)) + return sealer.StorageAuth(headers), nil +} + +type Deps struct { + Cfg *config.LotusProviderConfig + DB *harmonydb.DB + Full api.FullNode + Verif storiface.Verifier + LW *sealer.LocalWorker + As *ctladdr.AddressSelector + Maddrs []dtypes.MinerAddress + Stor *paths.Remote + Si *paths.DBIndex + LocalStore *paths.Local + ListenAddr string +} + +const ( + FlagRepoPath = "repo-path" +) + +func (deps *Deps) PopulateRemainingDeps(ctx context.Context, cctx *cli.Context, makeRepo bool) error { + + var err error + if makeRepo { + // Open repo + repoPath := cctx.String(FlagRepoPath) + fmt.Println("repopath", repoPath) + r, err := repo.NewFS(repoPath) + if err != nil { + return err + } + + ok, err := r.Exists() + if err != nil { + return err + } + if !ok { + if err := r.Init(repo.Provider); err != nil { + return err + } + } + } + + if deps.Cfg == nil { + deps.DB, err = MakeDB(cctx) + if err != nil { + return err + } + } + + if deps.Cfg == nil { + // The config feeds into task runners & their helpers + deps.Cfg, err = GetConfig(cctx, deps.DB) + if err != nil { + return err + } + } + + log.Debugw("config", "config", deps.Cfg) + + if deps.Verif == nil { + deps.Verif = ffiwrapper.ProofVerifier + } + + if deps.As == nil { + deps.As, err = provider.AddressSelector(&deps.Cfg.Addresses)() + if err != nil { + return err + } + } + + if deps.Si == nil { + de, err := journal.ParseDisabledEvents(deps.Cfg.Journal.DisabledEvents) + if err != nil { + return err + } + j, err := fsjournal.OpenFSJournalPath(cctx.String("journal"), de) + if err != nil { + return err + } + defer func() { + <-ctx.Done() + _ = j.Close() + }() + + al := alerting.NewAlertingSystem(j) + deps.Si = paths.NewDBIndex(al, deps.DB) + } + + if deps.Full == nil { + var fullCloser func() + deps.Full, fullCloser, err = cliutil.GetFullNodeAPIV1LotusProvider(cctx, deps.Cfg.Apis.ChainApiInfo) + if err != nil { + return err + } + + go func() { + <-ctx.Done() + fullCloser() + }() + } + + bls := &paths.BasicLocalStorage{ + PathToJSON: cctx.String("storage-json"), + } + + if deps.ListenAddr == "" { + listenAddr := cctx.String("listen") + const unspecifiedAddress = "0.0.0.0" + addressSlice := strings.Split(listenAddr, ":") + if ip := net.ParseIP(addressSlice[0]); ip != nil { + if ip.String() == unspecifiedAddress { + rip, err := deps.DB.GetRoutableIP() + if err != nil { + return err + } + deps.ListenAddr = rip + ":" + addressSlice[1] + } + } + } + if deps.LocalStore == nil { + deps.LocalStore, err = paths.NewLocal(ctx, bls, deps.Si, []string{"http://" + deps.ListenAddr + "/remote"}) + if err != nil { + return err + } + } + + sa, err := StorageAuth(deps.Cfg.Apis.StorageRPCSecret) + if err != nil { + return xerrors.Errorf(`'%w' while parsing the config toml's + [Apis] + StorageRPCSecret=%v +Get it with: jq .PrivateKey ~/.lotus-miner/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU`, err, deps.Cfg.Apis.StorageRPCSecret) + } + if deps.Stor == nil { + deps.Stor = paths.NewRemote(deps.LocalStore, deps.Si, http.Header(sa), 10, &paths.DefaultPartialFileHandler{}) + } + if deps.LW == nil { + wstates := statestore.New(dssync.MutexWrap(ds.NewMapDatastore())) + + // todo localWorker isn't the abstraction layer we want to use here, we probably want to go straight to ffiwrapper + // maybe with a lotus-provider specific abstraction. LocalWorker does persistent call tracking which we probably + // don't need (ehh.. maybe we do, the async callback system may actually work decently well with harmonytask) + deps.LW = sealer.NewLocalWorker(sealer.WorkerConfig{}, deps.Stor, deps.LocalStore, deps.Si, nil, wstates) + } + if len(deps.Maddrs) == 0 { + for _, s := range deps.Cfg.Addresses.MinerAddresses { + addr, err := address.NewFromString(s) + if err != nil { + return err + } + deps.Maddrs = append(deps.Maddrs, dtypes.MinerAddress(addr)) + } + } + return nil +} + +func GetConfig(cctx *cli.Context, db *harmonydb.DB) (*config.LotusProviderConfig, error) { + lp := config.DefaultLotusProvider() + have := []string{} + layers := cctx.StringSlice("layers") + for _, layer := range layers { + text := "" + err := db.QueryRow(cctx.Context, `SELECT config FROM harmony_config WHERE title=$1`, layer).Scan(&text) + if err != nil { + if strings.Contains(err.Error(), sql.ErrNoRows.Error()) { + return nil, fmt.Errorf("missing layer '%s' ", layer) + } + if layer == "base" { + return nil, errors.New(`lotus-provider defaults to a layer named 'base'. + Either use 'migrate' command or edit a base.toml and upload it with: lotus-provider config set base.toml`) + } + return nil, fmt.Errorf("could not read layer '%s': %w", layer, err) + } + meta, err := toml.Decode(text, &lp) + if err != nil { + return nil, fmt.Errorf("could not read layer, bad toml %s: %w", layer, err) + } + for _, k := range meta.Keys() { + have = append(have, strings.Join(k, " ")) + } + } + _ = have // FUTURE: verify that required fields are here. + // If config includes 3rd-party config, consider JSONSchema as a way that + // 3rd-parties can dynamically include config requirements and we can + // validate the config. Because of layering, we must validate @ startup. + return lp, nil +} diff --git a/cmd/lotus-provider/main.go b/cmd/lotus-provider/main.go index 19cc6f5f9..48ff70122 100644 --- a/cmd/lotus-provider/main.go +++ b/cmd/lotus-provider/main.go @@ -15,6 +15,7 @@ import ( "github.com/filecoin-project/lotus/build" lcli "github.com/filecoin-project/lotus/cli" cliutil "github.com/filecoin-project/lotus/cli/util" + "github.com/filecoin-project/lotus/cmd/lotus-provider/deps" "github.com/filecoin-project/lotus/lib/lotuslog" "github.com/filecoin-project/lotus/lib/tracing" "github.com/filecoin-project/lotus/node/repo" @@ -131,7 +132,7 @@ func main() { Value: "base", }, &cli.StringFlag{ - Name: FlagRepoPath, + Name: deps.FlagRepoPath, EnvVars: []string{"LOTUS_REPO_PATH"}, Value: "~/.lotusprovider", }, @@ -144,7 +145,7 @@ func main() { After: func(c *cli.Context) error { if r := recover(); r != nil { // Generate report in LOTUS_PATH and re-raise panic - build.GeneratePanicReport(c.String("panic-reports"), c.String(FlagRepoPath), c.App.Name) + build.GeneratePanicReport(c.String("panic-reports"), c.String(deps.FlagRepoPath), c.App.Name) panic(r) } return nil @@ -154,7 +155,3 @@ func main() { app.Metadata["repoType"] = repo.Provider lcli.RunApp(app) } - -const ( - FlagRepoPath = "repo-path" -) diff --git a/cmd/lotus-provider/proving.go b/cmd/lotus-provider/proving.go index 577b5b5f9..001108712 100644 --- a/cmd/lotus-provider/proving.go +++ b/cmd/lotus-provider/proving.go @@ -15,6 +15,7 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/dline" + "github.com/filecoin-project/lotus/cmd/lotus-provider/deps" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/provider" ) @@ -62,18 +63,18 @@ var wdPostTaskCmd = &cli.Command{ Action: func(cctx *cli.Context) error { ctx := context.Background() - deps, err := getDeps(ctx, cctx) + deps, err := deps.GetDeps(ctx, cctx) if err != nil { return err } - ts, err := deps.full.ChainHead(ctx) + ts, err := deps.Full.ChainHead(ctx) if err != nil { return xerrors.Errorf("cannot get chainhead %w", err) } ht := ts.Height() - addr, err := address.NewFromString(deps.cfg.Addresses.MinerAddresses[0]) + addr, err := address.NewFromString(deps.Cfg.Addresses.MinerAddresses[0]) if err != nil { return xerrors.Errorf("cannot get miner address %w", err) } @@ -82,7 +83,7 @@ var wdPostTaskCmd = &cli.Command{ return xerrors.Errorf("cannot get miner id %w", err) } var id int64 - _, err = deps.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { + _, err = deps.DB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { err = tx.QueryRow(`INSERT INTO harmony_task (name, posted_time, added_by) VALUES ('WdPost', CURRENT_TIMESTAMP, 123) RETURNING id`).Scan(&id) if err != nil { log.Error("inserting harmony_task: ", err) @@ -108,7 +109,7 @@ var wdPostTaskCmd = &cli.Command{ var result sql.NullString for { time.Sleep(time.Second) - err = deps.db.QueryRow(ctx, `SELECT result FROM harmony_test WHERE task_id=$1`, id).Scan(&result) + err = deps.DB.QueryRow(ctx, `SELECT result FROM harmony_test WHERE task_id=$1`, id).Scan(&result) if err != nil { return xerrors.Errorf("reading result from harmony_test: %w", err) } @@ -157,29 +158,29 @@ It will not send any messages to the chain. Since it can compute any deadline, o Action: func(cctx *cli.Context) error { ctx := context.Background() - deps, err := getDeps(ctx, cctx) + deps, err := deps.GetDeps(ctx, cctx) if err != nil { return err } - wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, deps.cfg.Fees, deps.cfg.Proving, deps.full, deps.verif, deps.lw, nil, - deps.as, deps.maddrs, deps.db, deps.stor, deps.si, deps.cfg.Subsystems.WindowPostMaxTasks) + wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, deps.Cfg.Fees, deps.Cfg.Proving, deps.Full, deps.Verif, deps.LW, nil, + deps.As, deps.Maddrs, deps.DB, deps.Stor, deps.Si, deps.Cfg.Subsystems.WindowPostMaxTasks) if err != nil { return err } _, _ = wdPoStSubmitTask, derlareRecoverTask - if len(deps.maddrs) == 0 { + if len(deps.Maddrs) == 0 { return errors.New("no miners to compute WindowPoSt for") } - head, err := deps.full.ChainHead(ctx) + head, err := deps.Full.ChainHead(ctx) if err != nil { return xerrors.Errorf("failed to get chain head: %w", err) } di := dline.NewInfo(head.Height(), cctx.Uint64("deadline"), 0, 0, 0, 10 /*challenge window*/, 0, 0) - for _, maddr := range deps.maddrs { + for _, maddr := range deps.Maddrs { out, err := wdPostTask.DoPartition(ctx, head, address.Address(maddr), di, cctx.Uint64("partition")) if err != nil { fmt.Println("Error computing WindowPoSt for miner", maddr, err) diff --git a/cmd/lotus-provider/rpc/rpc.go b/cmd/lotus-provider/rpc/rpc.go index 3ae3e2a1f..0fe1039d9 100644 --- a/cmd/lotus-provider/rpc/rpc.go +++ b/cmd/lotus-provider/rpc/rpc.go @@ -1,21 +1,32 @@ +// Package rpc provides all direct access to this node. package rpc import ( "context" + "encoding/base64" + "encoding/json" + "net" "net/http" + "time" + "github.com/gbrlsnchs/jwt/v3" "github.com/gorilla/mux" + logging "github.com/ipfs/go-log/v2" + "go.opencensus.io/tag" + "golang.org/x/xerrors" - // logging "github.com/ipfs/go-log/v2" "github.com/filecoin-project/go-jsonrpc" "github.com/filecoin-project/go-jsonrpc/auth" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/cmd/lotus-provider/deps" "github.com/filecoin-project/lotus/lib/rpcenc" + "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/metrics/proxy" + "github.com/filecoin-project/lotus/storage/paths" ) -//var log = logging.Logger("lp/rpc") +var log = logging.Logger("lp/rpc") func LotusProviderHandler( authv func(ctx context.Context, token string) ([]auth.Permission, error), @@ -49,3 +60,79 @@ func LotusProviderHandler( } return ah } + +type ProviderAPI struct { + *deps.Deps + ShutdownChan chan struct{} +} + +func (p *ProviderAPI) Version(context.Context) (api.Version, error) { + return api.ProviderAPIVersion0, nil +} + +// Trigger shutdown +func (p *ProviderAPI) Shutdown(context.Context) error { + close(p.ShutdownChan) + return nil +} + +func ListenAndServe(ctx context.Context, dependencies *deps.Deps, shutdownChan chan struct{}) error { + + fh := &paths.FetchHandler{Local: dependencies.LocalStore, PfHandler: &paths.DefaultPartialFileHandler{}} + remoteHandler := func(w http.ResponseWriter, r *http.Request) { + if !auth.HasPerm(r.Context(), nil, api.PermAdmin) { + w.WriteHeader(401) + _ = json.NewEncoder(w).Encode(struct{ Error string }{"unauthorized: missing admin permission"}) + return + } + + fh.ServeHTTP(w, r) + } + // local APIs + { + // debugging + mux := mux.NewRouter() + mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof + mux.PathPrefix("/remote").HandlerFunc(remoteHandler) + } + + var authVerify func(context.Context, string) ([]auth.Permission, error) + { + privateKey, err := base64.StdEncoding.DecodeString(dependencies.Cfg.Apis.StorageRPCSecret) + if err != nil { + return xerrors.Errorf("decoding storage rpc secret: %w", err) + } + authVerify = func(ctx context.Context, token string) ([]auth.Permission, error) { + var payload deps.JwtPayload + if _, err := jwt.Verify([]byte(token), jwt.NewHS256(privateKey), &payload); err != nil { + return nil, xerrors.Errorf("JWT Verification failed: %w", err) + } + + return payload.Allow, nil + } + } + // Serve the RPC. + srv := &http.Server{ + Handler: LotusProviderHandler( + authVerify, + remoteHandler, + &ProviderAPI{dependencies, shutdownChan}, + true), + ReadHeaderTimeout: time.Minute * 3, + BaseContext: func(listener net.Listener) context.Context { + ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-worker")) + return ctx + }, + } + + go func() { + <-ctx.Done() + log.Warn("Shutting down...") + if err := srv.Shutdown(context.TODO()); err != nil { + log.Errorf("shutting down RPC server failed: %s", err) + } + log.Warn("Graceful shutdown successful") + }() + + return nil +} diff --git a/cmd/lotus-provider/run.go b/cmd/lotus-provider/run.go index bf19ee537..87d073963 100644 --- a/cmd/lotus-provider/run.go +++ b/cmd/lotus-provider/run.go @@ -2,54 +2,23 @@ package main import ( "context" - "encoding/base64" - "encoding/json" "fmt" - "net" - "net/http" "os" - "strings" "time" - "github.com/gbrlsnchs/jwt/v3" - "github.com/gorilla/mux" - ds "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" "github.com/pkg/errors" - "github.com/samber/lo" "github.com/urfave/cli/v2" "go.opencensus.io/stats" "go.opencensus.io/tag" - "golang.org/x/xerrors" - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-jsonrpc/auth" - "github.com/filecoin-project/go-statestore" - - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" lcli "github.com/filecoin-project/lotus/cli" - cliutil "github.com/filecoin-project/lotus/cli/util" + "github.com/filecoin-project/lotus/cmd/lotus-provider/deps" "github.com/filecoin-project/lotus/cmd/lotus-provider/rpc" - "github.com/filecoin-project/lotus/journal" - "github.com/filecoin-project/lotus/journal/alerting" - "github.com/filecoin-project/lotus/journal/fsjournal" - "github.com/filecoin-project/lotus/lib/harmony/harmonydb" - "github.com/filecoin-project/lotus/lib/harmony/harmonytask" + "github.com/filecoin-project/lotus/cmd/lotus-provider/tasks" "github.com/filecoin-project/lotus/lib/ulimit" "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node" - "github.com/filecoin-project/lotus/node/config" - "github.com/filecoin-project/lotus/node/modules/dtypes" - "github.com/filecoin-project/lotus/node/repo" - "github.com/filecoin-project/lotus/provider" - "github.com/filecoin-project/lotus/provider/lpmessage" - "github.com/filecoin-project/lotus/provider/lpwinning" - "github.com/filecoin-project/lotus/storage/ctladdr" - "github.com/filecoin-project/lotus/storage/paths" - "github.com/filecoin-project/lotus/storage/sealer" - "github.com/filecoin-project/lotus/storage/sealer/ffiwrapper" - "github.com/filecoin-project/lotus/storage/sealer/storiface" ) type stackTracer interface { @@ -144,112 +113,22 @@ var runCmd = &cli.Command{ } } - deps, err := getDeps(ctx, cctx) - - if err != nil { - return err - } - cfg, db, full, verif, lw, as, maddrs, stor, si, localStore := deps.cfg, deps.db, deps.full, deps.verif, deps.lw, deps.as, deps.maddrs, deps.stor, deps.si, deps.localStore - - var activeTasks []harmonytask.TaskInterface - - sender, sendTask := lpmessage.NewSender(full, full, db) - activeTasks = append(activeTasks, sendTask) - - /////////////////////////////////////////////////////////////////////// - ///// Task Selection - /////////////////////////////////////////////////////////////////////// - { - - if cfg.Subsystems.EnableWindowPost { - wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw, sender, - as, maddrs, db, stor, si, cfg.Subsystems.WindowPostMaxTasks) - if err != nil { - return err - } - activeTasks = append(activeTasks, wdPostTask, wdPoStSubmitTask, derlareRecoverTask) - } - - if cfg.Subsystems.EnableWinningPost { - winPoStTask := lpwinning.NewWinPostTask(cfg.Subsystems.WinningPostMaxTasks, db, lw, verif, full, maddrs) - activeTasks = append(activeTasks, winPoStTask) - } - } - log.Infow("This lotus_provider instance handles", - "miner_addresses", maddrs, - "tasks", lo.Map(activeTasks, func(t harmonytask.TaskInterface, _ int) string { return t.TypeDetails().Name })) - - taskEngine, err := harmonytask.New(db, activeTasks, deps.listenAddr) + var dependencies *deps.Deps + err = dependencies.PopulateRemainingDeps(ctx, cctx, true) if err != nil { return err } + taskEngine, err := tasks.StartTasks(ctx, dependencies) + if err != nil { + return nil + } defer taskEngine.GracefullyTerminate(time.Hour) - fh := &paths.FetchHandler{Local: localStore, PfHandler: &paths.DefaultPartialFileHandler{}} - remoteHandler := func(w http.ResponseWriter, r *http.Request) { - if !auth.HasPerm(r.Context(), nil, api.PermAdmin) { - w.WriteHeader(401) - _ = json.NewEncoder(w).Encode(struct{ Error string }{"unauthorized: missing admin permission"}) - return - } - - fh.ServeHTTP(w, r) + err = rpc.ListenAndServe(ctx, dependencies, shutdownChan) // Monitor for shutdown. + if err != nil { + return err } - // local APIs - { - // debugging - mux := mux.NewRouter() - mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof - mux.PathPrefix("/remote").HandlerFunc(remoteHandler) - - /*ah := &auth.Handler{ - Verify: authv, - Next: mux.ServeHTTP, - }*/ // todo - - } - - var authVerify func(context.Context, string) ([]auth.Permission, error) - { - privateKey, err := base64.StdEncoding.DecodeString(deps.cfg.Apis.StorageRPCSecret) - if err != nil { - return xerrors.Errorf("decoding storage rpc secret: %w", err) - } - authVerify = func(ctx context.Context, token string) ([]auth.Permission, error) { - var payload jwtPayload - if _, err := jwt.Verify([]byte(token), jwt.NewHS256(privateKey), &payload); err != nil { - return nil, xerrors.Errorf("JWT Verification failed: %w", err) - } - - return payload.Allow, nil - } - } - // Serve the RPC. - srv := &http.Server{ - Handler: rpc.LotusProviderHandler( - authVerify, - remoteHandler, - &ProviderAPI{deps, shutdownChan}, - true), - ReadHeaderTimeout: time.Minute * 3, - BaseContext: func(listener net.Listener) context.Context { - ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-worker")) - return ctx - }, - } - - go func() { - <-ctx.Done() - log.Warn("Shutting down...") - if err := srv.Shutdown(context.TODO()); err != nil { - log.Errorf("shutting down RPC server failed: %s", err) - } - log.Warn("Graceful shutdown successful") - }() - - // Monitor for shutdown. - // TODO provide a graceful shutdown API on shutdownChan finishCh := node.MonitorShutdown(shutdownChan) //node.ShutdownHandler{Component: "rpc server", StopFunc: rpcStopper}, //node.ShutdownHandler{Component: "provider", StopFunc: stop}, @@ -257,203 +136,3 @@ var runCmd = &cli.Command{ return nil }, } - -func makeDB(cctx *cli.Context) (*harmonydb.DB, error) { - dbConfig := config.HarmonyDB{ - Username: cctx.String("db-user"), - Password: cctx.String("db-password"), - Hosts: strings.Split(cctx.String("db-host"), ","), - Database: cctx.String("db-name"), - Port: cctx.String("db-port"), - } - return harmonydb.NewFromConfig(dbConfig) -} - -type jwtPayload struct { - Allow []auth.Permission -} - -func StorageAuth(apiKey string) (sealer.StorageAuth, error) { - if apiKey == "" { - return nil, xerrors.Errorf("no api key provided") - } - - rawKey, err := base64.StdEncoding.DecodeString(apiKey) - if err != nil { - return nil, xerrors.Errorf("decoding api key: %w", err) - } - - key := jwt.NewHS256(rawKey) - - p := jwtPayload{ - Allow: []auth.Permission{"admin"}, - } - - token, err := jwt.Sign(&p, key) - if err != nil { - return nil, err - } - - headers := http.Header{} - headers.Add("Authorization", "Bearer "+string(token)) - return sealer.StorageAuth(headers), nil -} - -type Deps struct { - cfg *config.LotusProviderConfig - db *harmonydb.DB - full api.FullNode - verif storiface.Verifier - lw *sealer.LocalWorker - as *ctladdr.AddressSelector - maddrs []dtypes.MinerAddress - stor *paths.Remote - si *paths.DBIndex - localStore *paths.Local - listenAddr string -} - -func getDeps(ctx context.Context, cctx *cli.Context) (*Deps, error) { - // Open repo - - repoPath := cctx.String(FlagRepoPath) - fmt.Println("repopath", repoPath) - r, err := repo.NewFS(repoPath) - if err != nil { - return nil, err - } - - ok, err := r.Exists() - if err != nil { - return nil, err - } - if !ok { - if err := r.Init(repo.Provider); err != nil { - return nil, err - } - } - - db, err := makeDB(cctx) - if err != nil { - return nil, err - } - - /////////////////////////////////////////////////////////////////////// - ///// Dependency Setup - /////////////////////////////////////////////////////////////////////// - - // The config feeds into task runners & their helpers - cfg, err := getConfig(cctx, db) - if err != nil { - return nil, err - } - - log.Debugw("config", "config", cfg) - - var verif storiface.Verifier = ffiwrapper.ProofVerifier - - as, err := provider.AddressSelector(&cfg.Addresses)() - if err != nil { - return nil, err - } - - de, err := journal.ParseDisabledEvents(cfg.Journal.DisabledEvents) - if err != nil { - return nil, err - } - j, err := fsjournal.OpenFSJournalPath(cctx.String("journal"), de) - if err != nil { - return nil, err - } - - full, fullCloser, err := cliutil.GetFullNodeAPIV1LotusProvider(cctx, cfg.Apis.ChainApiInfo) - if err != nil { - return nil, err - } - - go func() { - select { - case <-ctx.Done(): - fullCloser() - _ = j.Close() - } - }() - sa, err := StorageAuth(cfg.Apis.StorageRPCSecret) - if err != nil { - return nil, xerrors.Errorf(`'%w' while parsing the config toml's - [Apis] - StorageRPCSecret=%v -Get it with: jq .PrivateKey ~/.lotus-miner/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU`, err, cfg.Apis.StorageRPCSecret) - } - - al := alerting.NewAlertingSystem(j) - si := paths.NewDBIndex(al, db) - bls := &paths.BasicLocalStorage{ - PathToJSON: cctx.String("storage-json"), - } - - listenAddr := cctx.String("listen") - const unspecifiedAddress = "0.0.0.0" - addressSlice := strings.Split(listenAddr, ":") - if ip := net.ParseIP(addressSlice[0]); ip != nil { - if ip.String() == unspecifiedAddress { - rip, err := db.GetRoutableIP() - if err != nil { - return nil, err - } - listenAddr = rip + ":" + addressSlice[1] - } - } - localStore, err := paths.NewLocal(ctx, bls, si, []string{"http://" + listenAddr + "/remote"}) - if err != nil { - return nil, err - } - - stor := paths.NewRemote(localStore, si, http.Header(sa), 10, &paths.DefaultPartialFileHandler{}) - - wstates := statestore.New(dssync.MutexWrap(ds.NewMapDatastore())) - - // todo localWorker isn't the abstraction layer we want to use here, we probably want to go straight to ffiwrapper - // maybe with a lotus-provider specific abstraction. LocalWorker does persistent call tracking which we probably - // don't need (ehh.. maybe we do, the async callback system may actually work decently well with harmonytask) - lw := sealer.NewLocalWorker(sealer.WorkerConfig{}, stor, localStore, si, nil, wstates) - - var maddrs []dtypes.MinerAddress - for _, s := range cfg.Addresses.MinerAddresses { - addr, err := address.NewFromString(s) - if err != nil { - return nil, err - } - maddrs = append(maddrs, dtypes.MinerAddress(addr)) - } - - return &Deps{ // lint: intentionally not-named so it will fail if one is forgotten - cfg, - db, - full, - verif, - lw, - as, - maddrs, - stor, - si, - localStore, - listenAddr, - }, nil - -} - -type ProviderAPI struct { - *Deps - ShutdownChan chan struct{} -} - -func (p *ProviderAPI) Version(context.Context) (api.Version, error) { - return api.ProviderAPIVersion0, nil -} - -// Trigger shutdown -func (p *ProviderAPI) Shutdown(context.Context) error { - close(p.ShutdownChan) - return nil -} diff --git a/cmd/lotus-provider/tasks/tasks.go b/cmd/lotus-provider/tasks/tasks.go new file mode 100644 index 000000000..18c56a744 --- /dev/null +++ b/cmd/lotus-provider/tasks/tasks.go @@ -0,0 +1,58 @@ +// Package tasks contains tasks that can be run by the lotus-provider command. +package tasks + +import ( + "context" + + logging "github.com/ipfs/go-log/v2" + + "github.com/filecoin-project/lotus/cmd/lotus-provider/deps" + "github.com/filecoin-project/lotus/lib/harmony/harmonytask" + "github.com/filecoin-project/lotus/provider" + "github.com/filecoin-project/lotus/provider/lpmessage" + "github.com/filecoin-project/lotus/provider/lpwinning" + "github.com/samber/lo" +) + +var log = logging.Logger("lotus-provider/deps") + +func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.TaskEngine, error) { + cfg := dependencies.Cfg + db := dependencies.DB + full := dependencies.Full + verif := dependencies.Verif + lw := dependencies.LW + as := dependencies.As + maddrs := dependencies.Maddrs + stor := dependencies.Stor + si := dependencies.Si + var activeTasks []harmonytask.TaskInterface + + sender, sendTask := lpmessage.NewSender(full, full, db) + activeTasks = append(activeTasks, sendTask) + + /////////////////////////////////////////////////////////////////////// + ///// Task Selection + /////////////////////////////////////////////////////////////////////// + { + + if cfg.Subsystems.EnableWindowPost { + wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw, sender, + as, maddrs, db, stor, si, cfg.Subsystems.WindowPostMaxTasks) + if err != nil { + return nil, err + } + activeTasks = append(activeTasks, wdPostTask, wdPoStSubmitTask, derlareRecoverTask) + } + + if cfg.Subsystems.EnableWinningPost { + winPoStTask := lpwinning.NewWinPostTask(cfg.Subsystems.WinningPostMaxTasks, db, lw, verif, full, maddrs) + activeTasks = append(activeTasks, winPoStTask) + } + } + log.Infow("This lotus_provider instance handles", + "miner_addresses", maddrs, + "tasks", lo.Map(activeTasks, func(t harmonytask.TaskInterface, _ int) string { return t.TypeDetails().Name })) + + return harmonytask.New(db, activeTasks, dependencies.ListenAddr) +}