feat: sched: Cache worker tasks
This commit is contained in:
parent
f652dd3436
commit
6de4e3d4cd
4
extern/sector-storage/sched.go
vendored
4
extern/sector-storage/sched.go
vendored
@ -76,6 +76,10 @@ type scheduler struct {
|
||||
type workerHandle struct {
|
||||
workerRpc Worker
|
||||
|
||||
tasksCache map[sealtasks.TaskType]struct{}
|
||||
tasksUpdate time.Time
|
||||
tasksLk sync.Mutex
|
||||
|
||||
info storiface.WorkerInfo
|
||||
|
||||
preparing *activeResources // use with workerHandle.lk
|
||||
|
20
extern/sector-storage/sched_resources.go
vendored
20
extern/sector-storage/sched_resources.go
vendored
@ -1,8 +1,11 @@
|
||||
package sectorstorage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
||||
)
|
||||
|
||||
@ -142,3 +145,20 @@ func (wh *workerHandle) utilization() float64 {
|
||||
|
||||
return u
|
||||
}
|
||||
|
||||
var tasksCacheTimeout = 30 * time.Second
|
||||
|
||||
func (wh *workerHandle) TaskTypes(ctx context.Context) (t map[sealtasks.TaskType]struct{}, err error) {
|
||||
wh.lk.Lock()
|
||||
defer wh.lk.Unlock()
|
||||
|
||||
if wh.tasksCache == nil || time.Now().Sub(wh.tasksUpdate) > tasksCacheTimeout {
|
||||
wh.tasksCache, err = wh.workerRpc.TaskTypes(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
wh.tasksUpdate = time.Now()
|
||||
}
|
||||
|
||||
return wh.tasksCache, nil
|
||||
}
|
||||
|
2
extern/sector-storage/selector_alloc.go
vendored
2
extern/sector-storage/selector_alloc.go
vendored
@ -27,7 +27,7 @@ func newAllocSelector(index stores.SectorIndex, alloc storiface.SectorFileType,
|
||||
}
|
||||
|
||||
func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *workerHandle) (bool, error) {
|
||||
tasks, err := whnd.workerRpc.TaskTypes(ctx)
|
||||
tasks, err := whnd.TaskTypes(ctx)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("getting supported worker task types: %w", err)
|
||||
}
|
||||
|
2
extern/sector-storage/selector_existing.go
vendored
2
extern/sector-storage/selector_existing.go
vendored
@ -29,7 +29,7 @@ func newExistingSelector(index stores.SectorIndex, sector abi.SectorID, alloc st
|
||||
}
|
||||
|
||||
func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *workerHandle) (bool, error) {
|
||||
tasks, err := whnd.workerRpc.TaskTypes(ctx)
|
||||
tasks, err := whnd.TaskTypes(ctx)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("getting supported worker task types: %w", err)
|
||||
}
|
||||
|
7
extern/sector-storage/selector_task.go
vendored
7
extern/sector-storage/selector_task.go
vendored
@ -20,7 +20,7 @@ func newTaskSelector() *taskSelector {
|
||||
}
|
||||
|
||||
func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *workerHandle) (bool, error) {
|
||||
tasks, err := whnd.workerRpc.TaskTypes(ctx)
|
||||
tasks, err := whnd.TaskTypes(ctx)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("getting supported worker task types: %w", err)
|
||||
}
|
||||
@ -30,11 +30,12 @@ func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.
|
||||
}
|
||||
|
||||
func (s *taskSelector) Cmp(ctx context.Context, _ sealtasks.TaskType, a, b *workerHandle) (bool, error) {
|
||||
atasks, err := a.workerRpc.TaskTypes(ctx)
|
||||
atasks, err := a.TaskTypes(ctx)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("getting supported worker task types: %w", err)
|
||||
}
|
||||
btasks, err := b.workerRpc.TaskTypes(ctx)
|
||||
|
||||
btasks, err := b.TaskTypes(ctx)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("getting supported worker task types: %w", err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user