feat: sched: Finalize* move selectors
This commit is contained in:
parent
b576008e87
commit
8c6cba7a03
21
extern/sector-storage/manager.go
vendored
21
extern/sector-storage/manager.go
vendored
@ -589,6 +589,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
|
|||||||
return xerrors.Errorf("acquiring sector lock: %w", err)
|
return xerrors.Errorf("acquiring sector lock: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// first check if the unsealed file exists anywhere; If it doesn't ignore it
|
||||||
unsealed := storiface.FTUnsealed
|
unsealed := storiface.FTUnsealed
|
||||||
{
|
{
|
||||||
unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
|
unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
|
||||||
@ -601,6 +602,8 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Make sure that the sealed file is still in sealing storage; In case it already
|
||||||
|
// isn't, we want to do finalize in long-term storage
|
||||||
pathType := storiface.PathStorage
|
pathType := storiface.PathStorage
|
||||||
{
|
{
|
||||||
sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTSealed, 0, false)
|
sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTSealed, 0, false)
|
||||||
@ -616,6 +619,8 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// do the cache trimming wherever the likely still very large cache lives.
|
||||||
|
// we really don't want to move it.
|
||||||
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache, false)
|
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache, false)
|
||||||
|
|
||||||
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
|
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
|
||||||
@ -628,7 +633,10 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
fetchSel := newAllocSelector(m.index, storiface.FTCache|storiface.FTSealed, storiface.PathStorage)
|
// get a selector for moving stuff into long-term storage
|
||||||
|
fetchSel := newMoveSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed, storiface.PathStorage)
|
||||||
|
|
||||||
|
// only move the unsealed file if it still exists and needs moving
|
||||||
moveUnsealed := unsealed
|
moveUnsealed := unsealed
|
||||||
{
|
{
|
||||||
if len(keepUnsealed) == 0 {
|
if len(keepUnsealed) == 0 {
|
||||||
@ -636,6 +644,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector storage.SectorRef,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// move stuff to long-term storage
|
||||||
err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel,
|
err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel,
|
||||||
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|moveUnsealed, storiface.PathStorage, storiface.AcquireMove),
|
m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|moveUnsealed, storiface.PathStorage, storiface.AcquireMove),
|
||||||
func(ctx context.Context, w Worker) error {
|
func(ctx context.Context, w Worker) error {
|
||||||
@ -657,6 +666,7 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
|
|||||||
return xerrors.Errorf("acquiring sector lock: %w", err)
|
return xerrors.Errorf("acquiring sector lock: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// first check if the unsealed file exists anywhere; If it doesn't ignore it
|
||||||
moveUnsealed := storiface.FTUnsealed
|
moveUnsealed := storiface.FTUnsealed
|
||||||
{
|
{
|
||||||
unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
|
unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false)
|
||||||
@ -669,6 +679,8 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Make sure that the update file is still in sealing storage; In case it already
|
||||||
|
// isn't, we want to do finalize in long-term storage
|
||||||
pathType := storiface.PathStorage
|
pathType := storiface.PathStorage
|
||||||
{
|
{
|
||||||
sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUpdate, 0, false)
|
sealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUpdate, 0, false)
|
||||||
@ -684,7 +696,9 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTUpdateCache, false)
|
// do the cache trimming wherever the likely still large cache lives.
|
||||||
|
// we really don't want to move it.
|
||||||
|
selector := newExistingSelector(m.index, sector.ID, storiface.FTUpdateCache, false)
|
||||||
|
|
||||||
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalizeReplicaUpdate, selector,
|
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalizeReplicaUpdate, selector,
|
||||||
m.schedFetch(sector, storiface.FTCache|storiface.FTUpdateCache|moveUnsealed, pathType, storiface.AcquireMove),
|
m.schedFetch(sector, storiface.FTCache|storiface.FTUpdateCache|moveUnsealed, pathType, storiface.AcquireMove),
|
||||||
@ -697,7 +711,8 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect
|
|||||||
}
|
}
|
||||||
|
|
||||||
move := func(types storiface.SectorFileType) error {
|
move := func(types storiface.SectorFileType) error {
|
||||||
fetchSel := newAllocSelector(m.index, types, storiface.PathStorage)
|
// get a selector for moving stuff into long-term storage
|
||||||
|
fetchSel := newMoveSelector(m.index, sector.ID, types, storiface.PathStorage)
|
||||||
{
|
{
|
||||||
if len(keepUnsealed) == 0 {
|
if len(keepUnsealed) == 0 {
|
||||||
moveUnsealed = storiface.FTNone
|
moveUnsealed = storiface.FTNone
|
||||||
|
4
extern/sector-storage/sched_test.go
vendored
4
extern/sector-storage/sched_test.go
vendored
@ -584,9 +584,9 @@ func TestSched(t *testing.T) {
|
|||||||
|
|
||||||
type slowishSelector bool
|
type slowishSelector bool
|
||||||
|
|
||||||
func (s slowishSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (bool, error) {
|
func (s slowishSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *WorkerHandle) (bool, bool, error) {
|
||||||
time.Sleep(200 * time.Microsecond)
|
time.Sleep(200 * time.Microsecond)
|
||||||
return bool(s), nil
|
return bool(s), false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s slowishSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
|
func (s slowishSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
|
||||||
|
96
extern/sector-storage/selector_move.go
vendored
Normal file
96
extern/sector-storage/selector_move.go
vendored
Normal file
@ -0,0 +1,96 @@
|
|||||||
|
package sectorstorage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
|
||||||
|
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
||||||
|
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
||||||
|
)
|
||||||
|
|
||||||
|
type moveSelector struct {
|
||||||
|
index stores.SectorIndex
|
||||||
|
sector abi.SectorID
|
||||||
|
alloc storiface.SectorFileType
|
||||||
|
destPtype storiface.PathType
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMoveSelector(index stores.SectorIndex, sector abi.SectorID, alloc storiface.SectorFileType, destPtype storiface.PathType) *moveSelector {
|
||||||
|
return &moveSelector{
|
||||||
|
index: index,
|
||||||
|
sector: sector,
|
||||||
|
alloc: alloc,
|
||||||
|
destPtype: destPtype,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *moveSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
|
||||||
|
tasks, err := whnd.TaskTypes(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
|
||||||
|
}
|
||||||
|
if _, supported := tasks[task]; !supported {
|
||||||
|
return false, false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
paths, err := whnd.workerRpc.Paths(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return false, false, xerrors.Errorf("getting worker paths: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
workerPaths := map[storiface.ID]int{}
|
||||||
|
for _, path := range paths {
|
||||||
|
workerPaths[path.ID] = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
ssize, err := spt.SectorSize()
|
||||||
|
if err != nil {
|
||||||
|
return false, false, xerrors.Errorf("getting sector size: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// note: allowFetch is always false here, because we want to find workers with
|
||||||
|
// the sector available locally
|
||||||
|
preferred, err := s.index.StorageFindSector(ctx, s.sector, s.alloc, ssize, false)
|
||||||
|
if err != nil {
|
||||||
|
return false, false, xerrors.Errorf("finding preferred storage: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, info := range preferred {
|
||||||
|
if _, ok := workerPaths[info.ID]; ok {
|
||||||
|
workerPaths[info.ID]++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
best, err := s.index.StorageBestAlloc(ctx, s.alloc, ssize, s.destPtype)
|
||||||
|
if err != nil {
|
||||||
|
return false, false, xerrors.Errorf("finding best dest storage: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var ok bool
|
||||||
|
|
||||||
|
for _, info := range best {
|
||||||
|
if n, has := workerPaths[info.ID]; has {
|
||||||
|
ok = true
|
||||||
|
|
||||||
|
// if the worker has a local path with the sector already in it
|
||||||
|
// prefer that worker; This usually meant that the move operation is
|
||||||
|
// either a no-op because the sector is already in the correct path,
|
||||||
|
// or the move a local move.
|
||||||
|
if n > 0 {
|
||||||
|
return true, true, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ok, false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *moveSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
|
||||||
|
return a.Utilization() < b.Utilization(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ WorkerSelector = &moveSelector{}
|
6
extern/sector-storage/selector_task.go
vendored
6
extern/sector-storage/selector_task.go
vendored
@ -19,14 +19,14 @@ func newTaskSelector() *taskSelector {
|
|||||||
return &taskSelector{}
|
return &taskSelector{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, error) {
|
func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *WorkerHandle) (bool, bool, error) {
|
||||||
tasks, err := whnd.TaskTypes(ctx)
|
tasks, err := whnd.TaskTypes(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, xerrors.Errorf("getting supported worker task types: %w", err)
|
return false, false, xerrors.Errorf("getting supported worker task types: %w", err)
|
||||||
}
|
}
|
||||||
_, supported := tasks[task]
|
_, supported := tasks[task]
|
||||||
|
|
||||||
return supported, nil
|
return supported, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *taskSelector) Cmp(ctx context.Context, _ sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
|
func (s *taskSelector) Cmp(ctx context.Context, _ sealtasks.TaskType, a, b *WorkerHandle) (bool, error) {
|
||||||
|
Loading…
Reference in New Issue
Block a user