diff --git a/storage/sealer/sched.go b/storage/sealer/sched.go index c2b7d6a2d..4f167eae3 100644 --- a/storage/sealer/sched.go +++ b/storage/sealer/sched.go @@ -157,7 +157,15 @@ func newScheduler(ctx context.Context, assigner string) (*Scheduler, error) { case "", "utilization": a = NewLowestUtilizationAssigner() case "spread": - a = NewSpreadAssigner() + a = NewSpreadAssigner(false) + case "experiment-spread-qcount": + a = NewSpreadAssigner(true) + case "experiment-spread-tasks": + a = NewSpreadTasksAssigner(false) + case "experiment-spread-tasks-qcount": + a = NewSpreadTasksAssigner(true) + case "experiment-random": + a = NewRandomAssigner() default: return nil, xerrors.Errorf("unknown assigner '%s'", assigner) } diff --git a/storage/sealer/sched_assigner_darts.go b/storage/sealer/sched_assigner_darts.go new file mode 100644 index 000000000..e28b70e78 --- /dev/null +++ b/storage/sealer/sched_assigner_darts.go @@ -0,0 +1,88 @@ +package sealer + +import ( + "math/rand" + + "github.com/filecoin-project/lotus/storage/sealer/storiface" +) + +func NewRandomAssigner() Assigner { + return &AssignerCommon{ + WindowSel: RandomWS, + } +} + +func RandomWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int { + scheduled := 0 + rmQueue := make([]int, 0, queueLen) + + for sqi := 0; sqi < queueLen; sqi++ { + task := (*sh.SchedQueue)[sqi] + + //bestAssigned := math.MaxInt // smaller = better + + type choice struct { + selectedWindow int + needRes storiface.Resources + info storiface.WorkerInfo + bestWid storiface.WorkerID + } + choices := make([]choice, 0, len(acceptableWindows[task.IndexHeap])) + + for i, wnd := range acceptableWindows[task.IndexHeap] { + wid := sh.OpenWindows[wnd].Worker + w := sh.Workers[wid] + + res := w.Info.Resources.ResourceSpec(task.Sector.ProofType, task.TaskType) + + log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i) + + if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), res, wid, "schedAssign", w.Info) { + continue + } + + choices = append(choices, choice{ + selectedWindow: wnd, + needRes: res, + info: w.Info, + bestWid: wid, + }) + + } + + if len(choices) == 0 { + // all windows full + continue + } + + // chose randomly + randIndex := rand.Intn(len(choices)) + selectedWindow := choices[randIndex].selectedWindow + needRes := choices[randIndex].needRes + info := choices[randIndex].info + bestWid := choices[randIndex].bestWid + + log.Debugw("SCHED ASSIGNED", + "assigner", "darts", + "sqi", sqi, + "sector", task.Sector.ID.Number, + "task", task.TaskType, + "window", selectedWindow, + "worker", bestWid, + "choices", len(choices)) + + windows[selectedWindow].Allocated.Add(task.SealTask(), info.Resources, needRes) + windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task) + + rmQueue = append(rmQueue, sqi) + scheduled++ + } + + if len(rmQueue) > 0 { + for i := len(rmQueue) - 1; i >= 0; i-- { + sh.SchedQueue.Remove(rmQueue[i]) + } + } + + return scheduled +} diff --git a/storage/sealer/sched_assigner_spread.go b/storage/sealer/sched_assigner_spread.go index f00d24d82..0a62b7406 100644 --- a/storage/sealer/sched_assigner_spread.go +++ b/storage/sealer/sched_assigner_spread.go @@ -6,76 +6,84 @@ import ( "github.com/filecoin-project/lotus/storage/sealer/storiface" ) -func NewSpreadAssigner() Assigner { +func NewSpreadAssigner(queued bool) Assigner { return &AssignerCommon{ - WindowSel: SpreadWS, + WindowSel: SpreadWS(queued), } } -func SpreadWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int { - scheduled := 0 - rmQueue := make([]int, 0, queueLen) - workerAssigned := map[storiface.WorkerID]int{} +func SpreadWS(queued bool) func(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int { + return func(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int { + scheduled := 0 + rmQueue := make([]int, 0, queueLen) + workerAssigned := map[storiface.WorkerID]int{} - for sqi := 0; sqi < queueLen; sqi++ { - task := (*sh.SchedQueue)[sqi] + for sqi := 0; sqi < queueLen; sqi++ { + task := (*sh.SchedQueue)[sqi] - selectedWindow := -1 - var needRes storiface.Resources - var info storiface.WorkerInfo - var bestWid storiface.WorkerID - bestAssigned := math.MaxInt // smaller = better + selectedWindow := -1 + var needRes storiface.Resources + var info storiface.WorkerInfo + var bestWid storiface.WorkerID + bestAssigned := math.MaxInt // smaller = better - for i, wnd := range acceptableWindows[task.IndexHeap] { - wid := sh.OpenWindows[wnd].Worker - w := sh.Workers[wid] + for i, wnd := range acceptableWindows[task.IndexHeap] { + wid := sh.OpenWindows[wnd].Worker + w := sh.Workers[wid] - res := w.Info.Resources.ResourceSpec(task.Sector.ProofType, task.TaskType) + res := w.Info.Resources.ResourceSpec(task.Sector.ProofType, task.TaskType) - log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i) + log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i) - if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), res, wid, "schedAssign", w.Info) { + if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), res, wid, "schedAssign", w.Info) { + continue + } + + wu, found := workerAssigned[wid] + if !found && queued { + wu = w.TaskCounts() + workerAssigned[wid] = wu + } + if wu >= bestAssigned { + continue + } + + info = w.Info + needRes = res + bestWid = wid + selectedWindow = wnd + bestAssigned = wu + } + + if selectedWindow < 0 { + // all windows full continue } - wu, _ := workerAssigned[wid] - if wu >= bestAssigned { - continue + log.Debugw("SCHED ASSIGNED", + "assigner", "spread", + "spread-queued", queued, + "sqi", sqi, + "sector", task.Sector.ID.Number, + "task", task.TaskType, + "window", selectedWindow, + "worker", bestWid, + "assigned", bestAssigned) + + workerAssigned[bestWid]++ + windows[selectedWindow].Allocated.Add(task.SealTask(), info.Resources, needRes) + windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task) + + rmQueue = append(rmQueue, sqi) + scheduled++ + } + + if len(rmQueue) > 0 { + for i := len(rmQueue) - 1; i >= 0; i-- { + sh.SchedQueue.Remove(rmQueue[i]) } - - info = w.Info - needRes = res - bestWid = wid - selectedWindow = wnd - bestAssigned = wu } - if selectedWindow < 0 { - // all windows full - continue - } - - log.Debugw("SCHED ASSIGNED", - "sqi", sqi, - "sector", task.Sector.ID.Number, - "task", task.TaskType, - "window", selectedWindow, - "worker", bestWid, - "assigned", bestAssigned) - - workerAssigned[bestWid]++ - windows[selectedWindow].Allocated.Add(task.SealTask(), info.Resources, needRes) - windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task) - - rmQueue = append(rmQueue, sqi) - scheduled++ + return scheduled } - - if len(rmQueue) > 0 { - for i := len(rmQueue) - 1; i >= 0; i-- { - sh.SchedQueue.Remove(rmQueue[i]) - } - } - - return scheduled } diff --git a/storage/sealer/sched_assigner_spread_tasks.go b/storage/sealer/sched_assigner_spread_tasks.go new file mode 100644 index 000000000..09cf98046 --- /dev/null +++ b/storage/sealer/sched_assigner_spread_tasks.go @@ -0,0 +1,98 @@ +package sealer + +import ( + "math" + + "github.com/filecoin-project/lotus/storage/sealer/sealtasks" + "github.com/filecoin-project/lotus/storage/sealer/storiface" +) + +func NewSpreadTasksAssigner(queued bool) Assigner { + return &AssignerCommon{ + WindowSel: SpreadTasksWS(queued), + } +} + +type widTask struct { + wid storiface.WorkerID + tt sealtasks.TaskType +} + +func SpreadTasksWS(queued bool) func(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int { + return func(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int { + scheduled := 0 + rmQueue := make([]int, 0, queueLen) + workerAssigned := map[widTask]int{} + + for sqi := 0; sqi < queueLen; sqi++ { + task := (*sh.SchedQueue)[sqi] + + selectedWindow := -1 + var needRes storiface.Resources + var info storiface.WorkerInfo + var bestWid widTask + bestAssigned := math.MaxInt // smaller = better + + for i, wnd := range acceptableWindows[task.IndexHeap] { + wid := sh.OpenWindows[wnd].Worker + w := sh.Workers[wid] + + res := w.Info.Resources.ResourceSpec(task.Sector.ProofType, task.TaskType) + + log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i) + + if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), res, wid, "schedAssign", w.Info) { + continue + } + + wt := widTask{wid: wid, tt: task.TaskType} + + wu, found := workerAssigned[wt] + if !found && queued { + st := task.SealTask() + wu = w.TaskCount(&st) + workerAssigned[wt] = wu + } + if wu >= bestAssigned { + continue + } + + info = w.Info + needRes = res + bestWid = wt + selectedWindow = wnd + bestAssigned = wu + } + + if selectedWindow < 0 { + // all windows full + continue + } + + log.Debugw("SCHED ASSIGNED", + "assigner", "spread-tasks", + "spread-queued", queued, + "sqi", sqi, + "sector", task.Sector.ID.Number, + "task", task.TaskType, + "window", selectedWindow, + "worker", bestWid, + "assigned", bestAssigned) + + workerAssigned[bestWid]++ + windows[selectedWindow].Allocated.Add(task.SealTask(), info.Resources, needRes) + windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task) + + rmQueue = append(rmQueue, sqi) + scheduled++ + } + + if len(rmQueue) > 0 { + for i := len(rmQueue) - 1; i >= 0; i-- { + sh.SchedQueue.Remove(rmQueue[i]) + } + } + + return scheduled + } +} diff --git a/storage/sealer/sched_assigner_utilization.go b/storage/sealer/sched_assigner_utilization.go index 2d051d000..1e75d904a 100644 --- a/storage/sealer/sched_assigner_utilization.go +++ b/storage/sealer/sched_assigner_utilization.go @@ -74,6 +74,7 @@ func LowestUtilizationWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, } log.Debugw("SCHED ASSIGNED", + "assigner", "util", "sqi", sqi, "sector", task.Sector.ID.Number, "task", task.TaskType, diff --git a/storage/sealer/sched_resources.go b/storage/sealer/sched_resources.go index 487e294a2..c4ad7991d 100644 --- a/storage/sealer/sched_resources.go +++ b/storage/sealer/sched_resources.go @@ -170,6 +170,19 @@ func (a *ActiveResources) utilization(wr storiface.WorkerResources) float64 { // return max } +func (a *ActiveResources) taskCount(tt *sealtasks.SealTaskType) int { + // nil means all tasks + if tt == nil { + var count int + for _, c := range a.taskCounters { + count += c + } + return count + } + + return a.taskCounters[*tt] +} + func (wh *WorkerHandle) Utilization() float64 { wh.lk.Lock() u := wh.active.utilization(wh.Info.Resources) @@ -183,3 +196,31 @@ func (wh *WorkerHandle) Utilization() float64 { return u } + +func (wh *WorkerHandle) TaskCounts() int { + wh.lk.Lock() + u := wh.active.taskCount(nil) + u += wh.preparing.taskCount(nil) + wh.lk.Unlock() + wh.wndLk.Lock() + for _, window := range wh.activeWindows { + u += window.Allocated.taskCount(nil) + } + wh.wndLk.Unlock() + + return u +} + +func (wh *WorkerHandle) TaskCount(tt *sealtasks.SealTaskType) int { + wh.lk.Lock() + u := wh.active.taskCount(tt) + u += wh.preparing.taskCount(tt) + wh.lk.Unlock() + wh.wndLk.Lock() + for _, window := range wh.activeWindows { + u += window.Allocated.taskCount(tt) + } + wh.wndLk.Unlock() + + return u +}