lotus/cmd/lotus-miner/sealing.go
2022-06-14 20:03:38 +02:00

498 lines
12 KiB
Go

package main
import (
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"sort"
"strings"
"text/tabwriter"
"time"
"github.com/dustin/go-humanize"
"github.com/fatih/color"
"github.com/google/uuid"
"github.com/mitchellh/go-homedir"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/lib/httpreader"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
var sealingCmd = &cli.Command{
Name: "sealing",
Usage: "interact with sealing pipeline",
Subcommands: []*cli.Command{
sealingJobsCmd,
workersCmd(true),
sealingSchedDiagCmd,
sealingAbortCmd,
sealingDataCidCmd,
},
}
func workersCmd(sealing bool) *cli.Command {
return &cli.Command{
Name: "workers",
Usage: "list workers",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "color",
Usage: "use color in display output",
DefaultText: "depends on output being a TTY",
},
},
Action: func(cctx *cli.Context) error {
if cctx.IsSet("color") {
color.NoColor = !cctx.Bool("color")
}
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
stats, err := nodeApi.WorkerStats(ctx)
if err != nil {
return err
}
type sortableStat struct {
id uuid.UUID
storiface.WorkerStats
}
st := make([]sortableStat, 0, len(stats))
for id, stat := range stats {
if len(stat.Tasks) > 0 {
if (stat.Tasks[0].WorkerType() != sealtasks.WorkerSealing) == sealing {
continue
}
}
st = append(st, sortableStat{id, stat})
}
sort.Slice(st, func(i, j int) bool {
return st[i].id.String() < st[j].id.String()
})
/*
Example output:
Worker c4d65451-07f8-4230-98ad-4f33dea2a8cc, host myhostname
TASK: PC1(1/4) AP(15/15) GET(3)
CPU: [|||||||| ] 16/128 core(s) in use
RAM: [|||||||| ] 12% 125.8 GiB/1008 GiB
VMEM: [|||||||| ] 12% 125.8 GiB/1008 GiB
GPU: [ ] 0% 0.00/1 gpu(s) in use
GPU: NVIDIA GeForce RTX 3090, not used
*/
for _, stat := range st {
// Worker uuid + name
var disabled string
if !stat.Enabled {
disabled = color.RedString(" (disabled)")
}
fmt.Printf("Worker %s, host %s%s\n", stat.id, color.MagentaString(stat.Info.Hostname), disabled)
// Task counts
tc := make([][]string, 0, len(stat.TaskCounts))
for st, c := range stat.TaskCounts {
if c == 0 {
continue
}
stt, err := sealtasks.SttFromString(st)
if err != nil {
return err
}
str := fmt.Sprint(c)
if max := stat.Info.Resources.ResourceSpec(stt.RegisteredSealProof, stt.TaskType).MaxConcurrent; max > 0 {
switch {
case c < max:
str = color.GreenString(str)
case c >= max:
str = color.YellowString(str)
}
str = fmt.Sprintf("%s/%d", str, max)
} else {
str = color.CyanString(str)
}
str = fmt.Sprintf("%s(%s)", color.BlueString(stt.Short()), str)
tc = append(tc, []string{string(stt.TaskType), str})
}
sort.Slice(tc, func(i, j int) bool {
return sealtasks.TaskType(tc[i][0]).Less(sealtasks.TaskType(tc[j][0]))
})
var taskStr string
for _, t := range tc {
taskStr = t[1] + " "
}
if taskStr != "" {
fmt.Printf("\tTASK: %s\n", taskStr)
}
// CPU use
fmt.Printf("\tCPU: [%s] %d/%d core(s) in use\n",
lcli.BarString(float64(stat.Info.Resources.CPUs), 0, float64(stat.CpuUse)), stat.CpuUse, stat.Info.Resources.CPUs)
// RAM use
ramTotal := stat.Info.Resources.MemPhysical
ramTasks := stat.MemUsedMin
ramUsed := stat.Info.Resources.MemUsed
var ramReserved uint64 = 0
if ramUsed > ramTasks {
ramReserved = ramUsed - ramTasks
}
ramBar := lcli.BarString(float64(ramTotal), float64(ramReserved), float64(ramTasks))
fmt.Printf("\tRAM: [%s] %d%% %s/%s\n", ramBar,
(ramTasks+ramReserved)*100/stat.Info.Resources.MemPhysical,
types.SizeStr(types.NewInt(ramTasks+ramUsed)),
types.SizeStr(types.NewInt(stat.Info.Resources.MemPhysical)))
// VMEM use (ram+swap)
vmemTotal := stat.Info.Resources.MemPhysical + stat.Info.Resources.MemSwap
vmemTasks := stat.MemUsedMax
vmemUsed := stat.Info.Resources.MemUsed + stat.Info.Resources.MemSwapUsed
var vmemReserved uint64 = 0
if vmemUsed > vmemTasks {
vmemReserved = vmemUsed - vmemTasks
}
vmemBar := lcli.BarString(float64(vmemTotal), float64(vmemReserved), float64(vmemTasks))
fmt.Printf("\tVMEM: [%s] %d%% %s/%s\n", vmemBar,
(vmemTasks+vmemReserved)*100/vmemTotal,
types.SizeStr(types.NewInt(vmemTasks+vmemReserved)),
types.SizeStr(types.NewInt(vmemTotal)))
// GPU use
if len(stat.Info.Resources.GPUs) > 0 {
gpuBar := lcli.BarString(float64(len(stat.Info.Resources.GPUs)), 0, stat.GpuUsed)
fmt.Printf("\tGPU: [%s] %.f%% %.2f/%d gpu(s) in use\n", color.GreenString(gpuBar),
stat.GpuUsed*100/float64(len(stat.Info.Resources.GPUs)),
stat.GpuUsed, len(stat.Info.Resources.GPUs))
}
gpuUse := "not "
gpuCol := color.FgBlue
if stat.GpuUsed > 0 {
gpuCol = color.FgGreen
gpuUse = ""
}
for _, gpu := range stat.Info.Resources.GPUs {
fmt.Printf("\tGPU: %s\n", color.New(gpuCol).Sprintf("%s, %sused", gpu, gpuUse))
}
}
return nil
},
}
}
var sealingJobsCmd = &cli.Command{
Name: "jobs",
Usage: "list running jobs",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "color",
Usage: "use color in display output",
DefaultText: "depends on output being a TTY",
},
&cli.BoolFlag{
Name: "show-ret-done",
Usage: "show returned but not consumed calls",
},
},
Action: func(cctx *cli.Context) error {
if cctx.IsSet("color") {
color.NoColor = !cctx.Bool("color")
}
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
jobs, err := nodeApi.WorkerJobs(ctx)
if err != nil {
return xerrors.Errorf("getting worker jobs: %w", err)
}
type line struct {
storiface.WorkerJob
wid uuid.UUID
}
lines := make([]line, 0)
for wid, jobs := range jobs {
for _, job := range jobs {
lines = append(lines, line{
WorkerJob: job,
wid: wid,
})
}
}
// oldest first
sort.Slice(lines, func(i, j int) bool {
if lines[i].RunWait != lines[j].RunWait {
return lines[i].RunWait < lines[j].RunWait
}
if lines[i].Start.Equal(lines[j].Start) {
return lines[i].ID.ID.String() < lines[j].ID.ID.String()
}
return lines[i].Start.Before(lines[j].Start)
})
workerHostnames := map[uuid.UUID]string{}
wst, err := nodeApi.WorkerStats(ctx)
if err != nil {
return xerrors.Errorf("getting worker stats: %w", err)
}
for wid, st := range wst {
workerHostnames[wid] = st.Info.Hostname
}
tw := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0)
_, _ = fmt.Fprintf(tw, "ID\tSector\tWorker\tHostname\tTask\tState\tTime\n")
for _, l := range lines {
state := "running"
switch {
case l.RunWait > 1:
state = fmt.Sprintf("assigned(%d)", l.RunWait-1)
case l.RunWait == storiface.RWPrepared:
state = "prepared"
case l.RunWait == storiface.RWRetDone:
if !cctx.Bool("show-ret-done") {
continue
}
state = "ret-done"
case l.RunWait == storiface.RWReturned:
state = "returned"
case l.RunWait == storiface.RWRetWait:
state = "ret-wait"
}
dur := "n/a"
if !l.Start.IsZero() {
dur = time.Now().Sub(l.Start).Truncate(time.Millisecond * 100).String()
}
hostname, ok := workerHostnames[l.wid]
if !ok {
hostname = l.Hostname
}
_, _ = fmt.Fprintf(tw, "%s\t%d\t%s\t%s\t%s\t%s\t%s\n",
hex.EncodeToString(l.ID.ID[:4]),
l.Sector.Number,
hex.EncodeToString(l.wid[:4]),
hostname,
l.Task.Short(),
state,
dur)
}
return tw.Flush()
},
}
var sealingSchedDiagCmd = &cli.Command{
Name: "sched-diag",
Usage: "Dump internal scheduler state",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "force-sched",
},
},
Action: func(cctx *cli.Context) error {
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
st, err := nodeApi.SealingSchedDiag(ctx, cctx.Bool("force-sched"))
if err != nil {
return err
}
j, err := json.MarshalIndent(&st, "", " ")
if err != nil {
return err
}
fmt.Println(string(j))
return nil
},
}
var sealingAbortCmd = &cli.Command{
Name: "abort",
Usage: "Abort a running job",
ArgsUsage: "[callid]",
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() != 1 {
return xerrors.Errorf("expected 1 argument")
}
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
jobs, err := nodeApi.WorkerJobs(ctx)
if err != nil {
return xerrors.Errorf("getting worker jobs: %w", err)
}
var job *storiface.WorkerJob
outer:
for _, workerJobs := range jobs {
for _, j := range workerJobs {
if strings.HasPrefix(j.ID.ID.String(), cctx.Args().First()) {
j := j
job = &j
break outer
}
}
}
if job == nil {
return xerrors.Errorf("job with specified id prefix not found")
}
fmt.Printf("aborting job %s, task %s, sector %d, running on host %s\n", job.ID.String(), job.Task.Short(), job.Sector.Number, job.Hostname)
return nodeApi.SealingAbort(ctx, job.ID)
},
}
var sealingDataCidCmd = &cli.Command{
Name: "data-cid",
Usage: "Compute data CID using workers",
ArgsUsage: "[file/url] <padded piece size>",
Flags: []cli.Flag{
&cli.Uint64Flag{
Name: "file-size",
Usage: "real file size",
},
},
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() < 1 || cctx.Args().Len() > 2 {
return xerrors.Errorf("expected 1 or 2 arguments")
}
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
var r io.Reader
sz := cctx.Uint64("file-size")
if strings.HasPrefix(cctx.Args().First(), "http://") || strings.HasPrefix(cctx.Args().First(), "https://") {
r = &httpreader.HttpReader{
URL: cctx.Args().First(),
}
if !cctx.IsSet("file-size") {
resp, err := http.Head(cctx.Args().First())
if err != nil {
return xerrors.Errorf("http head: %w", err)
}
if resp.ContentLength < 0 {
return xerrors.Errorf("head response didn't contain content length; specify --file-size")
}
sz = uint64(resp.ContentLength)
}
} else {
p, err := homedir.Expand(cctx.Args().First())
if err != nil {
return xerrors.Errorf("expanding path: %w", err)
}
f, err := os.OpenFile(p, os.O_RDONLY, 0)
if err != nil {
return xerrors.Errorf("opening source file: %w", err)
}
if !cctx.IsSet("file-size") {
st, err := f.Stat()
if err != nil {
return xerrors.Errorf("stat: %w", err)
}
sz = uint64(st.Size())
}
r = f
}
var psize abi.PaddedPieceSize
if cctx.Args().Len() == 2 {
rps, err := humanize.ParseBytes(cctx.Args().Get(1))
if err != nil {
return xerrors.Errorf("parsing piece size: %w", err)
}
psize = abi.PaddedPieceSize(rps)
if err := psize.Validate(); err != nil {
return xerrors.Errorf("checking piece size: %w", err)
}
if sz > uint64(psize.Unpadded()) {
return xerrors.Errorf("file larger than the piece")
}
} else {
psize = padreader.PaddedSize(sz).Padded()
}
pc, err := nodeApi.ComputeDataCid(ctx, psize.Unpadded(), r)
if err != nil {
return xerrors.Errorf("computing data CID: %w", err)
}
fmt.Println(pc.PieceCID, " ", pc.Size)
return nil
},
}