From 6c8605c077c559b531ca3fb246538f7a57567acb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 12 Jan 2024 14:09:12 +0100 Subject: [PATCH] lpweb: Fix actor display --- provider/lpweb/hapi/robust_rpc.go | 89 ++++++++++++++ provider/lpweb/hapi/routes.go | 3 + provider/lpweb/hapi/simpleinfo.go | 4 + provider/lpweb/hapi/watch_actor.go | 181 +++++++++++++++++++++++++++++ provider/lpweb/static/index.html | 4 +- 5 files changed, 279 insertions(+), 2 deletions(-) create mode 100644 provider/lpweb/hapi/robust_rpc.go create mode 100644 provider/lpweb/hapi/watch_actor.go diff --git a/provider/lpweb/hapi/robust_rpc.go b/provider/lpweb/hapi/robust_rpc.go new file mode 100644 index 000000000..24274f329 --- /dev/null +++ b/provider/lpweb/hapi/robust_rpc.go @@ -0,0 +1,89 @@ +package hapi + +import ( + "context" + "github.com/filecoin-project/lotus/api/client" + cliutil "github.com/filecoin-project/lotus/cli/util" + "time" +) + +func (a *app) watchRpc() { + ticker := time.NewTicker(watchInterval) + for { + err := a.updateRpc(context.TODO()) + if err != nil { + log.Errorw("updating rpc info", "error", err) + } + select { + case <-ticker.C: + } + } +} + +type minimalApiInfo struct { + Apis struct { + ChainApiInfo []string + } +} + +func (a *app) updateRpc(ctx context.Context) error { + rpcInfos := map[string]minimalApiInfo{} // config name -> api info + confNameToAddr := map[string]string{} // config name -> api address + + err := forEachConfig[minimalApiInfo](a, func(name string, info minimalApiInfo) error { + if len(info.Apis.ChainApiInfo) == 0 { + return nil + } + + rpcInfos[name] = info + + for _, addr := range info.Apis.ChainApiInfo { + ai := cliutil.ParseApiInfo(addr) + confNameToAddr[name] = ai.Addr + } + + return nil + }) + if err != nil { + return err + } + + apiInfos := map[string][]byte{} // api address -> token + + // for dedup by address + for _, info := range rpcInfos { + ai := cliutil.ParseApiInfo(info.Apis.ChainApiInfo[0]) + apiInfos[ai.Addr] = ai.Token + } + + a.rpcInfoLk.Lock() + + // todo improve this shared rpc logic + if a.workingApi == nil { + for addr, token := range apiInfos { + ai := cliutil.APIInfo{ + Addr: addr, + Token: token, + } + + da, err := ai.DialArgs("v1") + if err != nil { + continue + } + + ah := ai.AuthHeader() + + v1api, closer, err := client.NewFullNodeRPCV1(ctx, da, ah) + if err != nil { + continue + } + _ = closer // todo + + a.workingApi = v1api + } + } + + a.rpcInfoLk.Unlock() + + return nil +} diff --git a/provider/lpweb/hapi/routes.go b/provider/lpweb/hapi/routes.go index b07ab60a5..7a2e3d9be 100644 --- a/provider/lpweb/hapi/routes.go +++ b/provider/lpweb/hapi/routes.go @@ -25,6 +25,9 @@ func Routes(r *mux.Router, deps *deps.Deps) error { t: t, } + go a.watchRpc() + go a.watchActor() + r.HandleFunc("/simpleinfo/actorsummary", a.actorSummary) r.HandleFunc("/simpleinfo/machines", a.indexMachines) r.HandleFunc("/simpleinfo/tasks", a.indexTasks) diff --git a/provider/lpweb/hapi/simpleinfo.go b/provider/lpweb/hapi/simpleinfo.go index ee36a1e17..40b4b3ed6 100644 --- a/provider/lpweb/hapi/simpleinfo.go +++ b/provider/lpweb/hapi/simpleinfo.go @@ -2,6 +2,7 @@ package hapi import ( "context" + "github.com/filecoin-project/lotus/api/v1api" "html/template" "net/http" "os" @@ -15,6 +16,9 @@ type app struct { db *harmonydb.DB t *template.Template + rpcInfoLk sync.Mutex + workingApi v1api.FullNode + actorInfoLk sync.Mutex actorInfos []actorInfo } diff --git a/provider/lpweb/hapi/watch_actor.go b/provider/lpweb/hapi/watch_actor.go new file mode 100644 index 000000000..31cf9ac1f --- /dev/null +++ b/provider/lpweb/hapi/watch_actor.go @@ -0,0 +1,181 @@ +package hapi + +import ( + "context" + "github.com/BurntSushi/toml" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/lotus/chain/types" + "golang.org/x/xerrors" + "sort" + "time" +) + +const watchInterval = time.Second * 10 + +func (a *app) watchActor() { + ticker := time.NewTicker(watchInterval) + for { + err := a.updateActor(context.TODO()) + if err != nil { + log.Errorw("updating rpc info", "error", err) + } + select { + case <-ticker.C: + } + } +} + +type minimalActorInfo struct { + Addresses struct { + MinerAddresses []string + } +} + +func (a *app) updateActor(ctx context.Context) error { + a.rpcInfoLk.Lock() + api := a.workingApi + a.rpcInfoLk.Unlock() + + if api == nil { + log.Warnw("no working api yet") + return nil + } + + var actorInfos []actorInfo + + confNameToAddr := map[address.Address][]string{} // address -> config names + + err := forEachConfig[minimalActorInfo](a, func(name string, info minimalActorInfo) error { + for _, addr := range info.Addresses.MinerAddresses { + a, err := address.NewFromString(addr) + if err != nil { + return xerrors.Errorf("parsing address: %w", err) + } + confNameToAddr[a] = append(confNameToAddr[a], name) + } + + return nil + }) + if err != nil { + return err + } + + for addr, cnames := range confNameToAddr { + p, err := api.StateMinerPower(ctx, addr, types.EmptyTSK) + if err != nil { + return xerrors.Errorf("getting miner power: %w", err) + } + + dls, err := api.StateMinerDeadlines(ctx, addr, types.EmptyTSK) + if err != nil { + return xerrors.Errorf("getting deadlines: %w", err) + } + + outDls := []actorDeadline{} + + for dlidx := range dls { + p, err := api.StateMinerPartitions(ctx, addr, uint64(dlidx), types.EmptyTSK) + if err != nil { + return xerrors.Errorf("getting partition: %w", err) + } + + dl := actorDeadline{ + Empty: false, + Current: false, // todo + Proven: false, + PartFaulty: false, + Faulty: false, + } + + var live, faulty uint64 + + for _, part := range p { + l, err := part.LiveSectors.Count() + if err != nil { + return xerrors.Errorf("getting live sectors: %w", err) + } + live += l + + f, err := part.FaultySectors.Count() + if err != nil { + return xerrors.Errorf("getting faulty sectors: %w", err) + } + faulty += f + } + + dl.Empty = live == 0 + dl.Proven = live > 0 && faulty == 0 + dl.PartFaulty = faulty > 0 + dl.Faulty = faulty > 0 && faulty == live + + outDls = append(outDls, dl) + } + + pd, err := api.StateMinerProvingDeadline(ctx, addr, types.EmptyTSK) + if err != nil { + return xerrors.Errorf("getting proving deadline: %w", err) + } + + if len(outDls) != 48 { + return xerrors.Errorf("expected 48 deadlines, got %d", len(outDls)) + } + + outDls[pd.Index].Current = true + + actorInfos = append(actorInfos, actorInfo{ + Address: addr.String(), + CLayers: cnames, + QualityAdjustedPower: types.DeciStr(p.MinerPower.QualityAdjPower), + RawBytePower: types.DeciStr(p.MinerPower.RawBytePower), + Deadlines: outDls, + }) + } + + sort.Slice(actorInfos, func(i, j int) bool { + return actorInfos[i].Address < actorInfos[j].Address + }) + + a.actorInfoLk.Lock() + a.actorInfos = actorInfos + a.actorInfoLk.Unlock() + + return nil +} + +func (a *app) loadConfigs(ctx context.Context) (map[string]string, error) { + rows, err := a.db.Query(ctx, `SELECT title, config FROM harmony_config`) + if err != nil { + return nil, xerrors.Errorf("getting db configs: %w", err) + } + + configs := make(map[string]string) + for rows.Next() { + var title, config string + if err := rows.Scan(&title, &config); err != nil { + return nil, xerrors.Errorf("scanning db configs: %w", err) + } + configs[title] = config + } + + return configs, nil +} + +func forEachConfig[T any](a *app, cb func(name string, v T) error) error { + confs, err := a.loadConfigs(context.Background()) + if err != nil { + return err + } + + for name, tomlStr := range confs { + var info T + if err := toml.Unmarshal([]byte(tomlStr), &info); err != nil { + return xerrors.Errorf("unmarshaling %s config: %w", name, err) + } + + if err := cb(name, info); err != nil { + return xerrors.Errorf("cb: %w", err) + } + } + + return nil +} diff --git a/provider/lpweb/static/index.html b/provider/lpweb/static/index.html index 98f7336ad..d0ca8eb16 100644 --- a/provider/lpweb/static/index.html +++ b/provider/lpweb/static/index.html @@ -168,7 +168,7 @@ Message - + @@ -184,7 +184,7 @@ Owner - +