sched: Configurable assigners

This commit is contained in:
Łukasz Magiera 2022-05-23 16:58:43 +02:00
parent 588b8ecbca
commit 5ba8bd3b99
8 changed files with 116 additions and 10 deletions

View File

@ -122,6 +122,8 @@ type Config struct {
// PoSt config
ParallelCheckLimit int
Assigner string
}
type StorageAuth http.Header
@ -135,6 +137,11 @@ func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores.
return nil, xerrors.Errorf("creating prover instance: %w", err)
}
sh, err := newScheduler(sc.Assigner)
if err != nil {
return nil, err
}
m := &Manager{
ls: ls,
storage: stor,
@ -142,7 +149,7 @@ func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores.
remoteHnd: &stores.FetchHandler{Local: lstor, PfHandler: &stores.DefaultPartialFileHandler{}},
index: si,
sched: newScheduler(),
sched: sh,
windowPoStSched: newPoStScheduler(sealtasks.TTGenerateWindowPoSt),
winningPoStSched: newPoStScheduler(sealtasks.TTGenerateWinningPoSt),

View File

@ -109,6 +109,9 @@ func newTestMgr(ctx context.Context, t *testing.T, ds datastore.Datastore) (*Man
stor := stores.NewRemote(lstor, si, nil, 6000, &stores.DefaultPartialFileHandler{})
sh, err := newScheduler("")
require.NoError(t, err)
m := &Manager{
ls: st,
storage: stor,
@ -116,7 +119,7 @@ func newTestMgr(ctx context.Context, t *testing.T, ds datastore.Datastore) (*Man
remoteHnd: &stores.FetchHandler{Local: lstor},
index: si,
sched: newScheduler(),
sched: sh,
windowPoStSched: newPoStScheduler(sealtasks.TTGenerateWindowPoSt),
winningPoStSched: newPoStScheduler(sealtasks.TTGenerateWinningPoSt),

View File

@ -148,9 +148,19 @@ type workerResponse struct {
err error
}
func newScheduler() *Scheduler {
func newScheduler(assigner string) (*Scheduler, error) {
var a Assigner
switch assigner {
case "", "utilization":
a = NewLowestUtilizationAssigner()
case "spread":
a = NewSpreadAssigner()
default:
return nil, xerrors.Errorf("unknown assigner '%s'", assigner)
}
return &Scheduler{
assigner: NewLowestUtilizationAssigner(),
assigner: a,
Workers: map[storiface.WorkerID]*WorkerHandle{},
@ -171,7 +181,7 @@ func newScheduler() *Scheduler {
closing: make(chan struct{}),
closed: make(chan struct{}),
}
}, nil
}
func (sh *Scheduler) Schedule(ctx context.Context, sector storage.SectorRef, taskType sealtasks.TaskType, sel WorkerSelector, prepare WorkerAction, work WorkerAction) error {

View File

@ -0,0 +1,77 @@
package sectorstorage
import (
"math"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
func NewSpreadAssigner() Assigner {
return &AssignerCommon{
WindowSel: SpreadWS,
}
}
func SpreadWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int {
scheduled := 0
rmQueue := make([]int, 0, queueLen)
workerAssigned := map[storiface.WorkerID]int{}
for sqi := 0; sqi < queueLen; sqi++ {
task := (*sh.SchedQueue)[sqi]
selectedWindow := -1
var info storiface.WorkerInfo
var bestWid storiface.WorkerID
bestAssigned := math.MaxInt // smaller = better
for i, wnd := range acceptableWindows[task.IndexHeap] {
wid := sh.OpenWindows[wnd].Worker
w := sh.Workers[wid]
res := info.Resources.ResourceSpec(task.Sector.ProofType, task.TaskType)
log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i)
if !windows[wnd].Allocated.CanHandleRequest(res, wid, "schedAssign", info) {
continue
}
wu, _ := workerAssigned[wid]
if wu >= bestAssigned {
continue
}
info = w.Info
bestWid = wid
selectedWindow = wnd
bestAssigned = wu
}
if selectedWindow < 0 {
// all windows full
continue
}
log.Debugw("SCHED ASSIGNED",
"sqi", sqi,
"sector", task.Sector.ID.Number,
"task", task.TaskType,
"window", selectedWindow,
"worker", bestWid,
"assigned", bestAssigned)
workerAssigned[bestWid] += 1
windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task)
rmQueue = append(rmQueue, sqi)
scheduled++
}
if len(rmQueue) > 0 {
for i := len(rmQueue) - 1; i >= 0; i-- {
sh.SchedQueue.Remove(rmQueue[i])
}
}
return scheduled
}

View File

@ -13,7 +13,6 @@ func NewLowestUtilizationAssigner() Assigner {
}
func LowestUtilizationWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int {
// Step 2
scheduled := 0
rmQueue := make([]int, 0, queueLen)
workerUtil := map[storiface.WorkerID]float64{}
@ -36,7 +35,7 @@ func LowestUtilizationWS(sh *Scheduler, queueLen int, acceptableWindows [][]int,
log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i)
// TODO: allow bigger windows
if !windows[wnd].Allocated.CanHandleRequest(needRes, wid, "schedAssign", info) {
if !windows[wnd].Allocated.CanHandleRequest(res, wid, "schedAssign", info) {
continue
}

View File

@ -223,7 +223,8 @@ func addTestWorker(t *testing.T, sched *Scheduler, index *stores.Index, name str
}
func TestSchedStartStop(t *testing.T) {
sched := newScheduler()
sched, err := newScheduler("")
require.NoError(t, err)
go sched.runSched()
addTestWorker(t, sched, stores.NewIndex(), "fred", nil, decentWorkerResources, false)
@ -352,7 +353,8 @@ func TestSched(t *testing.T) {
return func(t *testing.T) {
index := stores.NewIndex()
sched := newScheduler()
sched, err := newScheduler("")
require.NoError(t, err)
sched.testSync = make(chan struct{})
go sched.runSched()
@ -604,7 +606,8 @@ func BenchmarkTrySched(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
sched := newScheduler()
sched, err := newScheduler("")
require.NoError(b, err)
sched.Workers[storiface.WorkerID{}] = &WorkerHandle{
workerRpc: nil,
Info: storiface.WorkerInfo{

View File

@ -159,6 +159,8 @@ func DefaultStorageMiner() *StorageMiner {
// it's the ratio between 10gbit / 1gbit
ParallelFetchLimit: 10,
Assigner: "utilization",
// By default use the hardware resource filtering strategy.
ResourceFiltering: sectorstorage.ResourceFilteringHardware,
},

View File

@ -330,6 +330,11 @@ type SealerConfig struct {
AllowProveReplicaUpdate2 bool
AllowRegenSectorKey bool
// Assigner specifies the worker assigner to use when scheduling tasks.
// "utilization" (default) - assign tasks to workers with lowest utilization.
// "spread" - assign tasks to as many distinct workers as possible.
Assigner string
// ResourceFiltering instructs the system which resource filtering strategy
// to use when evaluating tasks against this worker. An empty value defaults
// to "hardware".