container/heap doesn't provide sorted array

This commit is contained in:
Łukasz Magiera 2020-07-27 13:20:18 +02:00
parent 9fd91bb70a
commit 510897a497
3 changed files with 52 additions and 20 deletions

View File

@ -1,6 +1,6 @@
package sectorstorage package sectorstorage
import "container/heap" import "sort"
type requestQueue []*workerRequest type requestQueue []*workerRequest
@ -24,21 +24,22 @@ func (q requestQueue) Swap(i, j int) {
q[j].index = j q[j].index = j
} }
func (q *requestQueue) Push(x interface{}) { func (q *requestQueue) Push(x *workerRequest) {
n := len(*q) n := len(*q)
item := x.(*workerRequest) item := x
item.index = n item.index = n
*q = append(*q, item) *q = append(*q, item)
sort.Sort(q)
} }
func (q *requestQueue) Pop() interface{} { func (q *requestQueue) Remove(i int) *workerRequest {
old := *q old := *q
n := len(old) n := len(old)
item := old[n-1] item := old[i]
old[n-1] = nil // avoid memory leak old[i] = old[n - 1]
item.index = -1 // for safety old[n - 1] = nil
item.index = -1
*q = old[0 : n-1] *q = old[0 : n-1]
sort.Sort(q)
return item return item
} }
var _ heap.Interface = &requestQueue{}

View File

@ -1,7 +1,7 @@
package sectorstorage package sectorstorage
import ( import (
"container/heap" "fmt"
"testing" "testing"
"github.com/filecoin-project/sector-storage/sealtasks" "github.com/filecoin-project/sector-storage/sealtasks"
@ -10,19 +10,51 @@ import (
func TestRequestQueue(t *testing.T) { func TestRequestQueue(t *testing.T) {
rq := &requestQueue{} rq := &requestQueue{}
heap.Push(rq, &workerRequest{taskType: sealtasks.TTAddPiece}) rq.Push(&workerRequest{taskType: sealtasks.TTAddPiece})
heap.Push(rq, &workerRequest{taskType: sealtasks.TTPreCommit1}) rq.Push(&workerRequest{taskType: sealtasks.TTPreCommit1})
heap.Push(rq, &workerRequest{taskType: sealtasks.TTPreCommit2}) rq.Push(&workerRequest{taskType: sealtasks.TTPreCommit2})
heap.Push(rq, &workerRequest{taskType: sealtasks.TTPreCommit1}) rq.Push(&workerRequest{taskType: sealtasks.TTPreCommit1})
heap.Push(rq, &workerRequest{taskType: sealtasks.TTAddPiece}) rq.Push(&workerRequest{taskType: sealtasks.TTAddPiece})
pt := heap.Pop(rq).(*workerRequest) dump := func(s string) {
fmt.Println("---")
fmt.Println(s)
for sqi := 0; sqi < rq.Len(); sqi++ {
task := (*rq)[sqi]
fmt.Println(sqi, task.taskType)
}
}
dump("start")
pt := rq.Remove(0)
dump("pop 1")
if pt.taskType != sealtasks.TTPreCommit2 { if pt.taskType != sealtasks.TTPreCommit2 {
t.Error("expected precommit2, got", pt.taskType) t.Error("expected precommit2, got", pt.taskType)
} }
pt = heap.Pop(rq).(*workerRequest) pt = rq.Remove(0)
dump("pop 2")
if pt.taskType != sealtasks.TTPreCommit1 {
t.Error("expected precommit1, got", pt.taskType)
}
pt = rq.Remove(1)
dump("pop 3")
if pt.taskType != sealtasks.TTAddPiece {
t.Error("expected addpiece, got", pt.taskType)
}
pt = rq.Remove(0)
dump("pop 4")
if pt.taskType != sealtasks.TTPreCommit1 { if pt.taskType != sealtasks.TTPreCommit1 {
t.Error("expected precommit1, got", pt.taskType) t.Error("expected precommit1, got", pt.taskType)

View File

@ -1,7 +1,6 @@
package sectorstorage package sectorstorage
import ( import (
"container/heap"
"context" "context"
"fmt" "fmt"
"math/rand" "math/rand"
@ -222,7 +221,7 @@ func (sh *scheduler) runSched() {
sh.dropWorker(wid) sh.dropWorker(wid)
case req := <-sh.schedule: case req := <-sh.schedule:
heap.Push(sh.schedQueue, req) sh.schedQueue.Push(req)
sh.trySched() sh.trySched()
if sh.testSync != nil { if sh.testSync != nil {
@ -381,7 +380,7 @@ func (sh *scheduler) trySched() {
windows[selectedWindow].todo = append(windows[selectedWindow].todo, task) windows[selectedWindow].todo = append(windows[selectedWindow].todo, task)
heap.Remove(sh.schedQueue, sqi) sh.schedQueue.Remove(sqi)
sqi-- sqi--
scheduled++ scheduled++
} }