diff --git a/cmd/lotus-provider/config.go b/cmd/lotus-provider/config.go index 5bd681429..49eed327a 100644 --- a/cmd/lotus-provider/config.go +++ b/cmd/lotus-provider/config.go @@ -2,7 +2,6 @@ package main import ( "context" - "database/sql" "errors" "fmt" "io" @@ -14,7 +13,7 @@ import ( "github.com/urfave/cli/v2" "golang.org/x/xerrors" - "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/cmd/lotus-provider/deps" "github.com/filecoin-project/lotus/node/config" ) @@ -77,7 +76,7 @@ var configSetCmd = &cli.Command{ Action: func(cctx *cli.Context) error { args := cctx.Args() - db, err := makeDB(cctx) + db, err := deps.MakeDB(cctx) if err != nil { return err } @@ -131,7 +130,7 @@ var configGetCmd = &cli.Command{ if args.Len() != 1 { return fmt.Errorf("want 1 layer arg, got %d", args.Len()) } - db, err := makeDB(cctx) + db, err := deps.MakeDB(cctx) if err != nil { return err } @@ -153,7 +152,7 @@ var configListCmd = &cli.Command{ Usage: "List config layers you can get.", Flags: []cli.Flag{}, Action: func(cctx *cli.Context) error { - db, err := makeDB(cctx) + db, err := deps.MakeDB(cctx) if err != nil { return err } @@ -180,7 +179,7 @@ var configRmCmd = &cli.Command{ if args.Len() != 1 { return errors.New("must have exactly 1 arg for the layer name") } - db, err := makeDB(cctx) + db, err := deps.MakeDB(cctx) if err != nil { return err } @@ -209,11 +208,11 @@ var configViewCmd = &cli.Command{ }, }, Action: func(cctx *cli.Context) error { - db, err := makeDB(cctx) + db, err := deps.MakeDB(cctx) if err != nil { return err } - lp, err := getConfig(cctx, db) + lp, err := deps.GetConfig(cctx, db) if err != nil { return err } @@ -225,35 +224,3 @@ var configViewCmd = &cli.Command{ return nil }, } - -func getConfig(cctx *cli.Context, db *harmonydb.DB) (*config.LotusProviderConfig, error) { - lp := config.DefaultLotusProvider() - have := []string{} - layers := cctx.StringSlice("layers") - for _, layer := range layers { - text := "" - err := db.QueryRow(cctx.Context, `SELECT config FROM harmony_config WHERE title=$1`, layer).Scan(&text) - if err != nil { - if strings.Contains(err.Error(), sql.ErrNoRows.Error()) { - return nil, fmt.Errorf("missing layer '%s' ", layer) - } - if layer == "base" { - return nil, errors.New(`lotus-provider defaults to a layer named 'base'. - Either use 'migrate' command or edit a base.toml and upload it with: lotus-provider config set base.toml`) - } - return nil, fmt.Errorf("could not read layer '%s': %w", layer, err) - } - meta, err := toml.Decode(text, &lp) - if err != nil { - return nil, fmt.Errorf("could not read layer, bad toml %s: %w", layer, err) - } - for _, k := range meta.Keys() { - have = append(have, strings.Join(k, " ")) - } - } - _ = have // FUTURE: verify that required fields are here. - // If config includes 3rd-party config, consider JSONSchema as a way that - // 3rd-parties can dynamically include config requirements and we can - // validate the config. Because of layering, we must validate @ startup. - return lp, nil -} diff --git a/cmd/lotus-provider/deps/deps.go b/cmd/lotus-provider/deps/deps.go new file mode 100644 index 000000000..499cc7bec --- /dev/null +++ b/cmd/lotus-provider/deps/deps.go @@ -0,0 +1,276 @@ +// Package deps provides the dependencies for the lotus provider node. +package deps + +import ( + "context" + "database/sql" + "encoding/base64" + "errors" + "fmt" + "net" + "net/http" + "strings" + + "github.com/BurntSushi/toml" + "github.com/gbrlsnchs/jwt/v3" + ds "github.com/ipfs/go-datastore" + dssync "github.com/ipfs/go-datastore/sync" + logging "github.com/ipfs/go-log/v2" + "github.com/urfave/cli/v2" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-jsonrpc/auth" + "github.com/filecoin-project/go-statestore" + + "github.com/filecoin-project/lotus/api" + cliutil "github.com/filecoin-project/lotus/cli/util" + "github.com/filecoin-project/lotus/journal" + "github.com/filecoin-project/lotus/journal/alerting" + "github.com/filecoin-project/lotus/journal/fsjournal" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" + "github.com/filecoin-project/lotus/node/config" + "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/lotus/node/repo" + "github.com/filecoin-project/lotus/provider" + "github.com/filecoin-project/lotus/storage/ctladdr" + "github.com/filecoin-project/lotus/storage/paths" + "github.com/filecoin-project/lotus/storage/sealer" + "github.com/filecoin-project/lotus/storage/sealer/ffiwrapper" + "github.com/filecoin-project/lotus/storage/sealer/storiface" +) + +var log = logging.Logger("lotus-provider/deps") + +func MakeDB(cctx *cli.Context) (*harmonydb.DB, error) { + dbConfig := config.HarmonyDB{ + Username: cctx.String("db-user"), + Password: cctx.String("db-password"), + Hosts: strings.Split(cctx.String("db-host"), ","), + Database: cctx.String("db-name"), + Port: cctx.String("db-port"), + } + return harmonydb.NewFromConfig(dbConfig) +} + +type JwtPayload struct { + Allow []auth.Permission +} + +func StorageAuth(apiKey string) (sealer.StorageAuth, error) { + if apiKey == "" { + return nil, xerrors.Errorf("no api key provided") + } + + rawKey, err := base64.StdEncoding.DecodeString(apiKey) + if err != nil { + return nil, xerrors.Errorf("decoding api key: %w", err) + } + + key := jwt.NewHS256(rawKey) + + p := JwtPayload{ + Allow: []auth.Permission{"admin"}, + } + + token, err := jwt.Sign(&p, key) + if err != nil { + return nil, err + } + + headers := http.Header{} + headers.Add("Authorization", "Bearer "+string(token)) + return sealer.StorageAuth(headers), nil +} + +func GetDeps(ctx context.Context, cctx *cli.Context) (*Deps, error) { + var deps Deps + return &deps, deps.PopulateRemainingDeps(ctx, cctx, true) +} + +type Deps struct { + Cfg *config.LotusProviderConfig + DB *harmonydb.DB + Full api.FullNode + Verif storiface.Verifier + LW *sealer.LocalWorker + As *ctladdr.AddressSelector + Maddrs []dtypes.MinerAddress + Stor *paths.Remote + Si *paths.DBIndex + LocalStore *paths.Local + ListenAddr string +} + +const ( + FlagRepoPath = "repo-path" +) + +func (deps *Deps) PopulateRemainingDeps(ctx context.Context, cctx *cli.Context, makeRepo bool) error { + var err error + if makeRepo { + // Open repo + repoPath := cctx.String(FlagRepoPath) + fmt.Println("repopath", repoPath) + r, err := repo.NewFS(repoPath) + if err != nil { + return err + } + + ok, err := r.Exists() + if err != nil { + return err + } + if !ok { + if err := r.Init(repo.Provider); err != nil { + return err + } + } + } + + if deps.Cfg == nil { + deps.DB, err = MakeDB(cctx) + if err != nil { + return err + } + } + + if deps.Cfg == nil { + // The config feeds into task runners & their helpers + deps.Cfg, err = GetConfig(cctx, deps.DB) + if err != nil { + return err + } + } + + log.Debugw("config", "config", deps.Cfg) + + if deps.Verif == nil { + deps.Verif = ffiwrapper.ProofVerifier + } + + if deps.As == nil { + deps.As, err = provider.AddressSelector(&deps.Cfg.Addresses)() + if err != nil { + return err + } + } + + if deps.Si == nil { + de, err := journal.ParseDisabledEvents(deps.Cfg.Journal.DisabledEvents) + if err != nil { + return err + } + j, err := fsjournal.OpenFSJournalPath(cctx.String("journal"), de) + if err != nil { + return err + } + go func() { + <-ctx.Done() + _ = j.Close() + }() + + al := alerting.NewAlertingSystem(j) + deps.Si = paths.NewDBIndex(al, deps.DB) + } + + if deps.Full == nil { + var fullCloser func() + deps.Full, fullCloser, err = cliutil.GetFullNodeAPIV1LotusProvider(cctx, deps.Cfg.Apis.ChainApiInfo) + if err != nil { + return err + } + + go func() { + <-ctx.Done() + fullCloser() + }() + } + + bls := &paths.BasicLocalStorage{ + PathToJSON: cctx.String("storage-json"), + } + + if deps.ListenAddr == "" { + listenAddr := cctx.String("listen") + const unspecifiedAddress = "0.0.0.0" + addressSlice := strings.Split(listenAddr, ":") + if ip := net.ParseIP(addressSlice[0]); ip != nil { + if ip.String() == unspecifiedAddress { + rip, err := deps.DB.GetRoutableIP() + if err != nil { + return err + } + deps.ListenAddr = rip + ":" + addressSlice[1] + } + } + } + if deps.LocalStore == nil { + deps.LocalStore, err = paths.NewLocal(ctx, bls, deps.Si, []string{"http://" + deps.ListenAddr + "/remote"}) + if err != nil { + return err + } + } + + sa, err := StorageAuth(deps.Cfg.Apis.StorageRPCSecret) + if err != nil { + return xerrors.Errorf(`'%w' while parsing the config toml's + [Apis] + StorageRPCSecret=%v +Get it with: jq .PrivateKey ~/.lotus-miner/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU`, err, deps.Cfg.Apis.StorageRPCSecret) + } + if deps.Stor == nil { + deps.Stor = paths.NewRemote(deps.LocalStore, deps.Si, http.Header(sa), 10, &paths.DefaultPartialFileHandler{}) + } + if deps.LW == nil { + wstates := statestore.New(dssync.MutexWrap(ds.NewMapDatastore())) + + // todo localWorker isn't the abstraction layer we want to use here, we probably want to go straight to ffiwrapper + // maybe with a lotus-provider specific abstraction. LocalWorker does persistent call tracking which we probably + // don't need (ehh.. maybe we do, the async callback system may actually work decently well with harmonytask) + deps.LW = sealer.NewLocalWorker(sealer.WorkerConfig{}, deps.Stor, deps.LocalStore, deps.Si, nil, wstates) + } + if len(deps.Maddrs) == 0 { + for _, s := range deps.Cfg.Addresses.MinerAddresses { + addr, err := address.NewFromString(s) + if err != nil { + return err + } + deps.Maddrs = append(deps.Maddrs, dtypes.MinerAddress(addr)) + } + } + fmt.Println("last line of populate") + return nil +} + +func GetConfig(cctx *cli.Context, db *harmonydb.DB) (*config.LotusProviderConfig, error) { + lp := config.DefaultLotusProvider() + have := []string{} + layers := cctx.StringSlice("layers") + for _, layer := range layers { + text := "" + err := db.QueryRow(cctx.Context, `SELECT config FROM harmony_config WHERE title=$1`, layer).Scan(&text) + if err != nil { + if strings.Contains(err.Error(), sql.ErrNoRows.Error()) { + return nil, fmt.Errorf("missing layer '%s' ", layer) + } + if layer == "base" { + return nil, errors.New(`lotus-provider defaults to a layer named 'base'. + Either use 'migrate' command or edit a base.toml and upload it with: lotus-provider config set base.toml`) + } + return nil, fmt.Errorf("could not read layer '%s': %w", layer, err) + } + meta, err := toml.Decode(text, &lp) + if err != nil { + return nil, fmt.Errorf("could not read layer, bad toml %s: %w", layer, err) + } + for _, k := range meta.Keys() { + have = append(have, strings.Join(k, " ")) + } + } + _ = have // FUTURE: verify that required fields are here. + // If config includes 3rd-party config, consider JSONSchema as a way that + // 3rd-parties can dynamically include config requirements and we can + // validate the config. Because of layering, we must validate @ startup. + return lp, nil +} diff --git a/cmd/lotus-provider/main.go b/cmd/lotus-provider/main.go index 19cc6f5f9..24126a8d7 100644 --- a/cmd/lotus-provider/main.go +++ b/cmd/lotus-provider/main.go @@ -5,16 +5,18 @@ import ( "fmt" "os" "os/signal" - "runtime/debug" + "runtime/pprof" "syscall" "github.com/fatih/color" logging "github.com/ipfs/go-log/v2" + "github.com/mitchellh/go-homedir" "github.com/urfave/cli/v2" "github.com/filecoin-project/lotus/build" lcli "github.com/filecoin-project/lotus/cli" cliutil "github.com/filecoin-project/lotus/cli/util" + "github.com/filecoin-project/lotus/cmd/lotus-provider/deps" "github.com/filecoin-project/lotus/lib/lotuslog" "github.com/filecoin-project/lotus/lib/tracing" "github.com/filecoin-project/lotus/node/repo" @@ -28,8 +30,8 @@ func SetupCloseHandler() { go func() { <-c fmt.Println("\r- Ctrl+C pressed in Terminal") - debug.PrintStack() - os.Exit(1) + _ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 1) + panic(1) }() } @@ -131,7 +133,7 @@ func main() { Value: "base", }, &cli.StringFlag{ - Name: FlagRepoPath, + Name: deps.FlagRepoPath, EnvVars: []string{"LOTUS_REPO_PATH"}, Value: "~/.lotusprovider", }, @@ -143,8 +145,14 @@ func main() { }, After: func(c *cli.Context) error { if r := recover(); r != nil { + p, err := homedir.Expand(c.String(FlagMinerRepo)) + if err != nil { + log.Errorw("could not expand repo path for panic report", "error", err) + panic(r) + } + // Generate report in LOTUS_PATH and re-raise panic - build.GeneratePanicReport(c.String("panic-reports"), c.String(FlagRepoPath), c.App.Name) + build.GeneratePanicReport(c.String("panic-reports"), p, c.App.Name) panic(r) } return nil @@ -154,7 +162,3 @@ func main() { app.Metadata["repoType"] = repo.Provider lcli.RunApp(app) } - -const ( - FlagRepoPath = "repo-path" -) diff --git a/cmd/lotus-provider/proving.go b/cmd/lotus-provider/proving.go index c1ad8dec8..379bfdf85 100644 --- a/cmd/lotus-provider/proving.go +++ b/cmd/lotus-provider/proving.go @@ -15,6 +15,7 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/dline" + "github.com/filecoin-project/lotus/cmd/lotus-provider/deps" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/provider" ) @@ -62,18 +63,18 @@ var wdPostTaskCmd = &cli.Command{ Action: func(cctx *cli.Context) error { ctx := context.Background() - deps, err := getDeps(ctx, cctx) + deps, err := deps.GetDeps(ctx, cctx) if err != nil { return err } - ts, err := deps.full.ChainHead(ctx) + ts, err := deps.Full.ChainHead(ctx) if err != nil { return xerrors.Errorf("cannot get chainhead %w", err) } ht := ts.Height() - addr, err := address.NewFromString(deps.cfg.Addresses.MinerAddresses[0]) + addr, err := address.NewFromString(deps.Cfg.Addresses.MinerAddresses[0]) if err != nil { return xerrors.Errorf("cannot get miner address %w", err) } @@ -82,9 +83,10 @@ var wdPostTaskCmd = &cli.Command{ return xerrors.Errorf("cannot get miner id %w", err) } var id int64 + retryDelay := time.Millisecond * 10 retryAddTask: - _, err = deps.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { + _, err = deps.DB.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { err = tx.QueryRow(`INSERT INTO harmony_task (name, posted_time, added_by) VALUES ('WdPost', CURRENT_TIMESTAMP, 123) RETURNING id`).Scan(&id) if err != nil { log.Error("inserting harmony_task: ", err) @@ -115,7 +117,7 @@ var wdPostTaskCmd = &cli.Command{ var result sql.NullString for { time.Sleep(time.Second) - err = deps.db.QueryRow(ctx, `SELECT result FROM harmony_test WHERE task_id=$1`, id).Scan(&result) + err = deps.DB.QueryRow(ctx, `SELECT result FROM harmony_test WHERE task_id=$1`, id).Scan(&result) if err != nil { return xerrors.Errorf("reading result from harmony_test: %w", err) } @@ -164,29 +166,29 @@ It will not send any messages to the chain. Since it can compute any deadline, o Action: func(cctx *cli.Context) error { ctx := context.Background() - deps, err := getDeps(ctx, cctx) + deps, err := deps.GetDeps(ctx, cctx) if err != nil { return err } - wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, deps.cfg.Fees, deps.cfg.Proving, deps.full, deps.verif, deps.lw, nil, - deps.as, deps.maddrs, deps.db, deps.stor, deps.si, deps.cfg.Subsystems.WindowPostMaxTasks) + wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, deps.Cfg.Fees, deps.Cfg.Proving, deps.Full, deps.Verif, deps.LW, nil, + deps.As, deps.Maddrs, deps.DB, deps.Stor, deps.Si, deps.Cfg.Subsystems.WindowPostMaxTasks) if err != nil { return err } _, _ = wdPoStSubmitTask, derlareRecoverTask - if len(deps.maddrs) == 0 { + if len(deps.Maddrs) == 0 { return errors.New("no miners to compute WindowPoSt for") } - head, err := deps.full.ChainHead(ctx) + head, err := deps.Full.ChainHead(ctx) if err != nil { return xerrors.Errorf("failed to get chain head: %w", err) } di := dline.NewInfo(head.Height(), cctx.Uint64("deadline"), 0, 0, 0, 10 /*challenge window*/, 0, 0) - for _, maddr := range deps.maddrs { + for _, maddr := range deps.Maddrs { out, err := wdPostTask.DoPartition(ctx, head, address.Address(maddr), di, cctx.Uint64("partition")) if err != nil { fmt.Println("Error computing WindowPoSt for miner", maddr, err) diff --git a/cmd/lotus-provider/rpc/rpc.go b/cmd/lotus-provider/rpc/rpc.go index 3ae3e2a1f..1f075f79a 100644 --- a/cmd/lotus-provider/rpc/rpc.go +++ b/cmd/lotus-provider/rpc/rpc.go @@ -1,21 +1,32 @@ +// Package rpc provides all direct access to this node. package rpc import ( "context" + "encoding/base64" + "encoding/json" + "net" "net/http" + "time" + "github.com/gbrlsnchs/jwt/v3" "github.com/gorilla/mux" + logging "github.com/ipfs/go-log/v2" + "go.opencensus.io/tag" + "golang.org/x/xerrors" - // logging "github.com/ipfs/go-log/v2" "github.com/filecoin-project/go-jsonrpc" "github.com/filecoin-project/go-jsonrpc/auth" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/cmd/lotus-provider/deps" "github.com/filecoin-project/lotus/lib/rpcenc" + "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/metrics/proxy" + "github.com/filecoin-project/lotus/storage/paths" ) -//var log = logging.Logger("lp/rpc") +var log = logging.Logger("lp/rpc") func LotusProviderHandler( authv func(ctx context.Context, token string) ([]auth.Permission, error), @@ -49,3 +60,81 @@ func LotusProviderHandler( } return ah } + +type ProviderAPI struct { + *deps.Deps + ShutdownChan chan struct{} +} + +func (p *ProviderAPI) Version(context.Context) (api.Version, error) { + return api.ProviderAPIVersion0, nil +} + +// Trigger shutdown +func (p *ProviderAPI) Shutdown(context.Context) error { + close(p.ShutdownChan) + return nil +} + +func ListenAndServe(ctx context.Context, dependencies *deps.Deps, shutdownChan chan struct{}) error { + + fh := &paths.FetchHandler{Local: dependencies.LocalStore, PfHandler: &paths.DefaultPartialFileHandler{}} + remoteHandler := func(w http.ResponseWriter, r *http.Request) { + if !auth.HasPerm(r.Context(), nil, api.PermAdmin) { + w.WriteHeader(401) + _ = json.NewEncoder(w).Encode(struct{ Error string }{"unauthorized: missing admin permission"}) + return + } + + fh.ServeHTTP(w, r) + } + // local APIs + { + // debugging + mux := mux.NewRouter() + mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof + mux.PathPrefix("/remote").HandlerFunc(remoteHandler) + } + + var authVerify func(context.Context, string) ([]auth.Permission, error) + { + privateKey, err := base64.StdEncoding.DecodeString(dependencies.Cfg.Apis.StorageRPCSecret) + if err != nil { + return xerrors.Errorf("decoding storage rpc secret: %w", err) + } + authVerify = func(ctx context.Context, token string) ([]auth.Permission, error) { + var payload deps.JwtPayload + if _, err := jwt.Verify([]byte(token), jwt.NewHS256(privateKey), &payload); err != nil { + return nil, xerrors.Errorf("JWT Verification failed: %w", err) + } + + return payload.Allow, nil + } + } + // Serve the RPC. + srv := &http.Server{ + Handler: LotusProviderHandler( + authVerify, + remoteHandler, + &ProviderAPI{dependencies, shutdownChan}, + true), + ReadHeaderTimeout: time.Minute * 3, + BaseContext: func(listener net.Listener) context.Context { + ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-worker")) + return ctx + }, + Addr: dependencies.ListenAddr, + } + + go func() { + <-ctx.Done() + log.Warn("Shutting down...") + if err := srv.Shutdown(context.TODO()); err != nil { + log.Errorf("shutting down RPC server failed: %s", err) + } + log.Warn("Graceful shutdown successful") + }() + + log.Infof("Setting up RPC server at %s", dependencies.ListenAddr) + return srv.ListenAndServe() +} diff --git a/cmd/lotus-provider/run.go b/cmd/lotus-provider/run.go index de97aa766..8e7d5ad79 100644 --- a/cmd/lotus-provider/run.go +++ b/cmd/lotus-provider/run.go @@ -2,54 +2,23 @@ package main import ( "context" - "encoding/base64" - "encoding/json" "fmt" - "net" - "net/http" "os" - "strings" "time" - "github.com/gbrlsnchs/jwt/v3" - "github.com/gorilla/mux" - ds "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" "github.com/pkg/errors" - "github.com/samber/lo" "github.com/urfave/cli/v2" "go.opencensus.io/stats" "go.opencensus.io/tag" - "golang.org/x/xerrors" - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-jsonrpc/auth" - "github.com/filecoin-project/go-statestore" - - "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" lcli "github.com/filecoin-project/lotus/cli" - cliutil "github.com/filecoin-project/lotus/cli/util" + "github.com/filecoin-project/lotus/cmd/lotus-provider/deps" "github.com/filecoin-project/lotus/cmd/lotus-provider/rpc" - "github.com/filecoin-project/lotus/journal" - "github.com/filecoin-project/lotus/journal/alerting" - "github.com/filecoin-project/lotus/journal/fsjournal" - "github.com/filecoin-project/lotus/lib/harmony/harmonydb" - "github.com/filecoin-project/lotus/lib/harmony/harmonytask" + "github.com/filecoin-project/lotus/cmd/lotus-provider/tasks" "github.com/filecoin-project/lotus/lib/ulimit" "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node" - "github.com/filecoin-project/lotus/node/config" - "github.com/filecoin-project/lotus/node/modules/dtypes" - "github.com/filecoin-project/lotus/node/repo" - "github.com/filecoin-project/lotus/provider" - "github.com/filecoin-project/lotus/provider/lpmessage" - "github.com/filecoin-project/lotus/provider/lpwinning" - "github.com/filecoin-project/lotus/storage/ctladdr" - "github.com/filecoin-project/lotus/storage/paths" - "github.com/filecoin-project/lotus/storage/sealer" - "github.com/filecoin-project/lotus/storage/sealer/ffiwrapper" - "github.com/filecoin-project/lotus/storage/sealer/storiface" ) type stackTracer interface { @@ -144,112 +113,28 @@ var runCmd = &cli.Command{ } } - deps, err := getDeps(ctx, cctx) - + fmt.Println("before populateRemainingDeps") + dependencies := &deps.Deps{} + err = dependencies.PopulateRemainingDeps(ctx, cctx, true) + fmt.Println("after popdeps") if err != nil { + fmt.Println("err", err) return err } - cfg, db, full, verif, lw, as, maddrs, stor, si, localStore := deps.cfg, deps.db, deps.full, deps.verif, deps.lw, deps.as, deps.maddrs, deps.stor, deps.si, deps.localStore + fmt.Println("ef") - var activeTasks []harmonytask.TaskInterface + taskEngine, err := tasks.StartTasks(ctx, dependencies) + fmt.Println("gh") - sender, sendTask := lpmessage.NewSender(full, full, db) - activeTasks = append(activeTasks, sendTask) - - /////////////////////////////////////////////////////////////////////// - ///// Task Selection - /////////////////////////////////////////////////////////////////////// - { - - if cfg.Subsystems.EnableWindowPost { - wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw, sender, - as, maddrs, db, stor, si, cfg.Subsystems.WindowPostMaxTasks) - if err != nil { - return err - } - activeTasks = append(activeTasks, wdPostTask, wdPoStSubmitTask, derlareRecoverTask) - } - - if cfg.Subsystems.EnableWinningPost { - winPoStTask := lpwinning.NewWinPostTask(cfg.Subsystems.WinningPostMaxTasks, db, lw, verif, full, maddrs) - activeTasks = append(activeTasks, winPoStTask) - } - } - log.Infow("This lotus_provider instance handles", - "miner_addresses", minerAddressesToStrings(maddrs), - "tasks", lo.Map(activeTasks, func(t harmonytask.TaskInterface, _ int) string { return t.TypeDetails().Name })) - - taskEngine, err := harmonytask.New(db, activeTasks, deps.listenAddr) if err != nil { - return err + return nil } - defer taskEngine.GracefullyTerminate(time.Hour) - fh := &paths.FetchHandler{Local: localStore, PfHandler: &paths.DefaultPartialFileHandler{}} - remoteHandler := func(w http.ResponseWriter, r *http.Request) { - if !auth.HasPerm(r.Context(), nil, api.PermAdmin) { - w.WriteHeader(401) - _ = json.NewEncoder(w).Encode(struct{ Error string }{"unauthorized: missing admin permission"}) - return - } - - fh.ServeHTTP(w, r) + err = rpc.ListenAndServe(ctx, dependencies, shutdownChan) // Monitor for shutdown. + if err != nil { + return err } - // local APIs - { - // debugging - mux := mux.NewRouter() - mux.PathPrefix("/").Handler(http.DefaultServeMux) // pprof - mux.PathPrefix("/remote").HandlerFunc(remoteHandler) - - /*ah := &auth.Handler{ - Verify: authv, - Next: mux.ServeHTTP, - }*/ // todo - - } - - var authVerify func(context.Context, string) ([]auth.Permission, error) - { - privateKey, err := base64.StdEncoding.DecodeString(deps.cfg.Apis.StorageRPCSecret) - if err != nil { - return xerrors.Errorf("decoding storage rpc secret: %w", err) - } - authVerify = func(ctx context.Context, token string) ([]auth.Permission, error) { - var payload jwtPayload - if _, err := jwt.Verify([]byte(token), jwt.NewHS256(privateKey), &payload); err != nil { - return nil, xerrors.Errorf("JWT Verification failed: %w", err) - } - - return payload.Allow, nil - } - } - // Serve the RPC. - srv := &http.Server{ - Handler: rpc.LotusProviderHandler( - authVerify, - remoteHandler, - &ProviderAPI{deps, shutdownChan}, - true), - ReadHeaderTimeout: time.Minute * 3, - BaseContext: func(listener net.Listener) context.Context { - ctx, _ := tag.New(context.Background(), tag.Upsert(metrics.APIInterface, "lotus-worker")) - return ctx - }, - } - - go func() { - <-ctx.Done() - log.Warn("Shutting down...") - if err := srv.Shutdown(context.TODO()); err != nil { - log.Errorf("shutting down RPC server failed: %s", err) - } - log.Warn("Graceful shutdown successful") - }() - - // Monitor for shutdown. - // TODO provide a graceful shutdown API on shutdownChan finishCh := node.MonitorShutdown(shutdownChan) //node.ShutdownHandler{Component: "rpc server", StopFunc: rpcStopper}, //node.ShutdownHandler{Component: "provider", StopFunc: stop}, @@ -257,211 +142,3 @@ var runCmd = &cli.Command{ return nil }, } - -func makeDB(cctx *cli.Context) (*harmonydb.DB, error) { - dbConfig := config.HarmonyDB{ - Username: cctx.String("db-user"), - Password: cctx.String("db-password"), - Hosts: strings.Split(cctx.String("db-host"), ","), - Database: cctx.String("db-name"), - Port: cctx.String("db-port"), - } - return harmonydb.NewFromConfig(dbConfig) -} - -type jwtPayload struct { - Allow []auth.Permission -} - -func StorageAuth(apiKey string) (sealer.StorageAuth, error) { - if apiKey == "" { - return nil, xerrors.Errorf("no api key provided") - } - - rawKey, err := base64.StdEncoding.DecodeString(apiKey) - if err != nil { - return nil, xerrors.Errorf("decoding api key: %w", err) - } - - key := jwt.NewHS256(rawKey) - - p := jwtPayload{ - Allow: []auth.Permission{"admin"}, - } - - token, err := jwt.Sign(&p, key) - if err != nil { - return nil, err - } - - headers := http.Header{} - headers.Add("Authorization", "Bearer "+string(token)) - return sealer.StorageAuth(headers), nil -} - -type Deps struct { - cfg *config.LotusProviderConfig - db *harmonydb.DB - full api.FullNode - verif storiface.Verifier - lw *sealer.LocalWorker - as *ctladdr.AddressSelector - maddrs []dtypes.MinerAddress - stor *paths.Remote - si *paths.DBIndex - localStore *paths.Local - listenAddr string -} - -func getDeps(ctx context.Context, cctx *cli.Context) (*Deps, error) { - // Open repo - - repoPath := cctx.String(FlagRepoPath) - fmt.Println("repopath", repoPath) - r, err := repo.NewFS(repoPath) - if err != nil { - return nil, err - } - - ok, err := r.Exists() - if err != nil { - return nil, err - } - if !ok { - if err := r.Init(repo.Provider); err != nil { - return nil, err - } - } - - db, err := makeDB(cctx) - if err != nil { - return nil, err - } - - /////////////////////////////////////////////////////////////////////// - ///// Dependency Setup - /////////////////////////////////////////////////////////////////////// - - // The config feeds into task runners & their helpers - cfg, err := getConfig(cctx, db) - if err != nil { - return nil, err - } - - log.Debugw("config", "config", cfg) - - var verif storiface.Verifier = ffiwrapper.ProofVerifier - - as, err := provider.AddressSelector(&cfg.Addresses)() - if err != nil { - return nil, err - } - - de, err := journal.ParseDisabledEvents(cfg.Journal.DisabledEvents) - if err != nil { - return nil, err - } - j, err := fsjournal.OpenFSJournalPath(cctx.String("journal"), de) - if err != nil { - return nil, err - } - - full, fullCloser, err := cliutil.GetFullNodeAPIV1LotusProvider(cctx, cfg.Apis.ChainApiInfo) - if err != nil { - return nil, err - } - - go func() { - select { - case <-ctx.Done(): - fullCloser() - _ = j.Close() - } - }() - sa, err := StorageAuth(cfg.Apis.StorageRPCSecret) - if err != nil { - return nil, xerrors.Errorf(`'%w' while parsing the config toml's - [Apis] - StorageRPCSecret=%v -Get it with: jq .PrivateKey ~/.lotus-miner/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU`, err, cfg.Apis.StorageRPCSecret) - } - - al := alerting.NewAlertingSystem(j) - si := paths.NewDBIndex(al, db) - bls := &paths.BasicLocalStorage{ - PathToJSON: cctx.String("storage-json"), - } - - listenAddr := cctx.String("listen") - const unspecifiedAddress = "0.0.0.0" - addressSlice := strings.Split(listenAddr, ":") - if ip := net.ParseIP(addressSlice[0]); ip != nil { - if ip.String() == unspecifiedAddress { - rip, err := db.GetRoutableIP() - if err != nil { - return nil, err - } - listenAddr = rip + ":" + addressSlice[1] - } - } - localStore, err := paths.NewLocal(ctx, bls, si, []string{"http://" + listenAddr + "/remote"}) - if err != nil { - return nil, err - } - - stor := paths.NewRemote(localStore, si, http.Header(sa), 10, &paths.DefaultPartialFileHandler{}) - - wstates := statestore.New(dssync.MutexWrap(ds.NewMapDatastore())) - - // todo localWorker isn't the abstraction layer we want to use here, we probably want to go straight to ffiwrapper - // maybe with a lotus-provider specific abstraction. LocalWorker does persistent call tracking which we probably - // don't need (ehh.. maybe we do, the async callback system may actually work decently well with harmonytask) - lw := sealer.NewLocalWorker(sealer.WorkerConfig{}, stor, localStore, si, nil, wstates) - - var maddrs []dtypes.MinerAddress - for _, s := range cfg.Addresses.MinerAddresses { - addr, err := address.NewFromString(s) - if err != nil { - return nil, err - } - maddrs = append(maddrs, dtypes.MinerAddress(addr)) - } - - return &Deps{ // lint: intentionally not-named so it will fail if one is forgotten - cfg, - db, - full, - verif, - lw, - as, - maddrs, - stor, - si, - localStore, - listenAddr, - }, nil - -} - -type ProviderAPI struct { - *Deps - ShutdownChan chan struct{} -} - -func (p *ProviderAPI) Version(context.Context) (api.Version, error) { - return api.ProviderAPIVersion0, nil -} - -// Trigger shutdown -func (p *ProviderAPI) Shutdown(context.Context) error { - close(p.ShutdownChan) - return nil -} - -func minerAddressesToStrings(maddrs []dtypes.MinerAddress) []string { - strs := make([]string, len(maddrs)) - for i, addr := range maddrs { - strs[i] = address.Address(addr).String() - } - return strs -} diff --git a/cmd/lotus-provider/tasks/tasks.go b/cmd/lotus-provider/tasks/tasks.go new file mode 100644 index 000000000..2c4cd58bf --- /dev/null +++ b/cmd/lotus-provider/tasks/tasks.go @@ -0,0 +1,58 @@ +// Package tasks contains tasks that can be run by the lotus-provider command. +package tasks + +import ( + "context" + + logging "github.com/ipfs/go-log/v2" + "github.com/samber/lo" + + "github.com/filecoin-project/lotus/cmd/lotus-provider/deps" + "github.com/filecoin-project/lotus/lib/harmony/harmonytask" + "github.com/filecoin-project/lotus/provider" + "github.com/filecoin-project/lotus/provider/lpmessage" + "github.com/filecoin-project/lotus/provider/lpwinning" +) + +var log = logging.Logger("lotus-provider/deps") + +func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.TaskEngine, error) { + cfg := dependencies.Cfg + db := dependencies.DB + full := dependencies.Full + verif := dependencies.Verif + lw := dependencies.LW + as := dependencies.As + maddrs := dependencies.Maddrs + stor := dependencies.Stor + si := dependencies.Si + var activeTasks []harmonytask.TaskInterface + + sender, sendTask := lpmessage.NewSender(full, full, db) + activeTasks = append(activeTasks, sendTask) + + /////////////////////////////////////////////////////////////////////// + ///// Task Selection + /////////////////////////////////////////////////////////////////////// + { + + if cfg.Subsystems.EnableWindowPost { + wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw, sender, + as, maddrs, db, stor, si, cfg.Subsystems.WindowPostMaxTasks) + if err != nil { + return nil, err + } + activeTasks = append(activeTasks, wdPostTask, wdPoStSubmitTask, derlareRecoverTask) + } + + if cfg.Subsystems.EnableWinningPost { + winPoStTask := lpwinning.NewWinPostTask(cfg.Subsystems.WinningPostMaxTasks, db, lw, verif, full, maddrs) + activeTasks = append(activeTasks, winPoStTask) + } + } + log.Infow("This lotus_provider instance handles", + "miner_addresses", maddrs, + "tasks", lo.Map(activeTasks, func(t harmonytask.TaskInterface, _ int) string { return t.TypeDetails().Name })) + + return harmonytask.New(db, activeTasks, dependencies.ListenAddr) +} diff --git a/itests/kit/ensemble.go b/itests/kit/ensemble.go index 3c83ba896..12e2ce8aa 100644 --- a/itests/kit/ensemble.go +++ b/itests/kit/ensemble.go @@ -20,6 +20,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/require" + "github.com/urfave/cli/v2" "github.com/filecoin-project/go-address" cborutil "github.com/filecoin-project/go-cbor-util" @@ -45,6 +46,9 @@ import ( "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/wallet/key" + "github.com/filecoin-project/lotus/cmd/lotus-provider/deps" + "github.com/filecoin-project/lotus/cmd/lotus-provider/rpc" + "github.com/filecoin-project/lotus/cmd/lotus-provider/tasks" "github.com/filecoin-project/lotus/cmd/lotus-seed/seed" "github.com/filecoin-project/lotus/cmd/lotus-worker/sealworker" "github.com/filecoin-project/lotus/gateway" @@ -120,15 +124,17 @@ type Ensemble struct { options *ensembleOpts inactive struct { - fullnodes []*TestFullNode - miners []*TestMiner - workers []*TestWorker + fullnodes []*TestFullNode + providernodes []*TestProviderNode + miners []*TestMiner + workers []*TestWorker } active struct { - fullnodes []*TestFullNode - miners []*TestMiner - workers []*TestWorker - bms map[*TestMiner]*BlockMiner + fullnodes []*TestFullNode + providernodes []*TestProviderNode + miners []*TestMiner + workers []*TestWorker + bms map[*TestMiner]*BlockMiner } genesis struct { version network.Version @@ -221,6 +227,20 @@ func (n *Ensemble) FullNode(full *TestFullNode, opts ...NodeOpt) *Ensemble { return n } +// FullNode enrolls a new Provider node. +func (n *Ensemble) Provider(lp *TestProviderNode, opts ...NodeOpt) *Ensemble { + options := DefaultNodeOpts + for _, o := range opts { + err := o(&options) + require.NoError(n.t, err) + } + + *lp = TestProviderNode{t: n.t, options: options, Deps: &deps.Deps{}} + + n.inactive.providernodes = append(n.inactive.providernodes, lp) + return n +} + // Miner enrolls a new miner, using the provided full node for chain // interactions. func (n *Ensemble) MinerEnroll(minerNode *TestMiner, full *TestFullNode, opts ...NodeOpt) *Ensemble { @@ -886,6 +906,28 @@ func (n *Ensemble) Start() *Ensemble { // to active, so clear the slice. n.inactive.workers = n.inactive.workers[:0] + for _, p := range n.inactive.providernodes { + + // TODO setup config with options + err := p.Deps.PopulateRemainingDeps(context.Background(), &cli.Context{}, false) + require.NoError(n.t, err) + + shutdownChan := make(chan struct{}) + taskEngine, err := tasks.StartTasks(ctx, p.Deps) + if err != nil { + return nil + } + defer taskEngine.GracefullyTerminate(time.Hour) + + err = rpc.ListenAndServe(ctx, p.Deps, shutdownChan) // Monitor for shutdown. + require.NoError(n.t, err) + finishCh := node.MonitorShutdown(shutdownChan) //node.ShutdownHandler{Component: "rpc server", StopFunc: rpcStopper}, + //node.ShutdownHandler{Component: "provider", StopFunc: stop}, + + <-finishCh + + n.active.providernodes = append(n.active.providernodes, p) + } // --------------------- // MISC // --------------------- diff --git a/itests/kit/ensemble_presets.go b/itests/kit/ensemble_presets.go index 3ec39cf90..68b85fde0 100644 --- a/itests/kit/ensemble_presets.go +++ b/itests/kit/ensemble_presets.go @@ -101,6 +101,21 @@ func EnsembleOneTwo(t *testing.T, opts ...interface{}) (*TestFullNode, *TestMine return &full, &one, &two, ens } +// EnsembleProvider creates and starts an Ensemble with a single full node and a single provider. +// It does not interconnect nodes nor does it begin mining. +func EnsembleProvider(t *testing.T, opts ...interface{}) (*TestFullNode, *TestProviderNode, *Ensemble) { + opts = append(opts, WithAllSubsystems()) + + eopts, nopts := siftOptions(t, opts) + + var ( + full TestFullNode + provider TestProviderNode + ) + ens := NewEnsemble(t, eopts...).FullNode(&full, nopts...).Provider(&provider, nopts...).Start() + return &full, &provider, ens +} + func siftOptions(t *testing.T, opts []interface{}) (eopts []EnsembleOpt, nopts []NodeOpt) { for _, v := range opts { switch o := v.(type) { diff --git a/itests/kit/node_full.go b/itests/kit/node_full.go index 3e80ed688..697c59aed 100644 --- a/itests/kit/node_full.go +++ b/itests/kit/node_full.go @@ -22,6 +22,7 @@ import ( "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/wallet/key" cliutil "github.com/filecoin-project/lotus/cli/util" + "github.com/filecoin-project/lotus/cmd/lotus-provider/deps" "github.com/filecoin-project/lotus/gateway" "github.com/filecoin-project/lotus/node" ) @@ -54,6 +55,17 @@ type TestFullNode struct { options nodeOpts } +// TestProviderNode represents a Provider node enrolled in an Ensemble. +type TestProviderNode struct { + v1api.LotusProviderStruct + + t *testing.T + + *deps.Deps + + options nodeOpts +} + func MergeFullNodes(fullNodes []*TestFullNode) *TestFullNode { var wrappedFullNode TestFullNode var fns api.FullNodeStruct diff --git a/journal/fsjournal/fs.go b/journal/fsjournal/fs.go index b2eb946fd..5a74fbfc6 100644 --- a/journal/fsjournal/fs.go +++ b/journal/fsjournal/fs.go @@ -7,6 +7,7 @@ import ( "path/filepath" logging "github.com/ipfs/go-log/v2" + "github.com/mitchellh/go-homedir" "golang.org/x/xerrors" "github.com/filecoin-project/lotus/build" @@ -41,6 +42,11 @@ func OpenFSJournal(lr repo.LockedRepo, disabled journal.DisabledEvents) (journal } func OpenFSJournalPath(path string, disabled journal.DisabledEvents) (journal.Journal, error) { + path, err := homedir.Expand(path) + if err != nil { + return nil, xerrors.Errorf("failed to expand repo path: %w", err) + } + dir := filepath.Join(path, "journal") if err := os.MkdirAll(dir, 0755); err != nil { return nil, fmt.Errorf("failed to mk directory %s for file journal: %w", dir, err)