lotus/storage/sealer/selector_move.go

107 lines
2.8 KiB
Go
Raw Permalink Normal View History

package sealer
2022-05-23 21:27:28 +00:00
import (
"context"
"golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/storage/paths"
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
2022-05-23 21:27:28 +00:00
)
type moveSelector struct {
index paths.SectorIndex
2022-05-23 21:53:25 +00:00
sector abi.SectorID
alloc storiface.SectorFileType
destPtype storiface.PathType
allowRemote bool
2022-05-23 21:27:28 +00:00
}
func newMoveSelector(index paths.SectorIndex, sector abi.SectorID, alloc storiface.SectorFileType, destPtype storiface.PathType, allowRemote bool) *moveSelector {
2022-05-23 21:27:28 +00:00
return &moveSelector{
2022-05-23 21:53:25 +00:00
index: index,
sector: sector,
alloc: alloc,
destPtype: destPtype,
allowRemote: allowRemote,
2022-05-23 21:27:28 +00:00
}
}
2022-11-28 16:21:56 +00:00
func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd SchedWorker) (bool, bool, error) {
2022-05-23 21:27:28 +00:00
tasks, err := whnd.TaskTypes(ctx)
if err != nil {
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
}
if _, supported := tasks[task]; !supported {
return false, false, nil
}
2022-11-28 16:21:56 +00:00
paths, err := whnd.Paths(ctx)
2022-05-23 21:27:28 +00:00
if err != nil {
return false, false, xerrors.Errorf("getting worker paths: %w", err)
}
workerPaths := map[storiface.ID]int{}
for _, path := range paths {
workerPaths[path.ID] = 0
}
ssize, err := spt.SectorSize()
if err != nil {
return false, false, xerrors.Errorf("getting sector size: %w", err)
}
// note: allowFetch is always false here, because we want to find workers with
// the sector available locally
preferred, err := s.index.StorageFindSector(ctx, s.sector, s.alloc, ssize, false)
if err != nil {
return false, false, xerrors.Errorf("finding preferred storage: %w", err)
}
for _, info := range preferred {
if _, ok := workerPaths[info.ID]; ok {
workerPaths[info.ID]++
}
}
best, err := s.index.StorageBestAlloc(ctx, s.alloc, ssize, s.destPtype)
if err != nil {
return false, false, xerrors.Errorf("finding best dest storage: %w", err)
}
2022-07-01 16:02:10 +00:00
var ok, pref bool
requested := s.alloc
2022-05-23 21:27:28 +00:00
for _, info := range best {
if n, has := workerPaths[info.ID]; has {
ok = true
// if the worker has a local path with the sector already in it
// prefer that worker; This usually meant that the move operation is
// either a no-op because the sector is already in the correct path,
// or the move a local move.
if n > 0 {
2022-07-01 16:02:10 +00:00
pref = true
}
requested = requested.SubAllowed(info.AllowTypes, info.DenyTypes)
2022-07-01 16:02:10 +00:00
// got all paths
if requested == storiface.FTNone {
break
2022-05-23 21:27:28 +00:00
}
}
}
2022-07-11 19:47:47 +00:00
return (ok && s.allowRemote) || pref, pref, nil
2022-05-23 21:27:28 +00:00
}
2022-11-28 16:21:56 +00:00
func (s *moveSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b SchedWorker) (bool, error) {
2022-05-23 21:27:28 +00:00
return a.Utilization() < b.Utilization(), nil
}
var _ WorkerSelector = &moveSelector{}