sched: Fix tests
This commit is contained in:
parent
da96f06202
commit
903731adaf
@ -22,6 +22,10 @@ import (
|
||||
"github.com/filecoin-project/sector-storage/stores"
|
||||
)
|
||||
|
||||
func init() {
|
||||
logging.SetAllLoggers(logging.LevelDebug)
|
||||
}
|
||||
|
||||
type testStorage stores.StorageConfig
|
||||
|
||||
func newTestStorage(t *testing.T) *testStorage {
|
||||
|
17
sched.go
17
sched.go
@ -88,7 +88,7 @@ type schedWindowRequest struct {
|
||||
|
||||
type schedWindow struct {
|
||||
worker WorkerID
|
||||
allocated *activeResources
|
||||
allocated activeResources
|
||||
todo []*workerRequest
|
||||
}
|
||||
|
||||
@ -132,10 +132,12 @@ func newScheduler(spt abi.RegisteredSealProof) *scheduler {
|
||||
watchClosing: make(chan WorkerID),
|
||||
workerClosing: make(chan WorkerID),
|
||||
|
||||
schedule: make(chan *workerRequest),
|
||||
closing: make(chan struct{}),
|
||||
schedule: make(chan *workerRequest),
|
||||
windowRequests: make(chan *schedWindowRequest),
|
||||
|
||||
schedQueue: &requestQueue{},
|
||||
|
||||
closing: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
@ -295,7 +297,7 @@ func (sh *scheduler) trySched() {
|
||||
wr := sh.workers[wid].info.Resources
|
||||
|
||||
// TODO: allow bigger windows
|
||||
if windows[wnd].allocated.canHandleRequest(needRes, wid, wr) {
|
||||
if !windows[wnd].allocated.canHandleRequest(needRes, wid, wr) {
|
||||
continue
|
||||
}
|
||||
|
||||
@ -305,6 +307,11 @@ func (sh *scheduler) trySched() {
|
||||
break
|
||||
}
|
||||
|
||||
if selectedWindow < 0 {
|
||||
// all windows full
|
||||
continue
|
||||
}
|
||||
|
||||
windows[selectedWindow].todo = append(windows[selectedWindow].todo, task)
|
||||
|
||||
heap.Remove(sh.schedQueue, sqi)
|
||||
@ -327,6 +334,7 @@ func (sh *scheduler) trySched() {
|
||||
|
||||
scheduledWindows[wnd] = struct{}{}
|
||||
|
||||
window := window // copy
|
||||
select {
|
||||
case sh.openWindows[wnd].done <- &window:
|
||||
default:
|
||||
@ -390,6 +398,7 @@ func (sh *scheduler) runWorker(wid WorkerID) {
|
||||
case w := <-scheduledWindows:
|
||||
activeWindows = append(activeWindows, w)
|
||||
case <-taskDone:
|
||||
log.Debugw("task done", "workerid", wid)
|
||||
case <-sh.closing:
|
||||
return
|
||||
case <-workerClosing:
|
||||
|
@ -384,8 +384,8 @@ func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, s
|
||||
}
|
||||
|
||||
sort.Slice(candidates, func(i, j int) bool {
|
||||
iw := big.Mul(big.NewInt(int64(candidates[i].fsi.Available)), big.NewInt(int64(candidates[i].info.Weight)))
|
||||
jw := big.Mul(big.NewInt(int64(candidates[j].fsi.Available)), big.NewInt(int64(candidates[j].info.Weight)))
|
||||
iw := big.Mul(big.NewInt(candidates[i].fsi.Available), big.NewInt(int64(candidates[i].info.Weight)))
|
||||
jw := big.Mul(big.NewInt(candidates[j].fsi.Available), big.NewInt(int64(candidates[j].info.Weight)))
|
||||
|
||||
return iw.GreaterThan(jw)
|
||||
})
|
||||
|
@ -13,6 +13,7 @@ import (
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/sector-storage/fsutil"
|
||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||
)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user