Merge pull request #67 from filecoin-project/feat/windows

Window based scheduling
This commit is contained in:
Łukasz Magiera 2020-07-17 23:35:54 +02:00 committed by GitHub
commit a109ef9cbe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 932 additions and 267 deletions

View File

@ -208,7 +208,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
var selector WorkerSelector
if len(best) == 0 { // new
selector, err = newAllocSelector(ctx, m.index, stores.FTUnsealed, stores.PathSealing)
selector = newAllocSelector(ctx, m.index, stores.FTUnsealed, stores.PathSealing)
} else { // append to existing
selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false)
}
@ -269,7 +269,7 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
var selector WorkerSelector
var err error
if len(existingPieces) == 0 { // new
selector, err = newAllocSelector(ctx, m.index, stores.FTUnsealed, stores.PathSealing)
selector = newAllocSelector(ctx, m.index, stores.FTUnsealed, stores.PathSealing)
} else { // use existing
selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false)
}
@ -300,10 +300,7 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
// TODO: also consider where the unsealed data sits
selector, err := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed, stores.PathSealing)
if err != nil {
return nil, xerrors.Errorf("creating path selector: %w", err)
}
selector := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed, stores.PathSealing)
err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit1, selector, schedFetch(sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error {
p, err := w.SealPreCommit1(ctx, sector, ticket, pieces)
@ -417,11 +414,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU
return err
}
fetchSel, err := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed, stores.PathStorage)
if err != nil {
return xerrors.Errorf("creating fetchSel: %w", err)
}
fetchSel := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed, stores.PathStorage)
moveUnsealed := unsealed
{
if len(keepUnsealed) == 0 {
@ -496,8 +489,8 @@ func (m *Manager) FsStat(ctx context.Context, id stores.ID) (fsutil.FsStat, erro
return m.storage.FsStat(ctx, id)
}
func (m *Manager) Close() error {
return m.sched.Close()
func (m *Manager) Close(ctx context.Context) error {
return m.sched.Close(ctx)
}
var _ SectorManager = &Manager{}

View File

@ -23,6 +23,10 @@ import (
"github.com/filecoin-project/sector-storage/stores"
)
func init() {
logging.SetAllLoggers(logging.LevelDebug)
}
type testStorage stores.StorageConfig
func (t testStorage) DiskUsage(path string) (int64, error) {

621
sched.go
View File

@ -3,11 +3,12 @@ package sectorstorage
import (
"container/heap"
"context"
"fmt"
"math/rand"
"sort"
"sync"
"time"
"github.com/hashicorp/go-multierror"
"golang.org/x/xerrors"
"github.com/filecoin-project/specs-actors/actors/abi"
@ -20,6 +21,11 @@ type schedPrioCtxKey int
var SchedPriorityKey schedPrioCtxKey
var DefaultSchedPriority = 0
var SelectorTimeout = 5 * time.Second
var (
SchedWindows = 2
)
func getPriority(ctx context.Context) int {
sp := ctx.Value(SchedPriorityKey)
@ -56,11 +62,69 @@ type scheduler struct {
watchClosing chan WorkerID
workerClosing chan WorkerID
schedule chan *workerRequest
workerFree chan WorkerID
closing chan struct{}
schedule chan *workerRequest
windowRequests chan *schedWindowRequest
schedQueue *requestQueue
// owned by the sh.runSched goroutine
schedQueue *requestQueue
openWindows []*schedWindowRequest
closing chan struct{}
closed chan struct{}
testSync chan struct{} // used for testing
}
type workerHandle struct {
w Worker
info storiface.WorkerInfo
preparing *activeResources
active *activeResources
// for sync manager goroutine closing
cleanupStarted bool
closedMgr chan struct{}
closingMgr chan struct{}
}
type schedWindowRequest struct {
worker WorkerID
done chan *schedWindow
}
type schedWindow struct {
allocated activeResources
todo []*workerRequest
}
type activeResources struct {
memUsedMin uint64
memUsedMax uint64
gpuUsed bool
cpuUse uint64
cond *sync.Cond
}
type workerRequest struct {
sector abi.SectorID
taskType sealtasks.TaskType
priority int // larger values more important
sel WorkerSelector
prepare WorkerAction
work WorkerAction
index int // The index of the item in the heap.
ret chan<- workerResponse
ctx context.Context
}
type workerResponse struct {
err error
}
func newScheduler(spt abi.RegisteredSealProof) *scheduler {
@ -75,11 +139,13 @@ func newScheduler(spt abi.RegisteredSealProof) *scheduler {
watchClosing: make(chan WorkerID),
workerClosing: make(chan WorkerID),
schedule: make(chan *workerRequest),
workerFree: make(chan WorkerID),
closing: make(chan struct{}),
schedule: make(chan *workerRequest),
windowRequests: make(chan *schedWindowRequest),
schedQueue: &requestQueue{},
closing: make(chan struct{}),
closed: make(chan struct{}),
}
}
@ -115,25 +181,6 @@ func (sh *scheduler) Schedule(ctx context.Context, sector abi.SectorID, taskType
}
}
type workerRequest struct {
sector abi.SectorID
taskType sealtasks.TaskType
priority int // larger values more important
sel WorkerSelector
prepare WorkerAction
work WorkerAction
index int // The index of the item in the heap.
ret chan<- workerResponse
ctx context.Context
}
type workerResponse struct {
err error
}
func (r *workerRequest) respond(err error) {
select {
case r.ret <- workerResponse{err: err}:
@ -142,46 +189,30 @@ func (r *workerRequest) respond(err error) {
}
}
type activeResources struct {
memUsedMin uint64
memUsedMax uint64
gpuUsed bool
cpuUse uint64
cond *sync.Cond
}
type workerHandle struct {
w Worker
info storiface.WorkerInfo
preparing *activeResources
active *activeResources
}
func (sh *scheduler) runSched() {
defer close(sh.closed)
go sh.runWorkerWatcher()
for {
select {
case w := <-sh.newWorkers:
sh.schedNewWorker(w)
case wid := <-sh.workerClosing:
sh.schedDropWorker(wid)
case req := <-sh.schedule:
scheduled, err := sh.maybeSchedRequest(req)
if err != nil {
req.respond(err)
continue
}
if scheduled {
continue
}
sh.newWorker(w)
case wid := <-sh.workerClosing:
sh.dropWorker(wid)
case req := <-sh.schedule:
heap.Push(sh.schedQueue, req)
case wid := <-sh.workerFree:
sh.onWorkerFreed(wid)
sh.trySched()
if sh.testSync != nil {
sh.testSync <- struct{}{}
}
case req := <-sh.windowRequests:
sh.openWindows = append(sh.openWindows, req)
sh.trySched()
case <-sh.closing:
sh.schedClose()
return
@ -189,105 +220,267 @@ func (sh *scheduler) runSched() {
}
}
func (sh *scheduler) onWorkerFreed(wid WorkerID) {
sh.workersLk.Lock()
w, ok := sh.workers[wid]
sh.workersLk.Unlock()
if !ok {
log.Warnf("onWorkerFreed on invalid worker %d", wid)
func (sh *scheduler) trySched() {
/*
This assigns tasks to workers based on:
- Task priority (achieved by handling sh.schedQueue in order, since it's already sorted by priority)
- Worker resource availability
- Task-specified worker preference (acceptableWindows array below sorted by this preference)
- Window request age
1. For each task in the schedQueue find windows which can handle them
1.1. Create list of windows capable of handling a task
1.2. Sort windows according to task selector preferences
2. Going through schedQueue again, assign task to first acceptable window
with resources available
3. Submit windows with scheduled tasks to workers
*/
windows := make([]schedWindow, len(sh.openWindows))
acceptableWindows := make([][]int, sh.schedQueue.Len())
log.Debugf("trySched %d queued; %d open windows", sh.schedQueue.Len(), len(windows))
// Step 1
for sqi := 0; sqi < sh.schedQueue.Len(); sqi++ {
task := (*sh.schedQueue)[sqi]
needRes := ResourceTable[task.taskType][sh.spt]
for wnd, windowRequest := range sh.openWindows {
worker := sh.workers[windowRequest.worker]
// TODO: allow bigger windows
if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, worker.info.Resources) {
continue
}
ok, err := task.sel.Ok(task.ctx, task.taskType, sh.spt, worker)
if err != nil {
log.Errorf("trySched(1) req.sel.Ok error: %+v", err)
continue
}
if !ok {
continue
}
acceptableWindows[sqi] = append(acceptableWindows[sqi], wnd)
}
if len(acceptableWindows[sqi]) == 0 {
continue
}
// Pick best worker (shuffle in case some workers are equally as good)
rand.Shuffle(len(acceptableWindows[sqi]), func(i, j int) {
acceptableWindows[sqi][i], acceptableWindows[sqi][j] = acceptableWindows[sqi][j], acceptableWindows[sqi][i]
})
sort.SliceStable(acceptableWindows[sqi], func(i, j int) bool {
wii := sh.openWindows[acceptableWindows[sqi][i]].worker
wji := sh.openWindows[acceptableWindows[sqi][j]].worker
if wii == wji {
// for the same worker prefer older windows
return acceptableWindows[sqi][i] < acceptableWindows[sqi][j]
}
wi := sh.workers[wii]
wj := sh.workers[wji]
rpcCtx, cancel := context.WithTimeout(task.ctx, SelectorTimeout)
defer cancel()
r, err := task.sel.Cmp(rpcCtx, task.taskType, wi, wj)
if err != nil {
log.Error("selecting best worker: %s", err)
}
return r
})
}
// Step 2
scheduled := 0
for sqi := 0; sqi < sh.schedQueue.Len(); sqi++ {
task := (*sh.schedQueue)[sqi]
needRes := ResourceTable[task.taskType][sh.spt]
selectedWindow := -1
for _, wnd := range acceptableWindows[sqi+scheduled] {
wid := sh.openWindows[wnd].worker
wr := sh.workers[wid].info.Resources
log.Debugf("trySched try assign sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd)
// TODO: allow bigger windows
if !windows[wnd].allocated.canHandleRequest(needRes, wid, wr) {
continue
}
log.Debugf("trySched ASSIGNED sqi:%d sector %d to window %d", sqi, task.sector.Number, wnd)
windows[wnd].allocated.add(wr, needRes)
selectedWindow = wnd
break
}
if selectedWindow < 0 {
// all windows full
continue
}
windows[selectedWindow].todo = append(windows[selectedWindow].todo, task)
heap.Remove(sh.schedQueue, sqi)
sqi--
scheduled++
}
// Step 3
if scheduled == 0 {
return
}
for i := 0; i < sh.schedQueue.Len(); i++ {
req := (*sh.schedQueue)[i]
ok, err := req.sel.Ok(req.ctx, req.taskType, sh.spt, w)
if err != nil {
log.Errorf("onWorkerFreed req.sel.Ok error: %+v", err)
scheduledWindows := map[int]struct{}{}
for wnd, window := range windows {
if len(window.todo) == 0 {
// Nothing scheduled here, keep the window open
continue
}
if !ok {
continue
}
scheduledWindows[wnd] = struct{}{}
scheduled, err := sh.maybeSchedRequest(req)
if err != nil {
req.respond(err)
continue
}
if scheduled {
heap.Remove(sh.schedQueue, i)
i--
continue
window := window // copy
select {
case sh.openWindows[wnd].done <- &window:
default:
log.Error("expected sh.openWindows[wnd].done to be buffered")
}
}
// Rewrite sh.openWindows array, removing scheduled windows
newOpenWindows := make([]*schedWindowRequest, 0, len(sh.openWindows)-len(scheduledWindows))
for wnd, window := range sh.openWindows {
if _, scheduled := scheduledWindows[wnd]; scheduled {
// keep unscheduled windows open
continue
}
newOpenWindows = append(newOpenWindows, window)
}
sh.openWindows = newOpenWindows
}
var selectorTimeout = 5 * time.Second
func (sh *scheduler) runWorker(wid WorkerID) {
var ready sync.WaitGroup
ready.Add(1)
defer ready.Wait()
func (sh *scheduler) maybeSchedRequest(req *workerRequest) (bool, error) {
sh.workersLk.Lock()
defer sh.workersLk.Unlock()
go func() {
sh.workersLk.Lock()
worker, found := sh.workers[wid]
sh.workersLk.Unlock()
tried := 0
var acceptable []WorkerID
ready.Done()
needRes := ResourceTable[req.taskType][sh.spt]
if !found {
panic(fmt.Sprintf("worker %d not found", wid))
}
for wid, worker := range sh.workers {
rpcCtx, cancel := context.WithTimeout(req.ctx, selectorTimeout)
ok, err := req.sel.Ok(rpcCtx, req.taskType, sh.spt, worker)
cancel()
defer close(worker.closedMgr)
scheduledWindows := make(chan *schedWindow, SchedWindows)
taskDone := make(chan struct{}, 1)
windowsRequested := 0
var activeWindows []*schedWindow
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
workerClosing, err := worker.w.Closing(ctx)
if err != nil {
return false, err
return
}
if !ok {
continue
}
tried++
defer func() {
log.Warnw("Worker closing", "workerid", wid)
if !canHandleRequest(needRes, wid, worker.info.Resources, worker.preparing) {
continue
}
// TODO: close / return all queued tasks
}()
acceptable = append(acceptable, wid)
}
if len(acceptable) > 0 {
{
var serr error
sort.SliceStable(acceptable, func(i, j int) bool {
rpcCtx, cancel := context.WithTimeout(req.ctx, selectorTimeout)
defer cancel()
r, err := req.sel.Cmp(rpcCtx, req.taskType, sh.workers[acceptable[i]], sh.workers[acceptable[j]])
if err != nil {
serr = multierror.Append(serr, err)
for {
// ask for more windows if we need them
for ; windowsRequested < SchedWindows; windowsRequested++ {
select {
case sh.windowRequests <- &schedWindowRequest{
worker: wid,
done: scheduledWindows,
}:
case <-sh.closing:
return
case <-workerClosing:
return
case <-worker.closingMgr:
return
}
return r
})
}
if serr != nil {
return false, xerrors.Errorf("error(s) selecting best worker: %w", serr)
select {
case w := <-scheduledWindows:
activeWindows = append(activeWindows, w)
case <-taskDone:
log.Debugw("task done", "workerid", wid)
case <-sh.closing:
return
case <-workerClosing:
return
case <-worker.closingMgr:
return
}
assignLoop:
// process windows in order
for len(activeWindows) > 0 {
// process tasks within a window in order
for len(activeWindows[0].todo) > 0 {
todo := activeWindows[0].todo[0]
needRes := ResourceTable[todo.taskType][sh.spt]
sh.workersLk.Lock()
ok := worker.preparing.canHandleRequest(needRes, wid, worker.info.Resources)
if !ok {
sh.workersLk.Unlock()
break assignLoop
}
log.Debugf("assign worker sector %d", todo.sector.Number)
err := sh.assignWorker(taskDone, wid, worker, todo)
sh.workersLk.Unlock()
if err != nil {
log.Error("assignWorker error: %+v", err)
go todo.respond(xerrors.Errorf("assignWorker error: %w", err))
}
activeWindows[0].todo = activeWindows[0].todo[1:]
}
copy(activeWindows, activeWindows[1:])
activeWindows[len(activeWindows)-1] = nil
activeWindows = activeWindows[:len(activeWindows)-1]
windowsRequested--
}
}
return true, sh.assignWorker(acceptable[0], sh.workers[acceptable[0]], req)
}
if tried == 0 {
return false, xerrors.New("maybeSchedRequest didn't find any good workers")
}
return false, nil // put in waiting queue
}()
}
func (sh *scheduler) assignWorker(wid WorkerID, w *workerHandle, req *workerRequest) error {
func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *workerHandle, req *workerRequest) error {
needRes := ResourceTable[req.taskType][sh.spt]
w.preparing.add(w.info.Resources, needRes)
@ -301,7 +494,7 @@ func (sh *scheduler) assignWorker(wid WorkerID, w *workerHandle, req *workerRequ
sh.workersLk.Unlock()
select {
case sh.workerFree <- wid:
case taskDone <- struct{}{}:
case <-sh.closing:
log.Warnf("scheduler closed while sending response (prepare error: %+v)", err)
}
@ -322,7 +515,7 @@ func (sh *scheduler) assignWorker(wid WorkerID, w *workerHandle, req *workerRequ
defer sh.workersLk.Lock() // we MUST return locked from this function
select {
case sh.workerFree <- wid:
case taskDone <- struct{}{}:
case <-sh.closing:
}
@ -350,110 +543,10 @@ func (sh *scheduler) assignWorker(wid WorkerID, w *workerHandle, req *workerRequ
return nil
}
func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerResources, r Resources, locker sync.Locker, cb func() error) error {
for !canHandleRequest(r, id, wr, a) {
if a.cond == nil {
a.cond = sync.NewCond(locker)
}
a.cond.Wait()
}
func (sh *scheduler) newWorker(w *workerHandle) {
w.closedMgr = make(chan struct{})
w.closingMgr = make(chan struct{})
a.add(wr, r)
err := cb()
a.free(wr, r)
if a.cond != nil {
a.cond.Broadcast()
}
return err
}
func (a *activeResources) add(wr storiface.WorkerResources, r Resources) {
a.gpuUsed = r.CanGPU
if r.MultiThread() {
a.cpuUse += wr.CPUs
} else {
a.cpuUse += uint64(r.Threads)
}
a.memUsedMin += r.MinMemory
a.memUsedMax += r.MaxMemory
}
func (a *activeResources) free(wr storiface.WorkerResources, r Resources) {
if r.CanGPU {
a.gpuUsed = false
}
if r.MultiThread() {
a.cpuUse -= wr.CPUs
} else {
a.cpuUse -= uint64(r.Threads)
}
a.memUsedMin -= r.MinMemory
a.memUsedMax -= r.MaxMemory
}
func canHandleRequest(needRes Resources, wid WorkerID, res storiface.WorkerResources, active *activeResources) bool {
// TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running)
minNeedMem := res.MemReserved + active.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory
if minNeedMem > res.MemPhysical {
log.Debugf("sched: not scheduling on worker %d; not enough physical memory - need: %dM, have %dM", wid, minNeedMem/mib, res.MemPhysical/mib)
return false
}
maxNeedMem := res.MemReserved + active.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory
if maxNeedMem > res.MemSwap+res.MemPhysical {
log.Debugf("sched: not scheduling on worker %d; not enough virtual memory - need: %dM, have %dM", wid, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib)
return false
}
if needRes.MultiThread() {
if active.cpuUse > 0 {
log.Debugf("sched: not scheduling on worker %d; multicore process needs %d threads, %d in use, target %d", wid, res.CPUs, active.cpuUse, res.CPUs)
return false
}
} else {
if active.cpuUse+uint64(needRes.Threads) > res.CPUs {
log.Debugf("sched: not scheduling on worker %d; not enough threads, need %d, %d in use, target %d", wid, needRes.Threads, active.cpuUse, res.CPUs)
return false
}
}
if len(res.GPUs) > 0 && needRes.CanGPU {
if active.gpuUsed {
log.Debugf("sched: not scheduling on worker %d; GPU in use", wid)
return false
}
}
return true
}
func (a *activeResources) utilization(wr storiface.WorkerResources) float64 {
var max float64
cpu := float64(a.cpuUse) / float64(wr.CPUs)
max = cpu
memMin := float64(a.memUsedMin+wr.MemReserved) / float64(wr.MemPhysical)
if memMin > max {
max = memMin
}
memMax := float64(a.memUsedMax+wr.MemReserved) / float64(wr.MemPhysical+wr.MemSwap)
if memMax > max {
max = memMax
}
return max
}
func (sh *scheduler) schedNewWorker(w *workerHandle) {
sh.workersLk.Lock()
id := sh.nextWorker
@ -462,41 +555,73 @@ func (sh *scheduler) schedNewWorker(w *workerHandle) {
sh.workersLk.Unlock()
sh.runWorker(id)
select {
case sh.watchClosing <- id:
case <-sh.closing:
return
}
sh.onWorkerFreed(id)
}
func (sh *scheduler) schedDropWorker(wid WorkerID) {
func (sh *scheduler) dropWorker(wid WorkerID) {
sh.workersLk.Lock()
defer sh.workersLk.Unlock()
w := sh.workers[wid]
delete(sh.workers, wid)
go func() {
if err := w.w.Close(); err != nil {
log.Warnf("closing worker %d: %+v", err)
sh.workerCleanup(wid, w)
delete(sh.workers, wid)
}
func (sh *scheduler) workerCleanup(wid WorkerID, w *workerHandle) {
if !w.cleanupStarted {
close(w.closingMgr)
}
select {
case <-w.closedMgr:
case <-time.After(time.Second):
log.Errorf("timeout closing worker manager goroutine %d", wid)
}
if !w.cleanupStarted {
w.cleanupStarted = true
newWindows := make([]*schedWindowRequest, 0, len(sh.openWindows))
for _, window := range sh.openWindows {
if window.worker != wid {
newWindows = append(newWindows, window)
}
}
}()
sh.openWindows = newWindows
log.Debugf("dropWorker %d", wid)
go func() {
if err := w.w.Close(); err != nil {
log.Warnf("closing worker %d: %+v", err)
}
}()
}
}
func (sh *scheduler) schedClose() {
sh.workersLk.Lock()
defer sh.workersLk.Unlock()
log.Debugf("closing scheduler")
for i, w := range sh.workers {
if err := w.w.Close(); err != nil {
log.Errorf("closing worker %d: %+v", i, err)
}
sh.workerCleanup(i, w)
}
}
func (sh *scheduler) Close() error {
func (sh *scheduler) Close(ctx context.Context) error {
close(sh.closing)
select {
case <-sh.closed:
case <-ctx.Done():
return ctx.Err()
}
return nil
}

110
sched_resources.go Normal file
View File

@ -0,0 +1,110 @@
package sectorstorage
import (
"sync"
"github.com/filecoin-project/sector-storage/storiface"
)
func (a *activeResources) withResources(id WorkerID, wr storiface.WorkerResources, r Resources, locker sync.Locker, cb func() error) error {
for !a.canHandleRequest(r, id, wr) {
if a.cond == nil {
a.cond = sync.NewCond(locker)
}
a.cond.Wait()
}
a.add(wr, r)
err := cb()
a.free(wr, r)
if a.cond != nil {
a.cond.Broadcast()
}
return err
}
func (a *activeResources) add(wr storiface.WorkerResources, r Resources) {
a.gpuUsed = r.CanGPU
if r.MultiThread() {
a.cpuUse += wr.CPUs
} else {
a.cpuUse += uint64(r.Threads)
}
a.memUsedMin += r.MinMemory
a.memUsedMax += r.MaxMemory
}
func (a *activeResources) free(wr storiface.WorkerResources, r Resources) {
if r.CanGPU {
a.gpuUsed = false
}
if r.MultiThread() {
a.cpuUse -= wr.CPUs
} else {
a.cpuUse -= uint64(r.Threads)
}
a.memUsedMin -= r.MinMemory
a.memUsedMax -= r.MaxMemory
}
func (a *activeResources) canHandleRequest(needRes Resources, wid WorkerID, res storiface.WorkerResources) bool {
// TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running)
minNeedMem := res.MemReserved + a.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory
if minNeedMem > res.MemPhysical {
log.Debugf("sched: not scheduling on worker %d; not enough physical memory - need: %dM, have %dM", wid, minNeedMem/mib, res.MemPhysical/mib)
return false
}
maxNeedMem := res.MemReserved + a.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory
if maxNeedMem > res.MemSwap+res.MemPhysical {
log.Debugf("sched: not scheduling on worker %d; not enough virtual memory - need: %dM, have %dM", wid, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib)
return false
}
if needRes.MultiThread() {
if a.cpuUse > 0 {
log.Debugf("sched: not scheduling on worker %d; multicore process needs %d threads, %d in use, target %d", wid, res.CPUs, a.cpuUse, res.CPUs)
return false
}
} else {
if a.cpuUse+uint64(needRes.Threads) > res.CPUs {
log.Debugf("sched: not scheduling on worker %d; not enough threads, need %d, %d in use, target %d", wid, needRes.Threads, a.cpuUse, res.CPUs)
return false
}
}
if len(res.GPUs) > 0 && needRes.CanGPU {
if a.gpuUsed {
log.Debugf("sched: not scheduling on worker %d; GPU in use", wid)
return false
}
}
return true
}
func (a *activeResources) utilization(wr storiface.WorkerResources) float64 {
var max float64
cpu := float64(a.cpuUse) / float64(wr.CPUs)
max = cpu
memMin := float64(a.memUsedMin+wr.MemReserved) / float64(wr.MemPhysical)
if memMin > max {
max = memMin
}
memMax := float64(a.memUsedMax+wr.MemReserved) / float64(wr.MemPhysical+wr.MemSwap)
if memMax > max {
max = memMax
}
return max
}

View File

@ -2,9 +2,23 @@ package sectorstorage
import (
"context"
"fmt"
"io"
"runtime"
"sync"
"testing"
"time"
"github.com/ipfs/go-cid"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/sector-storage/fsutil"
"github.com/filecoin-project/sector-storage/sealtasks"
"github.com/filecoin-project/sector-storage/stores"
"github.com/filecoin-project/sector-storage/storiface"
"github.com/filecoin-project/specs-storage/storage"
)
func TestWithPriority(t *testing.T) {
@ -16,3 +30,421 @@ func TestWithPriority(t *testing.T) {
require.Equal(t, 2222, getPriority(ctx))
}
type schedTestWorker struct {
name string
taskTypes map[sealtasks.TaskType]struct{}
paths []stores.StoragePath
closed bool
closing chan struct{}
}
func (s *schedTestWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) {
panic("implement me")
}
func (s *schedTestWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storage.SectorCids, error) {
panic("implement me")
}
func (s *schedTestWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) {
panic("implement me")
}
func (s *schedTestWorker) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storage.Proof, error) {
panic("implement me")
}
func (s *schedTestWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) error {
panic("implement me")
}
func (s *schedTestWorker) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error {
panic("implement me")
}
func (s *schedTestWorker) Remove(ctx context.Context, sector abi.SectorID) error {
panic("implement me")
}
func (s *schedTestWorker) NewSector(ctx context.Context, sector abi.SectorID) error {
panic("implement me")
}
func (s *schedTestWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) {
panic("implement me")
}
func (s *schedTestWorker) MoveStorage(ctx context.Context, sector abi.SectorID) error {
panic("implement me")
}
func (s *schedTestWorker) Fetch(ctx context.Context, id abi.SectorID, ft stores.SectorFileType, ptype stores.PathType, am stores.AcquireMode) error {
panic("implement me")
}
func (s *schedTestWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) error {
panic("implement me")
}
func (s *schedTestWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error {
panic("implement me")
}
func (s *schedTestWorker) TaskTypes(ctx context.Context) (map[sealtasks.TaskType]struct{}, error) {
return s.taskTypes, nil
}
func (s *schedTestWorker) Paths(ctx context.Context) ([]stores.StoragePath, error) {
return s.paths, nil
}
func (s *schedTestWorker) Info(ctx context.Context) (storiface.WorkerInfo, error) {
return storiface.WorkerInfo{
Hostname: s.name,
Resources: storiface.WorkerResources{
MemPhysical: 128 << 30,
MemSwap: 200 << 30,
MemReserved: 2 << 30,
CPUs: 32,
GPUs: []string{"a GPU"},
},
}, nil
}
func (s *schedTestWorker) Closing(ctx context.Context) (<-chan struct{}, error) {
return s.closing, nil
}
func (s *schedTestWorker) Close() error {
if !s.closed {
log.Info("close schedTestWorker")
s.closed = true
close(s.closing)
}
return nil
}
var _ Worker = &schedTestWorker{}
func addTestWorker(t *testing.T, sched *scheduler, index *stores.Index, name string, taskTypes map[sealtasks.TaskType]struct{}) {
w := &schedTestWorker{
name: name,
taskTypes: taskTypes,
paths: []stores.StoragePath{{ID: "bb-8", Weight: 2, LocalPath: "<octopus>food</octopus>", CanSeal: true, CanStore: true}},
closing: make(chan struct{}),
}
for _, path := range w.paths {
err := index.StorageAttach(context.TODO(), stores.StorageInfo{
ID: path.ID,
URLs: nil,
Weight: path.Weight,
CanSeal: path.CanSeal,
CanStore: path.CanStore,
}, fsutil.FsStat{
Capacity: 1 << 40,
Available: 1 << 40,
Reserved: 3,
})
require.NoError(t, err)
}
info, err := w.Info(context.TODO())
require.NoError(t, err)
sched.newWorkers <- &workerHandle{
w: w,
info: info,
preparing: &activeResources{},
active: &activeResources{},
}
}
func TestSchedStartStop(t *testing.T) {
spt := abi.RegisteredSealProof_StackedDrg32GiBV1
sched := newScheduler(spt)
go sched.runSched()
addTestWorker(t, sched, stores.NewIndex(), "fred", nil)
require.NoError(t, sched.Close(context.TODO()))
}
func TestSched(t *testing.T) {
ctx, done := context.WithTimeout(context.Background(), 30*time.Second)
defer done()
spt := abi.RegisteredSealProof_StackedDrg32GiBV1
type workerSpec struct {
name string
taskTypes map[sealtasks.TaskType]struct{}
}
noopAction := func(ctx context.Context, w Worker) error {
return nil
}
type runMeta struct {
done map[string]chan struct{}
wg sync.WaitGroup
}
type task func(*testing.T, *scheduler, *stores.Index, *runMeta)
sched := func(taskName, expectWorker string, sid abi.SectorNumber, taskType sealtasks.TaskType) task {
_, _, l, _ := runtime.Caller(1)
_, _, l2, _ := runtime.Caller(2)
return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) {
done := make(chan struct{})
rm.done[taskName] = done
sel := newAllocSelector(ctx, index, stores.FTCache, stores.PathSealing)
rm.wg.Add(1)
go func() {
defer rm.wg.Done()
sectorNum := abi.SectorID{
Miner: 8,
Number: sid,
}
err := sched.Schedule(ctx, sectorNum, taskType, sel, func(ctx context.Context, w Worker) error {
wi, err := w.Info(ctx)
require.NoError(t, err)
require.Equal(t, expectWorker, wi.Hostname)
log.Info("IN ", taskName)
for {
_, ok := <-done
if !ok {
break
}
}
log.Info("OUT ", taskName)
return nil
}, noopAction)
require.NoError(t, err, fmt.Sprint(l, l2))
}()
<-sched.testSync
}
}
taskStarted := func(name string) task {
_, _, l, _ := runtime.Caller(1)
_, _, l2, _ := runtime.Caller(2)
return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) {
select {
case rm.done[name] <- struct{}{}:
case <-ctx.Done():
t.Fatal("ctx error", ctx.Err(), l, l2)
}
}
}
taskDone := func(name string) task {
_, _, l, _ := runtime.Caller(1)
_, _, l2, _ := runtime.Caller(2)
return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) {
select {
case rm.done[name] <- struct{}{}:
case <-ctx.Done():
t.Fatal("ctx error", ctx.Err(), l, l2)
}
close(rm.done[name])
}
}
taskNotScheduled := func(name string) task {
_, _, l, _ := runtime.Caller(1)
_, _, l2, _ := runtime.Caller(2)
return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) {
select {
case rm.done[name] <- struct{}{}:
t.Fatal("not expected", l, l2)
case <-time.After(10 * time.Millisecond): // TODO: better synchronization thingy
}
}
}
testFunc := func(workers []workerSpec, tasks []task) func(t *testing.T) {
return func(t *testing.T) {
index := stores.NewIndex()
sched := newScheduler(spt)
sched.testSync = make(chan struct{})
go sched.runSched()
for _, worker := range workers {
addTestWorker(t, sched, index, worker.name, worker.taskTypes)
}
rm := runMeta{
done: map[string]chan struct{}{},
}
for _, task := range tasks {
task(t, sched, index, &rm)
}
log.Info("wait for async stuff")
rm.wg.Wait()
require.NoError(t, sched.Close(context.TODO()))
}
}
multTask := func(tasks ...task) task {
return func(t *testing.T, s *scheduler, index *stores.Index, meta *runMeta) {
for _, tsk := range tasks {
tsk(t, s, index, meta)
}
}
}
t.Run("one-pc1", testFunc([]workerSpec{
{name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
}, []task{
sched("pc1-1", "fred", 8, sealtasks.TTPreCommit1),
taskDone("pc1-1"),
}))
t.Run("pc1-2workers-1", testFunc([]workerSpec{
{name: "fred2", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2: {}}},
{name: "fred1", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
}, []task{
sched("pc1-1", "fred1", 8, sealtasks.TTPreCommit1),
taskDone("pc1-1"),
}))
t.Run("pc1-2workers-2", testFunc([]workerSpec{
{name: "fred1", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
{name: "fred2", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2: {}}},
}, []task{
sched("pc1-1", "fred1", 8, sealtasks.TTPreCommit1),
taskDone("pc1-1"),
}))
t.Run("pc1-block-pc2", testFunc([]workerSpec{
{name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}},
}, []task{
sched("pc1", "fred", 8, sealtasks.TTPreCommit1),
taskStarted("pc1"),
sched("pc2", "fred", 8, sealtasks.TTPreCommit2),
taskNotScheduled("pc2"),
taskDone("pc1"),
taskDone("pc2"),
}))
t.Run("pc2-block-pc1", testFunc([]workerSpec{
{name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}},
}, []task{
sched("pc2", "fred", 8, sealtasks.TTPreCommit2),
taskStarted("pc2"),
sched("pc1", "fred", 8, sealtasks.TTPreCommit1),
taskNotScheduled("pc1"),
taskDone("pc2"),
taskDone("pc1"),
}))
t.Run("pc1-batching", testFunc([]workerSpec{
{name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
}, []task{
sched("t1", "fred", 8, sealtasks.TTPreCommit1),
taskStarted("t1"),
sched("t2", "fred", 8, sealtasks.TTPreCommit1),
taskStarted("t2"),
// with worker settings, we can only run 2 parallel PC1s
// start 2 more to fill fetch buffer
sched("t3", "fred", 8, sealtasks.TTPreCommit1),
taskNotScheduled("t3"),
sched("t4", "fred", 8, sealtasks.TTPreCommit1),
taskNotScheduled("t4"),
taskDone("t1"),
taskDone("t2"),
taskStarted("t3"),
taskStarted("t4"),
taskDone("t3"),
taskDone("t4"),
}))
twoPC1 := func(prefix string, sid abi.SectorNumber, schedAssert func(name string) task) task {
return multTask(
sched(prefix+"-a", "fred", sid, sealtasks.TTPreCommit1),
schedAssert(prefix+"-a"),
sched(prefix+"-b", "fred", sid+1, sealtasks.TTPreCommit1),
schedAssert(prefix+"-b"),
)
}
twoPC1Act := func(prefix string, schedAssert func(name string) task) task {
return multTask(
schedAssert(prefix+"-a"),
schedAssert(prefix+"-b"),
)
}
// run this one a bunch of times, it had a very annoying tendency to fail randomly
for i := 0; i < 40; i++ {
t.Run("pc1-pc2-prio", testFunc([]workerSpec{
{name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}},
}, []task{
// fill queues
twoPC1("w0", 0, taskStarted),
twoPC1("w1", 2, taskNotScheduled),
// windowed
sched("t1", "fred", 8, sealtasks.TTPreCommit1),
taskNotScheduled("t1"),
sched("t2", "fred", 9, sealtasks.TTPreCommit1),
taskNotScheduled("t2"),
sched("t3", "fred", 10, sealtasks.TTPreCommit2),
taskNotScheduled("t3"),
twoPC1Act("w0", taskDone),
twoPC1Act("w1", taskStarted),
twoPC1Act("w1", taskDone),
taskStarted("t3"),
taskNotScheduled("t1"),
taskNotScheduled("t2"),
taskDone("t3"),
taskStarted("t1"),
taskStarted("t2"),
taskDone("t1"),
taskDone("t2"),
}))
}
}

View File

@ -74,7 +74,11 @@ func (sh *scheduler) runWorkerWatcher() {
caseToWorker[toSet] = wid
default:
wid := caseToWorker[n]
wid, found := caseToWorker[n]
if !found {
log.Errorf("worker ID not found for case %d", n)
continue
}
delete(caseToWorker, n)
cases[n] = reflect.SelectCase{

View File

@ -17,12 +17,12 @@ type allocSelector struct {
ptype stores.PathType
}
func newAllocSelector(ctx context.Context, index stores.SectorIndex, alloc stores.SectorFileType, ptype stores.PathType) (*allocSelector, error) {
func newAllocSelector(ctx context.Context, index stores.SectorIndex, alloc stores.SectorFileType, ptype stores.PathType) *allocSelector {
return &allocSelector{
index: index,
alloc: alloc,
ptype: ptype,
}, nil
}
}
func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *workerHandle) (bool, error) {

View File

@ -29,9 +29,6 @@ type StorageInfo struct {
CanSeal bool
CanStore bool
LastHeartbeat time.Time
HeartbeatErr error
}
type HealthReport struct {