Merge pull request #8710 from filecoin-project/feat/stor-fin-move-selector
feat: sched: Finalize* move selectors
This commit is contained in:
commit
7836e20801
@ -542,6 +542,22 @@
|
|||||||
# env var: LOTUS_STORAGE_ASSIGNER
|
# env var: LOTUS_STORAGE_ASSIGNER
|
||||||
#Assigner = "utilization"
|
#Assigner = "utilization"
|
||||||
|
|
||||||
|
# DisallowRemoteFinalize when set to true will force all Finalize tasks to
|
||||||
|
# run on workers with local access to both long-term storage and the sealing
|
||||||
|
# path containing the sector.
|
||||||
|
# --
|
||||||
|
# WARNING: Only set this if all workers have access to long-term storage
|
||||||
|
# paths. If this flag is enabled, and there are workers without long-term
|
||||||
|
# storage access, sectors will not be moved from them, and Finalize tasks
|
||||||
|
# will appear to be stuck.
|
||||||
|
# --
|
||||||
|
# If you see stuck Finalize tasks after enabling this setting, check
|
||||||
|
# 'lotus-miner sealing sched-diag' and 'lotus-miner storage find [sector num]'
|
||||||
|
#
|
||||||
|
# type: bool
|
||||||
|
# env var: LOTUS_STORAGE_DISALLOWREMOTEFINALIZE
|
||||||
|
#DisallowRemoteFinalize = false
|
||||||
|
|
||||||
# ResourceFiltering instructs the system which resource filtering strategy
|
# ResourceFiltering instructs the system which resource filtering strategy
|
||||||
# to use when evaluating tasks against this worker. An empty value defaults
|
# to use when evaluating tasks against this worker. An empty value defaults
|
||||||
# to "hardware".
|
# to "hardware".
|
||||||
|
29
extern/sector-storage/manager.go
vendored
29
extern/sector-storage/manager.go
vendored
@ -71,7 +71,8 @@ type Manager struct {
|
|||||||
workLk sync.Mutex
|
workLk sync.Mutex
|
||||||
work *statestore.StateStore
|
work *statestore.StateStore
|
||||||
|
|
||||||
parallelCheckLimit int
|
parallelCheckLimit int
|
||||||
|
disallowRemoteFinalize bool
|
||||||
|
|
||||||
callToWork map[storiface.CallID]WorkID
|
callToWork map[storiface.CallID]WorkID
|
||||||
// used when we get an early return and there's no callToWork mapping
|
// used when we get an early return and there's no callToWork mapping
|
||||||
@ -123,6 +124,8 @@ type Config struct {
|
|||||||
// PoSt config
|
// PoSt config
|
||||||
ParallelCheckLimit int
|
ParallelCheckLimit int
|
||||||
|
|
||||||
|
DisallowRemoteFinalize bool
|
||||||
|
|
||||||
Assigner string
|
Assigner string
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -155,7 +158,8 @@ func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores.
|
|||||||
|
|
||||||
localProver: prover,
|
localProver: prover,
|
||||||
|
|
||||||
parallelCheckLimit: sc.ParallelCheckLimit,
|
parallelCheckLimit: sc.ParallelCheckLimit,
|
||||||
|
disallowRemoteFinalize: sc.DisallowRemoteFinalize,
|
||||||
|
|
||||||
work: mss,
|
work: mss,
|
||||||
callToWork: map[storiface.CallID]WorkID{},
|
callToWork: map[storiface.CallID]WorkID{},
|
||||||
@ -592,6 +596,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
|
|||||||
return xerrors.Errorf("acquiring sector lock: %w", err)
|
return xerrors.Errorf("acquiring sector lock: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// first check if the unsealed file exists anywhere; If it doesn't ignore it
|
||||||
unsealed := storiface.FTUnsealed
|
unsealed := storiface.FTUnsealed
|
||||||
{
|
{
|
||||||
unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
|
unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
|
||||||
@ -604,6 +609,8 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Make sure that the sealed file is still in sealing storage; In case it already
|
||||||
|
// isn't, we want to do finalize in long-term storage
|
||||||
pathType := storiface.PathStorage
|
pathType := storiface.PathStorage
|
||||||
{
|
{
|
||||||
sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTSealed, 0, false)
|
sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTSealed, 0, false)
|
||||||
@ -619,6 +626,8 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// do the cache trimming wherever the likely still very large cache lives.
|
||||||
|
// we really don't want to move it.
|
||||||
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache, false)
|
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache, false)
|
||||||
|
|
||||||
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
|
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
|
||||||
@ -631,7 +640,10 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
fetchSel := newAllocSelector(m.index, storiface.FTCache|storiface.FTSealed, storiface.PathStorage)
|
// get a selector for moving stuff into long-term storage
|
||||||
|
fetchSel := newMoveSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, storiface.PathStorage, !m.disallowRemoteFinalize)
|
||||||
|
|
||||||
|
// only move the unsealed file if it still exists and needs moving
|
||||||
moveUnsealed := unsealed
|
moveUnsealed := unsealed
|
||||||
{
|
{
|
||||||
if len(keepUnsealed) == 0 {
|
if len(keepUnsealed) == 0 {
|
||||||
@ -639,6 +651,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// move stuff to long-term storage
|
||||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel,
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel,
|
||||||
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|moveUnsealed, storiface.PathStorage, storiface.AcquireMove),
|
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|moveUnsealed, storiface.PathStorage, storiface.AcquireMove),
|
||||||
func(ctx context.Context, w Worker) error {
|
func(ctx context.Context, w Worker) error {
|
||||||
@ -660,6 +673,7 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
|
|||||||
return xerrors.Errorf("acquiring sector lock: %w", err)
|
return xerrors.Errorf("acquiring sector lock: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// first check if the unsealed file exists anywhere; If it doesn't ignore it
|
||||||
moveUnsealed := storiface.FTUnsealed
|
moveUnsealed := storiface.FTUnsealed
|
||||||
{
|
{
|
||||||
unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
|
unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
|
||||||
@ -672,6 +686,8 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Make sure that the update file is still in sealing storage; In case it already
|
||||||
|
// isn't, we want to do finalize in long-term storage
|
||||||
pathType := storiface.PathStorage
|
pathType := storiface.PathStorage
|
||||||
{
|
{
|
||||||
sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUpdate, 0, false)
|
sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUpdate, 0, false)
|
||||||
@ -687,7 +703,9 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTUpdateCache, false)
|
// do the cache trimming wherever the likely still large cache lives.
|
||||||
|
// we really don't want to move it.
|
||||||
|
selector := newExistingSelector(m.index, sector.ID, storiface.FTUpdateCache, false)
|
||||||
|
|
||||||
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalizeReplicaUpdate, selector,
|
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalizeReplicaUpdate, selector,
|
||||||
m.schedFetch(sector, storiface.FTCache|storiface.FTUpdateCache|moveUnsealed, pathType, storiface.AcquireMove),
|
m.schedFetch(sector, storiface.FTCache|storiface.FTUpdateCache|moveUnsealed, pathType, storiface.AcquireMove),
|
||||||
@ -700,7 +718,8 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
|
|||||||
}
|
}
|
||||||
|
|
||||||
move := func(types storiface.SectorFileType) error {
|
move := func(types storiface.SectorFileType) error {
|
||||||
fetchSel := newAllocSelector(m.index, types, storiface.PathStorage)
|
// get a selector for moving stuff into long-term storage
|
||||||
|
fetchSel := newMoveSelector(m.index, sector.ID, types, storiface.PathStorage, !m.disallowRemoteFinalize)
|
||||||
{
|
{
|
||||||
if len(keepUnsealed) == 0 {
|
if len(keepUnsealed) == 0 {
|
||||||
moveUnsealed = storiface.FTNone
|
moveUnsealed = storiface.FTNone
|
||||||
|
4
extern/sector-storage/sched.go
vendored
4
extern/sector-storage/sched.go
vendored
@ -44,7 +44,9 @@ const mib = 1 << 20
|
|||||||
type WorkerAction func(ctx context.Context, w Worker) error
|
type WorkerAction func(ctx context.Context, w Worker) error
|
||||||
|
|
||||||
type WorkerSelector interface {
|
type WorkerSelector interface {
|
||||||
Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (bool, error) // true if worker is acceptable for performing a task
|
// Ok is true if worker is acceptable for performing a task.
|
||||||
|
// If any worker is preferred for a task, other workers won't be considered for that task.
|
||||||
|
Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (ok, preferred bool, err error)
|
||||||
|
|
||||||
Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) // true if a is preferred over b
|
Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) // true if a is preferred over b
|
||||||
}
|
}
|
||||||
|
17
extern/sector-storage/sched_assigner_common.go
vendored
17
extern/sector-storage/sched_assigner_common.go
vendored
@ -61,8 +61,10 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
task := (*sh.SchedQueue)[sqi]
|
task := (*sh.SchedQueue)[sqi]
|
||||||
|
|
||||||
task.IndexHeap = sqi
|
task.IndexHeap = sqi
|
||||||
|
|
||||||
|
var havePreferred bool
|
||||||
|
|
||||||
for wnd, windowRequest := range sh.OpenWindows {
|
for wnd, windowRequest := range sh.OpenWindows {
|
||||||
worker, ok := sh.Workers[windowRequest.Worker]
|
worker, ok := sh.Workers[windowRequest.Worker]
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -84,7 +86,7 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
rpcCtx, cancel := context.WithTimeout(task.Ctx, SelectorTimeout)
|
rpcCtx, cancel := context.WithTimeout(task.Ctx, SelectorTimeout)
|
||||||
ok, err := task.Sel.Ok(rpcCtx, task.TaskType, task.Sector.ProofType, worker)
|
ok, preferred, err := task.Sel.Ok(rpcCtx, task.TaskType, task.Sector.ProofType, worker)
|
||||||
cancel()
|
cancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("trySched(1) req.Sel.Ok error: %+v", err)
|
log.Errorf("trySched(1) req.Sel.Ok error: %+v", err)
|
||||||
@ -95,6 +97,17 @@ func (a *AssignerCommon) TrySched(sh *Scheduler) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if havePreferred && !preferred {
|
||||||
|
// we have a way better worker for this task
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if preferred && !havePreferred {
|
||||||
|
// all workers we considered previously are much worse choice
|
||||||
|
acceptableWindows[sqi] = acceptableWindows[sqi][:0]
|
||||||
|
havePreferred = true
|
||||||
|
}
|
||||||
|
|
||||||
acceptableWindows[sqi] = append(acceptableWindows[sqi], wnd)
|
acceptableWindows[sqi] = append(acceptableWindows[sqi], wnd)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
4
extern/sector-storage/sched_test.go
vendored
4
extern/sector-storage/sched_test.go
vendored
@ -584,9 +584,9 @@ func TestSched(t *testing.T) {
|
|||||||
|
|
||||||
type slowishSelector bool
|
type slowishSelector bool
|
||||||
|
|
||||||
func (s slowishSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (bool, error) {
|
func (s slowishSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (bool, bool, error) {
|
||||||
time.Sleep(200 * time.Microsecond)
|
time.Sleep(200 * time.Microsecond)
|
||||||
return bool(s), nil
|
return bool(s), false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s slowishSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
|
func (s slowishSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
|
||||||
|
16
extern/sector-storage/selector_alloc.go
vendored
16
extern/sector-storage/selector_alloc.go
vendored
@ -26,18 +26,18 @@ func newAllocSelector(index stores.SectorIndex, alloc storiface.SectorFileType,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, error) {
|
func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
|
||||||
tasks, err := whnd.TaskTypes(ctx)
|
tasks, err := whnd.TaskTypes(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("getting supported worker task types: %w", err)
|
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
|
||||||
}
|
}
|
||||||
if _, supported := tasks[task]; !supported {
|
if _, supported := tasks[task]; !supported {
|
||||||
return false, nil
|
return false, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
paths, err := whnd.workerRpc.Paths(ctx)
|
paths, err := whnd.workerRpc.Paths(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("getting worker paths: %w", err)
|
return false, false, xerrors.Errorf("getting worker paths: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
have := map[storiface.ID]struct{}{}
|
have := map[storiface.ID]struct{}{}
|
||||||
@ -47,21 +47,21 @@ func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi
|
|||||||
|
|
||||||
ssize, err := spt.SectorSize()
|
ssize, err := spt.SectorSize()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("getting sector size: %w", err)
|
return false, false, xerrors.Errorf("getting sector size: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
best, err := s.index.StorageBestAlloc(ctx, s.alloc, ssize, s.ptype)
|
best, err := s.index.StorageBestAlloc(ctx, s.alloc, ssize, s.ptype)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("finding best alloc storage: %w", err)
|
return false, false, xerrors.Errorf("finding best alloc storage: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, info := range best {
|
for _, info := range best {
|
||||||
if _, ok := have[info.ID]; ok {
|
if _, ok := have[info.ID]; ok {
|
||||||
return true, nil
|
return true, false, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return false, nil
|
return false, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *allocSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
|
func (s *allocSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
|
||||||
|
16
extern/sector-storage/selector_existing.go
vendored
16
extern/sector-storage/selector_existing.go
vendored
@ -28,18 +28,18 @@ func newExistingSelector(index stores.SectorIndex, sector abi.SectorID, alloc st
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, error) {
|
func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
|
||||||
tasks, err := whnd.TaskTypes(ctx)
|
tasks, err := whnd.TaskTypes(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("getting supported worker task types: %w", err)
|
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
|
||||||
}
|
}
|
||||||
if _, supported := tasks[task]; !supported {
|
if _, supported := tasks[task]; !supported {
|
||||||
return false, nil
|
return false, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
paths, err := whnd.workerRpc.Paths(ctx)
|
paths, err := whnd.workerRpc.Paths(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("getting worker paths: %w", err)
|
return false, false, xerrors.Errorf("getting worker paths: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
have := map[storiface.ID]struct{}{}
|
have := map[storiface.ID]struct{}{}
|
||||||
@ -49,21 +49,21 @@ func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt
|
|||||||
|
|
||||||
ssize, err := spt.SectorSize()
|
ssize, err := spt.SectorSize()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("getting sector size: %w", err)
|
return false, false, xerrors.Errorf("getting sector size: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
best, err := s.index.StorageFindSector(ctx, s.sector, s.alloc, ssize, s.allowFetch)
|
best, err := s.index.StorageFindSector(ctx, s.sector, s.alloc, ssize, s.allowFetch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("finding best storage: %w", err)
|
return false, false, xerrors.Errorf("finding best storage: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, info := range best {
|
for _, info := range best {
|
||||||
if _, ok := have[info.ID]; ok {
|
if _, ok := have[info.ID]; ok {
|
||||||
return true, nil
|
return true, false, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return false, nil
|
return false, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *existingSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
|
func (s *existingSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
|
||||||
|
98
extern/sector-storage/selector_move.go
vendored
Normal file
98
extern/sector-storage/selector_move.go
vendored
Normal file
@ -0,0 +1,98 @@
|
|||||||
|
package sectorstorage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
|
||||||
|
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
||||||
|
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
||||||
|
)
|
||||||
|
|
||||||
|
type moveSelector struct {
|
||||||
|
index stores.SectorIndex
|
||||||
|
sector abi.SectorID
|
||||||
|
alloc storiface.SectorFileType
|
||||||
|
destPtype storiface.PathType
|
||||||
|
allowRemote bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMoveSelector(index stores.SectorIndex, sector abi.SectorID, alloc storiface.SectorFileType, destPtype storiface.PathType, allowRemote bool) *moveSelector {
|
||||||
|
return &moveSelector{
|
||||||
|
index: index,
|
||||||
|
sector: sector,
|
||||||
|
alloc: alloc,
|
||||||
|
destPtype: destPtype,
|
||||||
|
allowRemote: allowRemote,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
|
||||||
|
tasks, err := whnd.TaskTypes(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
|
||||||
|
}
|
||||||
|
if _, supported := tasks[task]; !supported {
|
||||||
|
return false, false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
paths, err := whnd.workerRpc.Paths(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return false, false, xerrors.Errorf("getting worker paths: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
workerPaths := map[storiface.ID]int{}
|
||||||
|
for _, path := range paths {
|
||||||
|
workerPaths[path.ID] = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
ssize, err := spt.SectorSize()
|
||||||
|
if err != nil {
|
||||||
|
return false, false, xerrors.Errorf("getting sector size: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// note: allowFetch is always false here, because we want to find workers with
|
||||||
|
// the sector available locally
|
||||||
|
preferred, err := s.index.StorageFindSector(ctx, s.sector, s.alloc, ssize, false)
|
||||||
|
if err != nil {
|
||||||
|
return false, false, xerrors.Errorf("finding preferred storage: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, info := range preferred {
|
||||||
|
if _, ok := workerPaths[info.ID]; ok {
|
||||||
|
workerPaths[info.ID]++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
best, err := s.index.StorageBestAlloc(ctx, s.alloc, ssize, s.destPtype)
|
||||||
|
if err != nil {
|
||||||
|
return false, false, xerrors.Errorf("finding best dest storage: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var ok bool
|
||||||
|
|
||||||
|
for _, info := range best {
|
||||||
|
if n, has := workerPaths[info.ID]; has {
|
||||||
|
ok = true
|
||||||
|
|
||||||
|
// if the worker has a local path with the sector already in it
|
||||||
|
// prefer that worker; This usually meant that the move operation is
|
||||||
|
// either a no-op because the sector is already in the correct path,
|
||||||
|
// or the move a local move.
|
||||||
|
if n > 0 {
|
||||||
|
return true, true, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ok && s.allowRemote, false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *moveSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
|
||||||
|
return a.Utilization() < b.Utilization(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ WorkerSelector = &moveSelector{}
|
6
extern/sector-storage/selector_task.go
vendored
6
extern/sector-storage/selector_task.go
vendored
@ -19,14 +19,14 @@ func newTaskSelector() *taskSelector {
|
|||||||
return &taskSelector{}
|
return &taskSelector{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, error) {
|
func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
|
||||||
tasks, err := whnd.TaskTypes(ctx)
|
tasks, err := whnd.TaskTypes(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("getting supported worker task types: %w", err)
|
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
|
||||||
}
|
}
|
||||||
_, supported := tasks[task]
|
_, supported := tasks[task]
|
||||||
|
|
||||||
return supported, nil
|
return supported, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *taskSelector) Cmp(ctx context.Context, _ sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
|
func (s *taskSelector) Cmp(ctx context.Context, _ sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
|
||||||
|
@ -571,6 +571,7 @@ func (n *Ensemble) Start() *Ensemble {
|
|||||||
|
|
||||||
noLocal := m.options.minerNoLocalSealing
|
noLocal := m.options.minerNoLocalSealing
|
||||||
assigner := m.options.minerAssigner
|
assigner := m.options.minerAssigner
|
||||||
|
disallowRemoteFinalize := m.options.disallowRemoteFinalize
|
||||||
|
|
||||||
var mineBlock = make(chan lotusminer.MineReq)
|
var mineBlock = make(chan lotusminer.MineReq)
|
||||||
opts := []node.Option{
|
opts := []node.Option{
|
||||||
@ -597,6 +598,7 @@ func (n *Ensemble) Start() *Ensemble {
|
|||||||
}
|
}
|
||||||
|
|
||||||
scfg.Storage.Assigner = assigner
|
scfg.Storage.Assigner = assigner
|
||||||
|
scfg.Storage.DisallowRemoteFinalize = disallowRemoteFinalize
|
||||||
scfg.Storage.ResourceFiltering = sectorstorage.ResourceFilteringDisabled
|
scfg.Storage.ResourceFiltering = sectorstorage.ResourceFilteringDisabled
|
||||||
return scfg.StorageManager()
|
return scfg.StorageManager()
|
||||||
}),
|
}),
|
||||||
|
@ -34,14 +34,15 @@ type nodeOpts struct {
|
|||||||
ownerKey *wallet.Key
|
ownerKey *wallet.Key
|
||||||
extraNodeOpts []node.Option
|
extraNodeOpts []node.Option
|
||||||
|
|
||||||
subsystems MinerSubsystem
|
subsystems MinerSubsystem
|
||||||
mainMiner *TestMiner
|
mainMiner *TestMiner
|
||||||
disableLibp2p bool
|
disableLibp2p bool
|
||||||
optBuilders []OptBuilder
|
optBuilders []OptBuilder
|
||||||
sectorSize abi.SectorSize
|
sectorSize abi.SectorSize
|
||||||
maxStagingDealsBytes int64
|
maxStagingDealsBytes int64
|
||||||
minerNoLocalSealing bool // use worker
|
minerNoLocalSealing bool // use worker
|
||||||
minerAssigner string
|
minerAssigner string
|
||||||
|
disallowRemoteFinalize bool
|
||||||
|
|
||||||
workerTasks []sealtasks.TaskType
|
workerTasks []sealtasks.TaskType
|
||||||
workerStorageOpt func(stores.Store) stores.Store
|
workerStorageOpt func(stores.Store) stores.Store
|
||||||
@ -105,6 +106,13 @@ func WithAssigner(a string) NodeOpt {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithDisallowRemoteFinalize(d bool) NodeOpt {
|
||||||
|
return func(opts *nodeOpts) error {
|
||||||
|
opts.disallowRemoteFinalize = d
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func DisableLibp2p() NodeOpt {
|
func DisableLibp2p() NodeOpt {
|
||||||
return func(opts *nodeOpts) error {
|
return func(opts *nodeOpts) error {
|
||||||
opts.disableLibp2p = true
|
opts.disableLibp2p = true
|
||||||
|
@ -57,6 +57,22 @@ func TestWorkerPledgeSpread(t *testing.T) {
|
|||||||
miner.PledgeSectors(ctx, 1, 0, nil)
|
miner.PledgeSectors(ctx, 1, 0, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWorkerPledgeLocalFin(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
_, miner, worker, ens := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(),
|
||||||
|
kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTPreCommit2, sealtasks.TTCommit2, sealtasks.TTUnseal}),
|
||||||
|
kit.WithDisallowRemoteFinalize(true),
|
||||||
|
) // no mock proofs
|
||||||
|
|
||||||
|
ens.InterconnectAll().BeginMining(50 * time.Millisecond)
|
||||||
|
|
||||||
|
e, err := worker.Enabled(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, e)
|
||||||
|
|
||||||
|
miner.PledgeSectors(ctx, 1, 0, nil)
|
||||||
|
}
|
||||||
|
|
||||||
func TestWorkerDataCid(t *testing.T) {
|
func TestWorkerDataCid(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
_, miner, worker, _ := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.WithNoLocalSealing(true),
|
_, miner, worker, _ := kit.EnsembleWorker(t, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.WithNoLocalSealing(true),
|
||||||
|
@ -763,6 +763,22 @@ This parameter is ONLY applicable if the retrieval pricing policy strategy has b
|
|||||||
Comment: `Assigner specifies the worker assigner to use when scheduling tasks.
|
Comment: `Assigner specifies the worker assigner to use when scheduling tasks.
|
||||||
"utilization" (default) - assign tasks to workers with lowest utilization.
|
"utilization" (default) - assign tasks to workers with lowest utilization.
|
||||||
"spread" - assign tasks to as many distinct workers as possible.`,
|
"spread" - assign tasks to as many distinct workers as possible.`,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "DisallowRemoteFinalize",
|
||||||
|
Type: "bool",
|
||||||
|
|
||||||
|
Comment: `DisallowRemoteFinalize when set to true will force all Finalize tasks to
|
||||||
|
run on workers with local access to both long-term storage and the sealing
|
||||||
|
path containing the sector.
|
||||||
|
--
|
||||||
|
WARNING: Only set this if all workers have access to long-term storage
|
||||||
|
paths. If this flag is enabled, and there are workers without long-term
|
||||||
|
storage access, sectors will not be moved from them, and Finalize tasks
|
||||||
|
will appear to be stuck.
|
||||||
|
--
|
||||||
|
If you see stuck Finalize tasks after enabling this setting, check
|
||||||
|
'lotus-miner sealing sched-diag' and 'lotus-miner storage find [sector num]'`,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Name: "ResourceFiltering",
|
Name: "ResourceFiltering",
|
||||||
|
@ -63,6 +63,7 @@ func (c *StorageMiner) StorageManager() sectorstorage.Config {
|
|||||||
AllowProveReplicaUpdate2: c.Storage.AllowProveReplicaUpdate2,
|
AllowProveReplicaUpdate2: c.Storage.AllowProveReplicaUpdate2,
|
||||||
AllowRegenSectorKey: c.Storage.AllowRegenSectorKey,
|
AllowRegenSectorKey: c.Storage.AllowRegenSectorKey,
|
||||||
ResourceFiltering: c.Storage.ResourceFiltering,
|
ResourceFiltering: c.Storage.ResourceFiltering,
|
||||||
|
DisallowRemoteFinalize: c.Storage.DisallowRemoteFinalize,
|
||||||
|
|
||||||
Assigner: c.Storage.Assigner,
|
Assigner: c.Storage.Assigner,
|
||||||
|
|
||||||
|
@ -335,6 +335,19 @@ type SealerConfig struct {
|
|||||||
// "spread" - assign tasks to as many distinct workers as possible.
|
// "spread" - assign tasks to as many distinct workers as possible.
|
||||||
Assigner string
|
Assigner string
|
||||||
|
|
||||||
|
// DisallowRemoteFinalize when set to true will force all Finalize tasks to
|
||||||
|
// run on workers with local access to both long-term storage and the sealing
|
||||||
|
// path containing the sector.
|
||||||
|
// --
|
||||||
|
// WARNING: Only set this if all workers have access to long-term storage
|
||||||
|
// paths. If this flag is enabled, and there are workers without long-term
|
||||||
|
// storage access, sectors will not be moved from them, and Finalize tasks
|
||||||
|
// will appear to be stuck.
|
||||||
|
// --
|
||||||
|
// If you see stuck Finalize tasks after enabling this setting, check
|
||||||
|
// 'lotus-miner sealing sched-diag' and 'lotus-miner storage find [sector num]'
|
||||||
|
DisallowRemoteFinalize bool
|
||||||
|
|
||||||
// ResourceFiltering instructs the system which resource filtering strategy
|
// ResourceFiltering instructs the system which resource filtering strategy
|
||||||
// to use when evaluating tasks against this worker. An empty value defaults
|
// to use when evaluating tasks against this worker. An empty value defaults
|
||||||
// to "hardware".
|
// to "hardware".
|
||||||
|
Loading…
Reference in New Issue
Block a user