sectorstorage: Compact assigned windows

This commit is contained in:
Łukasz Magiera 2020-08-28 18:26:17 +02:00
parent 20ffce5c0d
commit 11b11e416b
6 changed files with 134 additions and 14 deletions

View File

@ -156,7 +156,7 @@ var sealingJobsCmd = &cli.Command{
// oldest first
sort.Slice(lines, func(i, j int) bool {
if lines[i].RunWait != lines[j].RunWait {
return !lines[i].RunWait // already running tasks first
return lines[i].RunWait < lines[j].RunWait
}
return lines[i].Start.Before(lines[j].Start)
})
@ -176,9 +176,9 @@ var sealingJobsCmd = &cli.Command{
_, _ = fmt.Fprintf(tw, "ID\tSector\tWorker\tHostname\tTask\tState\tTime\n")
for _, l := range lines {
state := "assigned"
if !l.RunWait {
state = "running"
state := "running"
if l.RunWait != 0 {
state = fmt.Sprintf("assigned(%d)", l.RunWait-1)
}
_, _ = fmt.Fprintf(tw, "%d\t%d\t%d\t%s\t%s\t%s\t%s\n", l.ID, l.Sector.Number, l.wid, workerHostnames[l.wid], l.Task.Short(), state, time.Now().Sub(l.Start).Truncate(time.Millisecond*100))
}

View File

@ -22,17 +22,17 @@ func (r Resources) MultiThread() bool {
var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredSealProof]Resources{
sealtasks.TTAddPiece: {
abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{ // This is probably a bit conservative
MaxMemory: 64 << 30,
MinMemory: 64 << 30,
abi.RegisteredSealProof_StackedDrg64GiBV1: Resources{
MaxMemory: 8 << 30,
MinMemory: 8 << 30,
Threads: 1,
BaseMinMemory: 1 << 30,
},
abi.RegisteredSealProof_StackedDrg32GiBV1: Resources{ // This is probably a bit conservative
MaxMemory: 32 << 30,
MinMemory: 32 << 30,
abi.RegisteredSealProof_StackedDrg32GiBV1: Resources{
MaxMemory: 4 << 30,
MinMemory: 4 << 30,
Threads: 1,

View File

@ -153,7 +153,7 @@ func newScheduler(spt abi.RegisteredSealProof) *scheduler {
workerClosing: make(chan WorkerID),
schedule: make(chan *workerRequest),
windowRequests: make(chan *schedWindowRequest),
windowRequests: make(chan *schedWindowRequest, 20),
schedQueue: &requestQueue{},
@ -561,6 +561,8 @@ func (sh *scheduler) runWorker(wid WorkerID) {
worker.wndLk.Lock()
windowsRequested -= sh.workerCompactWindows(worker, wid)
assignLoop:
// process windows in order
for len(worker.activeWindows) > 0 {
@ -588,6 +590,7 @@ func (sh *scheduler) runWorker(wid WorkerID) {
go todo.respond(xerrors.Errorf("assignWorker error: %w", err))
}
// Note: we're not freeing window.allocated resources here very much on purpose
worker.activeWindows[0].todo = worker.activeWindows[0].todo[1:]
}
@ -603,6 +606,58 @@ func (sh *scheduler) runWorker(wid WorkerID) {
}()
}
func (sh *scheduler) workerCompactWindows(worker *workerHandle, wid WorkerID) int {
// move tasks from older windows to newer windows if older windows
// still can fit them
if len(worker.activeWindows) > 1 {
for wi, window := range worker.activeWindows[1:] {
lower := worker.activeWindows[wi]
var moved []int
for ti, todo := range window.todo {
needRes := ResourceTable[todo.taskType][sh.spt]
if !lower.allocated.canHandleRequest(needRes, wid, worker.info.Resources) {
continue
}
moved = append(moved, ti)
lower.todo = append(lower.todo, todo)
lower.allocated.add(worker.info.Resources, needRes)
window.allocated.free(worker.info.Resources, needRes)
}
if len(moved) > 0 {
newTodo := make([]*workerRequest, 0, len(window.todo)-len(moved))
for i, t := range window.todo {
if moved[0] == i {
moved = moved[1:]
continue
}
newTodo = append(newTodo, t)
}
window.todo = newTodo
}
}
}
var compacted int
var newWindows []*schedWindow
for _, window := range worker.activeWindows {
if len(window.todo) == 0 {
compacted++
continue
}
newWindows = append(newWindows, window)
}
worker.activeWindows = newWindows
return compacted
}
func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *workerHandle, req *workerRequest) error {
needRes := ResourceTable[req.taskType][sh.spt]

View File

@ -522,3 +522,68 @@ func BenchmarkTrySched(b *testing.B) {
b.Run("1w-500q", test(1, 500))
b.Run("200w-400q", test(200, 400))
}
func TestWindowCompact(t *testing.T) {
sh := scheduler{
spt: abi.RegisteredSealProof_StackedDrg32GiBV1,
}
test := func(start [][]sealtasks.TaskType, expect [][]sealtasks.TaskType) func(t *testing.T) {
return func(t *testing.T) {
wh := &workerHandle{
info: storiface.WorkerInfo{
Resources: decentWorkerResources,
},
}
for _, windowTasks := range start {
window := &schedWindow{}
for _, task := range windowTasks {
window.todo = append(window.todo, &workerRequest{taskType: task})
window.allocated.add(wh.info.Resources, ResourceTable[task][sh.spt])
}
wh.activeWindows = append(wh.activeWindows, window)
}
n := sh.workerCompactWindows(wh, 0)
require.Equal(t, len(start)-len(expect), n)
for wi, tasks := range expect {
var expectRes activeResources
for ti, task := range tasks {
require.Equal(t, task, wh.activeWindows[wi].todo[ti].taskType, "%d, %d", wi, ti)
expectRes.add(wh.info.Resources, ResourceTable[task][sh.spt])
}
require.Equal(t, expectRes.cpuUse, wh.activeWindows[wi].allocated.cpuUse, "%d", wi)
require.Equal(t, expectRes.gpuUsed, wh.activeWindows[wi].allocated.gpuUsed, "%d", wi)
require.Equal(t, expectRes.memUsedMin, wh.activeWindows[wi].allocated.memUsedMin, "%d", wi)
require.Equal(t, expectRes.memUsedMax, wh.activeWindows[wi].allocated.memUsedMax, "%d", wi)
}
}
}
t.Run("2-pc1-windows", test(
[][]sealtasks.TaskType{{sealtasks.TTPreCommit1}, {sealtasks.TTPreCommit1}},
[][]sealtasks.TaskType{{sealtasks.TTPreCommit1, sealtasks.TTPreCommit1}}),
)
t.Run("1-window", test(
[][]sealtasks.TaskType{{sealtasks.TTPreCommit1, sealtasks.TTPreCommit1}},
[][]sealtasks.TaskType{{sealtasks.TTPreCommit1, sealtasks.TTPreCommit1}}),
)
t.Run("2-pc2-windows", test(
[][]sealtasks.TaskType{{sealtasks.TTPreCommit2}, {sealtasks.TTPreCommit2}},
[][]sealtasks.TaskType{{sealtasks.TTPreCommit2}, {sealtasks.TTPreCommit2}}),
)
t.Run("2pc1-pc1ap", test(
[][]sealtasks.TaskType{{sealtasks.TTPreCommit1, sealtasks.TTPreCommit1}, {sealtasks.TTPreCommit1, sealtasks.TTAddPiece}},
[][]sealtasks.TaskType{{sealtasks.TTPreCommit1, sealtasks.TTPreCommit1, sealtasks.TTAddPiece}, {sealtasks.TTPreCommit1}}),
)
}

View File

@ -33,13 +33,13 @@ func (m *Manager) WorkerJobs() map[uint64][]storiface.WorkerJob {
out[uint64(id)] = handle.wt.Running()
handle.wndLk.Lock()
for _, window := range handle.activeWindows {
for wi, window := range handle.activeWindows {
for _, request := range window.todo {
out[uint64(id)] = append(out[uint64(id)], storiface.WorkerJob{
ID: 0,
Sector: request.sector,
Task: request.taskType,
RunWait: true,
RunWait: wi + 1,
Start: request.start,
})
}

View File

@ -37,6 +37,6 @@ type WorkerJob struct {
Sector abi.SectorID
Task sealtasks.TaskType
RunWait bool
RunWait int // 0 - running, 1+ - assigned
Start time.Time
}