Merge pull request #7703 from filecoin-project/feat/scheduler-enhancements

Scheduler enhancements
This commit is contained in:
Łukasz Magiera 2021-11-30 19:14:25 +01:00 committed by GitHub
commit 73f16f08e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 1936 additions and 162 deletions

View File

@ -231,16 +231,18 @@ func init() {
Hostname: "host",
Resources: storiface.WorkerResources{
MemPhysical: 256 << 30,
MemUsed: 2 << 30,
MemSwap: 120 << 30,
MemReserved: 2 << 30,
MemSwapUsed: 2 << 30,
CPUs: 64,
GPUs: []string{"aGPU 1337"},
Resources: storiface.ResourceTable,
},
},
Enabled: true,
MemUsedMin: 0,
MemUsedMax: 0,
GpuUsed: false,
GpuUsed: 0,
CpuUse: 0,
},
})
@ -286,6 +288,7 @@ func init() {
State: "ShardStateAvailable",
Error: "<error>",
})
addExample(storiface.ResourceTable)
}
func GetAPIType(name, pkg string) (i interface{}, t reflect.Type, permStruct []reflect.Type) {

View File

@ -58,7 +58,7 @@ var (
FullAPIVersion1 = newVer(2, 1, 0)
MinerAPIVersion0 = newVer(1, 2, 0)
WorkerAPIVersion0 = newVer(1, 1, 0)
WorkerAPIVersion0 = newVer(1, 5, 0)
)
//nolint:varcheck,deadcode

Binary file not shown.

Binary file not shown.

View File

@ -4,6 +4,7 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"math"
"os"
"sort"
"strings"
@ -32,6 +33,17 @@ var sealingCmd = &cli.Command{
},
}
var barCols = float64(64)
func barString(total, y, g float64) string {
yBars := int(math.Round(y / total * barCols))
gBars := int(math.Round(g / total * barCols))
eBars := int(barCols) - yBars - gBars
return color.YellowString(strings.Repeat("|", yBars)) +
color.GreenString(strings.Repeat("|", gBars)) +
strings.Repeat(" ", eBars)
}
var sealingWorkersCmd = &cli.Command{
Name: "workers",
Usage: "list workers",
@ -77,7 +89,7 @@ var sealingWorkersCmd = &cli.Command{
for _, stat := range st {
gpuUse := "not "
gpuCol := color.FgBlue
if stat.GpuUsed {
if stat.GpuUsed > 0 {
gpuCol = color.FgGreen
gpuUse = ""
}
@ -89,56 +101,43 @@ var sealingWorkersCmd = &cli.Command{
fmt.Printf("Worker %s, host %s%s\n", stat.id, color.MagentaString(stat.Info.Hostname), disabled)
var barCols = uint64(64)
cpuBars := int(stat.CpuUse * barCols / stat.Info.Resources.CPUs)
cpuBar := strings.Repeat("|", cpuBars)
if int(barCols)-cpuBars >= 0 {
cpuBar += strings.Repeat(" ", int(barCols)-cpuBars)
}
fmt.Printf("\tCPU: [%s] %d/%d core(s) in use\n",
color.GreenString(cpuBar), stat.CpuUse, stat.Info.Resources.CPUs)
barString(float64(stat.Info.Resources.CPUs), 0, float64(stat.CpuUse)), stat.CpuUse, stat.Info.Resources.CPUs)
ramBarsRes := int(stat.Info.Resources.MemReserved * barCols / stat.Info.Resources.MemPhysical)
ramBarsUsed := int(stat.MemUsedMin * barCols / stat.Info.Resources.MemPhysical)
ramRepeatSpace := int(barCols) - (ramBarsUsed + ramBarsRes)
colorFunc := color.YellowString
if ramRepeatSpace < 0 {
ramRepeatSpace = 0
colorFunc = color.RedString
ramTotal := stat.Info.Resources.MemPhysical
ramTasks := stat.MemUsedMin
ramUsed := stat.Info.Resources.MemUsed
var ramReserved uint64 = 0
if ramUsed > ramTasks {
ramReserved = ramUsed - ramTasks
}
ramBar := colorFunc(strings.Repeat("|", ramBarsRes)) +
color.GreenString(strings.Repeat("|", ramBarsUsed)) +
strings.Repeat(" ", ramRepeatSpace)
vmem := stat.Info.Resources.MemPhysical + stat.Info.Resources.MemSwap
vmemBarsRes := int(stat.Info.Resources.MemReserved * barCols / vmem)
vmemBarsUsed := int(stat.MemUsedMax * barCols / vmem)
vmemRepeatSpace := int(barCols) - (vmemBarsUsed + vmemBarsRes)
colorFunc = color.YellowString
if vmemRepeatSpace < 0 {
vmemRepeatSpace = 0
colorFunc = color.RedString
}
vmemBar := colorFunc(strings.Repeat("|", vmemBarsRes)) +
color.GreenString(strings.Repeat("|", vmemBarsUsed)) +
strings.Repeat(" ", vmemRepeatSpace)
ramBar := barString(float64(ramTotal), float64(ramReserved), float64(ramTasks))
fmt.Printf("\tRAM: [%s] %d%% %s/%s\n", ramBar,
(stat.Info.Resources.MemReserved+stat.MemUsedMin)*100/stat.Info.Resources.MemPhysical,
types.SizeStr(types.NewInt(stat.Info.Resources.MemReserved+stat.MemUsedMin)),
(ramTasks+ramReserved)*100/stat.Info.Resources.MemPhysical,
types.SizeStr(types.NewInt(ramTasks+ramUsed)),
types.SizeStr(types.NewInt(stat.Info.Resources.MemPhysical)))
fmt.Printf("\tVMEM: [%s] %d%% %s/%s\n", vmemBar,
(stat.Info.Resources.MemReserved+stat.MemUsedMax)*100/vmem,
types.SizeStr(types.NewInt(stat.Info.Resources.MemReserved+stat.MemUsedMax)),
types.SizeStr(types.NewInt(vmem)))
vmemTotal := stat.Info.Resources.MemPhysical + stat.Info.Resources.MemSwap
vmemTasks := stat.MemUsedMax
vmemUsed := stat.Info.Resources.MemUsed + stat.Info.Resources.MemSwapUsed
var vmemReserved uint64 = 0
if vmemUsed > vmemTasks {
vmemReserved = vmemUsed - vmemTasks
}
vmemBar := barString(float64(vmemTotal), float64(vmemReserved), float64(vmemTasks))
fmt.Printf("\tVMEM: [%s] %d%% %s/%s\n", vmemBar,
(vmemTasks+vmemReserved)*100/vmemTotal,
types.SizeStr(types.NewInt(vmemTasks+vmemReserved)),
types.SizeStr(types.NewInt(vmemTotal)))
if len(stat.Info.Resources.GPUs) > 0 {
gpuBar := barString(float64(len(stat.Info.Resources.GPUs)), 0, stat.GpuUsed)
fmt.Printf("\tGPU: [%s] %.f%% %.2f/%d gpu(s) in use\n", color.GreenString(gpuBar),
stat.GpuUsed*100/float64(len(stat.Info.Resources.GPUs)),
stat.GpuUsed, len(stat.Info.Resources.GPUs))
}
for _, gpu := range stat.Info.Resources.GPUs {
fmt.Printf("\tGPU: %s\n", color.New(gpuCol).Sprintf("%s, %sused", gpu, gpuUse))
}

View File

@ -58,8 +58,11 @@ var infoCmd = &cli.Command{
fmt.Printf("Hostname: %s\n", info.Hostname)
fmt.Printf("CPUs: %d; GPUs: %v\n", info.Resources.CPUs, info.Resources.GPUs)
fmt.Printf("RAM: %s; Swap: %s\n", types.SizeStr(types.NewInt(info.Resources.MemPhysical)), types.SizeStr(types.NewInt(info.Resources.MemSwap)))
fmt.Printf("Reserved memory: %s\n", types.SizeStr(types.NewInt(info.Resources.MemReserved)))
fmt.Printf("RAM: %s/%s; Swap: %s/%s\n",
types.SizeStr(types.NewInt(info.Resources.MemUsed)),
types.SizeStr(types.NewInt(info.Resources.MemPhysical)),
types.SizeStr(types.NewInt(info.Resources.MemSwapUsed)),
types.SizeStr(types.NewInt(info.Resources.MemSwap)))
fmt.Printf("Task types: ")
for _, t := range ttList(tt) {

View File

@ -60,6 +60,7 @@ func main() {
storageCmd,
setCmd,
waitQuietCmd,
resourcesCmd,
tasksCmd,
}

View File

@ -0,0 +1,72 @@
package main
import (
"fmt"
"os"
"sort"
"github.com/urfave/cli/v2"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
var resourcesCmd = &cli.Command{
Name: "resources",
Usage: "Manage resource table overrides",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "all",
Usage: "print all resource envvars",
},
&cli.BoolFlag{
Name: "default",
Usage: "print default resource envvars",
},
},
Action: func(cctx *cli.Context) error {
def := map[string]string{}
set := map[string]string{}
all := map[string]string{}
_, err := storiface.ParseResourceEnv(func(key, d string) (string, bool) {
if d != "" {
all[key] = d
def[key] = d
}
s, ok := os.LookupEnv(key)
if ok {
all[key] = s
set[key] = s
}
return s, ok
})
if err != nil {
return err
}
printMap := func(m map[string]string) {
var arr []string
for k, v := range m {
arr = append(arr, fmt.Sprintf("%s=%s", k, v))
}
sort.Strings(arr)
for _, s := range arr {
fmt.Println(s)
}
}
if cctx.Bool("default") {
printMap(def)
} else {
if cctx.Bool("all") {
printMap(all)
} else {
printMap(set)
}
}
return nil
},
}

View File

@ -2453,18 +2453,595 @@ Response:
"IgnoreResources": false,
"Resources": {
"MemPhysical": 274877906944,
"MemUsed": 2147483648,
"MemSwap": 128849018880,
"MemReserved": 2147483648,
"MemSwapUsed": 2147483648,
"CPUs": 64,
"GPUs": [
"aGPU 1337"
]
],
"Resources": {
"seal/v0/addpiece": {
"0": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"1": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"2": {
"MinMemory": 1073741824,
"MaxMemory": 1073741824,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"3": {
"MinMemory": 4294967296,
"MaxMemory": 4294967296,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"4": {
"MinMemory": 8589934592,
"MaxMemory": 8589934592,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"5": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"6": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"7": {
"MinMemory": 1073741824,
"MaxMemory": 1073741824,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"8": {
"MinMemory": 4294967296,
"MaxMemory": 4294967296,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"9": {
"MinMemory": 8589934592,
"MaxMemory": 8589934592,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
}
},
"seal/v0/commit/1": {
"0": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"1": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"2": {
"MinMemory": 1073741824,
"MaxMemory": 1073741824,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"3": {
"MinMemory": 1073741824,
"MaxMemory": 1073741824,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"4": {
"MinMemory": 1073741824,
"MaxMemory": 1073741824,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"5": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"6": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"7": {
"MinMemory": 1073741824,
"MaxMemory": 1073741824,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"8": {
"MinMemory": 1073741824,
"MaxMemory": 1073741824,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"9": {
"MinMemory": 1073741824,
"MaxMemory": 1073741824,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
}
},
"seal/v0/commit/2": {
"0": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"1": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"2": {
"MinMemory": 1073741824,
"MaxMemory": 1610612736,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 10737418240
},
"3": {
"MinMemory": 32212254720,
"MaxMemory": 161061273600,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 34359738368
},
"4": {
"MinMemory": 64424509440,
"MaxMemory": 204010946560,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 68719476736
},
"5": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"6": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"7": {
"MinMemory": 1073741824,
"MaxMemory": 1610612736,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 10737418240
},
"8": {
"MinMemory": 32212254720,
"MaxMemory": 161061273600,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 34359738368
},
"9": {
"MinMemory": 64424509440,
"MaxMemory": 204010946560,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 68719476736
}
},
"seal/v0/fetch": {
"0": {
"MinMemory": 1048576,
"MaxMemory": 1048576,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 0
},
"1": {
"MinMemory": 1048576,
"MaxMemory": 1048576,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 0
},
"2": {
"MinMemory": 1048576,
"MaxMemory": 1048576,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 0
},
"3": {
"MinMemory": 1048576,
"MaxMemory": 1048576,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 0
},
"4": {
"MinMemory": 1048576,
"MaxMemory": 1048576,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 0
},
"5": {
"MinMemory": 1048576,
"MaxMemory": 1048576,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 0
},
"6": {
"MinMemory": 1048576,
"MaxMemory": 1048576,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 0
},
"7": {
"MinMemory": 1048576,
"MaxMemory": 1048576,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 0
},
"8": {
"MinMemory": 1048576,
"MaxMemory": 1048576,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 0
},
"9": {
"MinMemory": 1048576,
"MaxMemory": 1048576,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 0
}
},
"seal/v0/precommit/1": {
"0": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"1": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"2": {
"MinMemory": 805306368,
"MaxMemory": 1073741824,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1048576
},
"3": {
"MinMemory": 60129542144,
"MaxMemory": 68719476736,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 10485760
},
"4": {
"MinMemory": 120259084288,
"MaxMemory": 137438953472,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 10485760
},
"5": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"6": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"7": {
"MinMemory": 805306368,
"MaxMemory": 1073741824,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1048576
},
"8": {
"MinMemory": 60129542144,
"MaxMemory": 68719476736,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 10485760
},
"9": {
"MinMemory": 120259084288,
"MaxMemory": 137438953472,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 10485760
}
},
"seal/v0/precommit/2": {
"0": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 0,
"MaxParallelism": -1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"1": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 0,
"MaxParallelism": -1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"2": {
"MinMemory": 1073741824,
"MaxMemory": 1610612736,
"GPUUtilization": 0,
"MaxParallelism": -1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"3": {
"MinMemory": 16106127360,
"MaxMemory": 16106127360,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 1073741824
},
"4": {
"MinMemory": 32212254720,
"MaxMemory": 32212254720,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 1073741824
},
"5": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 0,
"MaxParallelism": -1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"6": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 0,
"MaxParallelism": -1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"7": {
"MinMemory": 1073741824,
"MaxMemory": 1610612736,
"GPUUtilization": 0,
"MaxParallelism": -1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"8": {
"MinMemory": 16106127360,
"MaxMemory": 16106127360,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 1073741824
},
"9": {
"MinMemory": 32212254720,
"MaxMemory": 32212254720,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 1073741824
}
},
"seal/v0/unseal": {
"0": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"1": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"2": {
"MinMemory": 805306368,
"MaxMemory": 1073741824,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1048576
},
"3": {
"MinMemory": 60129542144,
"MaxMemory": 68719476736,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 10485760
},
"4": {
"MinMemory": 120259084288,
"MaxMemory": 137438953472,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 10485760
},
"5": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"6": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"7": {
"MinMemory": 805306368,
"MaxMemory": 1073741824,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1048576
},
"8": {
"MinMemory": 60129542144,
"MaxMemory": 68719476736,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 10485760
},
"9": {
"MinMemory": 120259084288,
"MaxMemory": 137438953472,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 10485760
}
}
}
}
},
"Enabled": true,
"MemUsedMin": 0,
"MemUsedMax": 0,
"GpuUsed": false,
"GpuUsed": 0,
"CpuUse": 0
}
}

View File

@ -92,10 +92,587 @@ Response:
"IgnoreResources": true,
"Resources": {
"MemPhysical": 42,
"MemUsed": 42,
"MemSwap": 42,
"MemReserved": 42,
"MemSwapUsed": 42,
"CPUs": 42,
"GPUs": null
"GPUs": null,
"Resources": {
"seal/v0/addpiece": {
"0": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"1": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"2": {
"MinMemory": 1073741824,
"MaxMemory": 1073741824,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"3": {
"MinMemory": 4294967296,
"MaxMemory": 4294967296,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"4": {
"MinMemory": 8589934592,
"MaxMemory": 8589934592,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"5": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"6": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"7": {
"MinMemory": 1073741824,
"MaxMemory": 1073741824,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"8": {
"MinMemory": 4294967296,
"MaxMemory": 4294967296,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"9": {
"MinMemory": 8589934592,
"MaxMemory": 8589934592,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
}
},
"seal/v0/commit/1": {
"0": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"1": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"2": {
"MinMemory": 1073741824,
"MaxMemory": 1073741824,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"3": {
"MinMemory": 1073741824,
"MaxMemory": 1073741824,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"4": {
"MinMemory": 1073741824,
"MaxMemory": 1073741824,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"5": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"6": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"7": {
"MinMemory": 1073741824,
"MaxMemory": 1073741824,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"8": {
"MinMemory": 1073741824,
"MaxMemory": 1073741824,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"9": {
"MinMemory": 1073741824,
"MaxMemory": 1073741824,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
}
},
"seal/v0/commit/2": {
"0": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"1": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"2": {
"MinMemory": 1073741824,
"MaxMemory": 1610612736,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 10737418240
},
"3": {
"MinMemory": 32212254720,
"MaxMemory": 161061273600,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 34359738368
},
"4": {
"MinMemory": 64424509440,
"MaxMemory": 204010946560,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 68719476736
},
"5": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"6": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"7": {
"MinMemory": 1073741824,
"MaxMemory": 1610612736,
"GPUUtilization": 1,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 10737418240
},
"8": {
"MinMemory": 32212254720,
"MaxMemory": 161061273600,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 34359738368
},
"9": {
"MinMemory": 64424509440,
"MaxMemory": 204010946560,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 68719476736
}
},
"seal/v0/fetch": {
"0": {
"MinMemory": 1048576,
"MaxMemory": 1048576,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 0
},
"1": {
"MinMemory": 1048576,
"MaxMemory": 1048576,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 0
},
"2": {
"MinMemory": 1048576,
"MaxMemory": 1048576,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 0
},
"3": {
"MinMemory": 1048576,
"MaxMemory": 1048576,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 0
},
"4": {
"MinMemory": 1048576,
"MaxMemory": 1048576,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 0
},
"5": {
"MinMemory": 1048576,
"MaxMemory": 1048576,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 0
},
"6": {
"MinMemory": 1048576,
"MaxMemory": 1048576,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 0
},
"7": {
"MinMemory": 1048576,
"MaxMemory": 1048576,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 0
},
"8": {
"MinMemory": 1048576,
"MaxMemory": 1048576,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 0
},
"9": {
"MinMemory": 1048576,
"MaxMemory": 1048576,
"GPUUtilization": 0,
"MaxParallelism": 0,
"MaxParallelismGPU": 0,
"BaseMinMemory": 0
}
},
"seal/v0/precommit/1": {
"0": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"1": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"2": {
"MinMemory": 805306368,
"MaxMemory": 1073741824,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1048576
},
"3": {
"MinMemory": 60129542144,
"MaxMemory": 68719476736,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 10485760
},
"4": {
"MinMemory": 120259084288,
"MaxMemory": 137438953472,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 10485760
},
"5": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"6": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"7": {
"MinMemory": 805306368,
"MaxMemory": 1073741824,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1048576
},
"8": {
"MinMemory": 60129542144,
"MaxMemory": 68719476736,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 10485760
},
"9": {
"MinMemory": 120259084288,
"MaxMemory": 137438953472,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 10485760
}
},
"seal/v0/precommit/2": {
"0": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 0,
"MaxParallelism": -1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"1": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 0,
"MaxParallelism": -1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"2": {
"MinMemory": 1073741824,
"MaxMemory": 1610612736,
"GPUUtilization": 0,
"MaxParallelism": -1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"3": {
"MinMemory": 16106127360,
"MaxMemory": 16106127360,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 1073741824
},
"4": {
"MinMemory": 32212254720,
"MaxMemory": 32212254720,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 1073741824
},
"5": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 0,
"MaxParallelism": -1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"6": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 0,
"MaxParallelism": -1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"7": {
"MinMemory": 1073741824,
"MaxMemory": 1610612736,
"GPUUtilization": 0,
"MaxParallelism": -1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1073741824
},
"8": {
"MinMemory": 16106127360,
"MaxMemory": 16106127360,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 1073741824
},
"9": {
"MinMemory": 32212254720,
"MaxMemory": 32212254720,
"GPUUtilization": 1,
"MaxParallelism": -1,
"MaxParallelismGPU": 6,
"BaseMinMemory": 1073741824
}
},
"seal/v0/unseal": {
"0": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"1": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"2": {
"MinMemory": 805306368,
"MaxMemory": 1073741824,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1048576
},
"3": {
"MinMemory": 60129542144,
"MaxMemory": 68719476736,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 10485760
},
"4": {
"MinMemory": 120259084288,
"MaxMemory": 137438953472,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 10485760
},
"5": {
"MinMemory": 2048,
"MaxMemory": 2048,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 2048
},
"6": {
"MinMemory": 8388608,
"MaxMemory": 8388608,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 8388608
},
"7": {
"MinMemory": 805306368,
"MaxMemory": 1073741824,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 1048576
},
"8": {
"MinMemory": 60129542144,
"MaxMemory": 68719476736,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 10485760
},
"9": {
"MinMemory": 120259084288,
"MaxMemory": 137438953472,
"GPUUtilization": 0,
"MaxParallelism": 1,
"MaxParallelismGPU": 0,
"BaseMinMemory": 10485760
}
}
}
}
}
```

View File

@ -15,6 +15,7 @@ COMMANDS:
storage manage sector storage
set Manage worker settings
wait-quiet Block until all running tasks exit
resources Manage resource table overrides
tasks Manage task processing
help, h Shows a list of commands or help for one command
@ -127,6 +128,21 @@ OPTIONS:
```
## lotus-worker resources
```
NAME:
lotus-worker resources - Manage resource table overrides
USAGE:
lotus-worker resources [command options] [arguments...]
OPTIONS:
--all print all resource envvars (default: false)
--default print default resource envvars (default: false)
--help, -h show help (default: false)
```
## lotus-worker tasks
```
NAME:

12
extern/sector-storage/cgroups.go vendored Normal file
View File

@ -0,0 +1,12 @@
//go:build !linux
// +build !linux
package sectorstorage
func cgroupV1Mem() (memoryMax, memoryUsed, swapMax, swapUsed uint64, err error) {
return 0, 0, 0, 0, nil
}
func cgroupV2Mem() (memoryMax, memoryUsed, swapMax, swapUsed uint64, err error) {
return 0, 0, 0, 0, nil
}

117
extern/sector-storage/cgroups_linux.go vendored Normal file
View File

@ -0,0 +1,117 @@
//go:build linux
// +build linux
package sectorstorage
import (
"bufio"
"bytes"
"math"
"os"
"path/filepath"
"github.com/containerd/cgroups"
cgroupv2 "github.com/containerd/cgroups/v2"
)
func cgroupV2MountPoint() (string, error) {
f, err := os.Open("/proc/self/mountinfo")
if err != nil {
return "", err
}
defer f.Close() //nolint
scanner := bufio.NewScanner(f)
for scanner.Scan() {
fields := bytes.Fields(scanner.Bytes())
if len(fields) >= 9 && bytes.Equal(fields[8], []byte("cgroup2")) {
return string(fields[4]), nil
}
}
return "", cgroups.ErrMountPointNotExist
}
func cgroupV1Mem() (memoryMax, memoryUsed, swapMax, swapUsed uint64, err error) {
path := cgroups.NestedPath("")
if pid := os.Getpid(); pid == 1 {
path = cgroups.RootPath
}
c, err := cgroups.Load(cgroups.SingleSubsystem(cgroups.V1, cgroups.Memory), path)
if err != nil {
return 0, 0, 0, 0, err
}
stats, err := c.Stat()
if err != nil {
return 0, 0, 0, 0, err
}
if stats.Memory == nil {
return 0, 0, 0, 0, nil
}
if stats.Memory.Usage != nil {
memoryMax = stats.Memory.Usage.Limit
// Exclude cached files
memoryUsed = stats.Memory.Usage.Usage - stats.Memory.InactiveFile - stats.Memory.ActiveFile
}
if stats.Memory.Swap != nil {
swapMax = stats.Memory.Swap.Limit
swapUsed = stats.Memory.Swap.Usage
}
return memoryMax, memoryUsed, swapMax, swapUsed, nil
}
func cgroupV2MemFromPath(mp, path string) (memoryMax, memoryUsed, swapMax, swapUsed uint64, err error) {
c, err := cgroupv2.LoadManager(mp, path)
if err != nil {
return 0, 0, 0, 0, err
}
stats, err := c.Stat()
if err != nil {
return 0, 0, 0, 0, err
}
if stats.Memory != nil {
memoryMax = stats.Memory.UsageLimit
// Exclude memory used caching files
memoryUsed = stats.Memory.Usage - stats.Memory.File
swapMax = stats.Memory.SwapLimit
swapUsed = stats.Memory.SwapUsage
}
return memoryMax, memoryUsed, swapMax, swapUsed, nil
}
func cgroupV2Mem() (memoryMax, memoryUsed, swapMax, swapUsed uint64, err error) {
memoryMax = math.MaxUint64
swapMax = math.MaxUint64
path, err := cgroupv2.PidGroupPath(os.Getpid())
if err != nil {
return 0, 0, 0, 0, err
}
mp, err := cgroupV2MountPoint()
if err != nil {
return 0, 0, 0, 0, err
}
for path != "/" {
cgMemoryMax, cgMemoryUsed, cgSwapMax, cgSwapUsed, err := cgroupV2MemFromPath(mp, path)
if err != nil {
return 0, 0, 0, 0, err
}
if cgMemoryMax != 0 && cgMemoryMax < memoryMax {
log.Debugf("memory limited by cgroup %s: %v", path, cgMemoryMax)
memoryMax = cgMemoryMax
memoryUsed = cgMemoryUsed
}
if cgSwapMax != 0 && cgSwapMax < swapMax {
log.Debugf("swap limited by cgroup %s: %v", path, cgSwapMax)
swapMax = cgSwapMax
swapUsed = cgSwapUsed
}
path = filepath.Dir(path)
}
return memoryMax, memoryUsed, swapMax, swapUsed, nil
}

View File

@ -51,13 +51,8 @@ type SectorManager interface {
FaultTracker
}
type WorkerID uuid.UUID // worker session UUID
var ClosedWorkerID = uuid.UUID{}
func (w WorkerID) String() string {
return uuid.UUID(w).String()
}
type Manager struct {
ls stores.LocalStorage
storage *stores.Remote

View File

@ -332,7 +332,7 @@ func TestRestartWorker(t *testing.T) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
TaskTypes: localTasks,
}, stor, lstor, idx, m, statestore.New(wds))
}, os.LookupEnv, stor, lstor, idx, m, statestore.New(wds))
err := m.AddWorker(ctx, w)
require.NoError(t, err)
@ -368,7 +368,7 @@ func TestRestartWorker(t *testing.T) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
TaskTypes: localTasks,
}, stor, lstor, idx, m, statestore.New(wds))
}, os.LookupEnv, stor, lstor, idx, m, statestore.New(wds))
err = m.AddWorker(ctx, w)
require.NoError(t, err)
@ -404,7 +404,7 @@ func TestReenableWorker(t *testing.T) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
TaskTypes: localTasks,
}, stor, lstor, idx, m, statestore.New(wds))
}, os.LookupEnv, stor, lstor, idx, m, statestore.New(wds))
err := m.AddWorker(ctx, w)
require.NoError(t, err)
@ -453,3 +453,123 @@ func TestReenableWorker(t *testing.T) {
i, _ = m.sched.Info(ctx)
require.Len(t, i.(SchedDiagInfo).OpenWindows, 2)
}
func TestResUse(t *testing.T) {
logging.SetAllLoggers(logging.LevelDebug)
ctx, done := context.WithCancel(context.Background())
defer done()
ds := datastore.NewMapDatastore()
m, lstor, stor, idx, cleanup := newTestMgr(ctx, t, ds)
defer cleanup()
localTasks := []sealtasks.TaskType{
sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch,
}
wds := datastore.NewMapDatastore()
arch := make(chan chan apres)
w := newLocalWorker(func() (ffiwrapper.Storage, error) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
TaskTypes: localTasks,
}, func(s string) (string, bool) {
return "", false
}, stor, lstor, idx, m, statestore.New(wds))
err := m.AddWorker(ctx, w)
require.NoError(t, err)
sid := storage.SectorRef{
ID: abi.SectorID{Miner: 1000, Number: 1},
ProofType: abi.RegisteredSealProof_StackedDrg2KiBV1,
}
go func() {
_, err := m.AddPiece(ctx, sid, nil, 1016, strings.NewReader(strings.Repeat("testthis", 127)))
require.Error(t, err)
}()
l:
for {
st := m.WorkerStats()
require.Len(t, st, 1)
for _, w := range st {
if w.MemUsedMax > 0 {
break l
}
time.Sleep(time.Millisecond)
}
}
st := m.WorkerStats()
require.Len(t, st, 1)
for _, w := range st {
require.Equal(t, storiface.ResourceTable[sealtasks.TTAddPiece][abi.RegisteredSealProof_StackedDrg2KiBV1].MaxMemory, w.MemUsedMax)
}
}
func TestResOverride(t *testing.T) {
logging.SetAllLoggers(logging.LevelDebug)
ctx, done := context.WithCancel(context.Background())
defer done()
ds := datastore.NewMapDatastore()
m, lstor, stor, idx, cleanup := newTestMgr(ctx, t, ds)
defer cleanup()
localTasks := []sealtasks.TaskType{
sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch,
}
wds := datastore.NewMapDatastore()
arch := make(chan chan apres)
w := newLocalWorker(func() (ffiwrapper.Storage, error) {
return &testExec{apch: arch}, nil
}, WorkerConfig{
TaskTypes: localTasks,
}, func(s string) (string, bool) {
if s == "AP_2K_MAX_MEMORY" {
return "99999", true
}
return "", false
}, stor, lstor, idx, m, statestore.New(wds))
err := m.AddWorker(ctx, w)
require.NoError(t, err)
sid := storage.SectorRef{
ID: abi.SectorID{Miner: 1000, Number: 1},
ProofType: abi.RegisteredSealProof_StackedDrg2KiBV1,
}
go func() {
_, err := m.AddPiece(ctx, sid, nil, 1016, strings.NewReader(strings.Repeat("testthis", 127)))
require.Error(t, err)
}()
l:
for {
st := m.WorkerStats()
require.Len(t, st, 1)
for _, w := range st {
if w.MemUsedMax > 0 {
break l
}
time.Sleep(time.Millisecond)
}
}
st := m.WorkerStats()
require.Len(t, st, 1)
for _, w := range st {
require.Equal(t, uint64(99999), w.MemUsedMax)
}
}

View File

@ -7,6 +7,7 @@ import (
"math/rand"
"net"
"net/http"
"os"
"testing"
"github.com/filecoin-project/go-state-types/abi"
@ -286,7 +287,7 @@ func (p *pieceProviderTestHarness) addRemoteWorker(t *testing.T, tasks []sealtas
worker := newLocalWorker(nil, WorkerConfig{
TaskTypes: tasks,
}, remote, localStore, p.index, p.mgr, csts)
}, os.LookupEnv, remote, localStore, p.index, p.mgr, csts)
p.servers = append(p.servers, svc)
p.localStores = append(p.localStores, localStore)

View File

@ -53,7 +53,7 @@ type WorkerSelector interface {
type scheduler struct {
workersLk sync.RWMutex
workers map[WorkerID]*workerHandle
workers map[storiface.WorkerID]*workerHandle
schedule chan *workerRequest
windowRequests chan *schedWindowRequest
@ -95,7 +95,7 @@ type workerHandle struct {
}
type schedWindowRequest struct {
worker WorkerID
worker storiface.WorkerID
done chan *schedWindow
}
@ -107,14 +107,14 @@ type schedWindow struct {
type workerDisableReq struct {
activeWindows []*schedWindow
wid WorkerID
wid storiface.WorkerID
done func()
}
type activeResources struct {
memUsedMin uint64
memUsedMax uint64
gpuUsed bool
gpuUsed float64
cpuUse uint64
cond *sync.Cond
@ -145,7 +145,7 @@ type workerResponse struct {
func newScheduler() *scheduler {
return &scheduler{
workers: map[WorkerID]*workerHandle{},
workers: map[storiface.WorkerID]*workerHandle{},
schedule: make(chan *workerRequest),
windowRequests: make(chan *schedWindowRequest, 20),
@ -378,7 +378,6 @@ func (sh *scheduler) trySched() {
}()
task := (*sh.schedQueue)[sqi]
needRes := ResourceTable[task.taskType][task.sector.ProofType]
task.indexHeap = sqi
for wnd, windowRequest := range sh.openWindows {
@ -394,6 +393,8 @@ func (sh *scheduler) trySched() {
continue
}
needRes := worker.info.Resources.ResourceSpec(task.sector.ProofType, task.taskType)
// TODO: allow bigger windows
if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, "schedAcceptable", worker.info) {
continue
@ -457,7 +458,6 @@ func (sh *scheduler) trySched() {
for sqi := 0; sqi < queueLen; sqi++ {
task := (*sh.schedQueue)[sqi]
needRes := ResourceTable[task.taskType][task.sector.ProofType]
selectedWindow := -1
for _, wnd := range acceptableWindows[task.indexHeap] {
@ -466,6 +466,8 @@ func (sh *scheduler) trySched() {
log.Debugf("SCHED try assign sqi:%d sector %d to window %d", sqi, task.sector.ID.Number, wnd)
needRes := info.Resources.ResourceSpec(task.sector.ProofType, task.taskType)
// TODO: allow bigger windows
if !windows[wnd].allocated.canHandleRequest(needRes, wid, "schedAssign", info) {
continue

View File

@ -6,7 +6,7 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerInfo, r Resources, locker sync.Locker, cb func() error) error {
func (a *activeResources) withResources(id storiface.WorkerID, wr storiface.WorkerInfo, r storiface.Resources, locker sync.Locker, cb func() error) error {
for !a.canHandleRequest(r, id, "withResources", wr) {
if a.cond == nil {
a.cond = sync.NewCond(locker)
@ -30,20 +30,20 @@ func (a *activeResources) hasWorkWaiting() bool {
return a.waiting > 0
}
func (a *activeResources) add(wr storiface.WorkerResources, r Resources) {
if r.CanGPU {
a.gpuUsed = true
func (a *activeResources) add(wr storiface.WorkerResources, r storiface.Resources) {
if r.GPUUtilization > 0 {
a.gpuUsed += r.GPUUtilization
}
a.cpuUse += r.Threads(wr.CPUs)
a.cpuUse += r.Threads(wr.CPUs, len(wr.GPUs))
a.memUsedMin += r.MinMemory
a.memUsedMax += r.MaxMemory
}
func (a *activeResources) free(wr storiface.WorkerResources, r Resources) {
if r.CanGPU {
a.gpuUsed = false
func (a *activeResources) free(wr storiface.WorkerResources, r storiface.Resources) {
if r.GPUUtilization > 0 {
a.gpuUsed -= r.GPUUtilization
}
a.cpuUse -= r.Threads(wr.CPUs)
a.cpuUse -= r.Threads(wr.CPUs, len(wr.GPUs))
a.memUsedMin -= r.MinMemory
a.memUsedMax -= r.MaxMemory
@ -54,35 +54,44 @@ func (a *activeResources) free(wr storiface.WorkerResources, r Resources) {
// canHandleRequest evaluates if the worker has enough available resources to
// handle the request.
func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, caller string, info storiface.WorkerInfo) bool {
func (a *activeResources) canHandleRequest(needRes storiface.Resources, wid storiface.WorkerID, caller string, info storiface.WorkerInfo) bool {
if info.IgnoreResources {
// shortcircuit; if this worker is ignoring resources, it can always handle the request.
return true
}
res := info.Resources
// TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running)
minNeedMem := res.MemReserved + a.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory
if minNeedMem > res.MemPhysical {
log.Debugf("sched: not scheduling on worker %s for %s; not enough physical memory - need: %dM, have %dM", wid, caller, minNeedMem/mib, res.MemPhysical/mib)
memNeeded := needRes.MinMemory + needRes.BaseMinMemory
memUsed := a.memUsedMin
// assume that MemUsed can be swapped, so only check it in the vmem Check
memAvail := res.MemPhysical - memUsed
if memNeeded > memAvail {
log.Debugf("sched: not scheduling on worker %s for %s; not enough physical memory - need: %dM, have %dM available", wid, caller, memNeeded/mib, memAvail/mib)
return false
}
maxNeedMem := res.MemReserved + a.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory
vmemNeeded := needRes.MaxMemory + needRes.BaseMinMemory
vmemUsed := a.memUsedMax
if vmemUsed < res.MemUsed+res.MemSwapUsed {
vmemUsed = res.MemUsed + res.MemSwapUsed
}
vmemAvail := res.MemPhysical + res.MemSwap - vmemUsed
if maxNeedMem > res.MemSwap+res.MemPhysical {
log.Debugf("sched: not scheduling on worker %s for %s; not enough virtual memory - need: %dM, have %dM", wid, caller, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib)
if vmemNeeded > vmemAvail {
log.Debugf("sched: not scheduling on worker %s for %s; not enough virtual memory - need: %dM, have %dM available", wid, caller, vmemNeeded/mib, vmemAvail/mib)
return false
}
if a.cpuUse+needRes.Threads(res.CPUs) > res.CPUs {
log.Debugf("sched: not scheduling on worker %s for %s; not enough threads, need %d, %d in use, target %d", wid, caller, needRes.Threads(res.CPUs), a.cpuUse, res.CPUs)
if a.cpuUse+needRes.Threads(res.CPUs, len(res.GPUs)) > res.CPUs {
log.Debugf("sched: not scheduling on worker %s for %s; not enough threads, need %d, %d in use, target %d", wid, caller, needRes.Threads(res.CPUs, len(res.GPUs)), a.cpuUse, res.CPUs)
return false
}
if len(res.GPUs) > 0 && needRes.CanGPU {
if a.gpuUsed {
log.Debugf("sched: not scheduling on worker %s for %s; GPU in use", wid, caller)
if len(res.GPUs) > 0 && needRes.GPUUtilization > 0 {
if a.gpuUsed+needRes.GPUUtilization > float64(len(res.GPUs)) {
log.Debugf("sched: not scheduling on worker %s for %s; GPU(s) in use", wid, caller)
return false
}
}
@ -96,12 +105,21 @@ func (a *activeResources) utilization(wr storiface.WorkerResources) float64 {
cpu := float64(a.cpuUse) / float64(wr.CPUs)
max = cpu
memMin := float64(a.memUsedMin+wr.MemReserved) / float64(wr.MemPhysical)
memUsed := a.memUsedMin
if memUsed < wr.MemUsed {
memUsed = wr.MemUsed
}
memMin := float64(memUsed) / float64(wr.MemPhysical)
if memMin > max {
max = memMin
}
memMax := float64(a.memUsedMax+wr.MemReserved) / float64(wr.MemPhysical+wr.MemSwap)
vmemUsed := a.memUsedMax
if a.memUsedMax < wr.MemUsed+wr.MemSwapUsed {
vmemUsed = wr.MemUsed + wr.MemSwapUsed
}
memMax := float64(vmemUsed) / float64(wr.MemPhysical+wr.MemSwap)
if memMax > max {
max = memMax
}

View File

@ -41,14 +41,16 @@ func TestWithPriority(t *testing.T) {
var decentWorkerResources = storiface.WorkerResources{
MemPhysical: 128 << 30,
MemSwap: 200 << 30,
MemReserved: 2 << 30,
MemUsed: 1 << 30,
MemSwapUsed: 1 << 30,
CPUs: 32,
GPUs: []string{"a GPU"},
GPUs: []string{},
}
var constrainedWorkerResources = storiface.WorkerResources{
MemPhysical: 1 << 30,
MemReserved: 2 << 30,
MemUsed: 1 << 30,
MemSwapUsed: 1 << 30,
CPUs: 1,
}
@ -188,6 +190,9 @@ func TestSchedStartStop(t *testing.T) {
}
func TestSched(t *testing.T) {
storiface.ParallelNum = 1
storiface.ParallelDenom = 1
ctx, done := context.WithTimeout(context.Background(), 30*time.Second)
defer done()
@ -254,7 +259,9 @@ func TestSched(t *testing.T) {
return nil
}, noopAction)
if err != context.Canceled {
require.NoError(t, err, fmt.Sprint(l, l2))
}
}()
<-sched.testSync
@ -299,9 +306,6 @@ func TestSched(t *testing.T) {
}
testFunc := func(workers []workerSpec, tasks []task) func(t *testing.T) {
ParallelNum = 1
ParallelDenom = 1
return func(t *testing.T) {
index := stores.NewIndex()
@ -558,7 +562,7 @@ func BenchmarkTrySched(b *testing.B) {
b.StopTimer()
sched := newScheduler()
sched.workers[WorkerID{}] = &workerHandle{
sched.workers[storiface.WorkerID{}] = &workerHandle{
workerRpc: nil,
info: storiface.WorkerInfo{
Hostname: "t",
@ -570,7 +574,7 @@ func BenchmarkTrySched(b *testing.B) {
for i := 0; i < windows; i++ {
sched.openWindows = append(sched.openWindows, &schedWindowRequest{
worker: WorkerID{},
worker: storiface.WorkerID{},
done: make(chan *schedWindow, 1000),
})
}
@ -616,7 +620,7 @@ func TestWindowCompact(t *testing.T) {
taskType: task,
sector: storage.SectorRef{ProofType: spt},
})
window.allocated.add(wh.info.Resources, ResourceTable[task][spt])
window.allocated.add(wh.info.Resources, storiface.ResourceTable[task][spt])
}
wh.activeWindows = append(wh.activeWindows, window)
@ -635,7 +639,7 @@ func TestWindowCompact(t *testing.T) {
for ti, task := range tasks {
require.Equal(t, task, wh.activeWindows[wi].todo[ti].taskType, "%d, %d", wi, ti)
expectRes.add(wh.info.Resources, ResourceTable[task][spt])
expectRes.add(wh.info.Resources, storiface.ResourceTable[task][spt])
}
require.Equal(t, expectRes.cpuUse, wh.activeWindows[wi].allocated.cpuUse, "%d", wi)

View File

@ -4,17 +4,18 @@ import (
"context"
"time"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"golang.org/x/xerrors"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
type schedWorker struct {
sched *scheduler
worker *workerHandle
wid WorkerID
wid storiface.WorkerID
heartbeatTimer *time.Ticker
scheduledWindows chan *schedWindow
@ -50,7 +51,7 @@ func (sh *scheduler) runWorker(ctx context.Context, w Worker) error {
closedMgr: make(chan struct{}),
}
wid := WorkerID(sessID)
wid := storiface.WorkerID(sessID)
sh.workersLk.Lock()
_, exist := sh.workers[wid]
@ -237,7 +238,7 @@ func (sw *schedWorker) checkSession(ctx context.Context) bool {
continue
}
if WorkerID(curSes) != sw.wid {
if storiface.WorkerID(curSes) != sw.wid {
if curSes != ClosedWorkerID {
// worker restarted
log.Warnw("worker session changed (worker restarted?)", "initial", sw.wid, "current", curSes)
@ -296,7 +297,7 @@ func (sw *schedWorker) workerCompactWindows() {
var moved []int
for ti, todo := range window.todo {
needRes := ResourceTable[todo.taskType][todo.sector.ProofType]
needRes := worker.info.Resources.ResourceSpec(todo.sector.ProofType, todo.taskType)
if !lower.allocated.canHandleRequest(needRes, sw.wid, "compactWindows", worker.info) {
continue
}
@ -357,7 +358,7 @@ assignLoop:
worker.lk.Lock()
for t, todo := range firstWindow.todo {
needRes := ResourceTable[todo.taskType][todo.sector.ProofType]
needRes := worker.info.Resources.ResourceSpec(todo.sector.ProofType, todo.taskType)
if worker.preparing.canHandleRequest(needRes, sw.wid, "startPreparing", worker.info) {
tidx = t
break
@ -418,7 +419,7 @@ assignLoop:
continue
}
needRes := ResourceTable[todo.taskType][todo.sector.ProofType]
needRes := storiface.ResourceTable[todo.taskType][todo.sector.ProofType]
if worker.active.canHandleRequest(needRes, sw.wid, "startPreparing", worker.info) {
tidx = t
break
@ -456,7 +457,7 @@ assignLoop:
func (sw *schedWorker) startProcessingTask(req *workerRequest) error {
w, sh := sw.worker, sw.sched
needRes := ResourceTable[req.taskType][req.sector.ProofType]
needRes := w.info.Resources.ResourceSpec(req.sector.ProofType, req.taskType)
w.lk.Lock()
w.preparing.add(w.info.Resources, needRes)
@ -539,7 +540,7 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error {
func (sw *schedWorker) startProcessingReadyTask(req *workerRequest) error {
w, sh := sw.worker, sw.sched
needRes := ResourceTable[req.taskType][req.sector.ProofType]
needRes := w.info.Resources.ResourceSpec(req.sector.ProofType, req.taskType)
w.active.add(w.info.Resources, needRes)
@ -579,7 +580,7 @@ func (sw *schedWorker) startProcessingReadyTask(req *workerRequest) error {
return nil
}
func (sh *scheduler) workerCleanup(wid WorkerID, w *workerHandle) {
func (sh *scheduler) workerCleanup(wid storiface.WorkerID, w *workerHandle) {
select {
case <-w.closingMgr:
default:

View File

@ -1,19 +1,31 @@
package sectorstorage
package storiface
import (
"github.com/filecoin-project/go-state-types/abi"
"fmt"
"reflect"
"strconv"
"strings"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
)
type Resources struct {
MinMemory uint64 // What Must be in RAM for decent perf
MaxMemory uint64 // Memory required (swap + ram)
MinMemory uint64 `envname:"MIN_MEMORY"` // What Must be in RAM for decent perf
MaxMemory uint64 `envname:"MAX_MEMORY"` // Memory required (swap + ram)
MaxParallelism int // -1 = multithread
CanGPU bool
// GPUUtilization specifes the number of GPUs a task can use
GPUUtilization float64 `envname:"GPU_UTILIZATION"`
BaseMinMemory uint64 // What Must be in RAM for decent perf (shared between threads)
// MaxParallelism specifies the number of CPU cores when GPU is NOT in use
MaxParallelism int `envname:"MAX_PARALLELISM"` // -1 = multithread
// MaxParallelismGPU specifies the number of CPU cores when GPU is in use
MaxParallelismGPU int `envname:"MAX_PARALLELISM_GPU"` // when 0, inherits MaxParallelism
BaseMinMemory uint64 `envname:"BASE_MIN_MEMORY"` // What Must be in RAM for decent perf (shared between threads)
}
/*
@ -32,8 +44,14 @@ var ParallelNum uint64 = 92
var ParallelDenom uint64 = 100
// TODO: Take NUMA into account
func (r Resources) Threads(wcpus uint64) uint64 {
if r.MaxParallelism == -1 {
func (r Resources) Threads(wcpus uint64, gpus int) uint64 {
mp := r.MaxParallelism
if r.GPUUtilization > 0 && gpus > 0 && r.MaxParallelismGPU != 0 { // task can use GPUs and worker has some
mp = r.MaxParallelismGPU
}
if mp == -1 {
n := (wcpus * ParallelNum) / ParallelDenom
if n == 0 {
return wcpus
@ -41,7 +59,7 @@ func (r Resources) Threads(wcpus uint64) uint64 {
return n
}
return uint64(r.MaxParallelism)
return uint64(mp)
}
var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources{
@ -135,7 +153,8 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MinMemory: 30 << 30,
MaxParallelism: -1,
CanGPU: true,
MaxParallelismGPU: 6,
GPUUtilization: 1.0,
BaseMinMemory: 1 << 30,
},
@ -144,7 +163,8 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MinMemory: 15 << 30,
MaxParallelism: -1,
CanGPU: true,
MaxParallelismGPU: 6,
GPUUtilization: 1.0,
BaseMinMemory: 1 << 30,
},
@ -221,7 +241,8 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MinMemory: 60 << 30,
MaxParallelism: -1,
CanGPU: true,
MaxParallelismGPU: 6,
GPUUtilization: 1.0,
BaseMinMemory: 64 << 30, // params
},
@ -230,7 +251,8 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MinMemory: 30 << 30,
MaxParallelism: -1,
CanGPU: true,
MaxParallelismGPU: 6,
GPUUtilization: 1.0,
BaseMinMemory: 32 << 30, // params
},
@ -239,7 +261,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MinMemory: 1 << 30,
MaxParallelism: 1, // This is fine
CanGPU: true,
GPUUtilization: 1.0,
BaseMinMemory: 10 << 30,
},
@ -248,7 +270,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MinMemory: 2 << 10,
MaxParallelism: 1,
CanGPU: true,
GPUUtilization: 1.0,
BaseMinMemory: 2 << 10,
},
@ -257,7 +279,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MinMemory: 8 << 20,
MaxParallelism: 1,
CanGPU: true,
GPUUtilization: 1.0,
BaseMinMemory: 8 << 20,
},
@ -268,7 +290,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MinMemory: 1 << 20,
MaxParallelism: 0,
CanGPU: false,
GPUUtilization: 0,
BaseMinMemory: 0,
},
@ -277,7 +299,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MinMemory: 1 << 20,
MaxParallelism: 0,
CanGPU: false,
GPUUtilization: 0,
BaseMinMemory: 0,
},
@ -286,7 +308,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MinMemory: 1 << 20,
MaxParallelism: 0,
CanGPU: false,
GPUUtilization: 0,
BaseMinMemory: 0,
},
@ -295,7 +317,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MinMemory: 1 << 20,
MaxParallelism: 0,
CanGPU: false,
GPUUtilization: 0,
BaseMinMemory: 0,
},
@ -304,7 +326,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MinMemory: 1 << 20,
MaxParallelism: 0,
CanGPU: false,
GPUUtilization: 0,
BaseMinMemory: 0,
},
@ -323,3 +345,83 @@ func init() {
m[abi.RegisteredSealProof_StackedDrg64GiBV1_1] = m[abi.RegisteredSealProof_StackedDrg64GiBV1]
}
}
func ParseResourceEnv(lookup func(key, def string) (string, bool)) (map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources, error) {
out := map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources{}
for taskType, defTT := range ResourceTable {
out[taskType] = map[abi.RegisteredSealProof]Resources{}
for spt, defRes := range defTT {
r := defRes // copy
spsz, err := spt.SectorSize()
if err != nil {
return nil, xerrors.Errorf("getting sector size: %w", err)
}
shortSize := strings.TrimSuffix(spsz.ShortString(), "iB")
rr := reflect.ValueOf(&r)
for i := 0; i < rr.Elem().Type().NumField(); i++ {
f := rr.Elem().Type().Field(i)
envname := f.Tag.Get("envname")
if envname == "" {
return nil, xerrors.Errorf("no envname for field '%s'", f.Name)
}
envval, found := lookup(taskType.Short()+"_"+shortSize+"_"+envname, fmt.Sprint(rr.Elem().Field(i).Interface()))
if !found {
// special multicore SDR handling
if (taskType == sealtasks.TTPreCommit1 || taskType == sealtasks.TTUnseal) && envname == "MAX_PARALLELISM" {
v, ok := rr.Elem().Field(i).Addr().Interface().(*int)
if !ok {
// can't happen, but let's not panic
return nil, xerrors.Errorf("res.MAX_PARALLELISM is not int (!?): %w", err)
}
*v, err = getSDRThreads(lookup)
if err != nil {
return nil, err
}
}
continue
}
v := rr.Elem().Field(i).Addr().Interface()
switch fv := v.(type) {
case *uint64:
*fv, err = strconv.ParseUint(envval, 10, 64)
case *int:
*fv, err = strconv.Atoi(envval)
case *float64:
*fv, err = strconv.ParseFloat(envval, 64)
default:
return nil, xerrors.Errorf("unknown resource field type")
}
}
out[taskType][spt] = r
}
}
return out, nil
}
func getSDRThreads(lookup func(key, def string) (string, bool)) (_ int, err error) {
producers := 0
if v, _ := lookup("FIL_PROOFS_USE_MULTICORE_SDR", ""); v == "1" {
producers = 3
if penv, found := lookup("FIL_PROOFS_MULTICORE_SDR_PRODUCERS", ""); found {
producers, err = strconv.Atoi(penv)
if err != nil {
return 0, xerrors.Errorf("parsing (atoi) FIL_PROOFS_MULTICORE_SDR_PRODUCERS: %w", err)
}
}
}
// producers + the one core actually doing the work
return producers + 1, nil
}

View File

@ -0,0 +1,75 @@
package storiface
import (
"fmt"
"testing"
stabi "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/stretchr/testify/require"
)
func TestListResourceVars(t *testing.T) {
_, err := ParseResourceEnv(func(key, def string) (string, bool) {
if def != "" {
fmt.Printf("%s=%s\n", key, def)
}
return "", false
})
require.NoError(t, err)
}
func TestListResourceOverride(t *testing.T) {
rt, err := ParseResourceEnv(func(key, def string) (string, bool) {
if key == "UNS_2K_MAX_PARALLELISM" {
return "2", true
}
if key == "PC2_2K_GPU_UTILIZATION" {
return "0.4", true
}
if key == "PC2_2K_MAX_MEMORY" {
return "2222", true
}
return "", false
})
require.NoError(t, err)
require.Equal(t, 2, rt[sealtasks.TTUnseal][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism)
require.Equal(t, 0.4, rt[sealtasks.TTPreCommit2][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].GPUUtilization)
require.Equal(t, uint64(2222), rt[sealtasks.TTPreCommit2][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxMemory)
// check that defaults don't get mutated
require.Equal(t, 1, ResourceTable[sealtasks.TTUnseal][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism)
}
func TestListResourceSDRMulticoreOverride(t *testing.T) {
rt, err := ParseResourceEnv(func(key, def string) (string, bool) {
if key == "FIL_PROOFS_USE_MULTICORE_SDR" {
return "1", true
}
return "", false
})
require.NoError(t, err)
require.Equal(t, 4, rt[sealtasks.TTPreCommit1][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism)
require.Equal(t, 4, rt[sealtasks.TTUnseal][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism)
rt, err = ParseResourceEnv(func(key, def string) (string, bool) {
if key == "FIL_PROOFS_USE_MULTICORE_SDR" {
return "1", true
}
if key == "FIL_PROOFS_MULTICORE_SDR_PRODUCERS" {
return "9000", true
}
return "", false
})
require.NoError(t, err)
require.Equal(t, 9001, rt[sealtasks.TTPreCommit1][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism)
require.Equal(t, 9001, rt[sealtasks.TTUnseal][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism)
}

View File

@ -15,6 +15,12 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
)
type WorkerID uuid.UUID // worker session UUID
func (w WorkerID) String() string {
return uuid.UUID(w).String()
}
type WorkerInfo struct {
Hostname string
@ -28,12 +34,35 @@ type WorkerInfo struct {
type WorkerResources struct {
MemPhysical uint64
MemUsed uint64
MemSwap uint64
MemReserved uint64 // Used by system / other processes
MemSwapUsed uint64
CPUs uint64 // Logical cores
GPUs []string
// if nil use the default resource table
Resources map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
}
func (wr WorkerResources) ResourceSpec(spt abi.RegisteredSealProof, tt sealtasks.TaskType) Resources {
res := ResourceTable[tt][spt]
// if the worker specifies custom resource table, prefer that
if wr.Resources != nil {
tr, ok := wr.Resources[tt]
if !ok {
return res
}
r, ok := tr[spt]
if ok {
return r
}
}
// otherwise, use the default resource table
return res
}
type WorkerStats struct {
@ -42,7 +71,7 @@ type WorkerStats struct {
MemUsedMin uint64
MemUsedMax uint64
GpuUsed bool // nolint
GpuUsed float64 // nolint
CpuUse uint64 // nolint
}

View File

@ -102,14 +102,14 @@ func (t *testWorker) Paths(ctx context.Context) ([]stores.StoragePath, error) {
}
func (t *testWorker) Info(ctx context.Context) (storiface.WorkerInfo, error) {
res := ResourceTable[sealtasks.TTPreCommit2][abi.RegisteredSealProof_StackedDrg2KiBV1]
res := storiface.ResourceTable[sealtasks.TTPreCommit2][abi.RegisteredSealProof_StackedDrg2KiBV1]
return storiface.WorkerInfo{
Hostname: "testworkerer",
Resources: storiface.WorkerResources{
MemPhysical: res.MinMemory * 3,
MemUsed: res.MinMemory,
MemSwap: 0,
MemReserved: res.MinMemory,
CPUs: 32,
GPUs: nil,
},

View File

@ -42,6 +42,7 @@ type WorkerConfig struct {
// used do provide custom proofs impl (mostly used in testing)
type ExecutorFunc func() (ffiwrapper.Storage, error)
type EnvFunc func(string) (string, bool)
type LocalWorker struct {
storage stores.Store
@ -50,6 +51,7 @@ type LocalWorker struct {
ret storiface.WorkerReturn
executor ExecutorFunc
noSwap bool
envLookup EnvFunc
// see equivalent field on WorkerConfig.
ignoreResources bool
@ -64,7 +66,7 @@ type LocalWorker struct {
closing chan struct{}
}
func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker {
func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, envLookup EnvFunc, store stores.Store, local *stores.Local, sindex stores.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker {
acceptTasks := map[sealtasks.TaskType]struct{}{}
for _, taskType := range wcfg.TaskTypes {
acceptTasks[taskType] = struct{}{}
@ -82,6 +84,7 @@ func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, store stores.Store
acceptTasks: acceptTasks,
executor: executor,
noSwap: wcfg.NoSwap,
envLookup: envLookup,
ignoreResources: wcfg.IgnoreResourceFiltering,
session: uuid.New(),
closing: make(chan struct{}),
@ -115,7 +118,7 @@ func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, store stores.Store
}
func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker {
return newLocalWorker(nil, wcfg, store, local, sindex, ret, cst)
return newLocalWorker(nil, wcfg, os.LookupEnv, store, local, sindex, ret, cst)
}
type localWorkerPathProvider struct {
@ -482,6 +485,52 @@ func (l *LocalWorker) Paths(ctx context.Context) ([]stores.StoragePath, error) {
return l.localStore.Local(ctx)
}
func (l *LocalWorker) memInfo() (memPhysical, memUsed, memSwap, memSwapUsed uint64, err error) {
h, err := sysinfo.Host()
if err != nil {
return 0, 0, 0, 0, err
}
mem, err := h.Memory()
if err != nil {
return 0, 0, 0, 0, err
}
memPhysical = mem.Total
// mem.Available is memory available without swapping, it is more relevant for this calculation
memUsed = mem.Total - mem.Available
memSwap = mem.VirtualTotal
memSwapUsed = mem.VirtualUsed
if cgMemMax, cgMemUsed, cgSwapMax, cgSwapUsed, err := cgroupV1Mem(); err == nil {
if cgMemMax > 0 && cgMemMax < memPhysical {
memPhysical = cgMemMax
memUsed = cgMemUsed
}
if cgSwapMax > 0 && cgSwapMax < memSwap {
memSwap = cgSwapMax
memSwapUsed = cgSwapUsed
}
}
if cgMemMax, cgMemUsed, cgSwapMax, cgSwapUsed, err := cgroupV2Mem(); err == nil {
if cgMemMax > 0 && cgMemMax < memPhysical {
memPhysical = cgMemMax
memUsed = cgMemUsed
}
if cgSwapMax > 0 && cgSwapMax < memSwap {
memSwap = cgSwapMax
memSwapUsed = cgSwapUsed
}
}
if l.noSwap {
memSwap = 0
memSwapUsed = 0
}
return memPhysical, memUsed, memSwap, memSwapUsed, nil
}
func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) {
hostname, err := os.Hostname() // TODO: allow overriding from config
if err != nil {
@ -493,30 +542,29 @@ func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) {
log.Errorf("getting gpu devices failed: %+v", err)
}
h, err := sysinfo.Host()
if err != nil {
return storiface.WorkerInfo{}, xerrors.Errorf("getting host info: %w", err)
}
mem, err := h.Memory()
memPhysical, memUsed, memSwap, memSwapUsed, err := l.memInfo()
if err != nil {
return storiface.WorkerInfo{}, xerrors.Errorf("getting memory info: %w", err)
}
memSwap := mem.VirtualTotal
if l.noSwap {
memSwap = 0
resEnv, err := storiface.ParseResourceEnv(func(key, def string) (string, bool) {
return l.envLookup(key)
})
if err != nil {
return storiface.WorkerInfo{}, xerrors.Errorf("interpreting resource env vars: %w", err)
}
return storiface.WorkerInfo{
Hostname: hostname,
IgnoreResources: l.ignoreResources,
Resources: storiface.WorkerResources{
MemPhysical: mem.Total,
MemPhysical: memPhysical,
MemUsed: memUsed,
MemSwap: memSwap,
MemReserved: mem.VirtualUsed + mem.Total - mem.Available, // TODO: sub this process
MemSwapUsed: memSwapUsed,
CPUs: uint64(runtime.NumCPU()),
GPUs: gpus,
Resources: resEnv,
},
}, nil
}

View File

@ -20,7 +20,7 @@ import (
type trackedWork struct {
job storiface.WorkerJob
worker WorkerID
worker storiface.WorkerID
workerHostname string
}
@ -58,7 +58,7 @@ func (wt *workTracker) onDone(ctx context.Context, callID storiface.CallID) {
delete(wt.running, callID)
}
func (wt *workTracker) track(ctx context.Context, ready chan struct{}, wid WorkerID, wi storiface.WorkerInfo, sid storage.SectorRef, task sealtasks.TaskType, cb func() (storiface.CallID, error)) (storiface.CallID, error) {
func (wt *workTracker) track(ctx context.Context, ready chan struct{}, wid storiface.WorkerID, wi storiface.WorkerInfo, sid storage.SectorRef, task sealtasks.TaskType, cb func() (storiface.CallID, error)) (storiface.CallID, error) {
tracked := func(rw int, callID storiface.CallID) trackedWork {
return trackedWork{
job: storiface.WorkerJob{
@ -122,7 +122,7 @@ func (wt *workTracker) track(ctx context.Context, ready chan struct{}, wid Worke
return callID, err
}
func (wt *workTracker) worker(wid WorkerID, wi storiface.WorkerInfo, w Worker) *trackedWorker {
func (wt *workTracker) worker(wid storiface.WorkerID, wi storiface.WorkerInfo, w Worker) *trackedWorker {
return &trackedWorker{
Worker: w,
wid: wid,
@ -152,7 +152,7 @@ func (wt *workTracker) Running() ([]trackedWork, []trackedWork) {
type trackedWorker struct {
Worker
wid WorkerID
wid storiface.WorkerID
workerInfo storiface.WorkerInfo
execute chan struct{} // channel blocking execution in case we're waiting for resources but the task is ready to execute

1
go.mod
View File

@ -15,6 +15,7 @@ require (
github.com/buger/goterm v1.0.3
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e
github.com/cockroachdb/pebble v0.0.0-20201001221639-879f3bfeef07
github.com/containerd/cgroups v0.0.0-20201119153540-4cbc285b3327
github.com/coreos/go-systemd/v22 v22.3.2
github.com/detailyang/go-fallocate v0.0.0-20180908115635-432fa640bd2e
github.com/dgraph-io/badger/v2 v2.2007.2

1
go.sum
View File

@ -174,6 +174,7 @@ github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e h1:fY5BOSpyZCqRo5O
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1 h1:q763qf9huN11kDQavWsoZXJNW3xEE4JJyHa5Q25/sd8=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/cilium/ebpf v0.2.0 h1:Fv93L3KKckEcEHR3oApXVzyBTDA8WAm6VXhPE00N3f8=
github.com/cilium/ebpf v0.2.0/go.mod h1:To2CFviqOWL/M0gIMsvSMlqe7em/l1ALkX1PyjrX2Qs=
github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag=
github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I=