From 41bc8f879175e961abe5e26c566d676bd4bdb5fc Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Mon, 18 Dec 2023 16:02:54 -0600 Subject: [PATCH] Folded PR11519 into a shared-capable LP --- cmd/lotus-provider/config.go | 25 ++- cmd/lotus-provider/main.go | 1 + cmd/lotus-provider/rpc/rpc.go | 40 ++-- cmd/lotus-provider/run.go | 36 +++ cmd/lotus-provider/web/api/debug/debug.go | 209 +++++++++++++++--- .../web/static/chain-connectivity.js | 3 +- node/config/types.go | 1 + 7 files changed, 260 insertions(+), 55 deletions(-) diff --git a/cmd/lotus-provider/config.go b/cmd/lotus-provider/config.go index 49eed327a..44ba49beb 100644 --- a/cmd/lotus-provider/config.go +++ b/cmd/lotus-provider/config.go @@ -14,6 +14,7 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/lotus/cmd/lotus-provider/deps" + "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/node/config" ) @@ -108,9 +109,8 @@ var configSetCmd = &cli.Command{ } _ = lp - _, err = db.Exec(context.Background(), - `INSERT INTO harmony_config (title, config) VALUES ($1, $2) - ON CONFLICT (title) DO UPDATE SET config = excluded.config`, name, string(bytes)) + err = setConfig(db, name, string(bytes)) + if err != nil { return fmt.Errorf("unable to save config layer: %w", err) } @@ -120,6 +120,13 @@ var configSetCmd = &cli.Command{ }, } +func setConfig(db *harmonydb.DB, name, config string) error { + _, err := db.Exec(context.Background(), + `INSERT INTO harmony_config (title, config) VALUES ($1, $2) + ON CONFLICT (title) DO UPDATE SET config = excluded.config`, name, config) + return err +} + var configGetCmd = &cli.Command{ Name: "get", Aliases: []string{"cat", "show"}, @@ -135,8 +142,7 @@ var configGetCmd = &cli.Command{ return err } - var cfg string - err = db.QueryRow(context.Background(), `SELECT config FROM harmony_config WHERE title=$1`, args.First()).Scan(&cfg) + cfg, err := getConfig(db, args.First()) if err != nil { return err } @@ -146,6 +152,15 @@ var configGetCmd = &cli.Command{ }, } +func getConfig(db *harmonydb.DB, layer string) (string, error) { + var cfg string + err := db.QueryRow(context.Background(), `SELECT config FROM harmony_config WHERE title=$1`, layer).Scan(&cfg) + if err != nil { + return "", err + } + return cfg, nil +} + var configListCmd = &cli.Command{ Name: "list", Aliases: []string{"ls"}, diff --git a/cmd/lotus-provider/main.go b/cmd/lotus-provider/main.go index 18b2e87f3..6ef71b53f 100644 --- a/cmd/lotus-provider/main.go +++ b/cmd/lotus-provider/main.go @@ -45,6 +45,7 @@ func main() { stopCmd, configCmd, testCmd, + webCmd, //backupCmd, //lcli.WithCategory("chain", actorCmd), //lcli.WithCategory("storage", sectorsCmd), diff --git a/cmd/lotus-provider/rpc/rpc.go b/cmd/lotus-provider/rpc/rpc.go index 4b4a77cf9..d77d8c81e 100644 --- a/cmd/lotus-provider/rpc/rpc.go +++ b/cmd/lotus-provider/rpc/rpc.go @@ -129,26 +129,28 @@ func ListenAndServe(ctx context.Context, dependencies *deps.Deps, shutdownChan c } log.Infof("Setting up RPC server at %s", dependencies.ListenAddr) - - web, err := web.GetSrv(ctx, dependencies) - if err != nil { - return err - } - - go func() { - <-ctx.Done() - log.Warn("Shutting down...") - if err := srv.Shutdown(context.TODO()); err != nil { - log.Errorf("shutting down RPC server failed: %s", err) - } - if err := web.Shutdown(context.Background()); err != nil { - log.Errorf("shutting down web server failed: %s", err) - } - log.Warn("Graceful shutdown successful") - }() - eg := errgroup.Group{} eg.Go(srv.ListenAndServe) - eg.Go(web.ListenAndServe) + + if dependencies.Cfg.Subsystems.EnableWebGui { + web, err := web.GetSrv(ctx, dependencies) + if err != nil { + return err + } + + go func() { + <-ctx.Done() + log.Warn("Shutting down...") + if err := srv.Shutdown(context.TODO()); err != nil { + log.Errorf("shutting down RPC server failed: %s", err) + } + if err := web.Shutdown(context.Background()); err != nil { + log.Errorf("shutting down web server failed: %s", err) + } + log.Warn("Graceful shutdown successful") + }() + log.Infof("Setting up web server at %s", dependencies.Cfg.Subsystems.GuiAddress) + eg.Go(web.ListenAndServe) + } return eg.Wait() } diff --git a/cmd/lotus-provider/run.go b/cmd/lotus-provider/run.go index 8e7d5ad79..50c8ababd 100644 --- a/cmd/lotus-provider/run.go +++ b/cmd/lotus-provider/run.go @@ -1,11 +1,13 @@ package main import ( + "bytes" "context" "fmt" "os" "time" + "github.com/BurntSushi/toml" "github.com/pkg/errors" "github.com/urfave/cli/v2" "go.opencensus.io/stats" @@ -19,6 +21,7 @@ import ( "github.com/filecoin-project/lotus/lib/ulimit" "github.com/filecoin-project/lotus/metrics" "github.com/filecoin-project/lotus/node" + "github.com/filecoin-project/lotus/node/config" ) type stackTracer interface { @@ -142,3 +145,36 @@ var runCmd = &cli.Command{ return nil }, } + +var webCmd = &cli.Command{ + Name: "web", + Usage: "Start lotus provider web interface", + Description: `Start an instance of lotus provider web interface. + This creates the 'web' layer if it does not exist, then calls run with that layer.`, + Flags: []cli.Flag{ + &cli.StringFlag{ + Name: "listen", + Usage: "Address to listen on", + Value: "127.0.0.1:4701", + }, + }, + Action: func(cctx *cli.Context) error { + db, err := deps.MakeDB(cctx) + if err != nil { + return err + } + + webtxt, err := getConfig(db, "web") + if err != nil || webtxt == "" { + cfg := config.DefaultLotusProvider() + cfg.Subsystems.EnableWebGui = true + var b bytes.Buffer + toml.NewEncoder(&b).Encode(cfg) + if err = setConfig(db, "web", b.String()); err != nil { + return err + } + } + cctx.Set("layers", "web") + return runCmd.Action(cctx) + }, +} diff --git a/cmd/lotus-provider/web/api/debug/debug.go b/cmd/lotus-provider/web/api/debug/debug.go index 1dcd7c5a3..f97bd1502 100644 --- a/cmd/lotus-provider/web/api/debug/debug.go +++ b/cmd/lotus-provider/web/api/debug/debug.go @@ -2,14 +2,20 @@ package debug import ( + "context" "encoding/json" "fmt" "net/http" + "sort" + "sync" "time" + "github.com/BurntSushi/toml" "github.com/gorilla/mux" logging "github.com/ipfs/go-log/v2" + "golang.org/x/xerrors" + "github.com/filecoin-project/lotus/api/client" "github.com/filecoin-project/lotus/build" cliutil "github.com/filecoin-project/lotus/cli/util" "github.com/filecoin-project/lotus/cmd/lotus-provider/deps" @@ -41,48 +47,149 @@ func (d *debug) chainStateSSE(w http.ResponseWriter, r *http.Request) { w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") - v1api := d.Deps.Full ctx := r.Context() - ai := cliutil.ParseApiInfo(d.Deps.Cfg.Apis.ChainApiInfo[0]) - ver, err := v1api.Version(ctx) - if err != nil { - log.Warnw("Version", "error", err) - return - } - -sse: for { - head, err := v1api.ChainHead(ctx) + + type minimalApiInfo struct { + Apis struct { + ChainApiInfo []string + } + } + + rpcInfos := map[string]minimalApiInfo{} // config name -> api info + confNameToAddr := map[string]string{} // config name -> api address + + err := forEachConfig[minimalApiInfo](d, func(name string, info minimalApiInfo) error { + if len(info.Apis.ChainApiInfo) == 0 { + return nil + } + + rpcInfos[name] = info + + for _, addr := range info.Apis.ChainApiInfo { + ai := cliutil.ParseApiInfo(addr) + confNameToAddr[name] = ai.Addr + } + + return nil + }) if err != nil { - log.Warnw("ChainHead", "error", err) + log.Errorw("getting api info", "error", err) return } - var syncState string - switch { - case time.Now().Unix()-int64(head.MinTimestamp()) < int64(build.BlockDelaySecs*3/2): // within 1.5 epochs - syncState = "ok" - case time.Now().Unix()-int64(head.MinTimestamp()) < int64(build.BlockDelaySecs*5): // within 5 epochs - syncState = fmt.Sprintf("slow (%s behind)", time.Since(time.Unix(int64(head.MinTimestamp()), 0)).Truncate(time.Second)) - default: - syncState = fmt.Sprintf("behind (%s behind)", time.Since(time.Unix(int64(head.MinTimestamp()), 0)).Truncate(time.Second)) + apiInfos := map[string][]byte{} // api address -> token + // for dedup by address + for _, info := range rpcInfos { + ai := cliutil.ParseApiInfo(info.Apis.ChainApiInfo[0]) + apiInfos[ai.Addr] = ai.Token } - select { - case <-ctx.Done(): - break sse - default: + infos := map[string]rpcInfo{} // api address -> rpc info + var infosLk sync.Mutex + + var wg sync.WaitGroup + wg.Add(len(rpcInfos)) + for addr, token := range apiInfos { + ai := cliutil.APIInfo{ + Addr: addr, + Token: token, + } + + var clayers []string + for layer, a := range confNameToAddr { + if a == addr { + clayers = append(clayers, layer) + } + } + + da, err := ai.DialArgs("v1") + if err != nil { + log.Warnw("DialArgs", "error", err) + + infosLk.Lock() + infos[addr] = rpcInfo{ + Address: ai.Addr, + Reachable: false, + CLayers: clayers, + } + infosLk.Unlock() + + wg.Done() + continue + } + + ah := ai.AuthHeader() + + v1api, closer, err := client.NewFullNodeRPCV1(ctx, da, ah) + if err != nil { + log.Warnf("Not able to establish connection to node with addr: %s", addr) + + infosLk.Lock() + infos[addr] = rpcInfo{ + Address: ai.Addr, + Reachable: false, + CLayers: clayers, + } + infosLk.Unlock() + + wg.Done() + continue + } + + go func(info string) { + defer wg.Done() + defer closer() + + ver, err := v1api.Version(ctx) + if err != nil { + log.Warnw("Version", "error", err) + return + } + + head, err := v1api.ChainHead(ctx) + if err != nil { + log.Warnw("ChainHead", "error", err) + return + } + + var syncState string + switch { + case time.Now().Unix()-int64(head.MinTimestamp()) < int64(build.BlockDelaySecs*3/2): // within 1.5 epochs + syncState = "ok" + case time.Now().Unix()-int64(head.MinTimestamp()) < int64(build.BlockDelaySecs*5): // within 5 epochs + syncState = fmt.Sprintf("slow (%s behind)", time.Since(time.Unix(int64(head.MinTimestamp()), 0)).Truncate(time.Second)) + default: + syncState = fmt.Sprintf("behind (%s behind)", time.Since(time.Unix(int64(head.MinTimestamp()), 0)).Truncate(time.Second)) + } + + var out rpcInfo + out.Address = ai.Addr + out.CLayers = clayers + out.Reachable = true + out.Version = ver.Version + out.SyncState = syncState + + infosLk.Lock() + infos[info] = out + infosLk.Unlock() + + }(addr) } + wg.Wait() + + infoList := make([]rpcInfo, 0, len(infos)) + for _, info := range infos { + infoList = append(infoList, info) + } + + sort.Slice(infoList, func(i, j int) bool { + return infoList[i].Address < infoList[j].Address + }) fmt.Fprintf(w, "data: ") - err = json.NewEncoder(w).Encode(rpcInfo{ - Address: ai.Addr, - CLayers: []string{}, - Reachable: true, - Version: ver.Version, - SyncState: syncState, - }) + err = json.NewEncoder(w).Encode(&infoList) if err != nil { log.Warnw("json encode", "error", err) return @@ -91,5 +198,47 @@ sse: if f, ok := w.(http.Flusher); ok { f.Flush() } + + time.Sleep(time.Duration(build.BlockDelaySecs) * time.Second) } } + +func forEachConfig[T any](a *debug, cb func(name string, v T) error) error { + confs, err := a.loadConfigs(context.Background()) + if err != nil { + return err + } + + for name, tomlStr := range confs { // todo for-each-config + var info T + if err := toml.Unmarshal([]byte(tomlStr), &info); err != nil { + return xerrors.Errorf("unmarshaling %s config: %w", name, err) + } + + if err := cb(name, info); err != nil { + return xerrors.Errorf("cb: %w", err) + } + } + + return nil +} + +func (d *debug) loadConfigs(ctx context.Context) (map[string]string, error) { + //err := db.QueryRow(cctx.Context, `SELECT config FROM harmony_config WHERE title=$1`, layer).Scan(&text) + + rows, err := d.DB.Query(ctx, `SELECT title, config FROM harmony_config`) + if err != nil { + return nil, xerrors.Errorf("getting db configs: %w", err) + } + + configs := make(map[string]string) + for rows.Next() { + var title, config string + if err := rows.Scan(&title, &config); err != nil { + return nil, xerrors.Errorf("scanning db configs: %w", err) + } + configs[title] = config + } + + return configs, nil +} diff --git a/cmd/lotus-provider/web/static/chain-connectivity.js b/cmd/lotus-provider/web/static/chain-connectivity.js index 6fc63234f..1b37666bb 100644 --- a/cmd/lotus-provider/web/static/chain-connectivity.js +++ b/cmd/lotus-provider/web/static/chain-connectivity.js @@ -8,7 +8,8 @@ window.customElements.define('chain-connectivity', class MyElement extends LitEl loadData() { const eventSource = new EventSource('/api/debug/chain-state-sse'); eventSource.onmessage = (event) => { - this.data.push(JSON.parse(event.data)); + this.data = JSON.parse(event.data); + super.requestUpdate(); }; eventSource.onerror = (error) => { console.error('Error:', error); diff --git a/node/config/types.go b/node/config/types.go index 6233191f4..0a6de9e93 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -97,6 +97,7 @@ type ProviderSubsystemsConfig struct { EnableWinningPost bool WinningPostMaxTasks int + EnableWebGui bool // The address that should listen for Web GUI requests. GuiAddress string }