feat: curio: jsonrpc in webui (#11904)

* curioweb: Setup jsonrpc communication

* curioweb: Move chain state from SSE to rpc

* appease the linter
This commit is contained in:
Łukasz Magiera 2024-05-24 17:48:43 +02:00 committed by GitHub
parent 759709b768
commit 9851d35a38
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 331 additions and 241 deletions

View File

@ -1,229 +0,0 @@
// Package debug provides the API for various debug endpoints in curio.
package debug
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sort"
"sync"
"time"
"github.com/BurntSushi/toml"
"github.com/gorilla/mux"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api/client"
"github.com/filecoin-project/lotus/build"
cliutil "github.com/filecoin-project/lotus/cli/util"
"github.com/filecoin-project/lotus/cmd/curio/deps"
)
var log = logging.Logger("curio/web/debug")
type debug struct {
*deps.Deps
}
func Routes(r *mux.Router, deps *deps.Deps) {
d := debug{deps}
r.HandleFunc("/chain-state-sse", d.chainStateSSE)
}
type rpcInfo struct {
Address string
CLayers []string
Reachable bool
SyncState string
Version string
}
func (d *debug) chainStateSSE(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
ctx := r.Context()
for {
type minimalApiInfo struct {
Apis struct {
ChainApiInfo []string
}
}
rpcInfos := map[string]minimalApiInfo{} // config name -> api info
confNameToAddr := map[string]string{} // config name -> api address
err := forEachConfig[minimalApiInfo](d, func(name string, info minimalApiInfo) error {
if len(info.Apis.ChainApiInfo) == 0 {
return nil
}
rpcInfos[name] = info
for _, addr := range info.Apis.ChainApiInfo {
ai := cliutil.ParseApiInfo(addr)
confNameToAddr[name] = ai.Addr
}
return nil
})
if err != nil {
log.Errorw("getting api info", "error", err)
return
}
dedup := map[string]bool{} // for dedup by address
infos := map[string]rpcInfo{} // api address -> rpc info
var infosLk sync.Mutex
var wg sync.WaitGroup
for _, info := range rpcInfos {
ai := cliutil.ParseApiInfo(info.Apis.ChainApiInfo[0])
if dedup[ai.Addr] {
continue
}
dedup[ai.Addr] = true
wg.Add(1)
go func() {
defer wg.Done()
var clayers []string
for layer, a := range confNameToAddr {
if a == ai.Addr {
clayers = append(clayers, layer)
}
}
myinfo := rpcInfo{
Address: ai.Addr,
Reachable: false,
CLayers: clayers,
}
defer func() {
infosLk.Lock()
defer infosLk.Unlock()
infos[ai.Addr] = myinfo
}()
da, err := ai.DialArgs("v1")
if err != nil {
log.Warnw("DialArgs", "error", err)
return
}
ah := ai.AuthHeader()
v1api, closer, err := client.NewFullNodeRPCV1(ctx, da, ah)
if err != nil {
log.Warnf("Not able to establish connection to node with addr: %s", ai.Addr)
return
}
defer closer()
ver, err := v1api.Version(ctx)
if err != nil {
log.Warnw("Version", "error", err)
return
}
head, err := v1api.ChainHead(ctx)
if err != nil {
log.Warnw("ChainHead", "error", err)
return
}
var syncState string
switch {
case time.Now().Unix()-int64(head.MinTimestamp()) < int64(build.BlockDelaySecs*3/2): // within 1.5 epochs
syncState = "ok"
case time.Now().Unix()-int64(head.MinTimestamp()) < int64(build.BlockDelaySecs*5): // within 5 epochs
syncState = fmt.Sprintf("slow (%s behind)", time.Since(time.Unix(int64(head.MinTimestamp()), 0)).Truncate(time.Second))
default:
syncState = fmt.Sprintf("behind (%s behind)", time.Since(time.Unix(int64(head.MinTimestamp()), 0)).Truncate(time.Second))
}
myinfo = rpcInfo{
Address: ai.Addr,
CLayers: clayers,
Reachable: true,
Version: ver.Version,
SyncState: syncState,
}
}()
}
wg.Wait()
var infoList []rpcInfo
for _, i := range infos {
infoList = append(infoList, i)
}
sort.Slice(infoList, func(i, j int) bool {
return infoList[i].Address < infoList[j].Address
})
fmt.Fprintf(w, "data: ")
err = json.NewEncoder(w).Encode(&infoList)
if err != nil {
log.Warnw("json encode", "error", err)
return
}
fmt.Fprintf(w, "\n\n")
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
time.Sleep(time.Duration(build.BlockDelaySecs) * time.Second)
select { // stop running if there is reader.
case <-ctx.Done():
return
default:
}
}
}
func forEachConfig[T any](a *debug, cb func(name string, v T) error) error {
confs, err := a.loadConfigs(context.Background())
if err != nil {
return err
}
for name, tomlStr := range confs { // todo for-each-config
var info T
if err := toml.Unmarshal([]byte(tomlStr), &info); err != nil {
return xerrors.Errorf("unmarshaling %s config: %w", name, err)
}
if err := cb(name, info); err != nil {
return xerrors.Errorf("cb: %w", err)
}
}
return nil
}
func (d *debug) loadConfigs(ctx context.Context) (map[string]string, error) {
//err := db.QueryRow(cctx.Context, `SELECT config FROM harmony_config WHERE title=$1`, layer).Scan(&text)
rows, err := d.DB.Query(ctx, `SELECT title, config FROM harmony_config`)
if err != nil {
return nil, xerrors.Errorf("getting db configs: %w", err)
}
configs := make(map[string]string)
for rows.Next() {
var title, config string
if err := rows.Scan(&title, &config); err != nil {
return nil, xerrors.Errorf("scanning db configs: %w", err)
}
configs[title] = config
}
return configs, nil
}

View File

@ -6,12 +6,12 @@ import (
"github.com/filecoin-project/lotus/cmd/curio/deps"
"github.com/filecoin-project/lotus/curiosrc/web/api/config"
"github.com/filecoin-project/lotus/curiosrc/web/api/debug"
"github.com/filecoin-project/lotus/curiosrc/web/api/sector"
"github.com/filecoin-project/lotus/curiosrc/web/api/webrpc"
)
func Routes(r *mux.Router, deps *deps.Deps) {
debug.Routes(r.PathPrefix("/debug").Subrouter(), deps)
webrpc.Routes(r.PathPrefix("/webrpc").Subrouter(), deps)
config.Routes(r.PathPrefix("/config").Subrouter(), deps)
sector.Routes(r.PathPrefix("/sector").Subrouter(), deps)
}

View File

@ -0,0 +1,37 @@
package webrpc
import (
"context"
"github.com/gorilla/mux"
logging "github.com/ipfs/go-log/v2"
"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/cmd/curio/deps"
)
var log = logging.Logger("webrpc")
type WebRPC struct {
deps *deps.Deps
}
func (a *WebRPC) Version(context.Context) (string, error) {
return build.UserVersion(), nil
}
func (a *WebRPC) BlockDelaySecs(context.Context) (uint64, error) {
return build.BlockDelaySecs, nil
}
func Routes(r *mux.Router, deps *deps.Deps) {
handler := &WebRPC{
deps: deps,
}
rpcSrv := jsonrpc.NewServer()
rpcSrv.Register("CurioWeb", handler)
r.Handle("/v0", rpcSrv)
}

View File

@ -0,0 +1,183 @@
package webrpc
import (
"context"
"fmt"
"sort"
"sync"
"time"
"github.com/BurntSushi/toml"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api/client"
"github.com/filecoin-project/lotus/build"
cliutil "github.com/filecoin-project/lotus/cli/util"
)
func forEachConfig[T any](a *WebRPC, cb func(name string, v T) error) error {
confs, err := a.loadConfigs(context.Background())
if err != nil {
return err
}
for name, tomlStr := range confs { // todo for-each-config
var info T
if err := toml.Unmarshal([]byte(tomlStr), &info); err != nil {
return xerrors.Errorf("unmarshaling %s config: %w", name, err)
}
if err := cb(name, info); err != nil {
return xerrors.Errorf("cb: %w", err)
}
}
return nil
}
func (a *WebRPC) loadConfigs(ctx context.Context) (map[string]string, error) {
//err := db.QueryRow(cctx.Context, `SELECT config FROM harmony_config WHERE title=$1`, layer).Scan(&text)
rows, err := a.deps.DB.Query(ctx, `SELECT title, config FROM harmony_config`)
if err != nil {
return nil, xerrors.Errorf("getting db configs: %w", err)
}
configs := make(map[string]string)
for rows.Next() {
var title, config string
if err := rows.Scan(&title, &config); err != nil {
return nil, xerrors.Errorf("scanning db configs: %w", err)
}
configs[title] = config
}
return configs, nil
}
type RpcInfo struct {
Address string
CLayers []string
Reachable bool
SyncState string
Version string
}
func (a *WebRPC) SyncerState(ctx context.Context) ([]RpcInfo, error) {
type minimalApiInfo struct {
Apis struct {
ChainApiInfo []string
}
}
rpcInfos := map[string]minimalApiInfo{} // config name -> api info
confNameToAddr := map[string]string{} // config name -> api address
err := forEachConfig[minimalApiInfo](a, func(name string, info minimalApiInfo) error {
if len(info.Apis.ChainApiInfo) == 0 {
return nil
}
rpcInfos[name] = info
for _, addr := range info.Apis.ChainApiInfo {
ai := cliutil.ParseApiInfo(addr)
confNameToAddr[name] = ai.Addr
}
return nil
})
if err != nil {
return nil, err
}
dedup := map[string]bool{} // for dedup by address
infos := map[string]RpcInfo{} // api address -> rpc info
var infosLk sync.Mutex
var wg sync.WaitGroup
for _, info := range rpcInfos {
ai := cliutil.ParseApiInfo(info.Apis.ChainApiInfo[0])
if dedup[ai.Addr] {
continue
}
dedup[ai.Addr] = true
wg.Add(1)
go func() {
defer wg.Done()
var clayers []string
for layer, a := range confNameToAddr {
if a == ai.Addr {
clayers = append(clayers, layer)
}
}
myinfo := RpcInfo{
Address: ai.Addr,
Reachable: false,
CLayers: clayers,
}
defer func() {
infosLk.Lock()
defer infosLk.Unlock()
infos[ai.Addr] = myinfo
}()
da, err := ai.DialArgs("v1")
if err != nil {
log.Warnw("DialArgs", "error", err)
return
}
ah := ai.AuthHeader()
v1api, closer, err := client.NewFullNodeRPCV1(ctx, da, ah)
if err != nil {
log.Warnf("Not able to establish connection to node with addr: %s", ai.Addr)
return
}
defer closer()
ver, err := v1api.Version(ctx)
if err != nil {
log.Warnw("Version", "error", err)
return
}
head, err := v1api.ChainHead(ctx)
if err != nil {
log.Warnw("ChainHead", "error", err)
return
}
var syncState string
switch {
case time.Now().Unix()-int64(head.MinTimestamp()) < int64(build.BlockDelaySecs*3/2): // within 1.5 epochs
syncState = "ok"
case time.Now().Unix()-int64(head.MinTimestamp()) < int64(build.BlockDelaySecs*5): // within 5 epochs
syncState = fmt.Sprintf("slow (%s behind)", time.Since(time.Unix(int64(head.MinTimestamp()), 0)).Truncate(time.Second))
default:
syncState = fmt.Sprintf("behind (%s behind)", time.Since(time.Unix(int64(head.MinTimestamp()), 0)).Truncate(time.Second))
}
myinfo = RpcInfo{
Address: ai.Addr,
CLayers: clayers,
Reachable: true,
Version: ver.Version,
SyncState: syncState,
}
}()
}
wg.Wait()
var infoList []RpcInfo
for _, i := range infos {
infoList = append(infoList, i)
}
sort.Slice(infoList, func(i, j int) bool {
return infoList[i].Address < infoList[j].Address
})
return infoList, nil
}

View File

@ -1,22 +1,25 @@
import { LitElement, html, css } from 'https://cdn.jsdelivr.net/gh/lit/dist@3/all/lit-all.min.js';
import RPCCall from '/lib/jsonrpc.mjs';
window.customElements.define('chain-connectivity', class MyElement extends LitElement {
constructor() {
super();
this.data = [];
this.loadData();
}
loadData() {
const eventSource = new EventSource('/api/debug/chain-state-sse');
eventSource.onmessage = (event) => {
this.data = JSON.parse(event.data);
super.requestUpdate();
};
eventSource.onerror = (error) => {
console.error('Error:', error);
loadData();
};
async loadData() {
const blockDelay = await RPCCall('BlockDelaySecs')
await this.updateData();
setInterval(this.update, blockDelay * 1000);
};
async updateData() {
this.data = await RPCCall('SyncerState');
console.log(this.data);
super.requestUpdate();
}
static get styles() {
return [css`
:host {

View File

@ -0,0 +1,96 @@
class JsonRpcClient {
static instance = null;
static async getInstance() {
if (!JsonRpcClient.instance) {
JsonRpcClient.instance = (async () => {
const client = new JsonRpcClient('/api/webrpc/v0');
await client.connect();
return client;
})();
}
return await JsonRpcClient.instance;
}
constructor(url) {
if (JsonRpcClient.instance) {
throw new Error("Error: Instantiation failed: Use getInstance() instead of new.");
}
this.url = url;
this.requestId = 0;
this.pendingRequests = new Map();
}
async connect() {
return new Promise((resolve, reject) => {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log("Connected to the server");
resolve();
};
this.ws.onclose = () => {
console.log("Connection closed, attempting to reconnect...");
setTimeout(() => this.connect().then(resolve, reject), 1000); // Reconnect after 1 second
};
this.ws.onerror = (error) => {
console.error("WebSocket error:", error);
reject(error);
};
this.ws.onmessage = (message) => {
this.handleMessage(message);
};
});
}
handleMessage(message) {
const response = JSON.parse(message.data);
const { id, result, error } = response;
const resolver = this.pendingRequests.get(id);
if (resolver) {
if (error) {
resolver.reject(error);
} else {
resolver.resolve(result);
}
this.pendingRequests.delete(id);
}
}
call(method, params = []) {
const id = ++this.requestId;
const request = {
jsonrpc: "2.0",
method: "CurioWeb." + method,
params,
id,
};
return new Promise((resolve, reject) => {
this.pendingRequests.set(id, { resolve, reject });
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(request));
} else {
reject('WebSocket is not open');
}
});
}
}
async function init() {
const client = await JsonRpcClient.getInstance();
console.log("webrpc backend:", await client.call('Version', []))
}
init();
export default async function(method, params = []) {
const i = await JsonRpcClient.getInstance();
return await i.call(method, params);
}