Merge pull request #4116 from filecoin-project/feat/sched-par-pcc2

sched: Allow some single-thread tasks to run in parallel with PC2/C2
This commit is contained in:
Łukasz Magiera 2020-10-01 01:35:05 +02:00 committed by GitHub
commit 954d806367
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 84 additions and 74 deletions

View File

@ -10,14 +10,38 @@ type Resources struct {
MinMemory uint64 // What Must be in RAM for decent perf
MaxMemory uint64 // Memory required (swap + ram)
Threads int // -1 = multithread
CanGPU bool
MaxParallelism int // -1 = multithread
CanGPU bool
BaseMinMemory uint64 // What Must be in RAM for decent perf (shared between threads)
}
func (r Resources) MultiThread() bool {
return r.Threads == -1
/*
Percent of threads to allocate to parallel tasks
12 * 0.92 = 11
16 * 0.92 = 14
24 * 0.92 = 22
32 * 0.92 = 29
64 * 0.92 = 58
128 * 0.92 = 117
*/
var ParallelNum uint64 = 92
var ParallelDenom uint64 = 100
// TODO: Take NUMA into account
func (r Resources) Threads(wcpus uint64) uint64 {
if r.MaxParallelism == -1 {
n := (wcpus * ParallelNum) / ParallelDenom
if n == 0 {
return wcpus
}
return n
}
return uint64(r.MaxParallelism)
}
var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources{
@ -26,7 +50,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 8 << 30,
MinMemory: 8 << 30,
Threads: 1,
MaxParallelism: 1,
BaseMinMemory: 1 << 30,
},
@ -34,7 +58,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 4 << 30,
MinMemory: 4 << 30,
Threads: 1,
MaxParallelism: 1,
BaseMinMemory: 1 << 30,
},
@ -42,7 +66,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 1 << 30,
MinMemory: 1 << 30,
Threads: 1,
MaxParallelism: 1,
BaseMinMemory: 1 << 30,
},
@ -50,7 +74,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 2 << 10,
MinMemory: 2 << 10,
Threads: 1,
MaxParallelism: 1,
BaseMinMemory: 2 << 10,
},
@ -58,7 +82,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 8 << 20,
MinMemory: 8 << 20,
Threads: 1,
MaxParallelism: 1,
BaseMinMemory: 8 << 20,
},
@ -68,7 +92,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 128 << 30,
MinMemory: 112 << 30,
Threads: 1,
MaxParallelism: 1,
BaseMinMemory: 10 << 20,
},
@ -76,7 +100,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 64 << 30,
MinMemory: 56 << 30,
Threads: 1,
MaxParallelism: 1,
BaseMinMemory: 10 << 20,
},
@ -84,7 +108,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 1 << 30,
MinMemory: 768 << 20,
Threads: 1,
MaxParallelism: 1,
BaseMinMemory: 1 << 20,
},
@ -92,7 +116,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 2 << 10,
MinMemory: 2 << 10,
Threads: 1,
MaxParallelism: 1,
BaseMinMemory: 2 << 10,
},
@ -100,35 +124,35 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 8 << 20,
MinMemory: 8 << 20,
Threads: 1,
MaxParallelism: 1,
BaseMinMemory: 8 << 20,
},
},
sealtasks.TTPreCommit2: {
abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{
MaxMemory: 64 << 30,
MinMemory: 64 << 30,
MaxMemory: 30 << 30,
MinMemory: 30 << 30,
Threads: -1,
CanGPU: true,
MaxParallelism: -1,
CanGPU: true,
BaseMinMemory: 60 << 30,
BaseMinMemory: 1 << 30,
},
abi.RegisteredSealProof_StackedDrg32GiBV1: Resources{
MaxMemory: 32 << 30,
MinMemory: 32 << 30,
MaxMemory: 15 << 30,
MinMemory: 15 << 30,
Threads: -1,
CanGPU: true,
MaxParallelism: -1,
CanGPU: true,
BaseMinMemory: 30 << 30,
BaseMinMemory: 1 << 30,
},
abi.RegisteredSealProof_StackedDrg512MiBV1: Resources{
MaxMemory: 3 << 29, // 1.5G
MinMemory: 1 << 30,
Threads: -1,
MaxParallelism: -1,
BaseMinMemory: 1 << 30,
},
@ -136,7 +160,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 2 << 10,
MinMemory: 2 << 10,
Threads: -1,
MaxParallelism: -1,
BaseMinMemory: 2 << 10,
},
@ -144,7 +168,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 8 << 20,
MinMemory: 8 << 20,
Threads: -1,
MaxParallelism: -1,
BaseMinMemory: 8 << 20,
},
@ -154,7 +178,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 1 << 30,
MinMemory: 1 << 30,
Threads: 0,
MaxParallelism: 0,
BaseMinMemory: 1 << 30,
},
@ -162,7 +186,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 1 << 30,
MinMemory: 1 << 30,
Threads: 0,
MaxParallelism: 0,
BaseMinMemory: 1 << 30,
},
@ -170,7 +194,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 1 << 30,
MinMemory: 1 << 30,
Threads: 0,
MaxParallelism: 0,
BaseMinMemory: 1 << 30,
},
@ -178,7 +202,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 2 << 10,
MinMemory: 2 << 10,
Threads: 0,
MaxParallelism: 0,
BaseMinMemory: 2 << 10,
},
@ -186,7 +210,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 8 << 20,
MinMemory: 8 << 20,
Threads: 0,
MaxParallelism: 0,
BaseMinMemory: 8 << 20,
},
@ -196,8 +220,8 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 190 << 30, // TODO: Confirm
MinMemory: 60 << 30,
Threads: -1,
CanGPU: true,
MaxParallelism: -1,
CanGPU: true,
BaseMinMemory: 64 << 30, // params
},
@ -205,8 +229,8 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 150 << 30, // TODO: ~30G of this should really be BaseMaxMemory
MinMemory: 30 << 30,
Threads: -1,
CanGPU: true,
MaxParallelism: -1,
CanGPU: true,
BaseMinMemory: 32 << 30, // params
},
@ -214,8 +238,8 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 3 << 29, // 1.5G
MinMemory: 1 << 30,
Threads: 1, // This is fine
CanGPU: true,
MaxParallelism: 1, // This is fine
CanGPU: true,
BaseMinMemory: 10 << 30,
},
@ -223,8 +247,8 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 2 << 10,
MinMemory: 2 << 10,
Threads: 1,
CanGPU: true,
MaxParallelism: 1,
CanGPU: true,
BaseMinMemory: 2 << 10,
},
@ -232,8 +256,8 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 8 << 20,
MinMemory: 8 << 20,
Threads: 1,
CanGPU: true,
MaxParallelism: 1,
CanGPU: true,
BaseMinMemory: 8 << 20,
},
@ -243,8 +267,8 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 1 << 20,
MinMemory: 1 << 20,
Threads: 0,
CanGPU: false,
MaxParallelism: 0,
CanGPU: false,
BaseMinMemory: 0,
},
@ -252,8 +276,8 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 1 << 20,
MinMemory: 1 << 20,
Threads: 0,
CanGPU: false,
MaxParallelism: 0,
CanGPU: false,
BaseMinMemory: 0,
},
@ -261,8 +285,8 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 1 << 20,
MinMemory: 1 << 20,
Threads: 0,
CanGPU: false,
MaxParallelism: 0,
CanGPU: false,
BaseMinMemory: 0,
},
@ -270,8 +294,8 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 1 << 20,
MinMemory: 1 << 20,
Threads: 0,
CanGPU: false,
MaxParallelism: 0,
CanGPU: false,
BaseMinMemory: 0,
},
@ -279,8 +303,8 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources
MaxMemory: 1 << 20,
MinMemory: 1 << 20,
Threads: 0,
CanGPU: false,
MaxParallelism: 0,
CanGPU: false,
BaseMinMemory: 0,
},

View File

@ -28,12 +28,7 @@ func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerResource
func (a *activeResources) add(wr storiface.WorkerResources, r Resources) {
a.gpuUsed = r.CanGPU
if r.MultiThread() {
a.cpuUse += wr.CPUs
} else {
a.cpuUse += uint64(r.Threads)
}
a.cpuUse += r.Threads(wr.CPUs)
a.memUsedMin += r.MinMemory
a.memUsedMax += r.MaxMemory
}
@ -42,12 +37,7 @@ func (a *activeResources) free(wr storiface.WorkerResources, r Resources) {
if r.CanGPU {
a.gpuUsed = false
}
if r.MultiThread() {
a.cpuUse -= wr.CPUs
} else {
a.cpuUse -= uint64(r.Threads)
}
a.cpuUse -= r.Threads(wr.CPUs)
a.memUsedMin -= r.MinMemory
a.memUsedMax -= r.MaxMemory
}
@ -68,16 +58,9 @@ func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, call
return false
}
if needRes.MultiThread() {
if a.cpuUse > 0 {
log.Debugf("sched: not scheduling on worker %d for %s; multicore process needs %d threads, %d in use, target %d", wid, caller, res.CPUs, a.cpuUse, res.CPUs)
return false
}
} else {
if a.cpuUse+uint64(needRes.Threads) > res.CPUs {
log.Debugf("sched: not scheduling on worker %d for %s; not enough threads, need %d, %d in use, target %d", wid, caller, needRes.Threads, a.cpuUse, res.CPUs)
return false
}
if a.cpuUse+needRes.Threads(res.CPUs) > res.CPUs {
log.Debugf("sched: not scheduling on worker %d for %s; not enough threads, need %d, %d in use, target %d", wid, caller, needRes.Threads(res.CPUs), a.cpuUse, res.CPUs)
return false
}
if len(res.GPUs) > 0 && needRes.CanGPU {

View File

@ -290,6 +290,9 @@ func TestSched(t *testing.T) {
}
testFunc := func(workers []workerSpec, tasks []task) func(t *testing.T) {
ParallelNum = 1
ParallelDenom = 1
return func(t *testing.T) {
index := stores.NewIndex()