diff --git a/extern/sector-storage/sched.go b/extern/sector-storage/sched.go index d7d7d3265..757e89299 100644 --- a/extern/sector-storage/sched.go +++ b/extern/sector-storage/sched.go @@ -76,6 +76,10 @@ type scheduler struct { type workerHandle struct { workerRpc Worker + tasksCache map[sealtasks.TaskType]struct{} + tasksUpdate time.Time + tasksLk sync.Mutex + info storiface.WorkerInfo preparing *activeResources // use with workerHandle.lk diff --git a/extern/sector-storage/sched_resources.go b/extern/sector-storage/sched_resources.go index 5f7f1cfb8..6034dc2e9 100644 --- a/extern/sector-storage/sched_resources.go +++ b/extern/sector-storage/sched_resources.go @@ -1,8 +1,11 @@ package sectorstorage import ( + "context" "sync" + "time" + "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" "github.com/filecoin-project/lotus/extern/sector-storage/storiface" ) @@ -142,3 +145,20 @@ func (wh *workerHandle) utilization() float64 { return u } + +var tasksCacheTimeout = 30 * time.Second + +func (wh *workerHandle) TaskTypes(ctx context.Context) (t map[sealtasks.TaskType]struct{}, err error) { + wh.lk.Lock() + defer wh.lk.Unlock() + + if wh.tasksCache == nil || time.Now().Sub(wh.tasksUpdate) > tasksCacheTimeout { + wh.tasksCache, err = wh.workerRpc.TaskTypes(ctx) + if err != nil { + return nil, err + } + wh.tasksUpdate = time.Now() + } + + return wh.tasksCache, nil +} diff --git a/extern/sector-storage/selector_alloc.go b/extern/sector-storage/selector_alloc.go index 3212161af..fe6a11e1e 100644 --- a/extern/sector-storage/selector_alloc.go +++ b/extern/sector-storage/selector_alloc.go @@ -27,7 +27,7 @@ func newAllocSelector(index stores.SectorIndex, alloc storiface.SectorFileType, } func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *workerHandle) (bool, error) { - tasks, err := whnd.workerRpc.TaskTypes(ctx) + tasks, err := whnd.TaskTypes(ctx) if err != nil { return false, xerrors.Errorf("getting supported worker task types: %w", err) } diff --git a/extern/sector-storage/selector_existing.go b/extern/sector-storage/selector_existing.go index 4c0ba379c..b84991b5c 100644 --- a/extern/sector-storage/selector_existing.go +++ b/extern/sector-storage/selector_existing.go @@ -29,7 +29,7 @@ func newExistingSelector(index stores.SectorIndex, sector abi.SectorID, alloc st } func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *workerHandle) (bool, error) { - tasks, err := whnd.workerRpc.TaskTypes(ctx) + tasks, err := whnd.TaskTypes(ctx) if err != nil { return false, xerrors.Errorf("getting supported worker task types: %w", err) } diff --git a/extern/sector-storage/selector_task.go b/extern/sector-storage/selector_task.go index 15c71b648..94bcb4419 100644 --- a/extern/sector-storage/selector_task.go +++ b/extern/sector-storage/selector_task.go @@ -20,7 +20,7 @@ func newTaskSelector() *taskSelector { } func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *workerHandle) (bool, error) { - tasks, err := whnd.workerRpc.TaskTypes(ctx) + tasks, err := whnd.TaskTypes(ctx) if err != nil { return false, xerrors.Errorf("getting supported worker task types: %w", err) } @@ -30,11 +30,12 @@ func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi. } func (s *taskSelector) Cmp(ctx context.Context, _ sealtasks.TaskType, a, b *workerHandle) (bool, error) { - atasks, err := a.workerRpc.TaskTypes(ctx) + atasks, err := a.TaskTypes(ctx) if err != nil { return false, xerrors.Errorf("getting supported worker task types: %w", err) } - btasks, err := b.workerRpc.TaskTypes(ctx) + + btasks, err := b.TaskTypes(ctx) if err != nil { return false, xerrors.Errorf("getting supported worker task types: %w", err) }