Folded PR11519 into a shared-capable LP

This commit is contained in:
Andrew Jackson (Ajax) 2023-12-18 16:02:54 -06:00
parent bc76004c15
commit 41bc8f8791
7 changed files with 260 additions and 55 deletions

View File

@ -14,6 +14,7 @@ import (
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/cmd/lotus-provider/deps"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/filecoin-project/lotus/node/config"
)
@ -108,9 +109,8 @@ var configSetCmd = &cli.Command{
}
_ = lp
_, err = db.Exec(context.Background(),
`INSERT INTO harmony_config (title, config) VALUES ($1, $2)
ON CONFLICT (title) DO UPDATE SET config = excluded.config`, name, string(bytes))
err = setConfig(db, name, string(bytes))
if err != nil {
return fmt.Errorf("unable to save config layer: %w", err)
}
@ -120,6 +120,13 @@ var configSetCmd = &cli.Command{
},
}
func setConfig(db *harmonydb.DB, name, config string) error {
_, err := db.Exec(context.Background(),
`INSERT INTO harmony_config (title, config) VALUES ($1, $2)
ON CONFLICT (title) DO UPDATE SET config = excluded.config`, name, config)
return err
}
var configGetCmd = &cli.Command{
Name: "get",
Aliases: []string{"cat", "show"},
@ -135,8 +142,7 @@ var configGetCmd = &cli.Command{
return err
}
var cfg string
err = db.QueryRow(context.Background(), `SELECT config FROM harmony_config WHERE title=$1`, args.First()).Scan(&cfg)
cfg, err := getConfig(db, args.First())
if err != nil {
return err
}
@ -146,6 +152,15 @@ var configGetCmd = &cli.Command{
},
}
func getConfig(db *harmonydb.DB, layer string) (string, error) {
var cfg string
err := db.QueryRow(context.Background(), `SELECT config FROM harmony_config WHERE title=$1`, layer).Scan(&cfg)
if err != nil {
return "", err
}
return cfg, nil
}
var configListCmd = &cli.Command{
Name: "list",
Aliases: []string{"ls"},

View File

@ -45,6 +45,7 @@ func main() {
stopCmd,
configCmd,
testCmd,
webCmd,
//backupCmd,
//lcli.WithCategory("chain", actorCmd),
//lcli.WithCategory("storage", sectorsCmd),

View File

@ -129,26 +129,28 @@ func ListenAndServe(ctx context.Context, dependencies *deps.Deps, shutdownChan c
}
log.Infof("Setting up RPC server at %s", dependencies.ListenAddr)
web, err := web.GetSrv(ctx, dependencies)
if err != nil {
return err
}
go func() {
<-ctx.Done()
log.Warn("Shutting down...")
if err := srv.Shutdown(context.TODO()); err != nil {
log.Errorf("shutting down RPC server failed: %s", err)
}
if err := web.Shutdown(context.Background()); err != nil {
log.Errorf("shutting down web server failed: %s", err)
}
log.Warn("Graceful shutdown successful")
}()
eg := errgroup.Group{}
eg.Go(srv.ListenAndServe)
eg.Go(web.ListenAndServe)
if dependencies.Cfg.Subsystems.EnableWebGui {
web, err := web.GetSrv(ctx, dependencies)
if err != nil {
return err
}
go func() {
<-ctx.Done()
log.Warn("Shutting down...")
if err := srv.Shutdown(context.TODO()); err != nil {
log.Errorf("shutting down RPC server failed: %s", err)
}
if err := web.Shutdown(context.Background()); err != nil {
log.Errorf("shutting down web server failed: %s", err)
}
log.Warn("Graceful shutdown successful")
}()
log.Infof("Setting up web server at %s", dependencies.Cfg.Subsystems.GuiAddress)
eg.Go(web.ListenAndServe)
}
return eg.Wait()
}

View File

@ -1,11 +1,13 @@
package main
import (
"bytes"
"context"
"fmt"
"os"
"time"
"github.com/BurntSushi/toml"
"github.com/pkg/errors"
"github.com/urfave/cli/v2"
"go.opencensus.io/stats"
@ -19,6 +21,7 @@ import (
"github.com/filecoin-project/lotus/lib/ulimit"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/config"
)
type stackTracer interface {
@ -142,3 +145,36 @@ var runCmd = &cli.Command{
return nil
},
}
var webCmd = &cli.Command{
Name: "web",
Usage: "Start lotus provider web interface",
Description: `Start an instance of lotus provider web interface.
This creates the 'web' layer if it does not exist, then calls run with that layer.`,
Flags: []cli.Flag{
&cli.StringFlag{
Name: "listen",
Usage: "Address to listen on",
Value: "127.0.0.1:4701",
},
},
Action: func(cctx *cli.Context) error {
db, err := deps.MakeDB(cctx)
if err != nil {
return err
}
webtxt, err := getConfig(db, "web")
if err != nil || webtxt == "" {
cfg := config.DefaultLotusProvider()
cfg.Subsystems.EnableWebGui = true
var b bytes.Buffer
toml.NewEncoder(&b).Encode(cfg)
if err = setConfig(db, "web", b.String()); err != nil {
return err
}
}
cctx.Set("layers", "web")
return runCmd.Action(cctx)
},
}

View File

@ -2,14 +2,20 @@
package debug
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sort"
"sync"
"time"
"github.com/BurntSushi/toml"
"github.com/gorilla/mux"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api/client"
"github.com/filecoin-project/lotus/build"
cliutil "github.com/filecoin-project/lotus/cli/util"
"github.com/filecoin-project/lotus/cmd/lotus-provider/deps"
@ -41,48 +47,149 @@ func (d *debug) chainStateSSE(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
v1api := d.Deps.Full
ctx := r.Context()
ai := cliutil.ParseApiInfo(d.Deps.Cfg.Apis.ChainApiInfo[0])
ver, err := v1api.Version(ctx)
if err != nil {
log.Warnw("Version", "error", err)
return
}
sse:
for {
head, err := v1api.ChainHead(ctx)
type minimalApiInfo struct {
Apis struct {
ChainApiInfo []string
}
}
rpcInfos := map[string]minimalApiInfo{} // config name -> api info
confNameToAddr := map[string]string{} // config name -> api address
err := forEachConfig[minimalApiInfo](d, func(name string, info minimalApiInfo) error {
if len(info.Apis.ChainApiInfo) == 0 {
return nil
}
rpcInfos[name] = info
for _, addr := range info.Apis.ChainApiInfo {
ai := cliutil.ParseApiInfo(addr)
confNameToAddr[name] = ai.Addr
}
return nil
})
if err != nil {
log.Warnw("ChainHead", "error", err)
log.Errorw("getting api info", "error", err)
return
}
var syncState string
switch {
case time.Now().Unix()-int64(head.MinTimestamp()) < int64(build.BlockDelaySecs*3/2): // within 1.5 epochs
syncState = "ok"
case time.Now().Unix()-int64(head.MinTimestamp()) < int64(build.BlockDelaySecs*5): // within 5 epochs
syncState = fmt.Sprintf("slow (%s behind)", time.Since(time.Unix(int64(head.MinTimestamp()), 0)).Truncate(time.Second))
default:
syncState = fmt.Sprintf("behind (%s behind)", time.Since(time.Unix(int64(head.MinTimestamp()), 0)).Truncate(time.Second))
apiInfos := map[string][]byte{} // api address -> token
// for dedup by address
for _, info := range rpcInfos {
ai := cliutil.ParseApiInfo(info.Apis.ChainApiInfo[0])
apiInfos[ai.Addr] = ai.Token
}
select {
case <-ctx.Done():
break sse
default:
infos := map[string]rpcInfo{} // api address -> rpc info
var infosLk sync.Mutex
var wg sync.WaitGroup
wg.Add(len(rpcInfos))
for addr, token := range apiInfos {
ai := cliutil.APIInfo{
Addr: addr,
Token: token,
}
var clayers []string
for layer, a := range confNameToAddr {
if a == addr {
clayers = append(clayers, layer)
}
}
da, err := ai.DialArgs("v1")
if err != nil {
log.Warnw("DialArgs", "error", err)
infosLk.Lock()
infos[addr] = rpcInfo{
Address: ai.Addr,
Reachable: false,
CLayers: clayers,
}
infosLk.Unlock()
wg.Done()
continue
}
ah := ai.AuthHeader()
v1api, closer, err := client.NewFullNodeRPCV1(ctx, da, ah)
if err != nil {
log.Warnf("Not able to establish connection to node with addr: %s", addr)
infosLk.Lock()
infos[addr] = rpcInfo{
Address: ai.Addr,
Reachable: false,
CLayers: clayers,
}
infosLk.Unlock()
wg.Done()
continue
}
go func(info string) {
defer wg.Done()
defer closer()
ver, err := v1api.Version(ctx)
if err != nil {
log.Warnw("Version", "error", err)
return
}
head, err := v1api.ChainHead(ctx)
if err != nil {
log.Warnw("ChainHead", "error", err)
return
}
var syncState string
switch {
case time.Now().Unix()-int64(head.MinTimestamp()) < int64(build.BlockDelaySecs*3/2): // within 1.5 epochs
syncState = "ok"
case time.Now().Unix()-int64(head.MinTimestamp()) < int64(build.BlockDelaySecs*5): // within 5 epochs
syncState = fmt.Sprintf("slow (%s behind)", time.Since(time.Unix(int64(head.MinTimestamp()), 0)).Truncate(time.Second))
default:
syncState = fmt.Sprintf("behind (%s behind)", time.Since(time.Unix(int64(head.MinTimestamp()), 0)).Truncate(time.Second))
}
var out rpcInfo
out.Address = ai.Addr
out.CLayers = clayers
out.Reachable = true
out.Version = ver.Version
out.SyncState = syncState
infosLk.Lock()
infos[info] = out
infosLk.Unlock()
}(addr)
}
wg.Wait()
infoList := make([]rpcInfo, 0, len(infos))
for _, info := range infos {
infoList = append(infoList, info)
}
sort.Slice(infoList, func(i, j int) bool {
return infoList[i].Address < infoList[j].Address
})
fmt.Fprintf(w, "data: ")
err = json.NewEncoder(w).Encode(rpcInfo{
Address: ai.Addr,
CLayers: []string{},
Reachable: true,
Version: ver.Version,
SyncState: syncState,
})
err = json.NewEncoder(w).Encode(&infoList)
if err != nil {
log.Warnw("json encode", "error", err)
return
@ -91,5 +198,47 @@ sse:
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
time.Sleep(time.Duration(build.BlockDelaySecs) * time.Second)
}
}
func forEachConfig[T any](a *debug, cb func(name string, v T) error) error {
confs, err := a.loadConfigs(context.Background())
if err != nil {
return err
}
for name, tomlStr := range confs { // todo for-each-config
var info T
if err := toml.Unmarshal([]byte(tomlStr), &info); err != nil {
return xerrors.Errorf("unmarshaling %s config: %w", name, err)
}
if err := cb(name, info); err != nil {
return xerrors.Errorf("cb: %w", err)
}
}
return nil
}
func (d *debug) loadConfigs(ctx context.Context) (map[string]string, error) {
//err := db.QueryRow(cctx.Context, `SELECT config FROM harmony_config WHERE title=$1`, layer).Scan(&text)
rows, err := d.DB.Query(ctx, `SELECT title, config FROM harmony_config`)
if err != nil {
return nil, xerrors.Errorf("getting db configs: %w", err)
}
configs := make(map[string]string)
for rows.Next() {
var title, config string
if err := rows.Scan(&title, &config); err != nil {
return nil, xerrors.Errorf("scanning db configs: %w", err)
}
configs[title] = config
}
return configs, nil
}

View File

@ -8,7 +8,8 @@ window.customElements.define('chain-connectivity', class MyElement extends LitEl
loadData() {
const eventSource = new EventSource('/api/debug/chain-state-sse');
eventSource.onmessage = (event) => {
this.data.push(JSON.parse(event.data));
this.data = JSON.parse(event.data);
super.requestUpdate();
};
eventSource.onerror = (error) => {
console.error('Error:', error);

View File

@ -97,6 +97,7 @@ type ProviderSubsystemsConfig struct {
EnableWinningPost bool
WinningPostMaxTasks int
EnableWebGui bool
// The address that should listen for Web GUI requests.
GuiAddress string
}