sealing: fix: Make DataCid resource env vars make more sense

This commit is contained in:
Łukasz Magiera 2022-08-29 15:57:58 +02:00
parent 28722de72d
commit e4ed3f8cf8
3 changed files with 80 additions and 21 deletions

View File

@ -96,6 +96,11 @@ func (a TaskType) WorkerType() string {
} }
} }
// SectorSized returns true if the task operates on a specific sector size
func (a TaskType) SectorSized() bool {
return a != TTDataCid
}
func (a TaskType) MuchLess(b TaskType) (bool, bool) { func (a TaskType) MuchLess(b TaskType) (bool, bool) {
oa, ob := order[a], order[b] oa, ob := order[a], order[b]
oneNegative := oa^ob < 0 oneNegative := oa^ob < 0

View File

@ -6,6 +6,7 @@ import (
"strconv" "strconv"
"strings" "strings"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
@ -13,6 +14,8 @@ import (
"github.com/filecoin-project/lotus/storage/sealer/sealtasks" "github.com/filecoin-project/lotus/storage/sealer/sealtasks"
) )
var log = logging.Logger("resources")
type Resources struct { type Resources struct {
MinMemory uint64 `envname:"MIN_MEMORY"` // What Must be in RAM for decent perf MinMemory uint64 `envname:"MIN_MEMORY"` // What Must be in RAM for decent perf
MaxMemory uint64 `envname:"MAX_MEMORY"` // Memory required (swap + ram; peak memory usage during task execution) MaxMemory uint64 `envname:"MAX_MEMORY"` // Memory required (swap + ram; peak memory usage during task execution)
@ -32,16 +35,14 @@ type Resources struct {
} }
/* /*
Percent of threads to allocate to parallel tasks
Percent of threads to allocate to parallel tasks 12 * 0.92 = 11
16 * 0.92 = 14
12 * 0.92 = 11 24 * 0.92 = 22
16 * 0.92 = 14 32 * 0.92 = 29
24 * 0.92 = 22 64 * 0.92 = 58
32 * 0.92 = 29 128 * 0.92 = 117
64 * 0.92 = 58
128 * 0.92 = 117
*/ */
var ParallelNum uint64 = 92 var ParallelNum uint64 = 92
var ParallelDenom uint64 = 100 var ParallelDenom uint64 = 100
@ -610,20 +611,29 @@ func ParseResourceEnv(lookup func(key, def string) (string, bool)) (map[sealtask
envval, found := lookup(taskType.Short()+"_"+shortSize+"_"+envname, fmt.Sprint(rr.Elem().Field(i).Interface())) envval, found := lookup(taskType.Short()+"_"+shortSize+"_"+envname, fmt.Sprint(rr.Elem().Field(i).Interface()))
if !found { if !found {
// special multicore SDR handling // see if a non-size-specific envvar is set
if (taskType == sealtasks.TTPreCommit1 || taskType == sealtasks.TTUnseal) && envname == "MAX_PARALLELISM" { envval, found = lookup(taskType.Short()+"_"+envname, fmt.Sprint(rr.Elem().Field(i).Interface()))
v, ok := rr.Elem().Field(i).Addr().Interface().(*int) if !found {
if !ok { // special multicore SDR handling
// can't happen, but let's not panic if (taskType == sealtasks.TTPreCommit1 || taskType == sealtasks.TTUnseal) && envname == "MAX_PARALLELISM" {
return nil, xerrors.Errorf("res.MAX_PARALLELISM is not int (!?): %w", err) v, ok := rr.Elem().Field(i).Addr().Interface().(*int)
} if !ok {
*v, err = getSDRThreads(lookup) // can't happen, but let's not panic
if err != nil { return nil, xerrors.Errorf("res.MAX_PARALLELISM is not int (!?): %w", err)
return nil, err }
*v, err = getSDRThreads(lookup)
if err != nil {
return nil, err
}
} }
continue
} }
continue } else {
if !taskType.SectorSized() {
log.Errorw("sector-size independent task resource var specified with sector-sized envvar", "env", taskType.Short()+"_"+shortSize+"_"+envname, "use", taskType.Short()+"_"+envname)
}
} }
v := rr.Elem().Field(i).Addr().Interface() v := rr.Elem().Field(i).Addr().Interface()

View File

@ -12,9 +12,12 @@ import (
) )
func TestListResourceVars(t *testing.T) { func TestListResourceVars(t *testing.T) {
seen := map[string]struct{}{}
_, err := ParseResourceEnv(func(key, def string) (string, bool) { _, err := ParseResourceEnv(func(key, def string) (string, bool) {
if def != "" { _, s := seen[key]
if !s && def != "" {
fmt.Printf("%s=%s\n", key, def) fmt.Printf("%s=%s\n", key, def)
seen[key] = struct{}{}
} }
return "", false return "", false
@ -75,3 +78,44 @@ func TestListResourceSDRMulticoreOverride(t *testing.T) {
require.Equal(t, 9001, rt[sealtasks.TTPreCommit1][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism) require.Equal(t, 9001, rt[sealtasks.TTPreCommit1][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism)
require.Equal(t, 9001, rt[sealtasks.TTUnseal][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism) require.Equal(t, 9001, rt[sealtasks.TTUnseal][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism)
} }
func TestUnsizedSetAll(t *testing.T) {
rt, err := ParseResourceEnv(func(key, def string) (string, bool) {
if key == "UNS_MAX_PARALLELISM" {
return "2", true
}
return "", false
})
require.NoError(t, err)
require.Equal(t, 2, rt[sealtasks.TTUnseal][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism)
require.Equal(t, 2, rt[sealtasks.TTUnseal][stabi.RegisteredSealProof_StackedDrg32GiBV1].MaxParallelism)
require.Equal(t, 2, rt[sealtasks.TTUnseal][stabi.RegisteredSealProof_StackedDrg8MiBV1].MaxParallelism)
// check that defaults don't get mutated
require.Equal(t, 1, ResourceTable[sealtasks.TTUnseal][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism)
}
func TestUnsizedNotPreferred(t *testing.T) {
rt, err := ParseResourceEnv(func(key, def string) (string, bool) {
if key == "DC_MAX_PARALLELISM" {
return "2", true
}
// test should also print a warning for DataCid as it's not sector-size dependent
if key == "DC_64G_MAX_PARALLELISM" {
return "1", true
}
return "", false
})
require.NoError(t, err)
require.Equal(t, 2, rt[sealtasks.TTDataCid][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism)
require.Equal(t, 2, rt[sealtasks.TTDataCid][stabi.RegisteredSealProof_StackedDrg32GiBV1].MaxParallelism)
require.Equal(t, 1, rt[sealtasks.TTDataCid][stabi.RegisteredSealProof_StackedDrg64GiBV1_1].MaxParallelism)
// check that defaults don't get mutated
require.Equal(t, 1, ResourceTable[sealtasks.TTUnseal][stabi.RegisteredSealProof_StackedDrg2KiBV1_1].MaxParallelism)
}