v1.27.0-a #10

Closed
jonathanface wants to merge 473 commits from v1.27.0-a into master
10 changed files with 435 additions and 23 deletions
Showing only changes of commit 099fc04d0a - Show all commits

View File

@ -2,12 +2,13 @@ package hapi
import ( import (
"embed" "embed"
"html/template" "text/template"
"github.com/gorilla/mux" "github.com/gorilla/mux"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/cmd/curio/deps" "github.com/filecoin-project/lotus/cmd/curio/deps"
) )
@ -15,7 +16,7 @@ import (
var templateFS embed.FS var templateFS embed.FS
func Routes(r *mux.Router, deps *deps.Deps) error { func Routes(r *mux.Router, deps *deps.Deps) error {
t, err := template.ParseFS(templateFS, "web/*") t, err := makeTemplate().ParseFS(templateFS, "web/*")
if err != nil { if err != nil {
return xerrors.Errorf("parse templates: %w", err) return xerrors.Errorf("parse templates: %w", err)
} }
@ -28,6 +29,7 @@ func Routes(r *mux.Router, deps *deps.Deps) error {
go a.watchRpc() go a.watchRpc()
go a.watchActor() go a.watchActor()
// index page (simple info)
r.HandleFunc("/simpleinfo/actorsummary", a.actorSummary) r.HandleFunc("/simpleinfo/actorsummary", a.actorSummary)
r.HandleFunc("/simpleinfo/machines", a.indexMachines) r.HandleFunc("/simpleinfo/machines", a.indexMachines)
r.HandleFunc("/simpleinfo/tasks", a.indexTasks) r.HandleFunc("/simpleinfo/tasks", a.indexTasks)
@ -35,8 +37,19 @@ func Routes(r *mux.Router, deps *deps.Deps) error {
r.HandleFunc("/simpleinfo/pipeline-porep", a.indexPipelinePorep) r.HandleFunc("/simpleinfo/pipeline-porep", a.indexPipelinePorep)
// pipeline-porep page // pipeline-porep page
r.HandleFunc("/simpleinfo/pipeline-porep/sectors", a.pipelinePorepSectors) r.HandleFunc("/pipeline-porep/sectors", a.pipelinePorepSectors)
// node info page
r.HandleFunc("/node/{id}", a.nodeInfo)
return nil return nil
} }
func makeTemplate() *template.Template {
return template.New("").Funcs(template.FuncMap{
"toHumanBytes": func(b int64) string {
return types.SizeStr(types.NewInt(uint64(b)))
},
})
}
var log = logging.Logger("curio/web") var log = logging.Logger("curio/web")

View File

@ -1,14 +1,18 @@
package hapi package hapi
import ( import (
"bytes"
"context" "context"
"html/template" "fmt"
"net/http" "net/http"
"os" "os"
"sort" "sort"
"strconv"
"sync" "sync"
"text/template"
"time" "time"
"github.com/gorilla/mux"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/api/v1api" "github.com/filecoin-project/lotus/api/v1api"
@ -99,12 +103,37 @@ func (a *app) indexPipelinePorep(w http.ResponseWriter, r *http.Request) {
a.executeTemplate(w, "pipeline_porep", s) a.executeTemplate(w, "pipeline_porep", s)
} }
func (a *app) nodeInfo(writer http.ResponseWriter, request *http.Request) {
params := mux.Vars(request)
id, ok := params["id"]
if !ok {
http.Error(writer, "missing id", http.StatusBadRequest)
return
}
intid, err := strconv.ParseInt(id, 10, 64)
if err != nil {
http.Error(writer, "invalid id", http.StatusBadRequest)
return
}
mi, err := a.clusterNodeInfo(request.Context(), intid)
if err != nil {
log.Errorf("machine info: %v", err)
http.Error(writer, "internal server error", http.StatusInternalServerError)
return
}
a.executePageTemplate(writer, "node_info", "Node Info", mi)
}
var templateDev = os.Getenv("LOTUS_WEB_DEV") == "1" var templateDev = os.Getenv("LOTUS_WEB_DEV") == "1"
func (a *app) executeTemplate(w http.ResponseWriter, name string, data interface{}) { func (a *app) executeTemplate(w http.ResponseWriter, name string, data interface{}) {
if templateDev { if templateDev {
fs := os.DirFS("./cmd/curio/web/hapi/web") fs := os.DirFS("./curiosrc/web/hapi/web")
a.t = template.Must(template.ParseFS(fs, "*")) a.t = template.Must(makeTemplate().ParseFS(fs, "*"))
} }
if err := a.t.ExecuteTemplate(w, name, data); err != nil { if err := a.t.ExecuteTemplate(w, name, data); err != nil {
log.Errorf("execute template %s: %v", name, err) log.Errorf("execute template %s: %v", name, err)
@ -112,6 +141,22 @@ func (a *app) executeTemplate(w http.ResponseWriter, name string, data interface
} }
} }
func (a *app) executePageTemplate(w http.ResponseWriter, name, title string, data interface{}) {
if templateDev {
fs := os.DirFS("./curiosrc/web/hapi/web")
a.t = template.Must(makeTemplate().ParseFS(fs, "*"))
}
var contentBuf bytes.Buffer
if err := a.t.ExecuteTemplate(&contentBuf, name, data); err != nil {
log.Errorf("execute template %s: %v", name, err)
http.Error(w, "internal server error", http.StatusInternalServerError)
}
a.executeTemplate(w, "root", map[string]interface{}{
"PageTitle": title,
"Content": contentBuf.String(),
})
}
type machineRecentTask struct { type machineRecentTask struct {
TaskName string TaskName string
Success int64 Success int64
@ -127,10 +172,10 @@ type machineSummary struct {
} }
type taskSummary struct { type taskSummary struct {
Name string Name string
SincePosted string SincePosted string
Owner *string Owner, OwnerID *string
ID int64 ID int64
} }
type taskHistorySummary struct { type taskHistorySummary struct {
@ -219,7 +264,7 @@ func (a *app) clusterMachineSummary(ctx context.Context) ([]machineSummary, erro
} }
func (a *app) clusterTaskSummary(ctx context.Context) ([]taskSummary, error) { func (a *app) clusterTaskSummary(ctx context.Context) ([]taskSummary, error) {
rows, err := a.db.Query(ctx, "SELECT id, name, update_time, owner_id FROM harmony_task order by update_time asc, owner_id") rows, err := a.db.Query(ctx, "SELECT t.id, t.name, t.update_time, t.owner_id, hm.host_and_port FROM harmony_task t LEFT JOIN curio.harmony_machines hm ON hm.id = t.owner_id ORDER BY t.update_time ASC, t.owner_id")
if err != nil { if err != nil {
return nil, err // Handle error return nil, err // Handle error
} }
@ -230,7 +275,7 @@ func (a *app) clusterTaskSummary(ctx context.Context) ([]taskSummary, error) {
var t taskSummary var t taskSummary
var posted time.Time var posted time.Time
if err := rows.Scan(&t.ID, &t.Name, &posted, &t.Owner); err != nil { if err := rows.Scan(&t.ID, &t.Name, &posted, &t.OwnerID, &t.Owner); err != nil {
return nil, err // Handle error return nil, err // Handle error
} }
@ -321,3 +366,232 @@ func (a *app) porepPipelineSummary(ctx context.Context) ([]porepPipelineSummary,
} }
return summaries, nil return summaries, nil
} }
type machineInfo struct {
Info struct {
Host string
ID int64
LastContact string
CPU int64
Memory int64
GPU int64
}
// Storage
Storage []struct {
ID string
Weight int64
MaxStorage int64
CanSeal bool
CanStore bool
Groups string
AllowTo string
AllowTypes string
DenyTypes string
Capacity int64
Available int64
FSAvailable int64
Reserved int64
Used int64
LastHeartbeat time.Time
HeartbeatErr *string
UsedPercent float64
ReservedPercent float64
}
/*TotalStorage struct {
MaxStorage int64
UsedStorage int64
MaxSealStorage int64
UsedSealStorage int64
MaxStoreStorage int64
UsedStoreStorage int64
}*/
// Tasks
RunningTasks []struct {
ID int64
Task string
Posted string
PoRepSector, PoRepSectorSP *int64
}
FinishedTasks []struct {
ID int64
Task string
Posted string
Start string
Queued string
Took string
Outcome string
Message string
}
}
func (a *app) clusterNodeInfo(ctx context.Context, id int64) (*machineInfo, error) {
rows, err := a.db.Query(ctx, "SELECT id, host_and_port, last_contact, cpu, ram, gpu FROM harmony_machines WHERE id=$1 ORDER BY host_and_port ASC", id)
if err != nil {
return nil, err // Handle error
}
defer rows.Close()
var summaries []machineInfo
if rows.Next() {
var m machineInfo
var lastContact time.Time
if err := rows.Scan(&m.Info.ID, &m.Info.Host, &lastContact, &m.Info.CPU, &m.Info.Memory, &m.Info.GPU); err != nil {
return nil, err
}
m.Info.LastContact = time.Since(lastContact).Round(time.Second).String()
summaries = append(summaries, m)
}
if len(summaries) == 0 {
return nil, xerrors.Errorf("machine not found")
}
// query storage info
rows2, err := a.db.Query(ctx, "SELECT storage_id, weight, max_storage, can_seal, can_store, groups, allow_to, allow_types, deny_types, capacity, available, fs_available, reserved, used, last_heartbeat, heartbeat_err FROM storage_path WHERE urls LIKE '%' || $1 || '%'", summaries[0].Info.Host)
if err != nil {
return nil, err
}
defer rows2.Close()
for rows2.Next() {
var s struct {
ID string
Weight int64
MaxStorage int64
CanSeal bool
CanStore bool
Groups string
AllowTo string
AllowTypes string
DenyTypes string
Capacity int64
Available int64
FSAvailable int64
Reserved int64
Used int64
LastHeartbeat time.Time
HeartbeatErr *string
UsedPercent float64
ReservedPercent float64
}
if err := rows2.Scan(&s.ID, &s.Weight, &s.MaxStorage, &s.CanSeal, &s.CanStore, &s.Groups, &s.AllowTo, &s.AllowTypes, &s.DenyTypes, &s.Capacity, &s.Available, &s.FSAvailable, &s.Reserved, &s.Used, &s.LastHeartbeat, &s.HeartbeatErr); err != nil {
return nil, err
}
s.UsedPercent = float64(s.Capacity-s.FSAvailable) * 100 / float64(s.Capacity)
s.ReservedPercent = float64(s.Capacity-(s.FSAvailable+s.Reserved))*100/float64(s.Capacity) - s.UsedPercent
summaries[0].Storage = append(summaries[0].Storage, s)
}
// tasks
rows3, err := a.db.Query(ctx, "SELECT id, name, posted_time FROM harmony_task WHERE owner_id=$1", summaries[0].Info.ID)
if err != nil {
return nil, err
}
defer rows3.Close()
for rows3.Next() {
var t struct {
ID int64
Task string
Posted string
PoRepSector *int64
PoRepSectorSP *int64
}
var posted time.Time
if err := rows3.Scan(&t.ID, &t.Task, &posted); err != nil {
return nil, err
}
t.Posted = time.Since(posted).Round(time.Second).String()
{
// try to find in the porep pipeline
rows4, err := a.db.Query(ctx, `SELECT sp_id, sector_number FROM sectors_sdr_pipeline
WHERE task_id_sdr=$1
OR task_id_tree_d=$1
OR task_id_tree_c=$1
OR task_id_tree_r=$1
OR task_id_precommit_msg=$1
OR task_id_porep=$1
OR task_id_commit_msg=$1
OR task_id_finalize=$1
OR task_id_move_storage=$1
`, t.ID)
if err != nil {
return nil, err
}
if rows4.Next() {
var spid int64
var sector int64
if err := rows4.Scan(&spid, &sector); err != nil {
return nil, err
}
t.PoRepSector = &sector
t.PoRepSectorSP = &spid
}
rows4.Close()
}
summaries[0].RunningTasks = append(summaries[0].RunningTasks, t)
}
rows5, err := a.db.Query(ctx, `SELECT name, task_id, posted, work_start, work_end, result, err FROM harmony_task_history WHERE completed_by_host_and_port = $1 ORDER BY work_end DESC LIMIT 15`, summaries[0].Info.Host)
if err != nil {
return nil, err
}
defer rows5.Close()
for rows5.Next() {
var ft struct {
ID int64
Task string
Posted string
Start string
Queued string
Took string
Outcome string
Message string
}
var posted, start, end time.Time
var result bool
if err := rows5.Scan(&ft.Task, &ft.ID, &posted, &start, &end, &result, &ft.Message); err != nil {
return nil, err
}
ft.Outcome = "Success"
if !result {
ft.Outcome = "Failed"
}
// Format the times and durations
ft.Posted = posted.Format("02 Jan 06 15:04 MST")
ft.Start = start.Format("02 Jan 06 15:04 MST")
ft.Queued = fmt.Sprintf("%s", start.Sub(posted).Round(time.Second).String())
ft.Took = fmt.Sprintf("%s", end.Sub(start).Round(time.Second))
summaries[0].FinishedTasks = append(summaries[0].FinishedTasks, ft)
}
return &summaries[0], nil
}

View File

@ -1,7 +1,7 @@
{{define "cluster_machines"}} {{define "cluster_machines"}}
{{range .}} {{range .}}
<tr> <tr>
<td>{{.Address}}</td> <td><a href="/hapi/node/{{.ID}}">{{.Address}}</a></td>
<td>{{.ID}}</td> <td>{{.ID}}</td>
<td>{{.SinceContact}}</td> <td>{{.SinceContact}}</td>
{{range .RecentTasks}} {{range .RecentTasks}}

View File

@ -4,7 +4,7 @@
<td>{{.Name}}</td> <td>{{.Name}}</td>
<td>{{.ID}}</td> <td>{{.ID}}</td>
<td>{{.SincePosted}}</td> <td>{{.SincePosted}}</td>
<td>{{.Owner}}</td> <td>{{if ne nil .OwnerID}}<a href="/hapi/node/{{.OwnerID}}">{{.Owner}}</a>{{end}}</td>
</tr> </tr>
{{end}} {{end}}
{{end}} {{end}}

View File

@ -0,0 +1,100 @@
{{define "node_info"}}
<h2>Info</h2>
<table>
<tr>
<td>Host</td>
<td>ID</td>
<td>Last Contact</td>
<td>CPU</td>
<td>Memory</td>
<td>GPU</td>
<td>Debug</td>
</tr>
<tr>
<td>{{.Info.Host}}</td>
<td>{{.Info.ID}}</td>
<td>{{.Info.LastContact}}</td>
<td>{{.Info.CPU}}</td>
<td>{{toHumanBytes .Info.Memory}}</td>
<td>{{.Info.GPU}}</td>
<td><a href="http://{{.Info.Host}}/debug/pprof">[pprof]</a></td>
</tr>
</table>
<hr>
<h2>Storage</h2>
<table>
<tr>
<td>ID</td>
<td>Type</td>
<td>Capacity</td>
<td>Available</td>
<td>Reserved</td>
<td></td>
</tr>
{{range .Storage}}
<tr>
<td>{{.ID}}</td>
<td>
{{if and (not .CanSeal) (not .CanStore)}}ReadOnly{{end}}
{{if and (.CanSeal) (not .CanStore)}}Seal{{end}}
{{if and (not .CanSeal) (.CanStore)}}Store{{end}}
{{if and (.CanSeal) (.CanStore)}}Seal+Store{{end}}
</td>
<td>{{toHumanBytes .Capacity}}</td>
<td>{{toHumanBytes .Available}}</td>
<td>{{toHumanBytes .Reserved}}</td>
<td>
<div style="width: 200px; height: 10px; background-color: black; border-bottom: 1px solid darkgrey">
<div style="width: {{.UsedPercent}}%; height: 10px; background-color: green"></div>
<div style="width: {{.ReservedPercent}}%; height: 10px; background-color: red"></div>
</div>
</td>
</tr>
{{end}}
</table>
<hr>
<h2>Tasks</h2>
<h3>Running</h3>
<table>
<tr>
<td>ID</td>
<td>Task</td>
<td>Posted</td>
<td>Sector</td>
</tr>
{{range .RunningTasks}}
<tr>
<td>{{.ID}}</td>
<td>{{.Task}}</td>
<td>{{.Posted}}</td>
<td>{{if ne nil .PoRepSector}}<a href="/pipeline_porep.html">f0{{.PoRepSectorSP}}:{{.PoRepSector}}</a>{{end}}</td>
</tr>
{{end}}
</table>
<h3>Recently Finished</h3>
<table>
<tr>
<td>ID</td>
<td>Task</td>
<td>Posted</td>
<td>Start</td>
<td>Queued</td>
<td>Took</td>
<td>Outcome</td>
<td>Message</td>
</tr>
{{range .FinishedTasks}}
<tr>
<td>{{.ID}}</td>
<td>{{.Task}}</td>
<td>{{.Posted}}</td>
<td>{{.Start}}</td>
<td>{{.Queued}}</td>
<td>{{.Took}}</td>
<td>{{.Outcome}}</td>
<td>{{.Message}}</td>
</tr>
{{end}}
</table>
{{end}}

View File

@ -0,0 +1,25 @@
{{define "root"}}
<html>
<head>
<title>{{.PageTitle}}</title>
<script src="https://unpkg.com/htmx.org@1.9.5" integrity="sha384-xcuj3WpfgjlKF+FXhSQFQ0ZNr39ln+hwjN3npfM9VBnUskLolQAcN80McRIVOPuO" crossorigin="anonymous"></script>
<script type="module" src="chain-connectivity.js"></script>
<link rel="stylesheet" href="/main.css">
<link rel='stylesheet' href='https://cdn.jsdelivr.net/npm/hack-font@3.3.0/build/web/hack-subset.css'>
</head>
<body>
<div class="app-head">
<div class="head-left">
<h1>{{.PageTitle}}</h1>
</div>
<div class="head-right">
version [todo]
</div>
</div>
<hr/>
<div class="page">
{{.Content}}
</div>
</body>
</html>
{{end}}

View File

@ -41,8 +41,8 @@ func GetSrv(ctx context.Context, deps *deps.Deps) (*http.Server, error) {
var static fs.FS = static var static fs.FS = static
if webDev { if webDev {
basePath = "cmd/curio/web/static" basePath = ""
static = os.DirFS(basePath) static = os.DirFS("curiosrc/web/static")
} }
mx.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { mx.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

View File

@ -35,16 +35,16 @@ table tr td:first-child, table tr th:first-child {
border-left: none; border-left: none;
} }
a:link { a {
color: #cfc; text-decoration: none;
} }
a:visited { a:link, a:visited {
color: #dfa; color: #adf7ad;
} }
a:hover { a:hover {
color: #af7; color: #88cc60;
} }
.success { .success {

View File

@ -75,7 +75,7 @@
<th>State</th> <th>State</th>
</tr> </tr>
</thead> </thead>
<tbody hx-get="/hapi/simpleinfo/pipeline-porep/sectors" hx-trigger="load,every 3s"> <tbody hx-get="/hapi/pipeline-porep/sectors" hx-trigger="load,every 3s">
</tbody> </tbody>
</table> </table>
</div> </div>

View File

@ -178,7 +178,7 @@ func New(
} }
} }
if !h.considerWork(workSourceRecover, []TaskID{TaskID(w.ID)}) { if !h.considerWork(workSourceRecover, []TaskID{TaskID(w.ID)}) {
log.Error("Strange: Unable to accept previously owned task: ", w.ID, w.Name) log.Errorw("Strange: Unable to accept previously owned task", "id", w.ID, "type", w.Name)
} }
} }
} }