lotus/cmd/lotus-miner/sealing.go
Clint Armstrong 93e4656a27 Use a float to represent GPU utilization
Before this change workers can only be allocated one GPU task,
regardless of how much of the GPU resources that task uses, or how many
GPUs are in the system.

This makes GPUUtilization a float which can represent that a task needs
a portion, or multiple GPUs. GPUs are accounted for like RAM and CPUs so
that workers with more GPUs can be allocated more tasks.

A known issue is that PC2 cannot use multiple GPUs. And even if the
worker has multiple GPUs and is allocated multiple PC2 tasks, those
tasks will only run on the first GPU.

This could result in unexpected behavior when a worker with multiple
GPUs is assigned multiple PC2 tasks. But this should not suprise any
existing users who upgrade, as any existing users who run workers with
multiple GPUs should already know this and be running a worker per GPU
for PC2. But now those users have the freedom to customize the GPU
utilization of PC2 to be less than one and effectively run multiple PC2
processes in a single worker.

C2 is capable of utilizing multiple GPUs, and now workers can be
customized for C2 accordingly.
2021-11-30 02:06:58 +01:00

340 lines
7.9 KiB
Go

package main
import (
"encoding/hex"
"encoding/json"
"fmt"
"math"
"os"
"sort"
"strings"
"text/tabwriter"
"time"
"github.com/fatih/color"
"github.com/google/uuid"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
)
var sealingCmd = &cli.Command{
Name: "sealing",
Usage: "interact with sealing pipeline",
Subcommands: []*cli.Command{
sealingJobsCmd,
sealingWorkersCmd,
sealingSchedDiagCmd,
sealingAbortCmd,
},
}
var barCols = float64(64)
func barString(total, y, g float64) string {
yBars := int(math.Round(y / total * barCols))
gBars := int(math.Round(g / total * barCols))
eBars := int(barCols) - yBars - gBars
return color.YellowString(strings.Repeat("|", yBars)) +
color.GreenString(strings.Repeat("|", gBars)) +
strings.Repeat(" ", eBars)
}
var sealingWorkersCmd = &cli.Command{
Name: "workers",
Usage: "list workers",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "color",
Usage: "use color in display output",
DefaultText: "depends on output being a TTY",
},
},
Action: func(cctx *cli.Context) error {
if cctx.IsSet("color") {
color.NoColor = !cctx.Bool("color")
}
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
stats, err := nodeApi.WorkerStats(ctx)
if err != nil {
return err
}
type sortableStat struct {
id uuid.UUID
storiface.WorkerStats
}
st := make([]sortableStat, 0, len(stats))
for id, stat := range stats {
st = append(st, sortableStat{id, stat})
}
sort.Slice(st, func(i, j int) bool {
return st[i].id.String() < st[j].id.String()
})
for _, stat := range st {
gpuUse := "not "
gpuCol := color.FgBlue
if stat.GpuUsed > 0 {
gpuCol = color.FgGreen
gpuUse = ""
}
var disabled string
if !stat.Enabled {
disabled = color.RedString(" (disabled)")
}
fmt.Printf("Worker %s, host %s%s\n", stat.id, color.MagentaString(stat.Info.Hostname), disabled)
fmt.Printf("\tCPU: [%s] %d/%d core(s) in use\n",
barString(float64(stat.Info.Resources.CPUs), 0, float64(stat.CpuUse)), stat.CpuUse, stat.Info.Resources.CPUs)
ramTotal := stat.Info.Resources.MemPhysical
ramTasks := stat.MemUsedMin
ramUsed := stat.Info.Resources.MemUsed
var ramReserved uint64 = 0
if ramUsed > ramTasks {
ramReserved = ramUsed - ramTasks
}
ramBar := barString(float64(ramTotal), float64(ramReserved), float64(ramTasks))
fmt.Printf("\tRAM: [%s] %d%% %s/%s\n", ramBar,
(ramTasks+ramReserved)*100/stat.Info.Resources.MemPhysical,
types.SizeStr(types.NewInt(ramTasks+ramUsed)),
types.SizeStr(types.NewInt(stat.Info.Resources.MemPhysical)))
vmemTotal := stat.Info.Resources.MemPhysical + stat.Info.Resources.MemSwap
vmemTasks := stat.MemUsedMax
vmemUsed := stat.Info.Resources.MemUsed + stat.Info.Resources.MemSwapUsed
var vmemReserved uint64 = 0
if vmemUsed > vmemTasks {
vmemReserved = vmemUsed - vmemTasks
}
vmemBar := barString(float64(vmemTotal), float64(vmemReserved), float64(vmemTasks))
fmt.Printf("\tVMEM: [%s] %d%% %s/%s\n", vmemBar,
(vmemTasks+vmemReserved)*100/vmemTotal,
types.SizeStr(types.NewInt(vmemTasks+vmemReserved)),
types.SizeStr(types.NewInt(vmemTotal)))
if len(stat.Info.Resources.GPUs) > 0 {
gpuBar := barString(float64(len(stat.Info.Resources.GPUs)), 0, stat.GpuUsed)
fmt.Printf("\tGPU: [%s] %.f%% %.2f/%d gpu(s) in use\n", color.GreenString(gpuBar),
stat.GpuUsed*100/float64(len(stat.Info.Resources.GPUs)),
stat.GpuUsed, len(stat.Info.Resources.GPUs))
}
for _, gpu := range stat.Info.Resources.GPUs {
fmt.Printf("\tGPU: %s\n", color.New(gpuCol).Sprintf("%s, %sused", gpu, gpuUse))
}
}
return nil
},
}
var sealingJobsCmd = &cli.Command{
Name: "jobs",
Usage: "list running jobs",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "color",
Usage: "use color in display output",
DefaultText: "depends on output being a TTY",
},
&cli.BoolFlag{
Name: "show-ret-done",
Usage: "show returned but not consumed calls",
},
},
Action: func(cctx *cli.Context) error {
if cctx.IsSet("color") {
color.NoColor = !cctx.Bool("color")
}
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
jobs, err := nodeApi.WorkerJobs(ctx)
if err != nil {
return xerrors.Errorf("getting worker jobs: %w", err)
}
type line struct {
storiface.WorkerJob
wid uuid.UUID
}
lines := make([]line, 0)
for wid, jobs := range jobs {
for _, job := range jobs {
lines = append(lines, line{
WorkerJob: job,
wid: wid,
})
}
}
// oldest first
sort.Slice(lines, func(i, j int) bool {
if lines[i].RunWait != lines[j].RunWait {
return lines[i].RunWait < lines[j].RunWait
}
if lines[i].Start.Equal(lines[j].Start) {
return lines[i].ID.ID.String() < lines[j].ID.ID.String()
}
return lines[i].Start.Before(lines[j].Start)
})
workerHostnames := map[uuid.UUID]string{}
wst, err := nodeApi.WorkerStats(ctx)
if err != nil {
return xerrors.Errorf("getting worker stats: %w", err)
}
for wid, st := range wst {
workerHostnames[wid] = st.Info.Hostname
}
tw := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0)
_, _ = fmt.Fprintf(tw, "ID\tSector\tWorker\tHostname\tTask\tState\tTime\n")
for _, l := range lines {
state := "running"
switch {
case l.RunWait > 1:
state = fmt.Sprintf("assigned(%d)", l.RunWait-1)
case l.RunWait == storiface.RWPrepared:
state = "prepared"
case l.RunWait == storiface.RWRetDone:
if !cctx.Bool("show-ret-done") {
continue
}
state = "ret-done"
case l.RunWait == storiface.RWReturned:
state = "returned"
case l.RunWait == storiface.RWRetWait:
state = "ret-wait"
}
dur := "n/a"
if !l.Start.IsZero() {
dur = time.Now().Sub(l.Start).Truncate(time.Millisecond * 100).String()
}
hostname, ok := workerHostnames[l.wid]
if !ok {
hostname = l.Hostname
}
_, _ = fmt.Fprintf(tw, "%s\t%d\t%s\t%s\t%s\t%s\t%s\n",
hex.EncodeToString(l.ID.ID[:4]),
l.Sector.Number,
hex.EncodeToString(l.wid[:4]),
hostname,
l.Task.Short(),
state,
dur)
}
return tw.Flush()
},
}
var sealingSchedDiagCmd = &cli.Command{
Name: "sched-diag",
Usage: "Dump internal scheduler state",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "force-sched",
},
},
Action: func(cctx *cli.Context) error {
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
st, err := nodeApi.SealingSchedDiag(ctx, cctx.Bool("force-sched"))
if err != nil {
return err
}
j, err := json.MarshalIndent(&st, "", " ")
if err != nil {
return err
}
fmt.Println(string(j))
return nil
},
}
var sealingAbortCmd = &cli.Command{
Name: "abort",
Usage: "Abort a running job",
ArgsUsage: "[callid]",
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() != 1 {
return xerrors.Errorf("expected 1 argument")
}
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
if err != nil {
return err
}
defer closer()
ctx := lcli.ReqContext(cctx)
jobs, err := nodeApi.WorkerJobs(ctx)
if err != nil {
return xerrors.Errorf("getting worker jobs: %w", err)
}
var job *storiface.WorkerJob
outer:
for _, workerJobs := range jobs {
for _, j := range workerJobs {
if strings.HasPrefix(j.ID.ID.String(), cctx.Args().First()) {
j := j
job = &j
break outer
}
}
}
if job == nil {
return xerrors.Errorf("job with specified id prefix not found")
}
fmt.Printf("aborting job %s, task %s, sector %d, running on host %s\n", job.ID.String(), job.Task.Short(), job.Sector.Number, job.Hostname)
return nodeApi.SealingAbort(ctx, job.ID)
},
}