feat: sched: Cache worker calls

This commit is contained in:
Łukasz Magiera 2022-11-28 17:21:56 +01:00
parent d82b2a5804
commit 5881edb75e
9 changed files with 149 additions and 42 deletions

50
lib/lazy/getonce.go Normal file
View File

@ -0,0 +1,50 @@
package lazy
import (
"context"
"sync"
)
type Lazy[T any] struct {
Get func() (T, error)
once sync.Once
val T
err error
}
func MakeLazy[T any](get func() (T, error)) *Lazy[T] {
return &Lazy[T]{
Get: get,
}
}
func (l *Lazy[T]) Val() (T, error) {
l.once.Do(func() {
l.val, l.err = l.Get()
})
return l.val, l.err
}
type LazyCtx[T any] struct {
Get func(context.Context) (T, error)
once sync.Once
val T
err error
}
func MakeLazyCtx[T any](get func(ctx context.Context) (T, error)) *LazyCtx[T] {
return &LazyCtx[T]{
Get: get,
}
}
func (l *LazyCtx[T]) Val(ctx context.Context) (T, error) {
l.once.Do(func() {
l.val, l.err = l.Get(ctx)
})
return l.val, l.err
}

View File

@ -43,12 +43,18 @@ const mib = 1 << 20
type WorkerAction func(ctx context.Context, w Worker) error
type SchedWorker interface {
TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error)
Paths(context.Context) ([]storiface.StoragePath, error)
Utilization() float64
}
type WorkerSelector interface {
// Ok is true if worker is acceptable for performing a task.
// If any worker is preferred for a task, other workers won't be considered for that task.
Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (ok, preferred bool, err error)
Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a SchedWorker) (ok, preferred bool, err error)
Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) // true if a is preferred over b
Cmp(ctx context.Context, task sealtasks.TaskType, a, b SchedWorker) (bool, error) // true if a is preferred over b
}
type Scheduler struct {
@ -81,11 +87,7 @@ type Scheduler struct {
type WorkerHandle struct {
workerRpc Worker
tasksCache map[sealtasks.TaskType]struct{}
tasksUpdate time.Time
tasksLk sync.Mutex
Info storiface.WorkerInfo
preparing *ActiveResources // use with WorkerHandle.lk

View File

@ -2,6 +2,7 @@ package sealer
import (
"context"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
"math/rand"
"sort"
"sync"
@ -37,6 +38,11 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
*/
cachedWorkers := &schedWorkerCache{
Workers: sh.Workers,
cached: map[storiface.WorkerID]*cachedSchedWorker{},
}
windowsLen := len(sh.OpenWindows)
queueLen := sh.SchedQueue.Len()
@ -81,7 +87,7 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
var havePreferred bool
for wnd, windowRequest := range sh.OpenWindows {
worker, ok := sh.Workers[windowRequest.Worker]
worker, ok := cachedWorkers.Get(windowRequest.Worker)
if !ok {
log.Errorf("worker referenced by windowRequest not found (worker: %s)", windowRequest.Worker)
// TODO: How to move forward here?
@ -143,8 +149,8 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
return acceptableWindows[sqi][i] < acceptableWindows[sqi][j] // nolint:scopelint
}
wi := sh.Workers[wii]
wj := sh.Workers[wji]
wi, _ := cachedWorkers.Get(wii)
wj, _ := cachedWorkers.Get(wji)
rpcCtx, cancel := context.WithTimeout(task.Ctx, SelectorTimeout)
defer cancel()

View File

@ -1,12 +1,9 @@
package sealer
import (
"context"
"sync"
"time"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
"sync"
)
type ActiveResources struct {
@ -185,20 +182,3 @@ func (wh *WorkerHandle) Utilization() float64 {
return u
}
var tasksCacheTimeout = 30 * time.Second
func (wh *WorkerHandle) TaskTypes(ctx context.Context) (t map[sealtasks.TaskType]struct{}, err error) {
wh.tasksLk.Lock()
defer wh.tasksLk.Unlock()
if wh.tasksCache == nil || time.Now().Sub(wh.tasksUpdate) > tasksCacheTimeout {
wh.tasksCache, err = wh.workerRpc.TaskTypes(ctx)
if err != nil {
return nil, err
}
wh.tasksUpdate = time.Now()
}
return wh.tasksCache, nil
}

View File

@ -0,0 +1,69 @@
package sealer
import (
"context"
"sync"
"github.com/filecoin-project/lotus/lib/lazy"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
// schedWorkerCache caches scheduling-related calls to workers
type schedWorkerCache struct {
Workers map[storiface.WorkerID]*WorkerHandle
lk sync.Mutex
cached map[storiface.WorkerID]*cachedSchedWorker
}
func (s *schedWorkerCache) Get(id storiface.WorkerID) (*cachedSchedWorker, bool) {
s.lk.Lock()
defer s.lk.Unlock()
if _, found := s.cached[id]; !found {
if _, found := s.Workers[id]; !found {
return nil, false
}
whnd := s.Workers[id]
s.cached[id] = &cachedSchedWorker{
tt: lazy.MakeLazyCtx(whnd.workerRpc.TaskTypes),
paths: lazy.MakeLazyCtx(whnd.workerRpc.Paths),
utilization: lazy.MakeLazy(func() (float64, error) {
return whnd.Utilization(), nil
}),
Enabled: whnd.Enabled,
Info: whnd.Info,
}
}
return s.cached[id], true
}
type cachedSchedWorker struct {
tt *lazy.LazyCtx[map[sealtasks.TaskType]struct{}]
paths *lazy.LazyCtx[[]storiface.StoragePath]
utilization *lazy.Lazy[float64]
Enabled bool
Info storiface.WorkerInfo
}
func (c *cachedSchedWorker) TaskTypes(ctx context.Context) (map[sealtasks.TaskType]struct{}, error) {
return c.tt.Val(ctx)
}
func (c *cachedSchedWorker) Paths(ctx context.Context) ([]storiface.StoragePath, error) {
return c.paths.Get(ctx)
}
func (c *cachedSchedWorker) Utilization() float64 {
// can't error
v, _ := c.utilization.Val()
return v
}
var _ SchedWorker = &cachedSchedWorker{}

View File

@ -26,7 +26,7 @@ func newAllocSelector(index paths.SectorIndex, alloc storiface.SectorFileType, p
}
}
func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd SchedWorker) (bool, bool, error) {
tasks, err := whnd.TaskTypes(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
@ -35,7 +35,7 @@ func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi
return false, false, nil
}
paths, err := whnd.workerRpc.Paths(ctx)
paths, err := whnd.Paths(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting worker paths: %w", err)
}
@ -71,7 +71,7 @@ func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi
return requested == storiface.FTNone, false, nil
}
func (s *allocSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
func (s *allocSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b SchedWorker) (bool, error) {
return a.Utilization() < b.Utilization(), nil
}

View File

@ -28,7 +28,7 @@ func newExistingSelector(index paths.SectorIndex, sector abi.SectorID, alloc sto
}
}
func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd SchedWorker) (bool, bool, error) {
tasks, err := whnd.TaskTypes(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
@ -37,7 +37,7 @@ func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt
return false, false, nil
}
paths, err := whnd.workerRpc.Paths(ctx)
paths, err := whnd.Paths(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting worker paths: %w", err)
}
@ -78,7 +78,7 @@ func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt
return requested == storiface.FTNone, false, nil
}
func (s *existingSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
func (s *existingSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b SchedWorker) (bool, error) {
return a.Utilization() < b.Utilization(), nil
}

View File

@ -30,7 +30,7 @@ func newMoveSelector(index paths.SectorIndex, sector abi.SectorID, alloc storifa
}
}
func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd SchedWorker) (bool, bool, error) {
tasks, err := whnd.TaskTypes(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
@ -39,7 +39,7 @@ func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.
return false, false, nil
}
paths, err := whnd.workerRpc.Paths(ctx)
paths, err := whnd.Paths(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting worker paths: %w", err)
}
@ -99,7 +99,7 @@ func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.
return (ok && s.allowRemote) || pref, pref, nil
}
func (s *moveSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
func (s *moveSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b SchedWorker) (bool, error) {
return a.Utilization() < b.Utilization(), nil
}

View File

@ -19,7 +19,7 @@ func newTaskSelector() *taskSelector {
return &taskSelector{}
}
func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd SchedWorker) (bool, bool, error) {
tasks, err := whnd.TaskTypes(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
@ -29,7 +29,7 @@ func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.
return supported, false, nil
}
func (s *taskSelector) Cmp(ctx context.Context, _ sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
func (s *taskSelector) Cmp(ctx context.Context, _ sealtasks.TaskType, a, b SchedWorker) (bool, error) {
atasks, err := a.TaskTypes(ctx)
if err != nil {
return false, xerrors.Errorf("getting supported worker task types: %w", err)