diff --git a/request_queue.go b/request_queue.go index 0d35e9f1d..b40375dc8 100644 --- a/request_queue.go +++ b/request_queue.go @@ -1,6 +1,6 @@ package sectorstorage -import "container/heap" +import "sort" type requestQueue []*workerRequest @@ -24,21 +24,22 @@ func (q requestQueue) Swap(i, j int) { q[j].index = j } -func (q *requestQueue) Push(x interface{}) { +func (q *requestQueue) Push(x *workerRequest) { n := len(*q) - item := x.(*workerRequest) + item := x item.index = n *q = append(*q, item) + sort.Sort(q) } -func (q *requestQueue) Pop() interface{} { +func (q *requestQueue) Remove(i int) *workerRequest { old := *q n := len(old) - item := old[n-1] - old[n-1] = nil // avoid memory leak - item.index = -1 // for safety + item := old[i] + old[i] = old[n - 1] + old[n - 1] = nil + item.index = -1 *q = old[0 : n-1] + sort.Sort(q) return item } - -var _ heap.Interface = &requestQueue{} diff --git a/request_queue_test.go b/request_queue_test.go index 9bf231e39..cb4a5d5dd 100644 --- a/request_queue_test.go +++ b/request_queue_test.go @@ -1,7 +1,7 @@ package sectorstorage import ( - "container/heap" + "fmt" "testing" "github.com/filecoin-project/sector-storage/sealtasks" @@ -10,19 +10,51 @@ import ( func TestRequestQueue(t *testing.T) { rq := &requestQueue{} - heap.Push(rq, &workerRequest{taskType: sealtasks.TTAddPiece}) - heap.Push(rq, &workerRequest{taskType: sealtasks.TTPreCommit1}) - heap.Push(rq, &workerRequest{taskType: sealtasks.TTPreCommit2}) - heap.Push(rq, &workerRequest{taskType: sealtasks.TTPreCommit1}) - heap.Push(rq, &workerRequest{taskType: sealtasks.TTAddPiece}) + rq.Push(&workerRequest{taskType: sealtasks.TTAddPiece}) + rq.Push(&workerRequest{taskType: sealtasks.TTPreCommit1}) + rq.Push(&workerRequest{taskType: sealtasks.TTPreCommit2}) + rq.Push(&workerRequest{taskType: sealtasks.TTPreCommit1}) + rq.Push(&workerRequest{taskType: sealtasks.TTAddPiece}) - pt := heap.Pop(rq).(*workerRequest) + dump := func(s string) { + fmt.Println("---") + fmt.Println(s) + + for sqi := 0; sqi < rq.Len(); sqi++ { + task := (*rq)[sqi] + fmt.Println(sqi, task.taskType) + } + } + + dump("start") + + pt := rq.Remove(0) + + dump("pop 1") if pt.taskType != sealtasks.TTPreCommit2 { t.Error("expected precommit2, got", pt.taskType) } - pt = heap.Pop(rq).(*workerRequest) + pt = rq.Remove(0) + + dump("pop 2") + + if pt.taskType != sealtasks.TTPreCommit1 { + t.Error("expected precommit1, got", pt.taskType) + } + + pt = rq.Remove(1) + + dump("pop 3") + + if pt.taskType != sealtasks.TTAddPiece { + t.Error("expected addpiece, got", pt.taskType) + } + + pt = rq.Remove(0) + + dump("pop 4") if pt.taskType != sealtasks.TTPreCommit1 { t.Error("expected precommit1, got", pt.taskType) diff --git a/sched.go b/sched.go index 239b52063..b549eb7c9 100644 --- a/sched.go +++ b/sched.go @@ -1,7 +1,6 @@ package sectorstorage import ( - "container/heap" "context" "fmt" "math/rand" @@ -222,7 +221,7 @@ func (sh *scheduler) runSched() { sh.dropWorker(wid) case req := <-sh.schedule: - heap.Push(sh.schedQueue, req) + sh.schedQueue.Push(req) sh.trySched() if sh.testSync != nil { @@ -381,7 +380,7 @@ func (sh *scheduler) trySched() { windows[selectedWindow].todo = append(windows[selectedWindow].todo, task) - heap.Remove(sh.schedQueue, sqi) + sh.schedQueue.Remove(sqi) sqi-- scheduled++ }