Merge pull request #82 from filecoin-project/feat/sched-diag
Fix requestQueue iteration, diag methods
This commit is contained in:
commit
add7f0b4b0
@ -494,6 +494,10 @@ func (m *Manager) FsStat(ctx context.Context, id stores.ID) (fsutil.FsStat, erro
|
||||
return m.storage.FsStat(ctx, id)
|
||||
}
|
||||
|
||||
func (m *Manager) SchedDiag(ctx context.Context) (interface{}, error) {
|
||||
return m.sched.Info(ctx)
|
||||
}
|
||||
|
||||
func (m *Manager) Close(ctx context.Context) error {
|
||||
return m.sched.Close(ctx)
|
||||
}
|
||||
|
@ -1,6 +1,6 @@
|
||||
package sectorstorage
|
||||
|
||||
import "container/heap"
|
||||
import "sort"
|
||||
|
||||
type requestQueue []*workerRequest
|
||||
|
||||
@ -24,21 +24,22 @@ func (q requestQueue) Swap(i, j int) {
|
||||
q[j].index = j
|
||||
}
|
||||
|
||||
func (q *requestQueue) Push(x interface{}) {
|
||||
func (q *requestQueue) Push(x *workerRequest) {
|
||||
n := len(*q)
|
||||
item := x.(*workerRequest)
|
||||
item := x
|
||||
item.index = n
|
||||
*q = append(*q, item)
|
||||
sort.Sort(q)
|
||||
}
|
||||
|
||||
func (q *requestQueue) Pop() interface{} {
|
||||
func (q *requestQueue) Remove(i int) *workerRequest {
|
||||
old := *q
|
||||
n := len(old)
|
||||
item := old[n-1]
|
||||
old[n-1] = nil // avoid memory leak
|
||||
item.index = -1 // for safety
|
||||
item := old[i]
|
||||
old[i] = old[n-1]
|
||||
old[n-1] = nil
|
||||
item.index = -1
|
||||
*q = old[0 : n-1]
|
||||
sort.Sort(q)
|
||||
return item
|
||||
}
|
||||
|
||||
var _ heap.Interface = &requestQueue{}
|
||||
|
@ -1,7 +1,7 @@
|
||||
package sectorstorage
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/filecoin-project/sector-storage/sealtasks"
|
||||
@ -10,19 +10,51 @@ import (
|
||||
func TestRequestQueue(t *testing.T) {
|
||||
rq := &requestQueue{}
|
||||
|
||||
heap.Push(rq, &workerRequest{taskType: sealtasks.TTAddPiece})
|
||||
heap.Push(rq, &workerRequest{taskType: sealtasks.TTPreCommit1})
|
||||
heap.Push(rq, &workerRequest{taskType: sealtasks.TTPreCommit2})
|
||||
heap.Push(rq, &workerRequest{taskType: sealtasks.TTPreCommit1})
|
||||
heap.Push(rq, &workerRequest{taskType: sealtasks.TTAddPiece})
|
||||
rq.Push(&workerRequest{taskType: sealtasks.TTAddPiece})
|
||||
rq.Push(&workerRequest{taskType: sealtasks.TTPreCommit1})
|
||||
rq.Push(&workerRequest{taskType: sealtasks.TTPreCommit2})
|
||||
rq.Push(&workerRequest{taskType: sealtasks.TTPreCommit1})
|
||||
rq.Push(&workerRequest{taskType: sealtasks.TTAddPiece})
|
||||
|
||||
pt := heap.Pop(rq).(*workerRequest)
|
||||
dump := func(s string) {
|
||||
fmt.Println("---")
|
||||
fmt.Println(s)
|
||||
|
||||
for sqi := 0; sqi < rq.Len(); sqi++ {
|
||||
task := (*rq)[sqi]
|
||||
fmt.Println(sqi, task.taskType)
|
||||
}
|
||||
}
|
||||
|
||||
dump("start")
|
||||
|
||||
pt := rq.Remove(0)
|
||||
|
||||
dump("pop 1")
|
||||
|
||||
if pt.taskType != sealtasks.TTPreCommit2 {
|
||||
t.Error("expected precommit2, got", pt.taskType)
|
||||
}
|
||||
|
||||
pt = heap.Pop(rq).(*workerRequest)
|
||||
pt = rq.Remove(0)
|
||||
|
||||
dump("pop 2")
|
||||
|
||||
if pt.taskType != sealtasks.TTPreCommit1 {
|
||||
t.Error("expected precommit1, got", pt.taskType)
|
||||
}
|
||||
|
||||
pt = rq.Remove(1)
|
||||
|
||||
dump("pop 3")
|
||||
|
||||
if pt.taskType != sealtasks.TTAddPiece {
|
||||
t.Error("expected addpiece, got", pt.taskType)
|
||||
}
|
||||
|
||||
pt = rq.Remove(0)
|
||||
|
||||
dump("pop 4")
|
||||
|
||||
if pt.taskType != sealtasks.TTPreCommit1 {
|
||||
t.Error("expected precommit1, got", pt.taskType)
|
||||
|
71
sched.go
71
sched.go
@ -1,7 +1,6 @@
|
||||
package sectorstorage
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
@ -69,6 +68,8 @@ type scheduler struct {
|
||||
schedQueue *requestQueue
|
||||
openWindows []*schedWindowRequest
|
||||
|
||||
info chan func(interface{})
|
||||
|
||||
closing chan struct{}
|
||||
closed chan struct{}
|
||||
testSync chan struct{} // used for testing
|
||||
@ -123,8 +124,8 @@ type workerRequest struct {
|
||||
index int // The index of the item in the heap.
|
||||
|
||||
indexHeap int
|
||||
ret chan<- workerResponse
|
||||
ctx context.Context
|
||||
ret chan<- workerResponse
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
type workerResponse struct {
|
||||
@ -148,6 +149,8 @@ func newScheduler(spt abi.RegisteredSealProof) *scheduler {
|
||||
|
||||
schedQueue: &requestQueue{},
|
||||
|
||||
info: make(chan func(interface{})),
|
||||
|
||||
closing: make(chan struct{}),
|
||||
closed: make(chan struct{}),
|
||||
}
|
||||
@ -193,6 +196,17 @@ func (r *workerRequest) respond(err error) {
|
||||
}
|
||||
}
|
||||
|
||||
type SchedDiagRequestInfo struct {
|
||||
Sector abi.SectorID
|
||||
TaskType sealtasks.TaskType
|
||||
Priority int
|
||||
}
|
||||
|
||||
type SchedDiagInfo struct {
|
||||
Requests []SchedDiagRequestInfo
|
||||
OpenWindows []WorkerID
|
||||
}
|
||||
|
||||
func (sh *scheduler) runSched() {
|
||||
defer close(sh.closed)
|
||||
|
||||
@ -207,7 +221,7 @@ func (sh *scheduler) runSched() {
|
||||
sh.dropWorker(wid)
|
||||
|
||||
case req := <-sh.schedule:
|
||||
heap.Push(sh.schedQueue, req)
|
||||
sh.schedQueue.Push(req)
|
||||
sh.trySched()
|
||||
|
||||
if sh.testSync != nil {
|
||||
@ -217,6 +231,9 @@ func (sh *scheduler) runSched() {
|
||||
sh.openWindows = append(sh.openWindows, req)
|
||||
sh.trySched()
|
||||
|
||||
case ireq := <-sh.info:
|
||||
ireq(sh.diag())
|
||||
|
||||
case <-sh.closing:
|
||||
sh.schedClose()
|
||||
return
|
||||
@ -224,6 +241,26 @@ func (sh *scheduler) runSched() {
|
||||
}
|
||||
}
|
||||
|
||||
func (sh *scheduler) diag() SchedDiagInfo {
|
||||
var out SchedDiagInfo
|
||||
|
||||
for sqi := 0; sqi < sh.schedQueue.Len(); sqi++ {
|
||||
task := (*sh.schedQueue)[sqi]
|
||||
|
||||
out.Requests = append(out.Requests, SchedDiagRequestInfo{
|
||||
Sector: task.sector,
|
||||
TaskType: task.taskType,
|
||||
Priority: task.priority,
|
||||
})
|
||||
}
|
||||
|
||||
for _, window := range sh.openWindows {
|
||||
out.OpenWindows = append(out.OpenWindows, window.worker)
|
||||
}
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
func (sh *scheduler) trySched() {
|
||||
/*
|
||||
This assigns tasks to workers based on:
|
||||
@ -244,7 +281,7 @@ func (sh *scheduler) trySched() {
|
||||
windows := make([]schedWindow, len(sh.openWindows))
|
||||
acceptableWindows := make([][]int, sh.schedQueue.Len())
|
||||
|
||||
log.Debugf("trySched %d queued; %d open windows", sh.schedQueue.Len(), len(windows))
|
||||
log.Debugf("SCHED %d queued; %d open windows", sh.schedQueue.Len(), len(windows))
|
||||
|
||||
// Step 1
|
||||
for sqi := 0; sqi < sh.schedQueue.Len(); sqi++ {
|
||||
@ -306,6 +343,9 @@ func (sh *scheduler) trySched() {
|
||||
})
|
||||
}
|
||||
|
||||
log.Debugf("SCHED windows: %+v", windows)
|
||||
log.Debugf("SCHED Acceptable win: %+v", acceptableWindows)
|
||||
|
||||
// Step 2
|
||||
scheduled := 0
|
||||
|
||||
@ -318,14 +358,14 @@ func (sh *scheduler) trySched() {
|
||||
wid := sh.openWindows[wnd].worker
|
||||
wr := sh.workers[wid].info.Resources
|
||||
|
||||
log.Debugf("trySched try assign sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd)
|
||||
log.Debugf("SCHED try assign sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd)
|
||||
|
||||
// TODO: allow bigger windows
|
||||
if !windows[wnd].allocated.canHandleRequest(needRes, wid, wr) {
|
||||
continue
|
||||
}
|
||||
|
||||
log.Debugf("trySched ASSIGNED sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd)
|
||||
log.Debugf("SCHED ASSIGNED sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd)
|
||||
|
||||
windows[wnd].allocated.add(wr, needRes)
|
||||
|
||||
@ -340,7 +380,7 @@ func (sh *scheduler) trySched() {
|
||||
|
||||
windows[selectedWindow].todo = append(windows[selectedWindow].todo, task)
|
||||
|
||||
heap.Remove(sh.schedQueue, sqi)
|
||||
sh.schedQueue.Remove(sqi)
|
||||
sqi--
|
||||
scheduled++
|
||||
}
|
||||
@ -623,6 +663,21 @@ func (sh *scheduler) schedClose() {
|
||||
}
|
||||
}
|
||||
|
||||
func (sh *scheduler) Info(ctx context.Context) (interface{}, error) {
|
||||
ch := make(chan interface{}, 1)
|
||||
|
||||
sh.info <- func(res interface{}) {
|
||||
ch <- res
|
||||
}
|
||||
|
||||
select {
|
||||
case res := <-ch:
|
||||
return res, nil
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (sh *scheduler) Close(ctx context.Context) error {
|
||||
close(sh.closing)
|
||||
select {
|
||||
|
Loading…
Reference in New Issue
Block a user