From ced1d6e0875c5b535b346ef6f4be2ab1e3ec51b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 8 May 2020 01:38:05 +0200 Subject: [PATCH] Priority queue for tasks --- request_queue.go | 36 ++++++++++++++++++++++++++++++++++++ request_queue_test.go | 30 ++++++++++++++++++++++++++++++ sched.go | 25 ++++++++++--------------- sealtasks/task.go | 14 ++++++++++++++ 4 files changed, 90 insertions(+), 15 deletions(-) create mode 100644 request_queue.go create mode 100644 request_queue_test.go diff --git a/request_queue.go b/request_queue.go new file mode 100644 index 000000000..09ca7ae3f --- /dev/null +++ b/request_queue.go @@ -0,0 +1,36 @@ +package sectorstorage + +import "container/heap" + +type requestQueue []*workerRequest + +func (q requestQueue) Len() int { return len(q) } + +func (q requestQueue) Less(i, j int) bool { + return q[i].taskType.Less(q[j].taskType) +} + +func (q requestQueue) Swap(i, j int) { + q[i], q[j] = q[j], q[i] + q[i].index = i + q[j].index = j +} + +func (q *requestQueue) Push(x interface{}) { + n := len(*q) + item := x.(*workerRequest) + item.index = n + *q = append(*q, item) +} + +func (q *requestQueue) Pop() interface{} { + old := *q + n := len(old) + item := old[n-1] + old[n-1] = nil // avoid memory leak + item.index = -1 // for safety + *q = old[0 : n-1] + return item +} + +var _ heap.Interface = &requestQueue{} diff --git a/request_queue_test.go b/request_queue_test.go new file mode 100644 index 000000000..9bf231e39 --- /dev/null +++ b/request_queue_test.go @@ -0,0 +1,30 @@ +package sectorstorage + +import ( + "container/heap" + "testing" + + "github.com/filecoin-project/sector-storage/sealtasks" +) + +func TestRequestQueue(t *testing.T) { + rq := &requestQueue{} + + heap.Push(rq, &workerRequest{taskType: sealtasks.TTAddPiece}) + heap.Push(rq, &workerRequest{taskType: sealtasks.TTPreCommit1}) + heap.Push(rq, &workerRequest{taskType: sealtasks.TTPreCommit2}) + heap.Push(rq, &workerRequest{taskType: sealtasks.TTPreCommit1}) + heap.Push(rq, &workerRequest{taskType: sealtasks.TTAddPiece}) + + pt := heap.Pop(rq).(*workerRequest) + + if pt.taskType != sealtasks.TTPreCommit2 { + t.Error("expected precommit2, got", pt.taskType) + } + + pt = heap.Pop(rq).(*workerRequest) + + if pt.taskType != sealtasks.TTPreCommit1 { + t.Error("expected precommit1, got", pt.taskType) + } +} diff --git a/sched.go b/sched.go index d7745da36..019febda4 100644 --- a/sched.go +++ b/sched.go @@ -1,7 +1,7 @@ package sectorstorage import ( - "container/list" + "container/heap" "context" "sort" "sync" @@ -41,7 +41,7 @@ type scheduler struct { workerFree chan WorkerID closing chan struct{} - schedQueue *list.List // List[*workerRequest] + schedQueue *requestQueue } func newScheduler(spt abi.RegisteredProof) *scheduler { @@ -60,7 +60,7 @@ func newScheduler(spt abi.RegisteredProof) *scheduler { workerFree: make(chan WorkerID), closing: make(chan struct{}), - schedQueue: list.New(), + schedQueue: &requestQueue{}, } } @@ -101,6 +101,8 @@ type workerRequest struct { prepare WorkerAction work WorkerAction + index int // The index of the item in the heap. + ret chan<- workerResponse ctx context.Context } @@ -154,7 +156,7 @@ func (sh *scheduler) runSched() { continue } - sh.schedQueue.PushBack(req) + heap.Push(sh.schedQueue, req) case wid := <-sh.workerFree: sh.onWorkerFreed(wid) case <-sh.closing: @@ -173,8 +175,8 @@ func (sh *scheduler) onWorkerFreed(wid WorkerID) { return } - for e := sh.schedQueue.Front(); e != nil; e = e.Next() { - req := e.Value.(*workerRequest) + for i := 0; i < sh.schedQueue.Len(); i++ { + req := (*sh.schedQueue)[i] ok, err := req.sel.Ok(req.ctx, req.taskType, w) if err != nil { @@ -193,15 +195,8 @@ func (sh *scheduler) onWorkerFreed(wid WorkerID) { } if scheduled { - pe := e.Prev() - sh.schedQueue.Remove(e) - if pe == nil { - pe = sh.schedQueue.Front() - } - if pe == nil { - break - } - e = pe + heap.Remove(sh.schedQueue, i) + i-- continue } } diff --git a/sealtasks/task.go b/sealtasks/task.go index 8fbe7a7b4..0a94d2c04 100644 --- a/sealtasks/task.go +++ b/sealtasks/task.go @@ -13,3 +13,17 @@ const ( TTFetch TaskType = "seal/v0/fetch" ) + +var order = map[TaskType]int{ + TTAddPiece: 7, + TTPreCommit1: 6, + TTPreCommit2: 5, + TTCommit2: 4, + TTCommit1: 3, + TTFetch: 2, + TTFinalize: 1, +} + +func (a TaskType) Less(b TaskType) bool { + return order[a] < order[b] +}