Merge remote-tracking branch 'origin/master' into feat/worker-task-count-limits

This commit is contained in:
Łukasz Magiera 2022-05-27 16:01:32 +02:00
commit 26a0b43116
25 changed files with 935 additions and 530 deletions

View File

@ -534,6 +534,30 @@
# env var: LOTUS_STORAGE_ALLOWREGENSECTORKEY
#AllowRegenSectorKey = true
# Assigner specifies the worker assigner to use when scheduling tasks.
# "utilization" (default) - assign tasks to workers with lowest utilization.
# "spread" - assign tasks to as many distinct workers as possible.
#
# type: string
# env var: LOTUS_STORAGE_ASSIGNER
#Assigner = "utilization"
# DisallowRemoteFinalize when set to true will force all Finalize tasks to
# run on workers with local access to both long-term storage and the sealing
# path containing the sector.
# --
# WARNING: Only set this if all workers have access to long-term storage
# paths. If this flag is enabled, and there are workers without long-term
# storage access, sectors will not be moved from them, and Finalize tasks
# will appear to be stuck.
# --
# If you see stuck Finalize tasks after enabling this setting, check
# 'lotus-miner sealing sched-diag' and 'lotus-miner storage find [sector num]'
#
# type: bool
# env var: LOTUS_STORAGE_DISALLOWREMOTEFINALIZE
#DisallowRemoteFinalize = false
# ResourceFiltering instructs the system which resource filtering strategy
# to use when evaluating tasks against this worker. An empty value defaults
# to "hardware".

View File

@ -62,7 +62,7 @@ type Manager struct {
remoteHnd *stores.FetchHandler
index stores.SectorIndex
sched *scheduler
sched *Scheduler
windowPoStSched *poStScheduler
winningPoStSched *poStScheduler
@ -71,7 +71,8 @@ type Manager struct {
workLk sync.Mutex
work *statestore.StateStore
parallelCheckLimit int
parallelCheckLimit int
disallowRemoteFinalize bool
callToWork map[storiface.CallID]WorkID
// used when we get an early return and there's no callToWork mapping
@ -122,6 +123,10 @@ type Config struct {
// PoSt config
ParallelCheckLimit int
DisallowRemoteFinalize bool
Assigner string
}
type StorageAuth http.Header
@ -135,6 +140,11 @@ func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores.
return nil, xerrors.Errorf("creating prover instance: %w", err)
}
sh, err := newScheduler(sc.Assigner)
if err != nil {
return nil, err
}
m := &Manager{
ls: ls,
storage: stor,
@ -142,13 +152,14 @@ func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores.
remoteHnd: &stores.FetchHandler{Local: lstor, PfHandler: &stores.DefaultPartialFileHandler{}},
index: si,
sched: newScheduler(),
sched: sh,
windowPoStSched: newPoStScheduler(sealtasks.TTGenerateWindowPoSt),
winningPoStSched: newPoStScheduler(sealtasks.TTGenerateWinningPoSt),
localProver: prover,
parallelCheckLimit: sc.ParallelCheckLimit,
parallelCheckLimit: sc.ParallelCheckLimit,
disallowRemoteFinalize: sc.DisallowRemoteFinalize,
work: mss,
callToWork: map[storiface.CallID]WorkID{},
@ -585,6 +596,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
return xerrors.Errorf("acquiring sector lock: %w", err)
}
// first check if the unsealed file exists anywhere; If it doesn't ignore it
unsealed := storiface.FTUnsealed
{
unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
@ -597,6 +609,8 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
}
}
// Make sure that the sealed file is still in sealing storage; In case it already
// isn't, we want to do finalize in long-term storage
pathType := storiface.PathStorage
{
sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTSealed, 0, false)
@ -612,6 +626,8 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
}
}
// do the cache trimming wherever the likely still very large cache lives.
// we really don't want to move it.
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache, false)
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
@ -624,7 +640,10 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
return err
}
fetchSel := newAllocSelector(m.index, storiface.FTCache|storiface.FTSealed, storiface.PathStorage)
// get a selector for moving stuff into long-term storage
fetchSel := newMoveSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, storiface.PathStorage, !m.disallowRemoteFinalize)
// only move the unsealed file if it still exists and needs moving
moveUnsealed := unsealed
{
if len(keepUnsealed) == 0 {
@ -632,6 +651,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
}
}
// move stuff to long-term storage
err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel,
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|moveUnsealed, storiface.PathStorage, storiface.AcquireMove),
func(ctx context.Context, w Worker) error {
@ -653,6 +673,7 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
return xerrors.Errorf("acquiring sector lock: %w", err)
}
// first check if the unsealed file exists anywhere; If it doesn't ignore it
moveUnsealed := storiface.FTUnsealed
{
unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
@ -665,6 +686,8 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
}
}
// Make sure that the update file is still in sealing storage; In case it already
// isn't, we want to do finalize in long-term storage
pathType := storiface.PathStorage
{
sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUpdate, 0, false)
@ -680,7 +703,9 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
}
}
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTUpdateCache, false)
// do the cache trimming wherever the likely still large cache lives.
// we really don't want to move it.
selector := newExistingSelector(m.index, sector.ID, storiface.FTUpdateCache, false)
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalizeReplicaUpdate, selector,
m.schedFetch(sector, storiface.FTCache|storiface.FTUpdateCache|moveUnsealed, pathType, storiface.AcquireMove),
@ -693,7 +718,8 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
}
move := func(types storiface.SectorFileType) error {
fetchSel := newAllocSelector(m.index, types, storiface.PathStorage)
// get a selector for moving stuff into long-term storage
fetchSel := newMoveSelector(m.index, sector.ID, types, storiface.PathStorage, !m.disallowRemoteFinalize)
{
if len(keepUnsealed) == 0 {
moveUnsealed = storiface.FTNone

View File

@ -109,6 +109,9 @@ func newTestMgr(ctx context.Context, t *testing.T, ds datastore.Datastore) (*Man
stor := stores.NewRemote(lstor, si, nil, 6000, &stores.DefaultPartialFileHandler{})
sh, err := newScheduler("")
require.NoError(t, err)
m := &Manager{
ls: st,
storage: stor,
@ -116,7 +119,7 @@ func newTestMgr(ctx context.Context, t *testing.T, ds datastore.Datastore) (*Man
remoteHnd: &stores.FetchHandler{Local: lstor},
index: si,
sched: newScheduler(),
sched: sh,
windowPoStSched: newPoStScheduler(sealtasks.TTGenerateWindowPoSt),
winningPoStSched: newPoStScheduler(sealtasks.TTGenerateWinningPoSt),

View File

@ -2,34 +2,34 @@ package sectorstorage
import "sort"
type requestQueue []*workerRequest
type RequestQueue []*WorkerRequest
func (q requestQueue) Len() int { return len(q) }
func (q RequestQueue) Len() int { return len(q) }
func (q requestQueue) Less(i, j int) bool {
oneMuchLess, muchLess := q[i].taskType.MuchLess(q[j].taskType)
func (q RequestQueue) Less(i, j int) bool {
oneMuchLess, muchLess := q[i].TaskType.MuchLess(q[j].TaskType)
if oneMuchLess {
return muchLess
}
if q[i].priority != q[j].priority {
return q[i].priority > q[j].priority
if q[i].Priority != q[j].Priority {
return q[i].Priority > q[j].Priority
}
if q[i].taskType != q[j].taskType {
return q[i].taskType.Less(q[j].taskType)
if q[i].TaskType != q[j].TaskType {
return q[i].TaskType.Less(q[j].TaskType)
}
return q[i].sector.ID.Number < q[j].sector.ID.Number // optimize minerActor.NewSectors bitfield
return q[i].Sector.ID.Number < q[j].Sector.ID.Number // optimize minerActor.NewSectors bitfield
}
func (q requestQueue) Swap(i, j int) {
func (q RequestQueue) Swap(i, j int) {
q[i], q[j] = q[j], q[i]
q[i].index = i
q[j].index = j
}
func (q *requestQueue) Push(x *workerRequest) {
func (q *RequestQueue) Push(x *WorkerRequest) {
n := len(*q)
item := x
item.index = n
@ -37,7 +37,7 @@ func (q *requestQueue) Push(x *workerRequest) {
sort.Sort(q)
}
func (q *requestQueue) Remove(i int) *workerRequest {
func (q *RequestQueue) Remove(i int) *WorkerRequest {
old := *q
n := len(old)
item := old[i]

View File

@ -8,13 +8,13 @@ import (
)
func TestRequestQueue(t *testing.T) {
rq := &requestQueue{}
rq := &RequestQueue{}
rq.Push(&workerRequest{taskType: sealtasks.TTAddPiece})
rq.Push(&workerRequest{taskType: sealtasks.TTPreCommit1})
rq.Push(&workerRequest{taskType: sealtasks.TTPreCommit2})
rq.Push(&workerRequest{taskType: sealtasks.TTPreCommit1})
rq.Push(&workerRequest{taskType: sealtasks.TTAddPiece})
rq.Push(&WorkerRequest{TaskType: sealtasks.TTAddPiece})
rq.Push(&WorkerRequest{TaskType: sealtasks.TTPreCommit1})
rq.Push(&WorkerRequest{TaskType: sealtasks.TTPreCommit2})
rq.Push(&WorkerRequest{TaskType: sealtasks.TTPreCommit1})
rq.Push(&WorkerRequest{TaskType: sealtasks.TTAddPiece})
dump := func(s string) {
fmt.Println("---")
@ -22,7 +22,7 @@ func TestRequestQueue(t *testing.T) {
for sqi := 0; sqi < rq.Len(); sqi++ {
task := (*rq)[sqi]
fmt.Println(sqi, task.taskType)
fmt.Println(sqi, task.TaskType)
}
}
@ -32,31 +32,31 @@ func TestRequestQueue(t *testing.T) {
dump("pop 1")
if pt.taskType != sealtasks.TTPreCommit2 {
t.Error("expected precommit2, got", pt.taskType)
if pt.TaskType != sealtasks.TTPreCommit2 {
t.Error("expected precommit2, got", pt.TaskType)
}
pt = rq.Remove(0)
dump("pop 2")
if pt.taskType != sealtasks.TTPreCommit1 {
t.Error("expected precommit1, got", pt.taskType)
if pt.TaskType != sealtasks.TTPreCommit1 {
t.Error("expected precommit1, got", pt.TaskType)
}
pt = rq.Remove(1)
dump("pop 3")
if pt.taskType != sealtasks.TTAddPiece {
t.Error("expected addpiece, got", pt.taskType)
if pt.TaskType != sealtasks.TTAddPiece {
t.Error("expected addpiece, got", pt.TaskType)
}
pt = rq.Remove(0)
dump("pop 4")
if pt.taskType != sealtasks.TTPreCommit1 {
t.Error("expected precommit1, got", pt.taskType)
if pt.TaskType != sealtasks.TTPreCommit1 {
t.Error("expected precommit1, got", pt.TaskType)
}
}

View File

@ -2,9 +2,6 @@ package sectorstorage
import (
"context"
"math"
"math/rand"
"sort"
"sync"
"time"
@ -47,23 +44,28 @@ const mib = 1 << 20
type WorkerAction func(ctx context.Context, w Worker) error
type WorkerSelector interface {
Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *workerHandle) (bool, error) // true if worker is acceptable for performing a task
// Ok is true if worker is acceptable for performing a task.
// If any worker is preferred for a task, other workers won't be considered for that task.
Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (ok, preferred bool, err error)
Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) // true if a is preferred over b
Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) // true if a is preferred over b
}
type scheduler struct {
workersLk sync.RWMutex
workers map[storiface.WorkerID]*workerHandle
type Scheduler struct {
assigner Assigner
schedule chan *workerRequest
windowRequests chan *schedWindowRequest
workersLk sync.RWMutex
Workers map[storiface.WorkerID]*WorkerHandle
schedule chan *WorkerRequest
windowRequests chan *SchedWindowRequest
workerChange chan struct{} // worker added / changed/freed resources
workerDisable chan workerDisableReq
// owned by the sh.runSched goroutine
schedQueue *requestQueue
openWindows []*schedWindowRequest
SchedQueue *RequestQueue
OpenWindows []*SchedWindowRequest
workTracker *workTracker
@ -74,24 +76,24 @@ type scheduler struct {
testSync chan struct{} // used for testing
}
type workerHandle struct {
type WorkerHandle struct {
workerRpc Worker
tasksCache map[sealtasks.TaskType]struct{}
tasksUpdate time.Time
tasksLk sync.Mutex
info storiface.WorkerInfo
Info storiface.WorkerInfo
preparing *activeResources // use with workerHandle.lk
active *activeResources // use with workerHandle.lk
preparing *activeResources // use with WorkerHandle.lk
active *activeResources // use with WorkerHandle.lk
lk sync.Mutex // can be taken inside sched.workersLk.RLock
wndLk sync.Mutex // can be taken inside sched.workersLk.RLock
activeWindows []*schedWindow
activeWindows []*SchedWindow
enabled bool
Enabled bool
// for sync manager goroutine closing
cleanupStarted bool
@ -99,28 +101,28 @@ type workerHandle struct {
closingMgr chan struct{}
}
type schedWindowRequest struct {
worker storiface.WorkerID
type SchedWindowRequest struct {
Worker storiface.WorkerID
done chan *schedWindow
Done chan *SchedWindow
}
type schedWindow struct {
allocated activeResources
todo []*workerRequest
type SchedWindow struct {
Allocated activeResources
Todo []*WorkerRequest
}
type workerDisableReq struct {
activeWindows []*schedWindow
activeWindows []*SchedWindow
wid storiface.WorkerID
done func()
}
type workerRequest struct {
sector storage.SectorRef
taskType sealtasks.TaskType
priority int // larger values more important
sel WorkerSelector
type WorkerRequest struct {
Sector storage.SectorRef
TaskType sealtasks.TaskType
Priority int // larger values more important
Sel WorkerSelector
prepare WorkerAction
work WorkerAction
@ -129,25 +131,37 @@ type workerRequest struct {
index int // The index of the item in the heap.
indexHeap int
IndexHeap int
ret chan<- workerResponse
ctx context.Context
Ctx context.Context
}
type workerResponse struct {
err error
}
func newScheduler() *scheduler {
return &scheduler{
workers: map[storiface.WorkerID]*workerHandle{},
func newScheduler(assigner string) (*Scheduler, error) {
var a Assigner
switch assigner {
case "", "utilization":
a = NewLowestUtilizationAssigner()
case "spread":
a = NewSpreadAssigner()
default:
return nil, xerrors.Errorf("unknown assigner '%s'", assigner)
}
schedule: make(chan *workerRequest),
windowRequests: make(chan *schedWindowRequest, 20),
return &Scheduler{
assigner: a,
Workers: map[storiface.WorkerID]*WorkerHandle{},
schedule: make(chan *WorkerRequest),
windowRequests: make(chan *SchedWindowRequest, 20),
workerChange: make(chan struct{}, 20),
workerDisable: make(chan workerDisableReq),
schedQueue: &requestQueue{},
SchedQueue: &RequestQueue{},
workTracker: &workTracker{
done: map[storiface.CallID]struct{}{},
@ -159,18 +173,18 @@ func newScheduler() *scheduler {
closing: make(chan struct{}),
closed: make(chan struct{}),
}
}, nil
}
func (sh *scheduler) Schedule(ctx context.Context, sector storage.SectorRef, taskType sealtasks.TaskType, sel WorkerSelector, prepare WorkerAction, work WorkerAction) error {
func (sh *Scheduler) Schedule(ctx context.Context, sector storage.SectorRef, taskType sealtasks.TaskType, sel WorkerSelector, prepare WorkerAction, work WorkerAction) error {
ret := make(chan workerResponse)
select {
case sh.schedule <- &workerRequest{
sector: sector,
taskType: taskType,
priority: getPriority(ctx),
sel: sel,
case sh.schedule <- &WorkerRequest{
Sector: sector,
TaskType: taskType,
Priority: getPriority(ctx),
Sel: sel,
prepare: prepare,
work: work,
@ -178,7 +192,7 @@ func (sh *scheduler) Schedule(ctx context.Context, sector storage.SectorRef, tas
start: time.Now(),
ret: ret,
ctx: ctx,
Ctx: ctx,
}:
case <-sh.closing:
return xerrors.New("closing")
@ -196,18 +210,18 @@ func (sh *scheduler) Schedule(ctx context.Context, sector storage.SectorRef, tas
}
}
func (r *workerRequest) respond(err error) {
func (r *WorkerRequest) respond(err error) {
select {
case r.ret <- workerResponse{err: err}:
case <-r.ctx.Done():
case <-r.Ctx.Done():
log.Warnf("request got cancelled before we could respond")
}
}
func (r *workerRequest) SealTask() sealtasks.SealTaskType {
func (r *WorkerRequest) SealTask() sealtasks.SealTaskType {
return sealtasks.SealTaskType{
TaskType: r.taskType,
RegisteredSealProof: r.sector.ProofType,
TaskType: r.TaskType,
RegisteredSealProof: r.Sector.ProofType,
}
}
@ -222,7 +236,7 @@ type SchedDiagInfo struct {
OpenWindows []string
}
func (sh *scheduler) runSched() {
func (sh *Scheduler) runSched() {
defer close(sh.closed)
iw := time.After(InitWait)
@ -239,14 +253,14 @@ func (sh *scheduler) runSched() {
toDisable = append(toDisable, dreq)
doSched = true
case req := <-sh.schedule:
sh.schedQueue.Push(req)
sh.SchedQueue.Push(req)
doSched = true
if sh.testSync != nil {
sh.testSync <- struct{}{}
}
case req := <-sh.windowRequests:
sh.openWindows = append(sh.openWindows, req)
sh.OpenWindows = append(sh.OpenWindows, req)
doSched = true
case ireq := <-sh.info:
ireq(sh.diag())
@ -270,12 +284,12 @@ func (sh *scheduler) runSched() {
case dreq := <-sh.workerDisable:
toDisable = append(toDisable, dreq)
case req := <-sh.schedule:
sh.schedQueue.Push(req)
sh.SchedQueue.Push(req)
if sh.testSync != nil {
sh.testSync <- struct{}{}
}
case req := <-sh.windowRequests:
sh.openWindows = append(sh.openWindows, req)
sh.OpenWindows = append(sh.OpenWindows, req)
default:
break loop
}
@ -283,21 +297,21 @@ func (sh *scheduler) runSched() {
for _, req := range toDisable {
for _, window := range req.activeWindows {
for _, request := range window.todo {
sh.schedQueue.Push(request)
for _, request := range window.Todo {
sh.SchedQueue.Push(request)
}
}
openWindows := make([]*schedWindowRequest, 0, len(sh.openWindows))
for _, window := range sh.openWindows {
if window.worker != req.wid {
openWindows := make([]*SchedWindowRequest, 0, len(sh.OpenWindows))
for _, window := range sh.OpenWindows {
if window.Worker != req.wid {
openWindows = append(openWindows, window)
}
}
sh.openWindows = openWindows
sh.OpenWindows = openWindows
sh.workersLk.Lock()
sh.workers[req.wid].enabled = false
sh.Workers[req.wid].Enabled = false
sh.workersLk.Unlock()
req.done()
@ -309,284 +323,51 @@ func (sh *scheduler) runSched() {
}
}
func (sh *scheduler) diag() SchedDiagInfo {
func (sh *Scheduler) diag() SchedDiagInfo {
var out SchedDiagInfo
for sqi := 0; sqi < sh.schedQueue.Len(); sqi++ {
task := (*sh.schedQueue)[sqi]
for sqi := 0; sqi < sh.SchedQueue.Len(); sqi++ {
task := (*sh.SchedQueue)[sqi]
out.Requests = append(out.Requests, SchedDiagRequestInfo{
Sector: task.sector.ID,
TaskType: task.taskType,
Priority: task.priority,
Sector: task.Sector.ID,
TaskType: task.TaskType,
Priority: task.Priority,
})
}
sh.workersLk.RLock()
defer sh.workersLk.RUnlock()
for _, window := range sh.openWindows {
out.OpenWindows = append(out.OpenWindows, uuid.UUID(window.worker).String())
for _, window := range sh.OpenWindows {
out.OpenWindows = append(out.OpenWindows, uuid.UUID(window.Worker).String())
}
return out
}
func (sh *scheduler) trySched() {
/*
This assigns tasks to workers based on:
- Task priority (achieved by handling sh.schedQueue in order, since it's already sorted by priority)
- Worker resource availability
- Task-specified worker preference (acceptableWindows array below sorted by this preference)
- Window request age
1. For each task in the schedQueue find windows which can handle them
1.1. Create list of windows capable of handling a task
1.2. Sort windows according to task selector preferences
2. Going through schedQueue again, assign task to first acceptable window
with resources available
3. Submit windows with scheduled tasks to workers
*/
type Assigner interface {
TrySched(sh *Scheduler)
}
func (sh *Scheduler) trySched() {
sh.workersLk.RLock()
defer sh.workersLk.RUnlock()
windowsLen := len(sh.openWindows)
queueLen := sh.schedQueue.Len()
log.Debugf("SCHED %d queued; %d open windows", queueLen, windowsLen)
if windowsLen == 0 || queueLen == 0 {
// nothing to schedule on
return
}
windows := make([]schedWindow, windowsLen)
for i := range windows {
windows[i].allocated = *newActiveResources()
}
acceptableWindows := make([][]int, queueLen) // QueueIndex -> []OpenWindowIndex
// Step 1
throttle := make(chan struct{}, windowsLen)
var wg sync.WaitGroup
wg.Add(queueLen)
for i := 0; i < queueLen; i++ {
throttle <- struct{}{}
go func(sqi int) {
defer wg.Done()
defer func() {
<-throttle
}()
task := (*sh.schedQueue)[sqi]
task.indexHeap = sqi
for wnd, windowRequest := range sh.openWindows {
worker, ok := sh.workers[windowRequest.worker]
if !ok {
log.Errorf("worker referenced by windowRequest not found (worker: %s)", windowRequest.worker)
// TODO: How to move forward here?
continue
}
if !worker.enabled {
log.Debugw("skipping disabled worker", "worker", windowRequest.worker)
continue
}
needRes := worker.info.Resources.ResourceSpec(task.sector.ProofType, task.taskType)
// TODO: allow bigger windows
if !windows[wnd].allocated.canHandleRequest(task.SealTask(), needRes, windowRequest.worker, "schedAcceptable", worker.info) {
continue
}
rpcCtx, cancel := context.WithTimeout(task.ctx, SelectorTimeout)
ok, err := task.sel.Ok(rpcCtx, task.taskType, task.sector.ProofType, worker)
cancel()
if err != nil {
log.Errorf("trySched(1) req.sel.Ok error: %+v", err)
continue
}
if !ok {
continue
}
acceptableWindows[sqi] = append(acceptableWindows[sqi], wnd)
}
if len(acceptableWindows[sqi]) == 0 {
return
}
// Pick best worker (shuffle in case some workers are equally as good)
rand.Shuffle(len(acceptableWindows[sqi]), func(i, j int) {
acceptableWindows[sqi][i], acceptableWindows[sqi][j] = acceptableWindows[sqi][j], acceptableWindows[sqi][i] // nolint:scopelint
})
sort.SliceStable(acceptableWindows[sqi], func(i, j int) bool {
wii := sh.openWindows[acceptableWindows[sqi][i]].worker // nolint:scopelint
wji := sh.openWindows[acceptableWindows[sqi][j]].worker // nolint:scopelint
if wii == wji {
// for the same worker prefer older windows
return acceptableWindows[sqi][i] < acceptableWindows[sqi][j] // nolint:scopelint
}
wi := sh.workers[wii]
wj := sh.workers[wji]
rpcCtx, cancel := context.WithTimeout(task.ctx, SelectorTimeout)
defer cancel()
r, err := task.sel.Cmp(rpcCtx, task.taskType, wi, wj)
if err != nil {
log.Errorf("selecting best worker: %s", err)
}
return r
})
}(i)
}
wg.Wait()
log.Debugf("SCHED windows: %+v", windows)
log.Debugf("SCHED Acceptable win: %+v", acceptableWindows)
// Step 2
scheduled := 0
rmQueue := make([]int, 0, queueLen)
workerUtil := map[storiface.WorkerID]float64{}
for sqi := 0; sqi < queueLen; sqi++ {
task := (*sh.schedQueue)[sqi]
selectedWindow := -1
var needRes storiface.Resources
var info storiface.WorkerInfo
var bestWid storiface.WorkerID
bestUtilization := math.MaxFloat64 // smaller = better
for i, wnd := range acceptableWindows[task.indexHeap] {
wid := sh.openWindows[wnd].worker
w := sh.workers[wid]
res := w.info.Resources.ResourceSpec(task.sector.ProofType, task.taskType)
log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.sector.ID.Number, wnd, i)
// TODO: allow bigger windows
if !windows[wnd].allocated.canHandleRequest(task.SealTask(), res, wid, "schedAssign", w.info) {
continue
}
wu, found := workerUtil[wid]
if !found {
wu = w.utilization()
workerUtil[wid] = wu
}
if wu >= bestUtilization {
// acceptable worker list is initially sorted by utilization, and the initially-best workers
// will be assigned tasks first. This means that if we find a worker which isn't better, it
// probably means that the other workers aren't better either.
//
// utilization
// ^
// | /
// | \ /
// | \ /
// | *
// #--------> acceptableWindow index
//
// * -> we're here
break
}
info = w.info
needRes = res
bestWid = wid
selectedWindow = wnd
bestUtilization = wu
}
if selectedWindow < 0 {
// all windows full
continue
}
log.Debugw("SCHED ASSIGNED",
"sqi", sqi,
"sector", task.sector.ID.Number,
"task", task.taskType,
"window", selectedWindow,
"worker", bestWid,
"utilization", bestUtilization)
workerUtil[bestWid] += windows[selectedWindow].allocated.add(task.SealTask(), info.Resources, needRes)
windows[selectedWindow].todo = append(windows[selectedWindow].todo, task)
rmQueue = append(rmQueue, sqi)
scheduled++
}
if len(rmQueue) > 0 {
for i := len(rmQueue) - 1; i >= 0; i-- {
sh.schedQueue.Remove(rmQueue[i])
}
}
// Step 3
if scheduled == 0 {
return
}
scheduledWindows := map[int]struct{}{}
for wnd, window := range windows {
if len(window.todo) == 0 {
// Nothing scheduled here, keep the window open
continue
}
scheduledWindows[wnd] = struct{}{}
window := window // copy
select {
case sh.openWindows[wnd].done <- &window:
default:
log.Error("expected sh.openWindows[wnd].done to be buffered")
}
}
// Rewrite sh.openWindows array, removing scheduled windows
newOpenWindows := make([]*schedWindowRequest, 0, windowsLen-len(scheduledWindows))
for wnd, window := range sh.openWindows {
if _, scheduled := scheduledWindows[wnd]; scheduled {
// keep unscheduled windows open
continue
}
newOpenWindows = append(newOpenWindows, window)
}
sh.openWindows = newOpenWindows
sh.assigner.TrySched(sh)
}
func (sh *scheduler) schedClose() {
func (sh *Scheduler) schedClose() {
sh.workersLk.Lock()
defer sh.workersLk.Unlock()
log.Debugf("closing scheduler")
for i, w := range sh.workers {
for i, w := range sh.Workers {
sh.workerCleanup(i, w)
}
}
func (sh *scheduler) Info(ctx context.Context) (interface{}, error) {
func (sh *Scheduler) Info(ctx context.Context) (interface{}, error) {
ch := make(chan interface{}, 1)
sh.info <- func(res interface{}) {
@ -601,7 +382,7 @@ func (sh *scheduler) Info(ctx context.Context) (interface{}, error) {
}
}
func (sh *scheduler) Close(ctx context.Context) error {
func (sh *Scheduler) Close(ctx context.Context) error {
close(sh.closing)
select {
case <-sh.closed:

View File

@ -0,0 +1,192 @@
package sectorstorage
import (
"context"
"math/rand"
"sort"
"sync"
)
type WindowSelector func(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int
// AssignerCommon is a task assigner with customizable parts
type AssignerCommon struct {
WindowSel WindowSelector
}
var _ Assigner = &AssignerCommon{}
func (a *AssignerCommon) TrySched(sh *Scheduler) {
/*
This assigns tasks to workers based on:
- Task priority (achieved by handling sh.SchedQueue in order, since it's already sorted by priority)
- Worker resource availability
- Task-specified worker preference (acceptableWindows array below sorted by this preference)
- Window request age
1. For each task in the SchedQueue find windows which can handle them
1.1. Create list of windows capable of handling a task
1.2. Sort windows according to task selector preferences
2. Going through SchedQueue again, assign task to first acceptable window
with resources available
3. Submit windows with scheduled tasks to workers
*/
windowsLen := len(sh.OpenWindows)
queueLen := sh.SchedQueue.Len()
log.Debugf("SCHED %d queued; %d open windows", queueLen, windowsLen)
if windowsLen == 0 || queueLen == 0 {
// nothing to schedule on
return
}
windows := make([]SchedWindow, windowsLen)
for i := range windows {
windows[i].Allocated = *NewActiveResources()
}
acceptableWindows := make([][]int, queueLen) // QueueIndex -> []OpenWindowIndex
// Step 1
throttle := make(chan struct{}, windowsLen)
var wg sync.WaitGroup
wg.Add(queueLen)
for i := 0; i < queueLen; i++ {
throttle <- struct{}{}
go func(sqi int) {
defer wg.Done()
defer func() {
<-throttle
}()
task := (*sh.SchedQueue)[sqi]
task.IndexHeap = sqi
var havePreferred bool
for wnd, windowRequest := range sh.OpenWindows {
worker, ok := sh.Workers[windowRequest.Worker]
if !ok {
log.Errorf("worker referenced by windowRequest not found (worker: %s)", windowRequest.Worker)
// TODO: How to move forward here?
continue
}
if !worker.Enabled {
log.Debugw("skipping disabled worker", "worker", windowRequest.Worker)
continue
}
needRes := worker.Info.Resources.ResourceSpec(task.Sector.ProofType, task.TaskType)
// TODO: allow bigger windows
if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), needRes, windowRequest.Worker, "schedAcceptable", worker.Info) {
continue
}
rpcCtx, cancel := context.WithTimeout(task.Ctx, SelectorTimeout)
ok, preferred, err := task.Sel.Ok(rpcCtx, task.TaskType, task.Sector.ProofType, worker)
cancel()
if err != nil {
log.Errorf("trySched(1) req.Sel.Ok error: %+v", err)
continue
}
if !ok {
continue
}
if havePreferred && !preferred {
// we have a way better worker for this task
continue
}
if preferred && !havePreferred {
// all workers we considered previously are much worse choice
acceptableWindows[sqi] = acceptableWindows[sqi][:0]
havePreferred = true
}
acceptableWindows[sqi] = append(acceptableWindows[sqi], wnd)
}
if len(acceptableWindows[sqi]) == 0 {
return
}
// Pick best worker (shuffle in case some workers are equally as good)
rand.Shuffle(len(acceptableWindows[sqi]), func(i, j int) {
acceptableWindows[sqi][i], acceptableWindows[sqi][j] = acceptableWindows[sqi][j], acceptableWindows[sqi][i] // nolint:scopelint
})
sort.SliceStable(acceptableWindows[sqi], func(i, j int) bool {
wii := sh.OpenWindows[acceptableWindows[sqi][i]].Worker // nolint:scopelint
wji := sh.OpenWindows[acceptableWindows[sqi][j]].Worker // nolint:scopelint
if wii == wji {
// for the same worker prefer older windows
return acceptableWindows[sqi][i] < acceptableWindows[sqi][j] // nolint:scopelint
}
wi := sh.Workers[wii]
wj := sh.Workers[wji]
rpcCtx, cancel := context.WithTimeout(task.Ctx, SelectorTimeout)
defer cancel()
r, err := task.Sel.Cmp(rpcCtx, task.TaskType, wi, wj)
if err != nil {
log.Errorf("selecting best worker: %s", err)
}
return r
})
}(i)
}
wg.Wait()
log.Debugf("SCHED windows: %+v", windows)
log.Debugf("SCHED Acceptable win: %+v", acceptableWindows)
// Step 2
scheduled := a.WindowSel(sh, queueLen, acceptableWindows, windows)
// Step 3
if scheduled == 0 {
return
}
scheduledWindows := map[int]struct{}{}
for wnd, window := range windows {
if len(window.Todo) == 0 {
// Nothing scheduled here, keep the window open
continue
}
scheduledWindows[wnd] = struct{}{}
window := window // copy
select {
case sh.OpenWindows[wnd].Done <- &window:
default:
log.Error("expected sh.OpenWindows[wnd].Done to be buffered")
}
}
// Rewrite sh.OpenWindows array, removing scheduled windows
newOpenWindows := make([]*SchedWindowRequest, 0, windowsLen-len(scheduledWindows))
for wnd, window := range sh.OpenWindows {
if _, scheduled := scheduledWindows[wnd]; scheduled {
// keep unscheduled windows open
continue
}
newOpenWindows = append(newOpenWindows, window)
}
sh.OpenWindows = newOpenWindows
}

View File

@ -0,0 +1,81 @@
package sectorstorage
import (
"math"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
func NewSpreadAssigner() Assigner {
return &AssignerCommon{
WindowSel: SpreadWS,
}
}
func SpreadWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int {
scheduled := 0
rmQueue := make([]int, 0, queueLen)
workerAssigned := map[storiface.WorkerID]int{}
for sqi := 0; sqi < queueLen; sqi++ {
task := (*sh.SchedQueue)[sqi]
selectedWindow := -1
var needRes storiface.Resources
var info storiface.WorkerInfo
var bestWid storiface.WorkerID
bestAssigned := math.MaxInt // smaller = better
for i, wnd := range acceptableWindows[task.IndexHeap] {
wid := sh.OpenWindows[wnd].Worker
w := sh.Workers[wid]
res := w.Info.Resources.ResourceSpec(task.Sector.ProofType, task.TaskType)
log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i)
if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), res, wid, "schedAssign", w.Info) {
continue
}
wu, _ := workerAssigned[wid]
if wu >= bestAssigned {
continue
}
info = w.Info
needRes = res
bestWid = wid
selectedWindow = wnd
bestAssigned = wu
}
if selectedWindow < 0 {
// all windows full
continue
}
log.Debugw("SCHED ASSIGNED",
"sqi", sqi,
"sector", task.Sector.ID.Number,
"task", task.TaskType,
"window", selectedWindow,
"worker", bestWid,
"assigned", bestAssigned)
workerAssigned[bestWid]++
windows[selectedWindow].Allocated.Add(task.SealTask(), info.Resources, needRes)
windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task)
rmQueue = append(rmQueue, sqi)
scheduled++
}
if len(rmQueue) > 0 {
for i := len(rmQueue) - 1; i >= 0; i-- {
sh.SchedQueue.Remove(rmQueue[i])
}
}
return scheduled
}

View File

@ -0,0 +1,98 @@
package sectorstorage
import (
"math"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
func NewLowestUtilizationAssigner() Assigner {
return &AssignerCommon{
WindowSel: LowestUtilizationWS,
}
}
func LowestUtilizationWS(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int {
scheduled := 0
rmQueue := make([]int, 0, queueLen)
workerUtil := map[storiface.WorkerID]float64{}
for sqi := 0; sqi < queueLen; sqi++ {
task := (*sh.SchedQueue)[sqi]
selectedWindow := -1
var needRes storiface.Resources
var info storiface.WorkerInfo
var bestWid storiface.WorkerID
bestUtilization := math.MaxFloat64 // smaller = better
for i, wnd := range acceptableWindows[task.IndexHeap] {
wid := sh.OpenWindows[wnd].Worker
w := sh.Workers[wid]
res := w.Info.Resources.ResourceSpec(task.Sector.ProofType, task.TaskType)
log.Debugf("SCHED try assign sqi:%d sector %d to window %d (awi:%d)", sqi, task.Sector.ID.Number, wnd, i)
// TODO: allow bigger windows
if !windows[wnd].Allocated.CanHandleRequest(task.SealTask(), res, wid, "schedAssign", w.Info) {
continue
}
wu, found := workerUtil[wid]
if !found {
wu = w.Utilization()
workerUtil[wid] = wu
}
if wu >= bestUtilization {
// acceptable worker list is initially sorted by utilization, and the initially-best workers
// will be assigned tasks first. This means that if we find a worker which isn't better, it
// probably means that the other workers aren't better either.
//
// utilization
// ^
// | /
// | \ /
// | \ /
// | *
// #--------> acceptableWindow index
//
// * -> we're here
break
}
info = w.Info
needRes = res
bestWid = wid
selectedWindow = wnd
bestUtilization = wu
}
if selectedWindow < 0 {
// all windows full
continue
}
log.Debugw("SCHED ASSIGNED",
"sqi", sqi,
"sector", task.Sector.ID.Number,
"task", task.TaskType,
"window", selectedWindow,
"worker", bestWid,
"utilization", bestUtilization)
workerUtil[bestWid] += windows[selectedWindow].Allocated.Add(task.SealTask(), info.Resources, needRes)
windows[selectedWindow].Todo = append(windows[selectedWindow].Todo, task)
rmQueue = append(rmQueue, sqi)
scheduled++
}
if len(rmQueue) > 0 {
for i := len(rmQueue) - 1; i >= 0; i-- {
sh.SchedQueue.Remove(rmQueue[i])
}
}
return scheduled
}

View File

@ -17,7 +17,7 @@ import (
type poStScheduler struct {
lk sync.RWMutex
workers map[storiface.WorkerID]*workerHandle
workers map[storiface.WorkerID]*WorkerHandle
cond *sync.Cond
postType sealtasks.TaskType
@ -25,14 +25,14 @@ type poStScheduler struct {
func newPoStScheduler(t sealtasks.TaskType) *poStScheduler {
ps := &poStScheduler{
workers: map[storiface.WorkerID]*workerHandle{},
workers: map[storiface.WorkerID]*WorkerHandle{},
postType: t,
}
ps.cond = sync.NewCond(&ps.lk)
return ps
}
func (ps *poStScheduler) MaybeAddWorker(wid storiface.WorkerID, tasks map[sealtasks.TaskType]struct{}, w *workerHandle) bool {
func (ps *poStScheduler) MaybeAddWorker(wid storiface.WorkerID, tasks map[sealtasks.TaskType]struct{}, w *WorkerHandle) bool {
if _, ok := tasks[ps.postType]; !ok {
return false
}
@ -49,10 +49,10 @@ func (ps *poStScheduler) MaybeAddWorker(wid storiface.WorkerID, tasks map[sealta
return true
}
func (ps *poStScheduler) delWorker(wid storiface.WorkerID) *workerHandle {
func (ps *poStScheduler) delWorker(wid storiface.WorkerID) *WorkerHandle {
ps.lk.Lock()
defer ps.lk.Unlock()
var w *workerHandle = nil
var w *WorkerHandle = nil
if wh, ok := ps.workers[wid]; ok {
w = wh
delete(ps.workers, wid)
@ -68,7 +68,7 @@ func (ps *poStScheduler) CanSched(ctx context.Context) bool {
}
for _, w := range ps.workers {
if w.enabled {
if w.Enabled {
return true
}
}
@ -105,7 +105,7 @@ func (ps *poStScheduler) Schedule(ctx context.Context, primary bool, spt abi.Reg
selected := candidates[0]
worker := ps.workers[selected.id]
return worker.active.withResources(selected.id, worker.info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error {
return worker.active.withResources(selected.id, worker.Info, ps.postType.SealTask(spt), selected.res, &ps.lk, func() error {
ps.lk.Unlock()
defer ps.lk.Lock()
@ -122,9 +122,9 @@ func (ps *poStScheduler) readyWorkers(spt abi.RegisteredSealProof) (bool, []cand
var accepts []candidateWorker
//if the gpus of the worker are insufficient or it's disabled, it cannot be scheduled
for wid, wr := range ps.workers {
needRes := wr.info.Resources.ResourceSpec(spt, ps.postType)
needRes := wr.Info.Resources.ResourceSpec(spt, ps.postType)
if !wr.active.canHandleRequest(ps.postType.SealTask(spt), needRes, wid, "post-readyWorkers", wr.info) {
if !wr.active.CanHandleRequest(ps.postType.SealTask(spt), needRes, wid, "post-readyWorkers", wr.Info) {
continue
}
@ -145,16 +145,16 @@ func (ps *poStScheduler) readyWorkers(spt abi.RegisteredSealProof) (bool, []cand
func (ps *poStScheduler) disable(wid storiface.WorkerID) {
ps.lk.Lock()
defer ps.lk.Unlock()
ps.workers[wid].enabled = false
ps.workers[wid].Enabled = false
}
func (ps *poStScheduler) enable(wid storiface.WorkerID) {
ps.lk.Lock()
defer ps.lk.Unlock()
ps.workers[wid].enabled = true
ps.workers[wid].Enabled = true
}
func (ps *poStScheduler) watch(wid storiface.WorkerID, worker *workerHandle) {
func (ps *poStScheduler) watch(wid storiface.WorkerID, worker *WorkerHandle) {
heartbeatTimer := time.NewTicker(stores.HeartbeatInterval)
defer heartbeatTimer.Stop()
@ -197,7 +197,7 @@ func (ps *poStScheduler) watch(wid storiface.WorkerID, worker *workerHandle) {
}
}
func (ps *poStScheduler) workerCleanup(wid storiface.WorkerID, w *workerHandle) {
func (ps *poStScheduler) workerCleanup(wid storiface.WorkerID, w *WorkerHandle) {
select {
case <-w.closingMgr:
default:
@ -223,7 +223,7 @@ func (ps *poStScheduler) schedClose() {
}
}
func (ps *poStScheduler) WorkerStats(ctx context.Context, cb func(ctx context.Context, wid storiface.WorkerID, worker *workerHandle)) {
func (ps *poStScheduler) WorkerStats(ctx context.Context, cb func(ctx context.Context, wid storiface.WorkerID, worker *WorkerHandle)) {
ps.lk.RLock()
defer ps.lk.RUnlock()
for id, w := range ps.workers {

View File

@ -21,14 +21,14 @@ type activeResources struct {
waiting int
}
func newActiveResources() *activeResources {
func NewActiveResources() *activeResources {
return &activeResources{
taskCounters: map[sealtasks.SealTaskType]int{},
}
}
func (a *activeResources) withResources(id storiface.WorkerID, wr storiface.WorkerInfo, tt sealtasks.SealTaskType, r storiface.Resources, locker sync.Locker, cb func() error) error {
for !a.canHandleRequest(tt, r, id, "withResources", wr) {
for !a.CanHandleRequest(tt, r, id, "withResources", wr) {
if a.cond == nil {
a.cond = sync.NewCond(locker)
}
@ -37,11 +37,11 @@ func (a *activeResources) withResources(id storiface.WorkerID, wr storiface.Work
a.waiting--
}
a.add(tt, wr.Resources, r)
a.Add(tt, wr.Resources, r)
err := cb()
a.free(tt, wr.Resources, r)
a.Free(tt, wr.Resources, r)
return err
}
@ -52,7 +52,7 @@ func (a *activeResources) hasWorkWaiting() bool {
}
// add task resources to activeResources and return utilization difference
func (a *activeResources) add(tt sealtasks.SealTaskType, wr storiface.WorkerResources, r storiface.Resources) float64 {
func (a *activeResources) Add(tt sealtasks.SealTaskType, wr storiface.WorkerResources, r storiface.Resources) float64 {
startUtil := a.utilization(wr)
if r.GPUUtilization > 0 {
@ -66,7 +66,7 @@ func (a *activeResources) add(tt sealtasks.SealTaskType, wr storiface.WorkerReso
return a.utilization(wr) - startUtil
}
func (a *activeResources) free(tt sealtasks.SealTaskType, wr storiface.WorkerResources, r storiface.Resources) {
func (a *activeResources) Free(tt sealtasks.SealTaskType, wr storiface.WorkerResources, r storiface.Resources) {
if r.GPUUtilization > 0 {
a.gpuUsed -= r.GPUUtilization
}
@ -80,9 +80,9 @@ func (a *activeResources) free(tt sealtasks.SealTaskType, wr storiface.WorkerRes
}
}
// canHandleRequest evaluates if the worker has enough available resources to
// CanHandleRequest evaluates if the worker has enough available resources to
// handle the request.
func (a *activeResources) canHandleRequest(tt sealtasks.SealTaskType, needRes storiface.Resources, wid storiface.WorkerID, caller string, info storiface.WorkerInfo) bool {
func (a *activeResources) CanHandleRequest(tt sealtasks.SealTaskType, needRes storiface.Resources, wid storiface.WorkerID, caller string, info storiface.WorkerInfo) bool {
if needRes.MaxConcurrent > 0 {
if a.taskCounters[tt] >= needRes.MaxConcurrent {
log.Debugf("sched: not scheduling on worker %s for %s; at task limit tt=%s, curcount=%d", wid, caller, tt, a.taskCounters[tt])
@ -172,14 +172,14 @@ func (a *activeResources) utilization(wr storiface.WorkerResources) float64 { //
return max
}
func (wh *workerHandle) utilization() float64 {
func (wh *WorkerHandle) Utilization() float64 {
wh.lk.Lock()
u := wh.active.utilization(wh.info.Resources)
u += wh.preparing.utilization(wh.info.Resources)
u := wh.active.utilization(wh.Info.Resources)
u += wh.preparing.utilization(wh.Info.Resources)
wh.lk.Unlock()
wh.wndLk.Lock()
for _, window := range wh.activeWindows {
u += window.allocated.utilization(wh.info.Resources)
u += window.Allocated.utilization(wh.Info.Resources)
}
wh.wndLk.Unlock()
@ -188,7 +188,7 @@ func (wh *workerHandle) utilization() float64 {
var tasksCacheTimeout = 30 * time.Second
func (wh *workerHandle) TaskTypes(ctx context.Context) (t map[sealtasks.TaskType]struct{}, err error) {
func (wh *WorkerHandle) TaskTypes(ctx context.Context) (t map[sealtasks.TaskType]struct{}, err error) {
wh.tasksLk.Lock()
defer wh.tasksLk.Unlock()

View File

@ -183,7 +183,7 @@ func (s *schedTestWorker) Close() error {
var _ Worker = &schedTestWorker{}
func addTestWorker(t *testing.T, sched *scheduler, index *stores.Index, name string, taskTypes map[sealtasks.TaskType]struct{}, resources storiface.WorkerResources, ignoreResources bool) {
func addTestWorker(t *testing.T, sched *Scheduler, index *stores.Index, name string, taskTypes map[sealtasks.TaskType]struct{}, resources storiface.WorkerResources, ignoreResources bool) {
w := &schedTestWorker{
name: name,
taskTypes: taskTypes,
@ -223,7 +223,8 @@ func addTestWorker(t *testing.T, sched *scheduler, index *stores.Index, name str
}
func TestSchedStartStop(t *testing.T) {
sched := newScheduler()
sched, err := newScheduler("")
require.NoError(t, err)
go sched.runSched()
addTestWorker(t, sched, stores.NewIndex(), "fred", nil, decentWorkerResources, false)
@ -259,13 +260,13 @@ func TestSched(t *testing.T) {
wg sync.WaitGroup
}
type task func(*testing.T, *scheduler, *stores.Index, *runMeta)
type task func(*testing.T, *Scheduler, *stores.Index, *runMeta)
sched := func(taskName, expectWorker string, sid abi.SectorNumber, taskType sealtasks.TaskType) task {
_, _, l, _ := runtime.Caller(1)
_, _, l2, _ := runtime.Caller(2)
return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) {
return func(t *testing.T, sched *Scheduler, index *stores.Index, rm *runMeta) {
done := make(chan struct{})
rm.done[taskName] = done
@ -314,7 +315,7 @@ func TestSched(t *testing.T) {
taskStarted := func(name string) task {
_, _, l, _ := runtime.Caller(1)
_, _, l2, _ := runtime.Caller(2)
return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) {
return func(t *testing.T, sched *Scheduler, index *stores.Index, rm *runMeta) {
select {
case rm.done[name] <- struct{}{}:
case <-ctx.Done():
@ -326,7 +327,7 @@ func TestSched(t *testing.T) {
taskDone := func(name string) task {
_, _, l, _ := runtime.Caller(1)
_, _, l2, _ := runtime.Caller(2)
return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) {
return func(t *testing.T, sched *Scheduler, index *stores.Index, rm *runMeta) {
select {
case rm.done[name] <- struct{}{}:
case <-ctx.Done():
@ -339,7 +340,7 @@ func TestSched(t *testing.T) {
taskNotScheduled := func(name string) task {
_, _, l, _ := runtime.Caller(1)
_, _, l2, _ := runtime.Caller(2)
return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) {
return func(t *testing.T, sched *Scheduler, index *stores.Index, rm *runMeta) {
select {
case rm.done[name] <- struct{}{}:
t.Fatal("not expected", l, l2)
@ -352,7 +353,8 @@ func TestSched(t *testing.T) {
return func(t *testing.T) {
index := stores.NewIndex()
sched := newScheduler()
sched, err := newScheduler("")
require.NoError(t, err)
sched.testSync = make(chan struct{})
go sched.runSched()
@ -378,7 +380,7 @@ func TestSched(t *testing.T) {
}
multTask := func(tasks ...task) task {
return func(t *testing.T, s *scheduler, index *stores.Index, meta *runMeta) {
return func(t *testing.T, s *Scheduler, index *stores.Index, meta *runMeta) {
for _, tsk := range tasks {
tsk(t, s, index, meta)
}
@ -492,7 +494,7 @@ func TestSched(t *testing.T) {
}
diag := func() task {
return func(t *testing.T, s *scheduler, index *stores.Index, meta *runMeta) {
return func(t *testing.T, s *Scheduler, index *stores.Index, meta *runMeta) {
time.Sleep(20 * time.Millisecond)
for _, request := range s.diag().Requests {
log.Infof("!!! sDIAG: sid(%d) task(%s)", request.Sector.Number, request.TaskType)
@ -582,12 +584,12 @@ func TestSched(t *testing.T) {
type slowishSelector bool
func (s slowishSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *workerHandle) (bool, error) {
func (s slowishSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (bool, bool, error) {
time.Sleep(200 * time.Microsecond)
return bool(s), nil
return bool(s), false, nil
}
func (s slowishSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) {
func (s slowishSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
time.Sleep(100 * time.Microsecond)
return true, nil
}
@ -604,29 +606,30 @@ func BenchmarkTrySched(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
sched := newScheduler()
sched.workers[storiface.WorkerID{}] = &workerHandle{
sched, err := newScheduler("")
require.NoError(b, err)
sched.Workers[storiface.WorkerID{}] = &WorkerHandle{
workerRpc: nil,
info: storiface.WorkerInfo{
Info: storiface.WorkerInfo{
Hostname: "t",
Resources: decentWorkerResources,
},
preparing: newActiveResources(),
active: newActiveResources(),
preparing: NewActiveResources(),
active: NewActiveResources(),
}
for i := 0; i < windows; i++ {
sched.openWindows = append(sched.openWindows, &schedWindowRequest{
worker: storiface.WorkerID{},
done: make(chan *schedWindow, 1000),
sched.OpenWindows = append(sched.OpenWindows, &SchedWindowRequest{
Worker: storiface.WorkerID{},
Done: make(chan *SchedWindow, 1000),
})
}
for i := 0; i < queue; i++ {
sched.schedQueue.Push(&workerRequest{
taskType: sealtasks.TTCommit2,
sel: slowishSelector(true),
ctx: ctx,
sched.SchedQueue.Push(&WorkerRequest{
TaskType: sealtasks.TTCommit2,
Sel: slowishSelector(true),
Ctx: ctx,
})
}
@ -644,28 +647,28 @@ func BenchmarkTrySched(b *testing.B) {
}
func TestWindowCompact(t *testing.T) {
sh := scheduler{}
sh := Scheduler{}
spt := abi.RegisteredSealProof_StackedDrg32GiBV1
test := func(start [][]sealtasks.TaskType, expect [][]sealtasks.TaskType) func(t *testing.T) {
return func(t *testing.T) {
wh := &workerHandle{
info: storiface.WorkerInfo{
wh := &WorkerHandle{
Info: storiface.WorkerInfo{
Resources: decentWorkerResources,
},
}
for _, windowTasks := range start {
window := &schedWindow{
allocated: *newActiveResources(),
window := &SchedWindow{
Allocated: *NewActiveResources(),
}
for _, task := range windowTasks {
window.todo = append(window.todo, &workerRequest{
taskType: task,
sector: storage.SectorRef{ProofType: spt},
window.Todo = append(window.Todo, &WorkerRequest{
TaskType: task,
Sector: storage.SectorRef{ProofType: spt},
})
window.allocated.add(task.SealTask(spt), wh.info.Resources, storiface.ResourceTable[task][spt])
window.Allocated.Add(task.SealTask(spt), wh.Info.Resources, storiface.ResourceTable[task][spt])
}
wh.activeWindows = append(wh.activeWindows, window)
@ -680,17 +683,17 @@ func TestWindowCompact(t *testing.T) {
require.Equal(t, len(start)-len(expect), -sw.windowsRequested)
for wi, tasks := range expect {
expectRes := newActiveResources()
expectRes := NewActiveResources()
for ti, task := range tasks {
require.Equal(t, task, wh.activeWindows[wi].todo[ti].taskType, "%d, %d", wi, ti)
expectRes.add(task.SealTask(spt), wh.info.Resources, storiface.ResourceTable[task][spt])
require.Equal(t, task, wh.activeWindows[wi].Todo[ti].TaskType, "%d, %d", wi, ti)
expectRes.Add(task.SealTask(spt), wh.Info.Resources, storiface.ResourceTable[task][spt])
}
require.Equal(t, expectRes.cpuUse, wh.activeWindows[wi].allocated.cpuUse, "%d", wi)
require.Equal(t, expectRes.gpuUsed, wh.activeWindows[wi].allocated.gpuUsed, "%d", wi)
require.Equal(t, expectRes.memUsedMin, wh.activeWindows[wi].allocated.memUsedMin, "%d", wi)
require.Equal(t, expectRes.memUsedMax, wh.activeWindows[wi].allocated.memUsedMax, "%d", wi)
require.Equal(t, expectRes.cpuUse, wh.activeWindows[wi].Allocated.cpuUse, "%d", wi)
require.Equal(t, expectRes.gpuUsed, wh.activeWindows[wi].Allocated.gpuUsed, "%d", wi)
require.Equal(t, expectRes.memUsedMin, wh.activeWindows[wi].Allocated.memUsedMin, "%d", wi)
require.Equal(t, expectRes.memUsedMax, wh.activeWindows[wi].Allocated.memUsedMax, "%d", wi)
}
}

View File

@ -12,31 +12,31 @@ import (
)
type schedWorker struct {
sched *scheduler
worker *workerHandle
sched *Scheduler
worker *WorkerHandle
wid storiface.WorkerID
heartbeatTimer *time.Ticker
scheduledWindows chan *schedWindow
scheduledWindows chan *SchedWindow
taskDone chan struct{}
windowsRequested int
}
func newWorkerHandle(ctx context.Context, w Worker) (*workerHandle, error) {
func newWorkerHandle(ctx context.Context, w Worker) (*WorkerHandle, error) {
info, err := w.Info(ctx)
if err != nil {
return nil, xerrors.Errorf("getting worker info: %w", err)
}
worker := &workerHandle{
worker := &WorkerHandle{
workerRpc: w,
info: info,
Info: info,
preparing: newActiveResources(),
active: newActiveResources(),
enabled: true,
preparing: NewActiveResources(),
active: NewActiveResources(),
Enabled: true,
closingMgr: make(chan struct{}),
closedMgr: make(chan struct{}),
@ -46,9 +46,9 @@ func newWorkerHandle(ctx context.Context, w Worker) (*workerHandle, error) {
}
// context only used for startup
func (sh *scheduler) runWorker(ctx context.Context, wid storiface.WorkerID, worker *workerHandle) error {
func (sh *Scheduler) runWorker(ctx context.Context, wid storiface.WorkerID, worker *WorkerHandle) error {
sh.workersLk.Lock()
_, exist := sh.workers[wid]
_, exist := sh.Workers[wid]
if exist {
log.Warnw("duplicated worker added", "id", wid)
@ -57,7 +57,7 @@ func (sh *scheduler) runWorker(ctx context.Context, wid storiface.WorkerID, work
return nil
}
sh.workers[wid] = worker
sh.Workers[wid] = worker
sh.workersLk.Unlock()
sw := &schedWorker{
@ -67,7 +67,7 @@ func (sh *scheduler) runWorker(ctx context.Context, wid storiface.WorkerID, work
wid: wid,
heartbeatTimer: time.NewTicker(stores.HeartbeatInterval),
scheduledWindows: make(chan *schedWindow, SchedWindows),
scheduledWindows: make(chan *SchedWindow, SchedWindows),
taskDone: make(chan struct{}, 1),
windowsRequested: 0,
@ -94,7 +94,7 @@ func (sw *schedWorker) handleWorker() {
}
sched.workersLk.Lock()
delete(sched.workers, sw.wid)
delete(sched.Workers, sw.wid)
sched.workersLk.Unlock()
}()
@ -103,7 +103,7 @@ func (sw *schedWorker) handleWorker() {
for {
{
sched.workersLk.Lock()
enabled := worker.enabled
enabled := worker.Enabled
sched.workersLk.Unlock()
// ask for more windows if we need them (non-blocking)
@ -124,8 +124,8 @@ func (sw *schedWorker) handleWorker() {
// session looks good
{
sched.workersLk.Lock()
enabled := worker.enabled
worker.enabled = true
enabled := worker.Enabled
worker.Enabled = true
sched.workersLk.Unlock()
if !enabled {
@ -248,9 +248,9 @@ func (sw *schedWorker) checkSession(ctx context.Context) bool {
func (sw *schedWorker) requestWindows() bool {
for ; sw.windowsRequested < SchedWindows; sw.windowsRequested++ {
select {
case sw.sched.windowRequests <- &schedWindowRequest{
worker: sw.wid,
done: sw.scheduledWindows,
case sw.sched.windowRequests <- &SchedWindowRequest{
Worker: sw.wid,
Done: sw.scheduledWindows,
}:
case <-sw.sched.closing:
return false
@ -290,21 +290,21 @@ func (sw *schedWorker) workerCompactWindows() {
lower := worker.activeWindows[wi]
var moved []int
for ti, todo := range window.todo {
needRes := worker.info.Resources.ResourceSpec(todo.sector.ProofType, todo.taskType)
if !lower.allocated.canHandleRequest(todo.SealTask(), needRes, sw.wid, "compactWindows", worker.info) {
for ti, todo := range window.Todo {
needRes := worker.Info.Resources.ResourceSpec(todo.Sector.ProofType, todo.TaskType)
if !lower.Allocated.CanHandleRequest(todo.SealTask(), needRes, sw.wid, "compactWindows", worker.Info) {
continue
}
moved = append(moved, ti)
lower.todo = append(lower.todo, todo)
lower.allocated.add(todo.SealTask(), worker.info.Resources, needRes)
window.allocated.free(todo.SealTask(), worker.info.Resources, needRes)
lower.Todo = append(lower.Todo, todo)
lower.Allocated.Add(todo.SealTask(), worker.Info.Resources, needRes)
window.Allocated.Free(todo.SealTask(), worker.Info.Resources, needRes)
}
if len(moved) > 0 {
newTodo := make([]*workerRequest, 0, len(window.todo)-len(moved))
for i, t := range window.todo {
newTodo := make([]*WorkerRequest, 0, len(window.Todo)-len(moved))
for i, t := range window.Todo {
if len(moved) > 0 && moved[0] == i {
moved = moved[1:]
continue
@ -312,16 +312,16 @@ func (sw *schedWorker) workerCompactWindows() {
newTodo = append(newTodo, t)
}
window.todo = newTodo
window.Todo = newTodo
}
}
}
var compacted int
var newWindows []*schedWindow
var newWindows []*SchedWindow
for _, window := range worker.activeWindows {
if len(window.todo) == 0 {
if len(window.Todo) == 0 {
compacted++
continue
}
@ -347,13 +347,13 @@ assignLoop:
firstWindow := worker.activeWindows[0]
// process tasks within a window, preferring tasks at lower indexes
for len(firstWindow.todo) > 0 {
for len(firstWindow.Todo) > 0 {
tidx := -1
worker.lk.Lock()
for t, todo := range firstWindow.todo {
needRes := worker.info.Resources.ResourceSpec(todo.sector.ProofType, todo.taskType)
if worker.preparing.canHandleRequest(todo.SealTask(), needRes, sw.wid, "startPreparing", worker.info) {
for t, todo := range firstWindow.Todo {
needRes := worker.Info.Resources.ResourceSpec(todo.Sector.ProofType, todo.TaskType)
if worker.preparing.CanHandleRequest(todo.SealTask(), needRes, sw.wid, "startPreparing", worker.Info) {
tidx = t
break
}
@ -364,9 +364,9 @@ assignLoop:
break assignLoop
}
todo := firstWindow.todo[tidx]
todo := firstWindow.Todo[tidx]
log.Debugf("assign worker sector %d", todo.sector.ID.Number)
log.Debugf("assign worker sector %d", todo.Sector.ID.Number)
err := sw.startProcessingTask(todo)
if err != nil {
@ -375,9 +375,9 @@ assignLoop:
}
// Note: we're not freeing window.allocated resources here very much on purpose
copy(firstWindow.todo[tidx:], firstWindow.todo[tidx+1:])
firstWindow.todo[len(firstWindow.todo)-1] = nil
firstWindow.todo = firstWindow.todo[:len(firstWindow.todo)-1]
copy(firstWindow.Todo[tidx:], firstWindow.Todo[tidx+1:])
firstWindow.Todo[len(firstWindow.Todo)-1] = nil
firstWindow.Todo = firstWindow.Todo[:len(firstWindow.Todo)-1]
}
copy(worker.activeWindows, worker.activeWindows[1:])
@ -405,16 +405,16 @@ assignLoop:
firstWindow := worker.activeWindows[0]
// process tasks within a window, preferring tasks at lower indexes
for len(firstWindow.todo) > 0 {
for len(firstWindow.Todo) > 0 {
tidx := -1
for t, todo := range firstWindow.todo {
if todo.taskType != sealtasks.TTCommit1 && todo.taskType != sealtasks.TTCommit2 { // todo put in task
for t, todo := range firstWindow.Todo {
if todo.TaskType != sealtasks.TTCommit1 && todo.TaskType != sealtasks.TTCommit2 { // todo put in task
continue
}
needRes := worker.info.Resources.ResourceSpec(todo.sector.ProofType, todo.taskType)
if worker.active.canHandleRequest(todo.SealTask(), needRes, sw.wid, "startPreparing", worker.info) {
needRes := worker.Info.Resources.ResourceSpec(todo.Sector.ProofType, todo.TaskType)
if worker.active.CanHandleRequest(todo.SealTask(), needRes, sw.wid, "startPreparing", worker.Info) {
tidx = t
break
}
@ -424,9 +424,9 @@ assignLoop:
break assignLoop
}
todo := firstWindow.todo[tidx]
todo := firstWindow.Todo[tidx]
log.Debugf("assign worker sector %d (ready)", todo.sector.ID.Number)
log.Debugf("assign worker sector %d (ready)", todo.Sector.ID.Number)
err := sw.startProcessingReadyTask(todo)
if err != nil {
@ -435,9 +435,9 @@ assignLoop:
}
// Note: we're not freeing window.allocated resources here very much on purpose
copy(firstWindow.todo[tidx:], firstWindow.todo[tidx+1:])
firstWindow.todo[len(firstWindow.todo)-1] = nil
firstWindow.todo = firstWindow.todo[:len(firstWindow.todo)-1]
copy(firstWindow.Todo[tidx:], firstWindow.Todo[tidx+1:])
firstWindow.Todo[len(firstWindow.Todo)-1] = nil
firstWindow.Todo = firstWindow.Todo[:len(firstWindow.Todo)-1]
}
copy(worker.activeWindows, worker.activeWindows[1:])
@ -448,24 +448,24 @@ assignLoop:
}
}
func (sw *schedWorker) startProcessingTask(req *workerRequest) error {
func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error {
w, sh := sw.worker, sw.sched
needRes := w.info.Resources.ResourceSpec(req.sector.ProofType, req.taskType)
needRes := w.Info.Resources.ResourceSpec(req.Sector.ProofType, req.TaskType)
w.lk.Lock()
w.preparing.add(req.SealTask(), w.info.Resources, needRes)
w.preparing.Add(req.SealTask(), w.Info.Resources, needRes)
w.lk.Unlock()
go func() {
// first run the prepare step (e.g. fetching sector data from other worker)
tw := sh.workTracker.worker(sw.wid, w.info, w.workerRpc)
tw := sh.workTracker.worker(sw.wid, w.Info, w.workerRpc)
tw.start()
err := req.prepare(req.ctx, tw)
err := req.prepare(req.Ctx, tw)
w.lk.Lock()
if err != nil {
w.preparing.free(req.SealTask(), w.info.Resources, needRes)
w.preparing.Free(req.SealTask(), w.Info.Resources, needRes)
w.lk.Unlock()
select {
@ -477,7 +477,7 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error {
select {
case req.ret <- workerResponse{err: err}:
case <-req.ctx.Done():
case <-req.Ctx.Done():
log.Warnf("request got cancelled before we could respond (prepare error: %+v)", err)
case <-sh.closing:
log.Warnf("scheduler closed while sending response (prepare error: %+v)", err)
@ -485,17 +485,17 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error {
return
}
tw = sh.workTracker.worker(sw.wid, w.info, w.workerRpc)
tw = sh.workTracker.worker(sw.wid, w.Info, w.workerRpc)
// start tracking work first early in case we need to wait for resources
werr := make(chan error, 1)
go func() {
werr <- req.work(req.ctx, tw)
werr <- req.work(req.Ctx, tw)
}()
// wait (if needed) for resources in the 'active' window
err = w.active.withResources(sw.wid, w.info, req.SealTask(), needRes, &w.lk, func() error {
w.preparing.free(req.SealTask(), w.info.Resources, needRes)
err = w.active.withResources(sw.wid, w.Info, req.SealTask(), needRes, &w.lk, func() error {
w.preparing.Free(req.SealTask(), w.Info.Resources, needRes)
w.lk.Unlock()
defer w.lk.Lock() // we MUST return locked from this function
@ -511,7 +511,7 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error {
select {
case req.ret <- workerResponse{err: err}:
case <-req.ctx.Done():
case <-req.Ctx.Done():
log.Warnf("request got cancelled before we could respond")
case <-sh.closing:
log.Warnf("scheduler closed while sending response")
@ -531,22 +531,22 @@ func (sw *schedWorker) startProcessingTask(req *workerRequest) error {
return nil
}
func (sw *schedWorker) startProcessingReadyTask(req *workerRequest) error {
func (sw *schedWorker) startProcessingReadyTask(req *WorkerRequest) error {
w, sh := sw.worker, sw.sched
needRes := w.info.Resources.ResourceSpec(req.sector.ProofType, req.taskType)
needRes := w.Info.Resources.ResourceSpec(req.Sector.ProofType, req.TaskType)
w.active.add(req.SealTask(), w.info.Resources, needRes)
w.active.Add(req.SealTask(), w.Info.Resources, needRes)
go func() {
// Do the work!
tw := sh.workTracker.worker(sw.wid, w.info, w.workerRpc)
tw := sh.workTracker.worker(sw.wid, w.Info, w.workerRpc)
tw.start()
err := req.work(req.ctx, tw)
err := req.work(req.Ctx, tw)
select {
case req.ret <- workerResponse{err: err}:
case <-req.ctx.Done():
case <-req.Ctx.Done():
log.Warnf("request got cancelled before we could respond")
case <-sh.closing:
log.Warnf("scheduler closed while sending response")
@ -554,7 +554,7 @@ func (sw *schedWorker) startProcessingReadyTask(req *workerRequest) error {
w.lk.Lock()
w.active.free(req.SealTask(), w.info.Resources, needRes)
w.active.Free(req.SealTask(), w.Info.Resources, needRes)
select {
case sw.taskDone <- struct{}{}:
@ -574,7 +574,7 @@ func (sw *schedWorker) startProcessingReadyTask(req *workerRequest) error {
return nil
}
func (sh *scheduler) workerCleanup(wid storiface.WorkerID, w *workerHandle) {
func (sh *Scheduler) workerCleanup(wid storiface.WorkerID, w *WorkerHandle) {
select {
case <-w.closingMgr:
default:
@ -592,13 +592,13 @@ func (sh *scheduler) workerCleanup(wid storiface.WorkerID, w *workerHandle) {
if !w.cleanupStarted {
w.cleanupStarted = true
newWindows := make([]*schedWindowRequest, 0, len(sh.openWindows))
for _, window := range sh.openWindows {
if window.worker != wid {
newWindows := make([]*SchedWindowRequest, 0, len(sh.OpenWindows))
for _, window := range sh.OpenWindows {
if window.Worker != wid {
newWindows = append(newWindows, window)
}
}
sh.openWindows = newWindows
sh.OpenWindows = newWindows
log.Debugf("worker %s dropped", wid)
}

View File

@ -26,18 +26,18 @@ func newAllocSelector(index stores.SectorIndex, alloc storiface.SectorFileType,
}
}
func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *workerHandle) (bool, error) {
func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
tasks, err := whnd.TaskTypes(ctx)
if err != nil {
return false, xerrors.Errorf("getting supported worker task types: %w", err)
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
}
if _, supported := tasks[task]; !supported {
return false, nil
return false, false, nil
}
paths, err := whnd.workerRpc.Paths(ctx)
if err != nil {
return false, xerrors.Errorf("getting worker paths: %w", err)
return false, false, xerrors.Errorf("getting worker paths: %w", err)
}
have := map[storiface.ID]struct{}{}
@ -47,25 +47,25 @@ func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi
ssize, err := spt.SectorSize()
if err != nil {
return false, xerrors.Errorf("getting sector size: %w", err)
return false, false, xerrors.Errorf("getting sector size: %w", err)
}
best, err := s.index.StorageBestAlloc(ctx, s.alloc, ssize, s.ptype)
if err != nil {
return false, xerrors.Errorf("finding best alloc storage: %w", err)
return false, false, xerrors.Errorf("finding best alloc storage: %w", err)
}
for _, info := range best {
if _, ok := have[info.ID]; ok {
return true, nil
return true, false, nil
}
}
return false, nil
return false, false, nil
}
func (s *allocSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) {
return a.utilization() < b.utilization(), nil
func (s *allocSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
return a.Utilization() < b.Utilization(), nil
}
var _ WorkerSelector = &allocSelector{}

View File

@ -28,18 +28,18 @@ func newExistingSelector(index stores.SectorIndex, sector abi.SectorID, alloc st
}
}
func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *workerHandle) (bool, error) {
func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
tasks, err := whnd.TaskTypes(ctx)
if err != nil {
return false, xerrors.Errorf("getting supported worker task types: %w", err)
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
}
if _, supported := tasks[task]; !supported {
return false, nil
return false, false, nil
}
paths, err := whnd.workerRpc.Paths(ctx)
if err != nil {
return false, xerrors.Errorf("getting worker paths: %w", err)
return false, false, xerrors.Errorf("getting worker paths: %w", err)
}
have := map[storiface.ID]struct{}{}
@ -49,25 +49,25 @@ func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt
ssize, err := spt.SectorSize()
if err != nil {
return false, xerrors.Errorf("getting sector size: %w", err)
return false, false, xerrors.Errorf("getting sector size: %w", err)
}
best, err := s.index.StorageFindSector(ctx, s.sector, s.alloc, ssize, s.allowFetch)
if err != nil {
return false, xerrors.Errorf("finding best storage: %w", err)
return false, false, xerrors.Errorf("finding best storage: %w", err)
}
for _, info := range best {
if _, ok := have[info.ID]; ok {
return true, nil
return true, false, nil
}
}
return false, nil
return false, false, nil
}
func (s *existingSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) {
return a.utilization() < b.utilization(), nil
func (s *existingSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
return a.Utilization() < b.Utilization(), nil
}
var _ WorkerSelector = &existingSelector{}

98
extern/sector-storage/selector_move.go vendored Normal file
View File

@ -0,0 +1,98 @@
package sectorstorage
import (
"context"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
)
type moveSelector struct {
index stores.SectorIndex
sector abi.SectorID
alloc storiface.SectorFileType
destPtype storiface.PathType
allowRemote bool
}
func newMoveSelector(index stores.SectorIndex, sector abi.SectorID, alloc storiface.SectorFileType, destPtype storiface.PathType, allowRemote bool) *moveSelector {
return &moveSelector{
index: index,
sector: sector,
alloc: alloc,
destPtype: destPtype,
allowRemote: allowRemote,
}
}
func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
tasks, err := whnd.TaskTypes(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
}
if _, supported := tasks[task]; !supported {
return false, false, nil
}
paths, err := whnd.workerRpc.Paths(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting worker paths: %w", err)
}
workerPaths := map[storiface.ID]int{}
for _, path := range paths {
workerPaths[path.ID] = 0
}
ssize, err := spt.SectorSize()
if err != nil {
return false, false, xerrors.Errorf("getting sector size: %w", err)
}
// note: allowFetch is always false here, because we want to find workers with
// the sector available locally
preferred, err := s.index.StorageFindSector(ctx, s.sector, s.alloc, ssize, false)
if err != nil {
return false, false, xerrors.Errorf("finding preferred storage: %w", err)
}
for _, info := range preferred {
if _, ok := workerPaths[info.ID]; ok {
workerPaths[info.ID]++
}
}
best, err := s.index.StorageBestAlloc(ctx, s.alloc, ssize, s.destPtype)
if err != nil {
return false, false, xerrors.Errorf("finding best dest storage: %w", err)
}
var ok bool
for _, info := range best {
if n, has := workerPaths[info.ID]; has {
ok = true
// if the worker has a local path with the sector already in it
// prefer that worker; This usually meant that the move operation is
// either a no-op because the sector is already in the correct path,
// or the move a local move.
if n > 0 {
return true, true, nil
}
}
}
return ok && s.allowRemote, false, nil
}
func (s *moveSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
return a.Utilization() < b.Utilization(), nil
}
var _ WorkerSelector = &moveSelector{}

View File

@ -19,17 +19,17 @@ func newTaskSelector() *taskSelector {
return &taskSelector{}
}
func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *workerHandle) (bool, error) {
func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
tasks, err := whnd.TaskTypes(ctx)
if err != nil {
return false, xerrors.Errorf("getting supported worker task types: %w", err)
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
}
_, supported := tasks[task]
return supported, nil
return supported, false, nil
}
func (s *taskSelector) Cmp(ctx context.Context, _ sealtasks.TaskType, a, b *workerHandle) (bool, error) {
func (s *taskSelector) Cmp(ctx context.Context, _ sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
atasks, err := a.TaskTypes(ctx)
if err != nil {
return false, xerrors.Errorf("getting supported worker task types: %w", err)
@ -43,7 +43,7 @@ func (s *taskSelector) Cmp(ctx context.Context, _ sealtasks.TaskType, a, b *work
return len(atasks) < len(btasks), nil // prefer workers which can do less
}
return a.utilization() < b.utilization(), nil
return a.Utilization() < b.Utilization(), nil
}
var _ WorkerSelector = &taskSelector{}

View File

@ -15,7 +15,7 @@ func (m *Manager) WorkerStats(ctx context.Context) map[uuid.UUID]storiface.Worke
out := map[uuid.UUID]storiface.WorkerStats{}
cb := func(ctx context.Context, id storiface.WorkerID, handle *workerHandle) {
cb := func(ctx context.Context, id storiface.WorkerID, handle *WorkerHandle) {
handle.lk.Lock()
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
@ -32,9 +32,9 @@ func (m *Manager) WorkerStats(ctx context.Context) map[uuid.UUID]storiface.Worke
}
out[uuid.UUID(id)] = storiface.WorkerStats{
Info: handle.info,
Info: handle.Info,
Tasks: taskList,
Enabled: handle.enabled,
Enabled: handle.Enabled,
MemUsedMin: handle.active.memUsedMin,
MemUsedMax: handle.active.memUsedMax,
GpuUsed: handle.active.gpuUsed,
@ -50,7 +50,7 @@ func (m *Manager) WorkerStats(ctx context.Context) map[uuid.UUID]storiface.Worke
handle.lk.Unlock()
}
for id, handle := range m.sched.workers {
for id, handle := range m.sched.Workers {
cb(ctx, id, handle)
}
@ -79,14 +79,14 @@ func (m *Manager) WorkerJobs() map[uuid.UUID][]storiface.WorkerJob {
m.sched.workersLk.RLock()
for id, handle := range m.sched.workers {
for id, handle := range m.sched.Workers {
handle.wndLk.Lock()
for wi, window := range handle.activeWindows {
for _, request := range window.todo {
for _, request := range window.Todo {
out[uuid.UUID(id)] = append(out[uuid.UUID(id)], storiface.WorkerJob{
ID: storiface.UndefCall,
Sector: request.sector.ID,
Task: request.taskType,
Sector: request.Sector.ID,
Task: request.TaskType,
RunWait: wi + 2,
Start: request.start,
})

View File

@ -570,6 +570,8 @@ func (n *Ensemble) Start() *Ensemble {
}
noLocal := m.options.minerNoLocalSealing
assigner := m.options.minerAssigner
disallowRemoteFinalize := m.options.disallowRemoteFinalize
var mineBlock = make(chan lotusminer.MineReq)
opts := []node.Option{
@ -595,6 +597,8 @@ func (n *Ensemble) Start() *Ensemble {
scfg.Storage.AllowCommit = false
}
scfg.Storage.Assigner = assigner
scfg.Storage.DisallowRemoteFinalize = disallowRemoteFinalize
scfg.Storage.ResourceFiltering = sectorstorage.ResourceFilteringDisabled
return scfg.StorageManager()
}),

View File

@ -34,13 +34,15 @@ type nodeOpts struct {
ownerKey *wallet.Key
extraNodeOpts []node.Option
subsystems MinerSubsystem
mainMiner *TestMiner
disableLibp2p bool
optBuilders []OptBuilder
sectorSize abi.SectorSize
maxStagingDealsBytes int64
minerNoLocalSealing bool // use worker
subsystems MinerSubsystem
mainMiner *TestMiner
disableLibp2p bool
optBuilders []OptBuilder
sectorSize abi.SectorSize
maxStagingDealsBytes int64
minerNoLocalSealing bool // use worker
minerAssigner string
disallowRemoteFinalize bool
workerTasks []sealtasks.TaskType
workerStorageOpt func(stores.Store) stores.Store
@ -97,6 +99,20 @@ func WithNoLocalSealing(nope bool) NodeOpt {
}
}
func WithAssigner(a string) NodeOpt {
return func(opts *nodeOpts) error {
opts.minerAssigner = a
return nil
}
}
func WithDisallowRemoteFinalize(d bool) NodeOpt {
return func(opts *nodeOpts) error {
opts.disallowRemoteFinalize = d
return nil
}
}
func DisableLibp2p() NodeOpt {
return func(opts *nodeOpts) error {
opts.disableLibp2p = true

View File

@ -41,6 +41,38 @@ func TestWorkerPledge(t *testing.T) {
miner.PledgeSectors(ctx, 1, 0, nil)
}
func TestWorkerPledgeSpread(t *testing.T) {
ctx := context.Background()
_, miner, worker, ens := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(),
kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2, sealtasks.TTUnseal}),
kit.WithAssigner("spread"),
) // no mock proofs
ens.InterconnectAll().BeginMining(50 * time.Millisecond)
e, err := worker.Enabled(ctx)
require.NoError(t, err)
require.True(t, e)
miner.PledgeSectors(ctx, 1, 0, nil)
}
func TestWorkerPledgeLocalFin(t *testing.T) {
ctx := context.Background()
_, miner, worker, ens := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(),
kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2, sealtasks.TTUnseal}),
kit.WithDisallowRemoteFinalize(true),
) // no mock proofs
ens.InterconnectAll().BeginMining(50 * time.Millisecond)
e, err := worker.Enabled(ctx)
require.NoError(t, err)
require.True(t, e)
miner.PledgeSectors(ctx, 1, 0, nil)
}
func TestWorkerDataCid(t *testing.T) {
ctx := context.Background()
_, miner, worker, _ := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.WithNoLocalSealing(true),

View File

@ -159,6 +159,8 @@ func DefaultStorageMiner() *StorageMiner {
// it's the ratio between 10gbit / 1gbit
ParallelFetchLimit: 10,
Assigner: "utilization",
// By default use the hardware resource filtering strategy.
ResourceFiltering: sectorstorage.ResourceFilteringHardware,
},

View File

@ -756,6 +756,30 @@ This parameter is ONLY applicable if the retrieval pricing policy strategy has b
Comment: ``,
},
{
Name: "Assigner",
Type: "string",
Comment: `Assigner specifies the worker assigner to use when scheduling tasks.
"utilization" (default) - assign tasks to workers with lowest utilization.
"spread" - assign tasks to as many distinct workers as possible.`,
},
{
Name: "DisallowRemoteFinalize",
Type: "bool",
Comment: `DisallowRemoteFinalize when set to true will force all Finalize tasks to
run on workers with local access to both long-term storage and the sealing
path containing the sector.
--
WARNING: Only set this if all workers have access to long-term storage
paths. If this flag is enabled, and there are workers without long-term
storage access, sectors will not be moved from them, and Finalize tasks
will appear to be stuck.
--
If you see stuck Finalize tasks after enabling this setting, check
'lotus-miner sealing sched-diag' and 'lotus-miner storage find [sector num]'`,
},
{
Name: "ResourceFiltering",
Type: "sectorstorage.ResourceFilteringStrategy",

View File

@ -63,6 +63,9 @@ func (c *StorageMiner) StorageManager() sectorstorage.Config {
AllowProveReplicaUpdate2: c.Storage.AllowProveReplicaUpdate2,
AllowRegenSectorKey: c.Storage.AllowRegenSectorKey,
ResourceFiltering: c.Storage.ResourceFiltering,
DisallowRemoteFinalize: c.Storage.DisallowRemoteFinalize,
Assigner: c.Storage.Assigner,
ParallelCheckLimit: c.Proving.ParallelCheckLimit,
}

View File

@ -330,6 +330,24 @@ type SealerConfig struct {
AllowProveReplicaUpdate2 bool
AllowRegenSectorKey bool
// Assigner specifies the worker assigner to use when scheduling tasks.
// "utilization" (default) - assign tasks to workers with lowest utilization.
// "spread" - assign tasks to as many distinct workers as possible.
Assigner string
// DisallowRemoteFinalize when set to true will force all Finalize tasks to
// run on workers with local access to both long-term storage and the sealing
// path containing the sector.
// --
// WARNING: Only set this if all workers have access to long-term storage
// paths. If this flag is enabled, and there are workers without long-term
// storage access, sectors will not be moved from them, and Finalize tasks
// will appear to be stuck.
// --
// If you see stuck Finalize tasks after enabling this setting, check
// 'lotus-miner sealing sched-diag' and 'lotus-miner storage find [sector num]'
DisallowRemoteFinalize bool
// ResourceFiltering instructs the system which resource filtering strategy
// to use when evaluating tasks against this worker. An empty value defaults
// to "hardware".