Merge pull request #9737 from filecoin-project/feat/cache-sched-worker-calls

feat: sched: Cache worker calls
This commit is contained in:
Łukasz Magiera 2022-11-29 12:58:46 +01:00 committed by GitHub
commit a0422f6467
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 175 additions and 44 deletions

50
lib/lazy/getonce.go Normal file
View File

@ -0,0 +1,50 @@
package lazy
import (
"context"
"sync"
)
type Lazy[T any] struct {
Get func() (T, error)
once sync.Once
val T
err error
}
func MakeLazy[T any](get func() (T, error)) *Lazy[T] {
return &Lazy[T]{
Get: get,
}
}
func (l *Lazy[T]) Val() (T, error) {
l.once.Do(func() {
l.val, l.err = l.Get()
})
return l.val, l.err
}
type LazyCtx[T any] struct {
Get func(context.Context) (T, error)
once sync.Once
val T
err error
}
func MakeLazyCtx[T any](get func(ctx context.Context) (T, error)) *LazyCtx[T] {
return &LazyCtx[T]{
Get: get,
}
}
func (l *LazyCtx[T]) Val(ctx context.Context) (T, error) {
l.once.Do(func() {
l.val, l.err = l.Get(ctx)
})
return l.val, l.err
}

View File

@ -43,12 +43,18 @@ const mib = 1 << 20
type WorkerAction func(ctx context.Context, w Worker) error
type SchedWorker interface {
TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error)
Paths(context.Context) ([]storiface.StoragePath, error)
Utilization() float64
}
type WorkerSelector interface {
// Ok is true if worker is acceptable for performing a task.
// If any worker is preferred for a task, other workers won't be considered for that task.
Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (ok, preferred bool, err error)
Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a SchedWorker) (ok, preferred bool, err error)
Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) // true if a is preferred over b
Cmp(ctx context.Context, task sealtasks.TaskType, a, b SchedWorker) (bool, error) // true if a is preferred over b
}
type Scheduler struct {
@ -82,10 +88,6 @@ type Scheduler struct {
type WorkerHandle struct {
workerRpc Worker
tasksCache map[sealtasks.TaskType]struct{}
tasksUpdate time.Time
tasksLk sync.Mutex
Info storiface.WorkerInfo
preparing *ActiveResources // use with WorkerHandle.lk

View File

@ -9,6 +9,7 @@ import (
"go.opencensus.io/stats"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
type WindowSelector func(sh *Scheduler, queueLen int, acceptableWindows [][]int, windows []SchedWindow) int
@ -37,6 +38,11 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
*/
cachedWorkers := &schedWorkerCache{
Workers: sh.Workers,
cached: map[storiface.WorkerID]*cachedSchedWorker{},
}
windowsLen := len(sh.OpenWindows)
queueLen := sh.SchedQueue.Len()
@ -61,6 +67,7 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
partDone := metrics.Timer(sh.mctx, metrics.SchedAssignerCandidatesDuration)
defer func() {
// call latest value of partDone in case we error out somewhere
partDone()
}()
@ -81,7 +88,7 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
var havePreferred bool
for wnd, windowRequest := range sh.OpenWindows {
worker, ok := sh.Workers[windowRequest.Worker]
worker, ok := cachedWorkers.Get(windowRequest.Worker)
if !ok {
log.Errorf("worker referenced by windowRequest not found (worker: %s)", windowRequest.Worker)
// TODO: How to move forward here?
@ -143,8 +150,8 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
return acceptableWindows[sqi][i] < acceptableWindows[sqi][j] // nolint:scopelint
}
wi := sh.Workers[wii]
wj := sh.Workers[wji]
wi, _ := cachedWorkers.Get(wii)
wj, _ := cachedWorkers.Get(wji)
rpcCtx, cancel := context.WithTimeout(task.Ctx, SelectorTimeout)
defer cancel()

View File

@ -1,9 +1,7 @@
package sealer
import (
"context"
"sync"
"time"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
@ -185,20 +183,3 @@ func (wh *WorkerHandle) Utilization() float64 {
return u
}
var tasksCacheTimeout = 30 * time.Second
func (wh *WorkerHandle) TaskTypes(ctx context.Context) (t map[sealtasks.TaskType]struct{}, err error) {
wh.tasksLk.Lock()
defer wh.tasksLk.Unlock()
if wh.tasksCache == nil || time.Now().Sub(wh.tasksUpdate) > tasksCacheTimeout {
wh.tasksCache, err = wh.workerRpc.TaskTypes(ctx)
if err != nil {
return nil, err
}
wh.tasksUpdate = time.Now()
}
return wh.tasksCache, nil
}

View File

@ -19,6 +19,7 @@ import (
"github.com/filecoin-project/go-state-types/abi"
prooftypes "github.com/filecoin-project/go-state-types/proof"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
@ -587,18 +588,28 @@ func TestSched(t *testing.T) {
type slowishSelector bool
func (s slowishSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (bool, bool, error) {
time.Sleep(200 * time.Microsecond)
func (s slowishSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a SchedWorker) (bool, bool, error) {
// note: we don't care about output here, just the time those calls take
// (selector Ok/Cmp is called in the scheduler)
_, _ = a.Paths(ctx)
_, _ = a.TaskTypes(ctx)
return bool(s), false, nil
}
func (s slowishSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
time.Sleep(100 * time.Microsecond)
func (s slowishSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b SchedWorker) (bool, error) {
// note: we don't care about output here, just the time those calls take
// (selector Ok/Cmp is called in the scheduler)
_, _ = a.Paths(ctx)
return true, nil
}
var _ WorkerSelector = slowishSelector(true)
type tw struct {
api.Worker
io.Closer
}
func BenchmarkTrySched(b *testing.B) {
logging.SetAllLoggers(logging.LevelInfo)
defer logging.SetAllLoggers(logging.LevelDebug)
@ -609,14 +620,25 @@ func BenchmarkTrySched(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
var whnd api.WorkerStruct
whnd.Internal.TaskTypes = func(p0 context.Context) (map[sealtasks.TaskType]struct{}, error) {
time.Sleep(100 * time.Microsecond)
return nil, nil
}
whnd.Internal.Paths = func(p0 context.Context) ([]storiface.StoragePath, error) {
time.Sleep(100 * time.Microsecond)
return nil, nil
}
sched, err := newScheduler(ctx, "")
require.NoError(b, err)
sched.Workers[storiface.WorkerID{}] = &WorkerHandle{
workerRpc: nil,
workerRpc: &tw{Worker: &whnd},
Info: storiface.WorkerInfo{
Hostname: "t",
Resources: decentWorkerResources,
},
Enabled: true,
preparing: NewActiveResources(),
active: NewActiveResources(),
}

View File

@ -0,0 +1,69 @@
package sealer
import (
"context"
"sync"
"github.com/filecoin-project/lotus/lib/lazy"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
// schedWorkerCache caches scheduling-related calls to workers
type schedWorkerCache struct {
Workers map[storiface.WorkerID]*WorkerHandle
lk sync.Mutex
cached map[storiface.WorkerID]*cachedSchedWorker
}
func (s *schedWorkerCache) Get(id storiface.WorkerID) (*cachedSchedWorker, bool) {
s.lk.Lock()
defer s.lk.Unlock()
if _, found := s.cached[id]; !found {
if _, found := s.Workers[id]; !found {
return nil, false
}
whnd := s.Workers[id]
s.cached[id] = &cachedSchedWorker{
tt: lazy.MakeLazyCtx(whnd.workerRpc.TaskTypes),
paths: lazy.MakeLazyCtx(whnd.workerRpc.Paths),
utilization: lazy.MakeLazy(func() (float64, error) {
return whnd.Utilization(), nil
}),
Enabled: whnd.Enabled,
Info: whnd.Info,
}
}
return s.cached[id], true
}
type cachedSchedWorker struct {
tt *lazy.LazyCtx[map[sealtasks.TaskType]struct{}]
paths *lazy.LazyCtx[[]storiface.StoragePath]
utilization *lazy.Lazy[float64]
Enabled bool
Info storiface.WorkerInfo
}
func (c *cachedSchedWorker) TaskTypes(ctx context.Context) (map[sealtasks.TaskType]struct{}, error) {
return c.tt.Val(ctx)
}
func (c *cachedSchedWorker) Paths(ctx context.Context) ([]storiface.StoragePath, error) {
return c.paths.Get(ctx)
}
func (c *cachedSchedWorker) Utilization() float64 {
// can't error
v, _ := c.utilization.Val()
return v
}
var _ SchedWorker = &cachedSchedWorker{}

View File

@ -26,7 +26,7 @@ func newAllocSelector(index paths.SectorIndex, alloc storiface.SectorFileType, p
}
}
func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd SchedWorker) (bool, bool, error) {
tasks, err := whnd.TaskTypes(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
@ -35,7 +35,7 @@ func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi
return false, false, nil
}
paths, err := whnd.workerRpc.Paths(ctx)
paths, err := whnd.Paths(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting worker paths: %w", err)
}
@ -71,7 +71,7 @@ func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi
return requested == storiface.FTNone, false, nil
}
func (s *allocSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
func (s *allocSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b SchedWorker) (bool, error) {
return a.Utilization() < b.Utilization(), nil
}

View File

@ -28,7 +28,7 @@ func newExistingSelector(index paths.SectorIndex, sector abi.SectorID, alloc sto
}
}
func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd SchedWorker) (bool, bool, error) {
tasks, err := whnd.TaskTypes(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
@ -37,7 +37,7 @@ func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt
return false, false, nil
}
paths, err := whnd.workerRpc.Paths(ctx)
paths, err := whnd.Paths(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting worker paths: %w", err)
}
@ -78,7 +78,7 @@ func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt
return requested == storiface.FTNone, false, nil
}
func (s *existingSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
func (s *existingSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b SchedWorker) (bool, error) {
return a.Utilization() < b.Utilization(), nil
}

View File

@ -30,7 +30,7 @@ func newMoveSelector(index paths.SectorIndex, sector abi.SectorID, alloc storifa
}
}
func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd SchedWorker) (bool, bool, error) {
tasks, err := whnd.TaskTypes(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
@ -39,7 +39,7 @@ func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.
return false, false, nil
}
paths, err := whnd.workerRpc.Paths(ctx)
paths, err := whnd.Paths(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting worker paths: %w", err)
}
@ -99,7 +99,7 @@ func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.
return (ok && s.allowRemote) || pref, pref, nil
}
func (s *moveSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
func (s *moveSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b SchedWorker) (bool, error) {
return a.Utilization() < b.Utilization(), nil
}

View File

@ -19,7 +19,7 @@ func newTaskSelector() *taskSelector {
return &taskSelector{}
}
func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd SchedWorker) (bool, bool, error) {
tasks, err := whnd.TaskTypes(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
@ -29,7 +29,7 @@ func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.
return supported, false, nil
}
func (s *taskSelector) Cmp(ctx context.Context, _ sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
func (s *taskSelector) Cmp(ctx context.Context, _ sealtasks.TaskType, a, b SchedWorker) (bool, error) {
atasks, err := a.TaskTypes(ctx)
if err != nil {
return false, xerrors.Errorf("getting supported worker task types: %w", err)