lotus/curiosrc/web/hapi/simpleinfo.go
2024-04-04 18:06:19 +02:00

602 lines
16 KiB
Go

package hapi
import (
"bytes"
"context"
"fmt"
"net/http"
"os"
"sort"
"strconv"
"sync"
"text/template"
"time"
"github.com/gorilla/mux"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api/v1api"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
)
type app struct {
db *harmonydb.DB
t *template.Template
rpcInfoLk sync.Mutex
workingApi v1api.FullNode
actorInfoLk sync.Mutex
actorInfos []actorInfo
}
type actorInfo struct {
Address string
CLayers []string
QualityAdjustedPower string
RawBytePower string
ActorBalance, ActorAvailable, WorkerBalance string
Win1, Win7, Win30 int64
Deadlines []actorDeadline
}
type actorDeadline struct {
Empty bool
Current bool
Proven bool
PartFaulty bool
Faulty bool
}
func (a *app) actorSummary(w http.ResponseWriter, r *http.Request) {
a.actorInfoLk.Lock()
defer a.actorInfoLk.Unlock()
a.executeTemplate(w, "actor_summary", a.actorInfos)
}
func (a *app) indexMachines(w http.ResponseWriter, r *http.Request) {
s, err := a.clusterMachineSummary(r.Context())
if err != nil {
log.Errorf("cluster machine summary: %v", err)
http.Error(w, "internal server error", http.StatusInternalServerError)
return
}
a.executeTemplate(w, "cluster_machines", s)
}
func (a *app) indexTasks(w http.ResponseWriter, r *http.Request) {
s, err := a.clusterTaskSummary(r.Context())
if err != nil {
log.Errorf("cluster task summary: %v", err)
http.Error(w, "internal server error", http.StatusInternalServerError)
return
}
a.executeTemplate(w, "cluster_tasks", s)
}
func (a *app) indexTasksHistory(w http.ResponseWriter, r *http.Request) {
s, err := a.clusterTaskHistorySummary(r.Context())
if err != nil {
log.Errorf("cluster task history summary: %v", err)
http.Error(w, "internal server error", http.StatusInternalServerError)
return
}
a.executeTemplate(w, "cluster_task_history", s)
}
func (a *app) indexPipelinePorep(w http.ResponseWriter, r *http.Request) {
s, err := a.porepPipelineSummary(r.Context())
if err != nil {
log.Errorf("porep pipeline summary: %v", err)
http.Error(w, "internal server error", http.StatusInternalServerError)
return
}
a.executeTemplate(w, "pipeline_porep", s)
}
func (a *app) nodeInfo(writer http.ResponseWriter, request *http.Request) {
params := mux.Vars(request)
id, ok := params["id"]
if !ok {
http.Error(writer, "missing id", http.StatusBadRequest)
return
}
intid, err := strconv.ParseInt(id, 10, 64)
if err != nil {
http.Error(writer, "invalid id", http.StatusBadRequest)
return
}
mi, err := a.clusterNodeInfo(request.Context(), intid)
if err != nil {
log.Errorf("machine info: %v", err)
http.Error(writer, "internal server error", http.StatusInternalServerError)
return
}
a.executePageTemplate(writer, "node_info", "Node Info", mi)
}
var templateDev = os.Getenv("LOTUS_WEB_DEV") == "1"
func (a *app) executeTemplate(w http.ResponseWriter, name string, data interface{}) {
if templateDev {
fs := os.DirFS("./curiosrc/web/hapi/web")
a.t = template.Must(makeTemplate().ParseFS(fs, "*"))
}
if err := a.t.ExecuteTemplate(w, name, data); err != nil {
log.Errorf("execute template %s: %v", name, err)
http.Error(w, "internal server error", http.StatusInternalServerError)
}
}
func (a *app) executePageTemplate(w http.ResponseWriter, name, title string, data interface{}) {
if templateDev {
fs := os.DirFS("./curiosrc/web/hapi/web")
a.t = template.Must(makeTemplate().ParseFS(fs, "*"))
}
var contentBuf bytes.Buffer
if err := a.t.ExecuteTemplate(&contentBuf, name, data); err != nil {
log.Errorf("execute template %s: %v", name, err)
http.Error(w, "internal server error", http.StatusInternalServerError)
}
a.executeTemplate(w, "root", map[string]interface{}{
"PageTitle": title,
"Content": contentBuf.String(),
})
}
type machineRecentTask struct {
TaskName string
Success int64
Fail int64
}
type machineSummary struct {
Address string
ID int64
SinceContact string
RecentTasks []*machineRecentTask
}
type taskSummary struct {
Name string
SincePosted string
Owner, OwnerID *string
ID int64
}
type taskHistorySummary struct {
Name string
TaskID int64
Posted, Start, Queued, Took string
Result bool
Err string
CompletedBy string
}
func (a *app) clusterMachineSummary(ctx context.Context) ([]machineSummary, error) {
// First get task summary for tasks completed in the last 24 hours
// NOTE: This query uses harmony_task_history_work_index, task history may get big
tsrows, err := a.db.Query(ctx, `SELECT hist.completed_by_host_and_port, hist.name, hist.result, count(1) FROM harmony_task_history hist
WHERE hist.work_end > now() - INTERVAL '1 day'
GROUP BY hist.completed_by_host_and_port, hist.name, hist.result
ORDER BY completed_by_host_and_port ASC`)
if err != nil {
return nil, err
}
defer tsrows.Close()
// Map of machine -> task -> recent task
taskSummaries := map[string]map[string]*machineRecentTask{}
for tsrows.Next() {
var taskName string
var result bool
var count int64
var machine string
if err := tsrows.Scan(&machine, &taskName, &result, &count); err != nil {
return nil, err
}
if _, ok := taskSummaries[machine]; !ok {
taskSummaries[machine] = map[string]*machineRecentTask{}
}
if _, ok := taskSummaries[machine][taskName]; !ok {
taskSummaries[machine][taskName] = &machineRecentTask{TaskName: taskName}
}
if result {
taskSummaries[machine][taskName].Success = count
} else {
taskSummaries[machine][taskName].Fail = count
}
}
// Then machine summary
rows, err := a.db.Query(ctx, "SELECT id, host_and_port, last_contact FROM harmony_machines order by host_and_port asc")
if err != nil {
return nil, err // Handle error
}
defer rows.Close()
var summaries []machineSummary
for rows.Next() {
var m machineSummary
var lastContact time.Time
if err := rows.Scan(&m.ID, &m.Address, &lastContact); err != nil {
return nil, err // Handle error
}
m.SinceContact = time.Since(lastContact).Round(time.Second).String()
// Add recent tasks
if ts, ok := taskSummaries[m.Address]; ok {
for _, t := range ts {
m.RecentTasks = append(m.RecentTasks, t)
}
sort.Slice(m.RecentTasks, func(i, j int) bool {
return m.RecentTasks[i].TaskName < m.RecentTasks[j].TaskName
})
}
summaries = append(summaries, m)
}
return summaries, nil
}
func (a *app) clusterTaskSummary(ctx context.Context) ([]taskSummary, error) {
rows, err := a.db.Query(ctx, "SELECT t.id, t.name, t.update_time, t.owner_id, hm.host_and_port FROM harmony_task t LEFT JOIN curio.harmony_machines hm ON hm.id = t.owner_id ORDER BY t.update_time ASC, t.owner_id")
if err != nil {
return nil, err // Handle error
}
defer rows.Close()
var summaries []taskSummary
for rows.Next() {
var t taskSummary
var posted time.Time
if err := rows.Scan(&t.ID, &t.Name, &posted, &t.OwnerID, &t.Owner); err != nil {
return nil, err // Handle error
}
t.SincePosted = time.Since(posted).Round(time.Second).String()
summaries = append(summaries, t)
}
return summaries, nil
}
func (a *app) clusterTaskHistorySummary(ctx context.Context) ([]taskHistorySummary, error) {
rows, err := a.db.Query(ctx, "SELECT id, name, task_id, posted, work_start, work_end, result, err, completed_by_host_and_port FROM harmony_task_history ORDER BY work_end DESC LIMIT 15")
if err != nil {
return nil, err // Handle error
}
defer rows.Close()
var summaries []taskHistorySummary
for rows.Next() {
var t taskHistorySummary
var posted, start, end time.Time
if err := rows.Scan(&t.TaskID, &t.Name, &t.TaskID, &posted, &start, &end, &t.Result, &t.Err, &t.CompletedBy); err != nil {
return nil, err // Handle error
}
t.Posted = posted.Local().Round(time.Second).Format("02 Jan 06 15:04")
t.Start = start.Local().Round(time.Second).Format("02 Jan 06 15:04")
//t.End = end.Local().Round(time.Second).Format("02 Jan 06 15:04")
t.Queued = start.Sub(posted).Round(time.Second).String()
if t.Queued == "0s" {
t.Queued = start.Sub(posted).Round(time.Millisecond).String()
}
t.Took = end.Sub(start).Round(time.Second).String()
if t.Took == "0s" {
t.Took = end.Sub(start).Round(time.Millisecond).String()
}
summaries = append(summaries, t)
}
return summaries, nil
}
type porepPipelineSummary struct {
Actor string
CountSDR int
CountTrees int
CountPrecommitMsg int
CountWaitSeed int
CountPoRep int
CountCommitMsg int
CountDone int
CountFailed int
}
func (a *app) porepPipelineSummary(ctx context.Context) ([]porepPipelineSummary, error) {
rows, err := a.db.Query(ctx, `
SELECT
sp_id,
COUNT(*) FILTER (WHERE after_sdr = false) as CountSDR,
COUNT(*) FILTER (WHERE (after_tree_d = false OR after_tree_c = false OR after_tree_r = false) AND after_sdr = true) as CountTrees,
COUNT(*) FILTER (WHERE after_tree_r = true and after_precommit_msg = false) as CountPrecommitMsg,
COUNT(*) FILTER (WHERE after_precommit_msg_success = false AND after_precommit_msg = true) as CountWaitSeed,
COUNT(*) FILTER (WHERE after_porep = false AND after_precommit_msg_success = true) as CountPoRep,
COUNT(*) FILTER (WHERE after_commit_msg_success = false AND after_porep = true) as CountCommitMsg,
COUNT(*) FILTER (WHERE after_commit_msg_success = true) as CountDone,
COUNT(*) FILTER (WHERE failed = true) as CountFailed
FROM
sectors_sdr_pipeline
GROUP BY sp_id`)
if err != nil {
return nil, xerrors.Errorf("query: %w", err)
}
defer rows.Close()
var summaries []porepPipelineSummary
for rows.Next() {
var summary porepPipelineSummary
if err := rows.Scan(&summary.Actor, &summary.CountSDR, &summary.CountTrees, &summary.CountPrecommitMsg, &summary.CountWaitSeed, &summary.CountPoRep, &summary.CountCommitMsg, &summary.CountDone, &summary.CountFailed); err != nil {
return nil, xerrors.Errorf("scan: %w", err)
}
summary.Actor = "f0" + summary.Actor
summaries = append(summaries, summary)
}
return summaries, nil
}
type machineInfo struct {
Info struct {
Host string
ID int64
LastContact string
CPU int64
Memory int64
GPU int64
}
// Storage
Storage []struct {
ID string
Weight int64
MaxStorage int64
CanSeal bool
CanStore bool
Groups string
AllowTo string
AllowTypes string
DenyTypes string
Capacity int64
Available int64
FSAvailable int64
Reserved int64
Used int64
AllowMiners string
DenyMiners string
LastHeartbeat time.Time
HeartbeatErr *string
UsedPercent float64
ReservedPercent float64
}
/*TotalStorage struct {
MaxStorage int64
UsedStorage int64
MaxSealStorage int64
UsedSealStorage int64
MaxStoreStorage int64
UsedStoreStorage int64
}*/
// Tasks
RunningTasks []struct {
ID int64
Task string
Posted string
PoRepSector, PoRepSectorSP *int64
}
FinishedTasks []struct {
ID int64
Task string
Posted string
Start string
Queued string
Took string
Outcome string
Message string
}
}
func (a *app) clusterNodeInfo(ctx context.Context, id int64) (*machineInfo, error) {
rows, err := a.db.Query(ctx, "SELECT id, host_and_port, last_contact, cpu, ram, gpu FROM harmony_machines WHERE id=$1 ORDER BY host_and_port ASC", id)
if err != nil {
return nil, err // Handle error
}
defer rows.Close()
var summaries []machineInfo
if rows.Next() {
var m machineInfo
var lastContact time.Time
if err := rows.Scan(&m.Info.ID, &m.Info.Host, &lastContact, &m.Info.CPU, &m.Info.Memory, &m.Info.GPU); err != nil {
return nil, err
}
m.Info.LastContact = time.Since(lastContact).Round(time.Second).String()
summaries = append(summaries, m)
}
if len(summaries) == 0 {
return nil, xerrors.Errorf("machine not found")
}
// query storage info
rows2, err := a.db.Query(ctx, "SELECT storage_id, weight, max_storage, can_seal, can_store, groups, allow_to, allow_types, deny_types, capacity, available, fs_available, reserved, used, allow_miners, deny_miners, last_heartbeat, heartbeat_err FROM storage_path WHERE urls LIKE '%' || $1 || '%'", summaries[0].Info.Host)
if err != nil {
return nil, err
}
defer rows2.Close()
for rows2.Next() {
var s struct {
ID string
Weight int64
MaxStorage int64
CanSeal bool
CanStore bool
Groups string
AllowTo string
AllowTypes string
DenyTypes string
Capacity int64
Available int64
FSAvailable int64
Reserved int64
Used int64
AllowMiners string
DenyMiners string
LastHeartbeat time.Time
HeartbeatErr *string
UsedPercent float64
ReservedPercent float64
}
if err := rows2.Scan(&s.ID, &s.Weight, &s.MaxStorage, &s.CanSeal, &s.CanStore, &s.Groups, &s.AllowTo, &s.AllowTypes, &s.DenyTypes, &s.Capacity, &s.Available, &s.FSAvailable, &s.Reserved, &s.Used, &s.AllowMiners, &s.DenyMiners, &s.LastHeartbeat, &s.HeartbeatErr); err != nil {
return nil, err
}
s.UsedPercent = float64(s.Capacity-s.FSAvailable) * 100 / float64(s.Capacity)
s.ReservedPercent = float64(s.Capacity-(s.FSAvailable+s.Reserved))*100/float64(s.Capacity) - s.UsedPercent
summaries[0].Storage = append(summaries[0].Storage, s)
}
// tasks
rows3, err := a.db.Query(ctx, "SELECT id, name, posted_time FROM harmony_task WHERE owner_id=$1", summaries[0].Info.ID)
if err != nil {
return nil, err
}
defer rows3.Close()
for rows3.Next() {
var t struct {
ID int64
Task string
Posted string
PoRepSector *int64
PoRepSectorSP *int64
}
var posted time.Time
if err := rows3.Scan(&t.ID, &t.Task, &posted); err != nil {
return nil, err
}
t.Posted = time.Since(posted).Round(time.Second).String()
{
// try to find in the porep pipeline
rows4, err := a.db.Query(ctx, `SELECT sp_id, sector_number FROM sectors_sdr_pipeline
WHERE task_id_sdr=$1
OR task_id_tree_d=$1
OR task_id_tree_c=$1
OR task_id_tree_r=$1
OR task_id_precommit_msg=$1
OR task_id_porep=$1
OR task_id_commit_msg=$1
OR task_id_finalize=$1
OR task_id_move_storage=$1
`, t.ID)
if err != nil {
return nil, err
}
if rows4.Next() {
var spid int64
var sector int64
if err := rows4.Scan(&spid, &sector); err != nil {
return nil, err
}
t.PoRepSector = &sector
t.PoRepSectorSP = &spid
}
rows4.Close()
}
summaries[0].RunningTasks = append(summaries[0].RunningTasks, t)
}
rows5, err := a.db.Query(ctx, `SELECT name, task_id, posted, work_start, work_end, result, err FROM harmony_task_history WHERE completed_by_host_and_port = $1 ORDER BY work_end DESC LIMIT 15`, summaries[0].Info.Host)
if err != nil {
return nil, err
}
defer rows5.Close()
for rows5.Next() {
var ft struct {
ID int64
Task string
Posted string
Start string
Queued string
Took string
Outcome string
Message string
}
var posted, start, end time.Time
var result bool
if err := rows5.Scan(&ft.Task, &ft.ID, &posted, &start, &end, &result, &ft.Message); err != nil {
return nil, err
}
ft.Outcome = "Success"
if !result {
ft.Outcome = "Failed"
}
// Format the times and durations
ft.Posted = posted.Format("02 Jan 06 15:04 MST")
ft.Start = start.Format("02 Jan 06 15:04 MST")
ft.Queued = fmt.Sprintf("%s", start.Sub(posted).Round(time.Second).String())
ft.Took = fmt.Sprintf("%s", end.Sub(start).Round(time.Second))
summaries[0].FinishedTasks = append(summaries[0].FinishedTasks, ft)
}
return &summaries[0], nil
}