From 9851d35a3811e5339560fb706926bf63a846edae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 24 May 2024 17:48:43 +0200 Subject: [PATCH] feat: curio: jsonrpc in webui (#11904) * curioweb: Setup jsonrpc communication * curioweb: Move chain state from SSE to rpc * appease the linter --- curiosrc/web/api/debug/debug.go | 229 --------------------- curiosrc/web/api/routes.go | 4 +- curiosrc/web/api/webrpc/routes.go | 37 ++++ curiosrc/web/api/webrpc/sync_state.go | 183 ++++++++++++++++ curiosrc/web/static/chain-connectivity.mjs | 23 ++- curiosrc/web/static/lib/jsonrpc.mjs | 96 +++++++++ 6 files changed, 331 insertions(+), 241 deletions(-) delete mode 100644 curiosrc/web/api/debug/debug.go create mode 100644 curiosrc/web/api/webrpc/routes.go create mode 100644 curiosrc/web/api/webrpc/sync_state.go create mode 100644 curiosrc/web/static/lib/jsonrpc.mjs diff --git a/curiosrc/web/api/debug/debug.go b/curiosrc/web/api/debug/debug.go deleted file mode 100644 index c0e89ab8e..000000000 --- a/curiosrc/web/api/debug/debug.go +++ /dev/null @@ -1,229 +0,0 @@ -// Package debug provides the API for various debug endpoints in curio. -package debug - -import ( - "context" - "encoding/json" - "fmt" - "net/http" - "sort" - "sync" - "time" - - "github.com/BurntSushi/toml" - "github.com/gorilla/mux" - logging "github.com/ipfs/go-log/v2" - "golang.org/x/xerrors" - - "github.com/filecoin-project/lotus/api/client" - "github.com/filecoin-project/lotus/build" - cliutil "github.com/filecoin-project/lotus/cli/util" - "github.com/filecoin-project/lotus/cmd/curio/deps" -) - -var log = logging.Logger("curio/web/debug") - -type debug struct { - *deps.Deps -} - -func Routes(r *mux.Router, deps *deps.Deps) { - d := debug{deps} - r.HandleFunc("/chain-state-sse", d.chainStateSSE) -} - -type rpcInfo struct { - Address string - CLayers []string - Reachable bool - SyncState string - Version string -} - -func (d *debug) chainStateSSE(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Access-Control-Allow-Origin", "*") - w.Header().Set("Access-Control-Allow-Headers", "Content-Type") - w.Header().Set("Content-Type", "text/event-stream") - w.Header().Set("Cache-Control", "no-cache") - w.Header().Set("Connection", "keep-alive") - - ctx := r.Context() - - for { - - type minimalApiInfo struct { - Apis struct { - ChainApiInfo []string - } - } - - rpcInfos := map[string]minimalApiInfo{} // config name -> api info - confNameToAddr := map[string]string{} // config name -> api address - - err := forEachConfig[minimalApiInfo](d, func(name string, info minimalApiInfo) error { - if len(info.Apis.ChainApiInfo) == 0 { - return nil - } - - rpcInfos[name] = info - - for _, addr := range info.Apis.ChainApiInfo { - ai := cliutil.ParseApiInfo(addr) - confNameToAddr[name] = ai.Addr - } - - return nil - }) - if err != nil { - log.Errorw("getting api info", "error", err) - return - } - - dedup := map[string]bool{} // for dedup by address - - infos := map[string]rpcInfo{} // api address -> rpc info - var infosLk sync.Mutex - - var wg sync.WaitGroup - for _, info := range rpcInfos { - ai := cliutil.ParseApiInfo(info.Apis.ChainApiInfo[0]) - if dedup[ai.Addr] { - continue - } - dedup[ai.Addr] = true - wg.Add(1) - go func() { - defer wg.Done() - var clayers []string - for layer, a := range confNameToAddr { - if a == ai.Addr { - clayers = append(clayers, layer) - } - } - - myinfo := rpcInfo{ - Address: ai.Addr, - Reachable: false, - CLayers: clayers, - } - defer func() { - infosLk.Lock() - defer infosLk.Unlock() - infos[ai.Addr] = myinfo - }() - da, err := ai.DialArgs("v1") - if err != nil { - log.Warnw("DialArgs", "error", err) - return - } - - ah := ai.AuthHeader() - - v1api, closer, err := client.NewFullNodeRPCV1(ctx, da, ah) - if err != nil { - log.Warnf("Not able to establish connection to node with addr: %s", ai.Addr) - return - } - defer closer() - - ver, err := v1api.Version(ctx) - if err != nil { - log.Warnw("Version", "error", err) - return - } - - head, err := v1api.ChainHead(ctx) - if err != nil { - log.Warnw("ChainHead", "error", err) - return - } - - var syncState string - switch { - case time.Now().Unix()-int64(head.MinTimestamp()) < int64(build.BlockDelaySecs*3/2): // within 1.5 epochs - syncState = "ok" - case time.Now().Unix()-int64(head.MinTimestamp()) < int64(build.BlockDelaySecs*5): // within 5 epochs - syncState = fmt.Sprintf("slow (%s behind)", time.Since(time.Unix(int64(head.MinTimestamp()), 0)).Truncate(time.Second)) - default: - syncState = fmt.Sprintf("behind (%s behind)", time.Since(time.Unix(int64(head.MinTimestamp()), 0)).Truncate(time.Second)) - } - - myinfo = rpcInfo{ - Address: ai.Addr, - CLayers: clayers, - Reachable: true, - Version: ver.Version, - SyncState: syncState, - } - }() - } - wg.Wait() - - var infoList []rpcInfo - for _, i := range infos { - infoList = append(infoList, i) - } - sort.Slice(infoList, func(i, j int) bool { - return infoList[i].Address < infoList[j].Address - }) - - fmt.Fprintf(w, "data: ") - err = json.NewEncoder(w).Encode(&infoList) - if err != nil { - log.Warnw("json encode", "error", err) - return - } - fmt.Fprintf(w, "\n\n") - if f, ok := w.(http.Flusher); ok { - f.Flush() - } - - time.Sleep(time.Duration(build.BlockDelaySecs) * time.Second) - - select { // stop running if there is reader. - case <-ctx.Done(): - return - default: - } - } -} - -func forEachConfig[T any](a *debug, cb func(name string, v T) error) error { - confs, err := a.loadConfigs(context.Background()) - if err != nil { - return err - } - - for name, tomlStr := range confs { // todo for-each-config - var info T - if err := toml.Unmarshal([]byte(tomlStr), &info); err != nil { - return xerrors.Errorf("unmarshaling %s config: %w", name, err) - } - - if err := cb(name, info); err != nil { - return xerrors.Errorf("cb: %w", err) - } - } - - return nil -} - -func (d *debug) loadConfigs(ctx context.Context) (map[string]string, error) { - //err := db.QueryRow(cctx.Context, `SELECT config FROM harmony_config WHERE title=$1`, layer).Scan(&text) - - rows, err := d.DB.Query(ctx, `SELECT title, config FROM harmony_config`) - if err != nil { - return nil, xerrors.Errorf("getting db configs: %w", err) - } - - configs := make(map[string]string) - for rows.Next() { - var title, config string - if err := rows.Scan(&title, &config); err != nil { - return nil, xerrors.Errorf("scanning db configs: %w", err) - } - configs[title] = config - } - - return configs, nil -} diff --git a/curiosrc/web/api/routes.go b/curiosrc/web/api/routes.go index cf56257ee..b0dec6d50 100644 --- a/curiosrc/web/api/routes.go +++ b/curiosrc/web/api/routes.go @@ -6,12 +6,12 @@ import ( "github.com/filecoin-project/lotus/cmd/curio/deps" "github.com/filecoin-project/lotus/curiosrc/web/api/config" - "github.com/filecoin-project/lotus/curiosrc/web/api/debug" "github.com/filecoin-project/lotus/curiosrc/web/api/sector" + "github.com/filecoin-project/lotus/curiosrc/web/api/webrpc" ) func Routes(r *mux.Router, deps *deps.Deps) { - debug.Routes(r.PathPrefix("/debug").Subrouter(), deps) + webrpc.Routes(r.PathPrefix("/webrpc").Subrouter(), deps) config.Routes(r.PathPrefix("/config").Subrouter(), deps) sector.Routes(r.PathPrefix("/sector").Subrouter(), deps) } diff --git a/curiosrc/web/api/webrpc/routes.go b/curiosrc/web/api/webrpc/routes.go new file mode 100644 index 000000000..f5975e701 --- /dev/null +++ b/curiosrc/web/api/webrpc/routes.go @@ -0,0 +1,37 @@ +package webrpc + +import ( + "context" + + "github.com/gorilla/mux" + logging "github.com/ipfs/go-log/v2" + + "github.com/filecoin-project/go-jsonrpc" + + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/cmd/curio/deps" +) + +var log = logging.Logger("webrpc") + +type WebRPC struct { + deps *deps.Deps +} + +func (a *WebRPC) Version(context.Context) (string, error) { + return build.UserVersion(), nil +} + +func (a *WebRPC) BlockDelaySecs(context.Context) (uint64, error) { + return build.BlockDelaySecs, nil +} + +func Routes(r *mux.Router, deps *deps.Deps) { + handler := &WebRPC{ + deps: deps, + } + + rpcSrv := jsonrpc.NewServer() + rpcSrv.Register("CurioWeb", handler) + r.Handle("/v0", rpcSrv) +} diff --git a/curiosrc/web/api/webrpc/sync_state.go b/curiosrc/web/api/webrpc/sync_state.go new file mode 100644 index 000000000..533a52be1 --- /dev/null +++ b/curiosrc/web/api/webrpc/sync_state.go @@ -0,0 +1,183 @@ +package webrpc + +import ( + "context" + "fmt" + "sort" + "sync" + "time" + + "github.com/BurntSushi/toml" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/api/client" + "github.com/filecoin-project/lotus/build" + cliutil "github.com/filecoin-project/lotus/cli/util" +) + +func forEachConfig[T any](a *WebRPC, cb func(name string, v T) error) error { + confs, err := a.loadConfigs(context.Background()) + if err != nil { + return err + } + + for name, tomlStr := range confs { // todo for-each-config + var info T + if err := toml.Unmarshal([]byte(tomlStr), &info); err != nil { + return xerrors.Errorf("unmarshaling %s config: %w", name, err) + } + + if err := cb(name, info); err != nil { + return xerrors.Errorf("cb: %w", err) + } + } + + return nil +} + +func (a *WebRPC) loadConfigs(ctx context.Context) (map[string]string, error) { + //err := db.QueryRow(cctx.Context, `SELECT config FROM harmony_config WHERE title=$1`, layer).Scan(&text) + + rows, err := a.deps.DB.Query(ctx, `SELECT title, config FROM harmony_config`) + if err != nil { + return nil, xerrors.Errorf("getting db configs: %w", err) + } + + configs := make(map[string]string) + for rows.Next() { + var title, config string + if err := rows.Scan(&title, &config); err != nil { + return nil, xerrors.Errorf("scanning db configs: %w", err) + } + configs[title] = config + } + + return configs, nil +} + +type RpcInfo struct { + Address string + CLayers []string + Reachable bool + SyncState string + Version string +} + +func (a *WebRPC) SyncerState(ctx context.Context) ([]RpcInfo, error) { + type minimalApiInfo struct { + Apis struct { + ChainApiInfo []string + } + } + + rpcInfos := map[string]minimalApiInfo{} // config name -> api info + confNameToAddr := map[string]string{} // config name -> api address + + err := forEachConfig[minimalApiInfo](a, func(name string, info minimalApiInfo) error { + if len(info.Apis.ChainApiInfo) == 0 { + return nil + } + + rpcInfos[name] = info + + for _, addr := range info.Apis.ChainApiInfo { + ai := cliutil.ParseApiInfo(addr) + confNameToAddr[name] = ai.Addr + } + + return nil + }) + if err != nil { + return nil, err + } + + dedup := map[string]bool{} // for dedup by address + + infos := map[string]RpcInfo{} // api address -> rpc info + var infosLk sync.Mutex + + var wg sync.WaitGroup + for _, info := range rpcInfos { + ai := cliutil.ParseApiInfo(info.Apis.ChainApiInfo[0]) + if dedup[ai.Addr] { + continue + } + dedup[ai.Addr] = true + wg.Add(1) + go func() { + defer wg.Done() + var clayers []string + for layer, a := range confNameToAddr { + if a == ai.Addr { + clayers = append(clayers, layer) + } + } + + myinfo := RpcInfo{ + Address: ai.Addr, + Reachable: false, + CLayers: clayers, + } + defer func() { + infosLk.Lock() + defer infosLk.Unlock() + infos[ai.Addr] = myinfo + }() + da, err := ai.DialArgs("v1") + if err != nil { + log.Warnw("DialArgs", "error", err) + return + } + + ah := ai.AuthHeader() + + v1api, closer, err := client.NewFullNodeRPCV1(ctx, da, ah) + if err != nil { + log.Warnf("Not able to establish connection to node with addr: %s", ai.Addr) + return + } + defer closer() + + ver, err := v1api.Version(ctx) + if err != nil { + log.Warnw("Version", "error", err) + return + } + + head, err := v1api.ChainHead(ctx) + if err != nil { + log.Warnw("ChainHead", "error", err) + return + } + + var syncState string + switch { + case time.Now().Unix()-int64(head.MinTimestamp()) < int64(build.BlockDelaySecs*3/2): // within 1.5 epochs + syncState = "ok" + case time.Now().Unix()-int64(head.MinTimestamp()) < int64(build.BlockDelaySecs*5): // within 5 epochs + syncState = fmt.Sprintf("slow (%s behind)", time.Since(time.Unix(int64(head.MinTimestamp()), 0)).Truncate(time.Second)) + default: + syncState = fmt.Sprintf("behind (%s behind)", time.Since(time.Unix(int64(head.MinTimestamp()), 0)).Truncate(time.Second)) + } + + myinfo = RpcInfo{ + Address: ai.Addr, + CLayers: clayers, + Reachable: true, + Version: ver.Version, + SyncState: syncState, + } + }() + } + wg.Wait() + + var infoList []RpcInfo + for _, i := range infos { + infoList = append(infoList, i) + } + sort.Slice(infoList, func(i, j int) bool { + return infoList[i].Address < infoList[j].Address + }) + + return infoList, nil +} diff --git a/curiosrc/web/static/chain-connectivity.mjs b/curiosrc/web/static/chain-connectivity.mjs index c090193e2..60e6888ab 100644 --- a/curiosrc/web/static/chain-connectivity.mjs +++ b/curiosrc/web/static/chain-connectivity.mjs @@ -1,22 +1,25 @@ import { LitElement, html, css } from 'https://cdn.jsdelivr.net/gh/lit/dist@3/all/lit-all.min.js'; +import RPCCall from '/lib/jsonrpc.mjs'; + window.customElements.define('chain-connectivity', class MyElement extends LitElement { constructor() { super(); this.data = []; this.loadData(); } - loadData() { - const eventSource = new EventSource('/api/debug/chain-state-sse'); - eventSource.onmessage = (event) => { - this.data = JSON.parse(event.data); - super.requestUpdate(); - }; - eventSource.onerror = (error) => { - console.error('Error:', error); - loadData(); - }; + + async loadData() { + const blockDelay = await RPCCall('BlockDelaySecs') + await this.updateData(); + setInterval(this.update, blockDelay * 1000); }; + async updateData() { + this.data = await RPCCall('SyncerState'); + console.log(this.data); + super.requestUpdate(); + } + static get styles() { return [css` :host { diff --git a/curiosrc/web/static/lib/jsonrpc.mjs b/curiosrc/web/static/lib/jsonrpc.mjs new file mode 100644 index 000000000..4f560f0bb --- /dev/null +++ b/curiosrc/web/static/lib/jsonrpc.mjs @@ -0,0 +1,96 @@ +class JsonRpcClient { + static instance = null; + + static async getInstance() { + if (!JsonRpcClient.instance) { + JsonRpcClient.instance = (async () => { + const client = new JsonRpcClient('/api/webrpc/v0'); + await client.connect(); + return client; + })(); + } + return await JsonRpcClient.instance; + } + + + constructor(url) { + if (JsonRpcClient.instance) { + throw new Error("Error: Instantiation failed: Use getInstance() instead of new."); + } + this.url = url; + this.requestId = 0; + this.pendingRequests = new Map(); + } + + async connect() { + return new Promise((resolve, reject) => { + this.ws = new WebSocket(this.url); + + this.ws.onopen = () => { + console.log("Connected to the server"); + resolve(); + }; + + this.ws.onclose = () => { + console.log("Connection closed, attempting to reconnect..."); + setTimeout(() => this.connect().then(resolve, reject), 1000); // Reconnect after 1 second + }; + + this.ws.onerror = (error) => { + console.error("WebSocket error:", error); + reject(error); + }; + + this.ws.onmessage = (message) => { + this.handleMessage(message); + }; + }); + } + + handleMessage(message) { + const response = JSON.parse(message.data); + const { id, result, error } = response; + + const resolver = this.pendingRequests.get(id); + if (resolver) { + if (error) { + resolver.reject(error); + } else { + resolver.resolve(result); + } + this.pendingRequests.delete(id); + } + } + + call(method, params = []) { + const id = ++this.requestId; + const request = { + jsonrpc: "2.0", + method: "CurioWeb." + method, + params, + id, + }; + + return new Promise((resolve, reject) => { + this.pendingRequests.set(id, { resolve, reject }); + + if (this.ws.readyState === WebSocket.OPEN) { + this.ws.send(JSON.stringify(request)); + } else { + reject('WebSocket is not open'); + } + }); + } +} + +async function init() { + const client = await JsonRpcClient.getInstance(); + console.log("webrpc backend:", await client.call('Version', [])) +} + +init(); + +export default async function(method, params = []) { + const i = await JsonRpcClient.getInstance(); + return await i.call(method, params); +}