Merge pull request #20 from filecoin-project/feat/overallocate
Refactor scheduling logic
This commit is contained in:
commit
83b1b94034
29
Makefile
Normal file
29
Makefile
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
all: build
|
||||||
|
.PHONY: all
|
||||||
|
|
||||||
|
SUBMODULES=
|
||||||
|
|
||||||
|
FFI_PATH:=./extern/filecoin-ffi/
|
||||||
|
FFI_DEPS:=.install-filcrypto
|
||||||
|
FFI_DEPS:=$(addprefix $(FFI_PATH),$(FFI_DEPS))
|
||||||
|
|
||||||
|
$(FFI_DEPS): .filecoin-build ;
|
||||||
|
|
||||||
|
.filecoin-build: $(FFI_PATH)
|
||||||
|
$(MAKE) -C $(FFI_PATH) $(FFI_DEPS:$(FFI_PATH)%=%)
|
||||||
|
@touch $@
|
||||||
|
|
||||||
|
.update-modules:
|
||||||
|
git submodule update --init --recursive
|
||||||
|
@touch $@
|
||||||
|
|
||||||
|
test: .update-modules .filecoin-build
|
||||||
|
go test -v ./...
|
||||||
|
.PHONY: test
|
||||||
|
SUBMODULES+=test
|
||||||
|
|
||||||
|
build: $(SUBMODULES)
|
||||||
|
|
||||||
|
clean:
|
||||||
|
rm -f .filecoin-build
|
||||||
|
rm -f .update-modules
|
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
|
"runtime"
|
||||||
|
|
||||||
"github.com/elastic/go-sysinfo"
|
"github.com/elastic/go-sysinfo"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
@ -103,6 +104,15 @@ func (l *LocalWorker) AddPiece(ctx context.Context, sector abi.SectorID, epcs []
|
|||||||
return sb.AddPiece(ctx, sector, epcs, sz, r)
|
return sb.AddPiece(ctx, sector, epcs, sz, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (l *LocalWorker) Fetch(ctx context.Context, sector abi.SectorID, fileType stores.SectorFileType, sealing bool) error {
|
||||||
|
_, done, err := (&localWorkerPathProvider{w: l}).AcquireSector(ctx, sector, fileType, stores.FTNone, sealing)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
done()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage2.PreCommit1Out, err error) {
|
func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage2.PreCommit1Out, err error) {
|
||||||
sb, err := l.sb()
|
sb, err := l.sb()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -195,6 +205,7 @@ func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) {
|
|||||||
MemPhysical: mem.Total,
|
MemPhysical: mem.Total,
|
||||||
MemSwap: mem.VirtualTotal,
|
MemSwap: mem.VirtualTotal,
|
||||||
MemReserved: mem.VirtualUsed + mem.Total - mem.Available, // TODO: sub this process
|
MemReserved: mem.VirtualUsed + mem.Total - mem.Available, // TODO: sub this process
|
||||||
|
CPUs: uint64(runtime.NumCPU()),
|
||||||
GPUs: gpus,
|
GPUs: gpus,
|
||||||
},
|
},
|
||||||
}, nil
|
}, nil
|
||||||
|
292
manager.go
292
manager.go
@ -1,12 +1,10 @@
|
|||||||
package sectorstorage
|
package sectorstorage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"container/list"
|
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
@ -30,6 +28,7 @@ type URLs []string
|
|||||||
|
|
||||||
type Worker interface {
|
type Worker interface {
|
||||||
ffiwrapper.StorageSealer
|
ffiwrapper.StorageSealer
|
||||||
|
Fetch(context.Context, abi.SectorID, stores.SectorFileType, bool) error
|
||||||
|
|
||||||
TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error)
|
TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error)
|
||||||
|
|
||||||
@ -61,18 +60,9 @@ type Manager struct {
|
|||||||
remoteHnd *stores.FetchHandler
|
remoteHnd *stores.FetchHandler
|
||||||
index stores.SectorIndex
|
index stores.SectorIndex
|
||||||
|
|
||||||
|
sched *scheduler
|
||||||
|
|
||||||
storage.Prover
|
storage.Prover
|
||||||
|
|
||||||
workersLk sync.Mutex
|
|
||||||
nextWorker WorkerID
|
|
||||||
workers map[WorkerID]*workerHandle
|
|
||||||
|
|
||||||
newWorkers chan *workerHandle
|
|
||||||
schedule chan *workerRequest
|
|
||||||
workerFree chan WorkerID
|
|
||||||
closing chan struct{}
|
|
||||||
|
|
||||||
schedQueue *list.List // List[*workerRequest]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type SealerConfig struct {
|
type SealerConfig struct {
|
||||||
@ -106,23 +96,15 @@ func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg
|
|||||||
remoteHnd: &stores.FetchHandler{Local: lstor},
|
remoteHnd: &stores.FetchHandler{Local: lstor},
|
||||||
index: si,
|
index: si,
|
||||||
|
|
||||||
nextWorker: 0,
|
sched: newScheduler(cfg.SealProofType),
|
||||||
workers: map[WorkerID]*workerHandle{},
|
|
||||||
|
|
||||||
newWorkers: make(chan *workerHandle),
|
|
||||||
schedule: make(chan *workerRequest),
|
|
||||||
workerFree: make(chan WorkerID),
|
|
||||||
closing: make(chan struct{}),
|
|
||||||
|
|
||||||
schedQueue: list.New(),
|
|
||||||
|
|
||||||
Prover: prover,
|
Prover: prover,
|
||||||
}
|
}
|
||||||
|
|
||||||
go m.runSched()
|
go m.sched.runSched()
|
||||||
|
|
||||||
localTasks := []sealtasks.TaskType{
|
localTasks := []sealtasks.TaskType{
|
||||||
sealtasks.TTAddPiece, sealtasks.TTCommit1, sealtasks.TTFinalize,
|
sealtasks.TTAddPiece, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch,
|
||||||
}
|
}
|
||||||
if sc.AllowPreCommit1 {
|
if sc.AllowPreCommit1 {
|
||||||
localTasks = append(localTasks, sealtasks.TTPreCommit1)
|
localTasks = append(localTasks, sealtasks.TTPreCommit1)
|
||||||
@ -169,9 +151,11 @@ func (m *Manager) AddWorker(ctx context.Context, w Worker) error {
|
|||||||
return xerrors.Errorf("getting worker info: %w", err)
|
return xerrors.Errorf("getting worker info: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
m.newWorkers <- &workerHandle{
|
m.sched.newWorkers <- &workerHandle{
|
||||||
w: w,
|
w: w,
|
||||||
info: info,
|
info: info,
|
||||||
|
preparing: &activeResources{},
|
||||||
|
active: &activeResources{},
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -189,81 +173,13 @@ func (m *Manager) ReadPieceFromSealedSector(context.Context, abi.SectorID, ffiwr
|
|||||||
panic("implement me")
|
panic("implement me")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) getWorkersByPaths(task sealtasks.TaskType, inPaths []stores.StorageInfo) ([]WorkerID, map[WorkerID]stores.StorageInfo) {
|
func schedNop(context.Context, Worker) error {
|
||||||
m.workersLk.Lock()
|
return nil
|
||||||
defer m.workersLk.Unlock()
|
|
||||||
|
|
||||||
var workers []WorkerID
|
|
||||||
paths := map[WorkerID]stores.StorageInfo{}
|
|
||||||
|
|
||||||
for i, worker := range m.workers {
|
|
||||||
tt, err := worker.w.TaskTypes(context.TODO())
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("error getting supported worker task types: %+v", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if _, ok := tt[task]; !ok {
|
|
||||||
log.Debugf("dropping worker %d; task %s not supported (supports %v)", i, task, tt)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
phs, err := worker.w.Paths(context.TODO())
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("error getting worker paths: %+v", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// check if the worker has access to the path we selected
|
|
||||||
var st *stores.StorageInfo
|
|
||||||
for _, p := range phs {
|
|
||||||
for _, meta := range inPaths {
|
|
||||||
if p.ID == meta.ID {
|
|
||||||
if st != nil && st.Weight > p.Weight {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
p := meta // copy
|
|
||||||
st = &p
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if st == nil {
|
|
||||||
log.Debugf("skipping worker %d; doesn't have any of %v", i, inPaths)
|
|
||||||
log.Debugf("skipping worker %d; only has %v", i, phs)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
paths[i] = *st
|
|
||||||
workers = append(workers, i)
|
|
||||||
}
|
|
||||||
|
|
||||||
return workers, paths
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) getWorker(ctx context.Context, taskType sealtasks.TaskType, accept []WorkerID) (Worker, func(), error) {
|
func schedFetch(sector abi.SectorID, ft stores.SectorFileType, sealing bool) func(context.Context, Worker) error {
|
||||||
ret := make(chan workerResponse)
|
return func(ctx context.Context, worker Worker) error {
|
||||||
|
return worker.Fetch(ctx, sector, ft, sealing)
|
||||||
select {
|
|
||||||
case m.schedule <- &workerRequest{
|
|
||||||
taskType: taskType,
|
|
||||||
accept: accept,
|
|
||||||
|
|
||||||
cancel: ctx.Done(),
|
|
||||||
ret: ret,
|
|
||||||
}:
|
|
||||||
case <-m.closing:
|
|
||||||
return nil, nil, xerrors.New("closing")
|
|
||||||
case <-ctx.Done():
|
|
||||||
return nil, nil, ctx.Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case resp := <-ret:
|
|
||||||
return resp.worker, resp.done, resp.err
|
|
||||||
case <-m.closing:
|
|
||||||
return nil, nil, xerrors.New("closing")
|
|
||||||
case <-ctx.Done():
|
|
||||||
return nil, nil, ctx.Err()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -273,151 +189,114 @@ func (m *Manager) NewSector(ctx context.Context, sector abi.SectorID) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
|
func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPieces []abi.UnpaddedPieceSize, sz abi.UnpaddedPieceSize, r io.Reader) (abi.PieceInfo, error) {
|
||||||
// TODO: consider multiple paths vs workers when initially allocating
|
var selector WorkerSelector
|
||||||
|
|
||||||
var best []stores.StorageInfo
|
|
||||||
var err error
|
var err error
|
||||||
if len(existingPieces) == 0 { // new
|
if len(existingPieces) == 0 { // new
|
||||||
best, err = m.index.StorageBestAlloc(ctx, stores.FTUnsealed, true)
|
selector, err = newAllocSelector(ctx, m.index, stores.FTUnsealed)
|
||||||
} else { // append to existing
|
} else { // append to existing
|
||||||
best, err = m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, false)
|
selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return abi.PieceInfo{}, xerrors.Errorf("finding sector path: %w", err)
|
return abi.PieceInfo{}, xerrors.Errorf("creating path selector: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("find workers for %v", best)
|
var out abi.PieceInfo
|
||||||
candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTAddPiece, best)
|
err = m.sched.Schedule(ctx, sealtasks.TTAddPiece, selector, schedNop, func(ctx context.Context, w Worker) error {
|
||||||
|
p, err := w.AddPiece(ctx, sector, existingPieces, sz, r)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
out = p
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
if len(candidateWorkers) == 0 {
|
return out, err
|
||||||
return abi.PieceInfo{}, ErrNoWorkers
|
|
||||||
}
|
|
||||||
|
|
||||||
worker, done, err := m.getWorker(ctx, sealtasks.TTAddPiece, candidateWorkers)
|
|
||||||
if err != nil {
|
|
||||||
return abi.PieceInfo{}, xerrors.Errorf("scheduling worker: %w", err)
|
|
||||||
}
|
|
||||||
defer done()
|
|
||||||
|
|
||||||
// TODO: select(candidateWorkers, ...)
|
|
||||||
// TODO: remove the sectorbuilder abstraction, pass path directly
|
|
||||||
return worker.AddPiece(ctx, sector, existingPieces, sz, r)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
|
func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
|
||||||
// TODO: also consider where the unsealed data sits
|
// TODO: also consider where the unsealed data sits
|
||||||
|
|
||||||
best, err := m.index.StorageBestAlloc(ctx, stores.FTCache|stores.FTSealed, true)
|
selector, err := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("finding path for sector sealing: %w", err)
|
return nil, xerrors.Errorf("creating path selector: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTPreCommit1, best)
|
err = m.sched.Schedule(ctx, sealtasks.TTPreCommit1, selector, schedFetch(sector, stores.FTUnsealed, true), func(ctx context.Context, w Worker) error {
|
||||||
if len(candidateWorkers) == 0 {
|
p, err := w.SealPreCommit1(ctx, sector, ticket, pieces)
|
||||||
return nil, ErrNoWorkers
|
if err != nil {
|
||||||
}
|
return err
|
||||||
|
}
|
||||||
|
out = p
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
worker, done, err := m.getWorker(ctx, sealtasks.TTPreCommit1, candidateWorkers)
|
return out, err
|
||||||
if err != nil {
|
|
||||||
return nil, xerrors.Errorf("scheduling worker: %w", err)
|
|
||||||
}
|
|
||||||
defer done()
|
|
||||||
|
|
||||||
// TODO: select(candidateWorkers, ...)
|
|
||||||
// TODO: remove the sectorbuilder abstraction, pass path directly
|
|
||||||
return worker.SealPreCommit1(ctx, sector, ticket, pieces)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.PreCommit1Out) (cids storage.SectorCids, err error) {
|
func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.PreCommit1Out) (out storage.SectorCids, err error) {
|
||||||
// TODO: allow workers to fetch the sectors
|
selector, err := newExistingSelector(ctx, m.index, sector, stores.FTCache|stores.FTSealed, true)
|
||||||
|
|
||||||
best, err := m.index.StorageFindSector(ctx, sector, stores.FTCache|stores.FTSealed, true)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return storage.SectorCids{}, xerrors.Errorf("finding path for sector sealing: %w", err)
|
return storage.SectorCids{}, xerrors.Errorf("creating path selector: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTPreCommit2, best)
|
err = m.sched.Schedule(ctx, sealtasks.TTPreCommit2, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, true), func(ctx context.Context, w Worker) error {
|
||||||
if len(candidateWorkers) == 0 {
|
p, err := w.SealPreCommit2(ctx, sector, phase1Out)
|
||||||
return storage.SectorCids{}, ErrNoWorkers
|
if err != nil {
|
||||||
}
|
return err
|
||||||
|
}
|
||||||
worker, done, err := m.getWorker(ctx, sealtasks.TTPreCommit2, candidateWorkers)
|
out = p
|
||||||
if err != nil {
|
return nil
|
||||||
return storage.SectorCids{}, xerrors.Errorf("scheduling worker: %w", err)
|
})
|
||||||
}
|
return out, err
|
||||||
defer done()
|
|
||||||
|
|
||||||
// TODO: select(candidateWorkers, ...)
|
|
||||||
// TODO: remove the sectorbuilder abstraction, pass path directly
|
|
||||||
return worker.SealPreCommit2(ctx, sector, phase1Out)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (output storage.Commit1Out, err error) {
|
func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (out storage.Commit1Out, err error) {
|
||||||
best, err := m.index.StorageFindSector(ctx, sector, stores.FTCache|stores.FTSealed, true)
|
selector, err := newExistingSelector(ctx, m.index, sector, stores.FTCache|stores.FTSealed, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("finding path for sector sealing: %w", err)
|
return storage.Commit1Out{}, xerrors.Errorf("creating path selector: %w", err)
|
||||||
}
|
|
||||||
|
|
||||||
candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTCommit1, best)
|
|
||||||
if len(candidateWorkers) == 0 {
|
|
||||||
return nil, ErrNoWorkers
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Try very hard to execute on worker with access to the sectors
|
// TODO: Try very hard to execute on worker with access to the sectors
|
||||||
worker, done, err := m.getWorker(ctx, sealtasks.TTCommit1, candidateWorkers)
|
// (except, don't.. for now at least - we are using this step to bring data
|
||||||
if err != nil {
|
// into 'provable' storage. Optimally we'd do that in commit2, in parallel
|
||||||
return nil, xerrors.Errorf("scheduling worker: %w", err)
|
// with snark compute)
|
||||||
}
|
err = m.sched.Schedule(ctx, sealtasks.TTCommit1, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, true), func(ctx context.Context, w Worker) error {
|
||||||
defer done()
|
p, err := w.SealCommit1(ctx, sector, ticket, seed, pieces, cids)
|
||||||
|
if err != nil {
|
||||||
// TODO: select(candidateWorkers, ...)
|
return err
|
||||||
// TODO: remove the sectorbuilder abstraction, pass path directly
|
}
|
||||||
return worker.SealCommit1(ctx, sector, ticket, seed, pieces, cids)
|
out = p
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
return out, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.Commit1Out) (proof storage.Proof, err error) {
|
func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.Commit1Out) (out storage.Proof, err error) {
|
||||||
var candidateWorkers []WorkerID
|
selector := newTaskSelector()
|
||||||
|
|
||||||
m.workersLk.Lock()
|
err = m.sched.Schedule(ctx, sealtasks.TTCommit2, selector, schedNop, func(ctx context.Context, w Worker) error {
|
||||||
for id, worker := range m.workers {
|
p, err := w.SealCommit2(ctx, sector, phase1Out)
|
||||||
tt, err := worker.w.TaskTypes(ctx)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("error getting supported worker task types: %+v", err)
|
return err
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
if _, ok := tt[sealtasks.TTCommit2]; !ok {
|
out = p
|
||||||
continue
|
return nil
|
||||||
}
|
})
|
||||||
candidateWorkers = append(candidateWorkers, id)
|
|
||||||
}
|
|
||||||
m.workersLk.Unlock()
|
|
||||||
if len(candidateWorkers) == 0 {
|
|
||||||
return nil, ErrNoWorkers
|
|
||||||
}
|
|
||||||
|
|
||||||
worker, done, err := m.getWorker(ctx, sealtasks.TTCommit2, candidateWorkers)
|
return out, err
|
||||||
if err != nil {
|
|
||||||
return nil, xerrors.Errorf("scheduling worker: %w", err)
|
|
||||||
}
|
|
||||||
defer done()
|
|
||||||
|
|
||||||
return worker.SealCommit2(ctx, sector, phase1Out)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error {
|
func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID) error {
|
||||||
best, err := m.index.StorageFindSector(ctx, sector, stores.FTCache|stores.FTSealed|stores.FTUnsealed, true)
|
selector, err := newExistingSelector(ctx, m.index, sector, stores.FTCache|stores.FTSealed|stores.FTUnsealed, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("finding sealed sector: %w", err)
|
return xerrors.Errorf("creating path selector: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
candidateWorkers, _ := m.getWorkersByPaths(sealtasks.TTFinalize, best)
|
return m.sched.Schedule(ctx, sealtasks.TTFinalize, selector,
|
||||||
if len(candidateWorkers) == 0 {
|
schedFetch(sector, stores.FTCache|stores.FTSealed|stores.FTUnsealed, false),
|
||||||
return ErrNoWorkers
|
func(ctx context.Context, w Worker) error {
|
||||||
}
|
return w.FinalizeSector(ctx, sector)
|
||||||
|
})
|
||||||
// TODO: Remove sector from sealing stores
|
|
||||||
// TODO: Move the sector to long-term storage
|
|
||||||
return m.workers[candidateWorkers[0]].w.FinalizeSector(ctx, sector)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error) {
|
func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error) {
|
||||||
@ -439,8 +318,7 @@ func (m *Manager) FsStat(ctx context.Context, id stores.ID) (stores.FsStat, erro
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) Close() error {
|
func (m *Manager) Close() error {
|
||||||
close(m.closing)
|
return m.sched.Close()
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ SectorManager = &Manager{}
|
var _ SectorManager = &Manager{}
|
||||||
|
94
resources.go
94
resources.go
@ -23,12 +23,16 @@ type Resources struct {
|
|||||||
MinMemory uint64 // What Must be in RAM for decent perf
|
MinMemory uint64 // What Must be in RAM for decent perf
|
||||||
MaxMemory uint64 // Memory required (swap + ram)
|
MaxMemory uint64 // Memory required (swap + ram)
|
||||||
|
|
||||||
MultiThread bool
|
Threads int // -1 = multithread
|
||||||
CanGPU bool
|
CanGPU bool
|
||||||
|
|
||||||
BaseMinMemory uint64 // What Must be in RAM for decent perf (shared between threads)
|
BaseMinMemory uint64 // What Must be in RAM for decent perf (shared between threads)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r Resources) MultiThread() bool {
|
||||||
|
return r.Threads == -1
|
||||||
|
}
|
||||||
|
|
||||||
const MaxCachingOverhead = 32 << 30
|
const MaxCachingOverhead = 32 << 30
|
||||||
|
|
||||||
var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{
|
var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{
|
||||||
@ -37,7 +41,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{
|
|||||||
MaxMemory: 32 << 30,
|
MaxMemory: 32 << 30,
|
||||||
MinMemory: 32 << 30,
|
MinMemory: 32 << 30,
|
||||||
|
|
||||||
MultiThread: false,
|
Threads: 1,
|
||||||
|
|
||||||
BaseMinMemory: 1 << 30,
|
BaseMinMemory: 1 << 30,
|
||||||
},
|
},
|
||||||
@ -45,7 +49,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{
|
|||||||
MaxMemory: 1 << 30,
|
MaxMemory: 1 << 30,
|
||||||
MinMemory: 1 << 30,
|
MinMemory: 1 << 30,
|
||||||
|
|
||||||
MultiThread: false,
|
Threads: 1,
|
||||||
|
|
||||||
BaseMinMemory: 1 << 30,
|
BaseMinMemory: 1 << 30,
|
||||||
},
|
},
|
||||||
@ -53,7 +57,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{
|
|||||||
MaxMemory: 2 << 10,
|
MaxMemory: 2 << 10,
|
||||||
MinMemory: 2 << 10,
|
MinMemory: 2 << 10,
|
||||||
|
|
||||||
MultiThread: false,
|
Threads: 1,
|
||||||
|
|
||||||
BaseMinMemory: 2 << 10,
|
BaseMinMemory: 2 << 10,
|
||||||
},
|
},
|
||||||
@ -61,7 +65,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{
|
|||||||
MaxMemory: 8 << 20,
|
MaxMemory: 8 << 20,
|
||||||
MinMemory: 8 << 20,
|
MinMemory: 8 << 20,
|
||||||
|
|
||||||
MultiThread: false,
|
Threads: 1,
|
||||||
|
|
||||||
BaseMinMemory: 8 << 20,
|
BaseMinMemory: 8 << 20,
|
||||||
},
|
},
|
||||||
@ -71,7 +75,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{
|
|||||||
MaxMemory: 64 << 30,
|
MaxMemory: 64 << 30,
|
||||||
MinMemory: 32 << 30,
|
MinMemory: 32 << 30,
|
||||||
|
|
||||||
MultiThread: false,
|
Threads: 1,
|
||||||
|
|
||||||
BaseMinMemory: 30 << 30,
|
BaseMinMemory: 30 << 30,
|
||||||
},
|
},
|
||||||
@ -79,7 +83,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{
|
|||||||
MaxMemory: 3 << 29, // 1.5G
|
MaxMemory: 3 << 29, // 1.5G
|
||||||
MinMemory: 1 << 30,
|
MinMemory: 1 << 30,
|
||||||
|
|
||||||
MultiThread: false,
|
Threads: 1,
|
||||||
|
|
||||||
BaseMinMemory: 1 << 30,
|
BaseMinMemory: 1 << 30,
|
||||||
},
|
},
|
||||||
@ -87,7 +91,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{
|
|||||||
MaxMemory: 2 << 10,
|
MaxMemory: 2 << 10,
|
||||||
MinMemory: 2 << 10,
|
MinMemory: 2 << 10,
|
||||||
|
|
||||||
MultiThread: false,
|
Threads: 1,
|
||||||
|
|
||||||
BaseMinMemory: 2 << 10,
|
BaseMinMemory: 2 << 10,
|
||||||
},
|
},
|
||||||
@ -95,7 +99,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{
|
|||||||
MaxMemory: 8 << 20,
|
MaxMemory: 8 << 20,
|
||||||
MinMemory: 8 << 20,
|
MinMemory: 8 << 20,
|
||||||
|
|
||||||
MultiThread: false,
|
Threads: 1,
|
||||||
|
|
||||||
BaseMinMemory: 8 << 20,
|
BaseMinMemory: 8 << 20,
|
||||||
},
|
},
|
||||||
@ -105,7 +109,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{
|
|||||||
MaxMemory: 96 << 30,
|
MaxMemory: 96 << 30,
|
||||||
MinMemory: 64 << 30,
|
MinMemory: 64 << 30,
|
||||||
|
|
||||||
MultiThread: true,
|
Threads: -1,
|
||||||
|
|
||||||
BaseMinMemory: 30 << 30,
|
BaseMinMemory: 30 << 30,
|
||||||
},
|
},
|
||||||
@ -113,7 +117,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{
|
|||||||
MaxMemory: 3 << 29, // 1.5G
|
MaxMemory: 3 << 29, // 1.5G
|
||||||
MinMemory: 1 << 30,
|
MinMemory: 1 << 30,
|
||||||
|
|
||||||
MultiThread: true,
|
Threads: -1,
|
||||||
|
|
||||||
BaseMinMemory: 1 << 30,
|
BaseMinMemory: 1 << 30,
|
||||||
},
|
},
|
||||||
@ -121,7 +125,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{
|
|||||||
MaxMemory: 2 << 10,
|
MaxMemory: 2 << 10,
|
||||||
MinMemory: 2 << 10,
|
MinMemory: 2 << 10,
|
||||||
|
|
||||||
MultiThread: true,
|
Threads: -1,
|
||||||
|
|
||||||
BaseMinMemory: 2 << 10,
|
BaseMinMemory: 2 << 10,
|
||||||
},
|
},
|
||||||
@ -129,7 +133,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{
|
|||||||
MaxMemory: 8 << 20,
|
MaxMemory: 8 << 20,
|
||||||
MinMemory: 8 << 20,
|
MinMemory: 8 << 20,
|
||||||
|
|
||||||
MultiThread: true,
|
Threads: -1,
|
||||||
|
|
||||||
BaseMinMemory: 8 << 20,
|
BaseMinMemory: 8 << 20,
|
||||||
},
|
},
|
||||||
@ -139,7 +143,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{
|
|||||||
MaxMemory: 1 << 30,
|
MaxMemory: 1 << 30,
|
||||||
MinMemory: 1 << 30,
|
MinMemory: 1 << 30,
|
||||||
|
|
||||||
MultiThread: false,
|
Threads: 0,
|
||||||
|
|
||||||
BaseMinMemory: 1 << 30,
|
BaseMinMemory: 1 << 30,
|
||||||
},
|
},
|
||||||
@ -147,7 +151,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{
|
|||||||
MaxMemory: 1 << 30,
|
MaxMemory: 1 << 30,
|
||||||
MinMemory: 1 << 30,
|
MinMemory: 1 << 30,
|
||||||
|
|
||||||
MultiThread: false,
|
Threads: 0,
|
||||||
|
|
||||||
BaseMinMemory: 1 << 30,
|
BaseMinMemory: 1 << 30,
|
||||||
},
|
},
|
||||||
@ -155,7 +159,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{
|
|||||||
MaxMemory: 2 << 10,
|
MaxMemory: 2 << 10,
|
||||||
MinMemory: 2 << 10,
|
MinMemory: 2 << 10,
|
||||||
|
|
||||||
MultiThread: false,
|
Threads: 0,
|
||||||
|
|
||||||
BaseMinMemory: 2 << 10,
|
BaseMinMemory: 2 << 10,
|
||||||
},
|
},
|
||||||
@ -163,7 +167,7 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{
|
|||||||
MaxMemory: 8 << 20,
|
MaxMemory: 8 << 20,
|
||||||
MinMemory: 8 << 20,
|
MinMemory: 8 << 20,
|
||||||
|
|
||||||
MultiThread: false,
|
Threads: 0,
|
||||||
|
|
||||||
BaseMinMemory: 8 << 20,
|
BaseMinMemory: 8 << 20,
|
||||||
},
|
},
|
||||||
@ -173,8 +177,8 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{
|
|||||||
MaxMemory: 110 << 30,
|
MaxMemory: 110 << 30,
|
||||||
MinMemory: 60 << 30,
|
MinMemory: 60 << 30,
|
||||||
|
|
||||||
MultiThread: true,
|
Threads: -1,
|
||||||
CanGPU: true,
|
CanGPU: true,
|
||||||
|
|
||||||
BaseMinMemory: 64 << 30, // params
|
BaseMinMemory: 64 << 30, // params
|
||||||
},
|
},
|
||||||
@ -182,8 +186,8 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{
|
|||||||
MaxMemory: 3 << 29, // 1.5G
|
MaxMemory: 3 << 29, // 1.5G
|
||||||
MinMemory: 1 << 30,
|
MinMemory: 1 << 30,
|
||||||
|
|
||||||
MultiThread: false, // This is fine
|
Threads: 1, // This is fine
|
||||||
CanGPU: true,
|
CanGPU: true,
|
||||||
|
|
||||||
BaseMinMemory: 10 << 30,
|
BaseMinMemory: 10 << 30,
|
||||||
},
|
},
|
||||||
@ -191,8 +195,8 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{
|
|||||||
MaxMemory: 2 << 10,
|
MaxMemory: 2 << 10,
|
||||||
MinMemory: 2 << 10,
|
MinMemory: 2 << 10,
|
||||||
|
|
||||||
MultiThread: false,
|
Threads: 1,
|
||||||
CanGPU: true,
|
CanGPU: true,
|
||||||
|
|
||||||
BaseMinMemory: 2 << 10,
|
BaseMinMemory: 2 << 10,
|
||||||
},
|
},
|
||||||
@ -200,10 +204,48 @@ var ResourceTable = map[sealtasks.TaskType]map[abi.RegisteredProof]Resources{
|
|||||||
MaxMemory: 8 << 20,
|
MaxMemory: 8 << 20,
|
||||||
MinMemory: 8 << 20,
|
MinMemory: 8 << 20,
|
||||||
|
|
||||||
MultiThread: false,
|
Threads: 1,
|
||||||
CanGPU: true,
|
CanGPU: true,
|
||||||
|
|
||||||
BaseMinMemory: 8 << 20,
|
BaseMinMemory: 8 << 20,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
sealtasks.TTFetch: {
|
||||||
|
abi.RegisteredProof_StackedDRG32GiBSeal: Resources{
|
||||||
|
MaxMemory: 1 << 20,
|
||||||
|
MinMemory: 1 << 20,
|
||||||
|
|
||||||
|
Threads: 0,
|
||||||
|
CanGPU: false,
|
||||||
|
|
||||||
|
BaseMinMemory: 0,
|
||||||
|
},
|
||||||
|
abi.RegisteredProof_StackedDRG512MiBSeal: Resources{
|
||||||
|
MaxMemory: 1 << 20,
|
||||||
|
MinMemory: 1 << 20,
|
||||||
|
|
||||||
|
Threads: 0,
|
||||||
|
CanGPU: false,
|
||||||
|
|
||||||
|
BaseMinMemory: 0,
|
||||||
|
},
|
||||||
|
abi.RegisteredProof_StackedDRG2KiBSeal: Resources{
|
||||||
|
MaxMemory: 1 << 20,
|
||||||
|
MinMemory: 1 << 20,
|
||||||
|
|
||||||
|
Threads: 0,
|
||||||
|
CanGPU: false,
|
||||||
|
|
||||||
|
BaseMinMemory: 0,
|
||||||
|
},
|
||||||
|
abi.RegisteredProof_StackedDRG8MiBSeal: Resources{
|
||||||
|
MaxMemory: 1 << 20,
|
||||||
|
MinMemory: 1 << 20,
|
||||||
|
|
||||||
|
Threads: 0,
|
||||||
|
CanGPU: false,
|
||||||
|
|
||||||
|
BaseMinMemory: 0,
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
439
sched.go
439
sched.go
@ -1,6 +1,12 @@
|
|||||||
package sectorstorage
|
package sectorstorage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"container/list"
|
||||||
|
"context"
|
||||||
|
"sort"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-multierror"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||||
@ -11,97 +17,166 @@ import (
|
|||||||
|
|
||||||
const mib = 1 << 20
|
const mib = 1 << 20
|
||||||
|
|
||||||
|
type WorkerAction func(ctx context.Context, w Worker) error
|
||||||
|
|
||||||
|
type WorkerSelector interface {
|
||||||
|
Ok(ctx context.Context, task sealtasks.TaskType, a *workerHandle) (bool, error) // true if worker is acceptable for performing a task
|
||||||
|
|
||||||
|
Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) // true if a is preferred over b
|
||||||
|
}
|
||||||
|
|
||||||
|
type scheduler struct {
|
||||||
|
spt abi.RegisteredProof
|
||||||
|
|
||||||
|
workersLk sync.Mutex
|
||||||
|
nextWorker WorkerID
|
||||||
|
workers map[WorkerID]*workerHandle
|
||||||
|
|
||||||
|
newWorkers chan *workerHandle
|
||||||
|
schedule chan *workerRequest
|
||||||
|
workerFree chan WorkerID
|
||||||
|
closing chan struct{}
|
||||||
|
|
||||||
|
schedQueue *list.List // List[*workerRequest]
|
||||||
|
}
|
||||||
|
|
||||||
|
func newScheduler(spt abi.RegisteredProof) *scheduler {
|
||||||
|
return &scheduler{
|
||||||
|
spt: spt,
|
||||||
|
|
||||||
|
nextWorker: 0,
|
||||||
|
workers: map[WorkerID]*workerHandle{},
|
||||||
|
|
||||||
|
newWorkers: make(chan *workerHandle),
|
||||||
|
schedule: make(chan *workerRequest),
|
||||||
|
workerFree: make(chan WorkerID),
|
||||||
|
closing: make(chan struct{}),
|
||||||
|
|
||||||
|
schedQueue: list.New(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sh *scheduler) Schedule(ctx context.Context, taskType sealtasks.TaskType, sel WorkerSelector, prepare WorkerAction, work WorkerAction) error {
|
||||||
|
ret := make(chan workerResponse)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case sh.schedule <- &workerRequest{
|
||||||
|
taskType: taskType,
|
||||||
|
sel: sel,
|
||||||
|
|
||||||
|
prepare: prepare,
|
||||||
|
work: work,
|
||||||
|
|
||||||
|
ret: ret,
|
||||||
|
ctx: ctx,
|
||||||
|
}:
|
||||||
|
case <-sh.closing:
|
||||||
|
return xerrors.New("closing")
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case resp := <-ret:
|
||||||
|
return resp.err
|
||||||
|
case <-sh.closing:
|
||||||
|
return xerrors.New("closing")
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type workerRequest struct {
|
type workerRequest struct {
|
||||||
taskType sealtasks.TaskType
|
taskType sealtasks.TaskType
|
||||||
accept []WorkerID // ordered by preference
|
sel WorkerSelector
|
||||||
|
|
||||||
ret chan<- workerResponse
|
prepare WorkerAction
|
||||||
cancel <-chan struct{}
|
work WorkerAction
|
||||||
|
|
||||||
|
ret chan<- workerResponse
|
||||||
|
ctx context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
type workerResponse struct {
|
type workerResponse struct {
|
||||||
err error
|
err error
|
||||||
|
|
||||||
worker Worker
|
|
||||||
done func()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *workerRequest) respond(resp workerResponse) {
|
func (r *workerRequest) respond(err error) {
|
||||||
select {
|
select {
|
||||||
case r.ret <- resp:
|
case r.ret <- workerResponse{err: err}:
|
||||||
case <-r.cancel:
|
case <-r.ctx.Done():
|
||||||
log.Warnf("request got cancelled before we could respond")
|
log.Warnf("request got cancelled before we could respond")
|
||||||
if resp.done != nil {
|
|
||||||
resp.done()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type activeResources struct {
|
||||||
|
memUsedMin uint64
|
||||||
|
memUsedMax uint64
|
||||||
|
gpuUsed bool
|
||||||
|
cpuUse uint64
|
||||||
|
|
||||||
|
cond *sync.Cond
|
||||||
|
}
|
||||||
|
|
||||||
type workerHandle struct {
|
type workerHandle struct {
|
||||||
w Worker
|
w Worker
|
||||||
|
|
||||||
info storiface.WorkerInfo
|
info storiface.WorkerInfo
|
||||||
|
|
||||||
memUsedMin uint64
|
preparing *activeResources
|
||||||
memUsedMax uint64
|
active *activeResources
|
||||||
gpuUsed bool
|
|
||||||
cpuUse int // -1 - multicore thing; 0 - free; 1+ - singlecore things
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) runSched() {
|
func (sh *scheduler) runSched() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case w := <-m.newWorkers:
|
case w := <-sh.newWorkers:
|
||||||
m.schedNewWorker(w)
|
sh.schedNewWorker(w)
|
||||||
case req := <-m.schedule:
|
case req := <-sh.schedule:
|
||||||
resp, err := m.maybeSchedRequest(req)
|
scheduled, err := sh.maybeSchedRequest(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
req.respond(workerResponse{err: err})
|
req.respond(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if scheduled {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if resp != nil {
|
sh.schedQueue.PushBack(req)
|
||||||
req.respond(*resp)
|
case wid := <-sh.workerFree:
|
||||||
continue
|
sh.onWorkerFreed(wid)
|
||||||
}
|
case <-sh.closing:
|
||||||
|
sh.schedClose()
|
||||||
m.schedQueue.PushBack(req)
|
|
||||||
case wid := <-m.workerFree:
|
|
||||||
m.onWorkerFreed(wid)
|
|
||||||
case <-m.closing:
|
|
||||||
m.schedClose()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) onWorkerFreed(wid WorkerID) {
|
func (sh *scheduler) onWorkerFreed(wid WorkerID) {
|
||||||
for e := m.schedQueue.Front(); e != nil; e = e.Next() {
|
for e := sh.schedQueue.Front(); e != nil; e = e.Next() {
|
||||||
req := e.Value.(*workerRequest)
|
req := e.Value.(*workerRequest)
|
||||||
var ok bool
|
|
||||||
for _, id := range req.accept {
|
ok, err := req.sel.Ok(req.ctx, req.taskType, sh.workers[wid])
|
||||||
if id == wid {
|
if err != nil {
|
||||||
ok = true
|
log.Errorf("onWorkerFreed req.sel.Ok error: %+v", err)
|
||||||
break
|
continue
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := m.maybeSchedRequest(req)
|
scheduled, err := sh.maybeSchedRequest(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
req.respond(workerResponse{err: err})
|
req.respond(err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if resp != nil {
|
if scheduled {
|
||||||
req.respond(*resp)
|
|
||||||
|
|
||||||
pe := e.Prev()
|
pe := e.Prev()
|
||||||
m.schedQueue.Remove(e)
|
sh.schedQueue.Remove(e)
|
||||||
if pe == nil {
|
if pe == nil {
|
||||||
pe = m.schedQueue.Front()
|
pe = sh.schedQueue.Front()
|
||||||
}
|
}
|
||||||
if pe == nil {
|
if pe == nil {
|
||||||
break
|
break
|
||||||
@ -112,147 +187,249 @@ func (m *Manager) onWorkerFreed(wid WorkerID) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) maybeSchedRequest(req *workerRequest) (*workerResponse, error) {
|
func (sh *scheduler) maybeSchedRequest(req *workerRequest) (bool, error) {
|
||||||
m.workersLk.Lock()
|
sh.workersLk.Lock()
|
||||||
defer m.workersLk.Unlock()
|
defer sh.workersLk.Unlock()
|
||||||
|
|
||||||
tried := 0
|
tried := 0
|
||||||
|
var acceptable []WorkerID
|
||||||
|
|
||||||
|
needRes := ResourceTable[req.taskType][sh.spt]
|
||||||
|
|
||||||
|
for wid, worker := range sh.workers {
|
||||||
|
ok, err := req.sel.Ok(req.ctx, req.taskType, worker)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
for i := len(req.accept) - 1; i >= 0; i-- {
|
|
||||||
id := req.accept[i]
|
|
||||||
w, ok := m.workers[id]
|
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Warnf("requested worker %d is not in scheduler", id)
|
continue
|
||||||
}
|
}
|
||||||
tried++
|
tried++
|
||||||
|
|
||||||
canDo, err := m.canHandleRequest(id, w, req)
|
if !canHandleRequest(needRes, sh.spt, wid, worker.info.Resources, worker.preparing) {
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if !canDo {
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
return m.makeResponse(id, w, req), nil
|
acceptable = append(acceptable, wid)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(acceptable) > 0 {
|
||||||
|
{
|
||||||
|
var serr error
|
||||||
|
|
||||||
|
sort.SliceStable(acceptable, func(i, j int) bool {
|
||||||
|
r, err := req.sel.Cmp(req.ctx, req.taskType, sh.workers[acceptable[i]], sh.workers[acceptable[j]])
|
||||||
|
if err != nil {
|
||||||
|
serr = multierror.Append(serr, err)
|
||||||
|
}
|
||||||
|
return r
|
||||||
|
})
|
||||||
|
|
||||||
|
if serr != nil {
|
||||||
|
return false, xerrors.Errorf("error(s) selecting best worker: %w", serr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, sh.assignWorker(acceptable[0], sh.workers[acceptable[0]], req)
|
||||||
}
|
}
|
||||||
|
|
||||||
if tried == 0 {
|
if tried == 0 {
|
||||||
return nil, xerrors.New("maybeSchedRequest didn't find any good workers")
|
return false, xerrors.New("maybeSchedRequest didn't find any good workers")
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, nil // put in waiting queue
|
return false, nil // put in waiting queue
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) makeResponse(wid WorkerID, w *workerHandle, req *workerRequest) *workerResponse {
|
func (sh *scheduler) assignWorker(wid WorkerID, w *workerHandle, req *workerRequest) error {
|
||||||
needRes := ResourceTable[req.taskType][m.scfg.SealProofType]
|
needRes := ResourceTable[req.taskType][sh.spt]
|
||||||
|
|
||||||
w.gpuUsed = needRes.CanGPU
|
w.preparing.add(w.info.Resources, needRes)
|
||||||
if needRes.MultiThread {
|
|
||||||
w.cpuUse = -1
|
|
||||||
} else {
|
|
||||||
if w.cpuUse != -1 {
|
|
||||||
w.cpuUse++
|
|
||||||
} else {
|
|
||||||
log.Warnf("sched: makeResponse for worker %d: worker cpu is in multicore use, but a single core task was scheduled", wid)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
w.memUsedMin += needRes.MinMemory
|
go func() {
|
||||||
w.memUsedMax += needRes.MaxMemory
|
err := req.prepare(req.ctx, w.w)
|
||||||
|
sh.workersLk.Lock()
|
||||||
|
|
||||||
return &workerResponse{
|
if err != nil {
|
||||||
err: nil,
|
w.preparing.free(w.info.Resources, needRes)
|
||||||
worker: w.w,
|
sh.workersLk.Unlock()
|
||||||
done: func() {
|
|
||||||
m.workersLk.Lock()
|
|
||||||
|
|
||||||
if needRes.CanGPU {
|
|
||||||
w.gpuUsed = false
|
|
||||||
}
|
|
||||||
|
|
||||||
if needRes.MultiThread {
|
|
||||||
w.cpuUse = 0
|
|
||||||
} else if w.cpuUse != -1 {
|
|
||||||
w.cpuUse--
|
|
||||||
}
|
|
||||||
|
|
||||||
w.memUsedMin -= needRes.MinMemory
|
|
||||||
w.memUsedMax -= needRes.MaxMemory
|
|
||||||
|
|
||||||
m.workersLk.Unlock()
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case m.workerFree <- wid:
|
case sh.workerFree <- wid:
|
||||||
case <-m.closing:
|
case <-sh.closing:
|
||||||
|
log.Warnf("scheduler closed while sending response (prepare error: %+v)", err)
|
||||||
}
|
}
|
||||||
},
|
|
||||||
}
|
select {
|
||||||
|
case req.ret <- workerResponse{err: err}:
|
||||||
|
case <-req.ctx.Done():
|
||||||
|
log.Warnf("request got cancelled before we could respond (prepare error: %+v)", err)
|
||||||
|
case <-sh.closing:
|
||||||
|
log.Warnf("scheduler closed while sending response (prepare error: %+v)", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = w.active.withResources(sh.spt, wid, w.info.Resources, needRes, &sh.workersLk, func() error {
|
||||||
|
w.preparing.free(w.info.Resources, needRes)
|
||||||
|
sh.workersLk.Unlock()
|
||||||
|
defer sh.workersLk.Lock() // we MUST return locked from this function
|
||||||
|
|
||||||
|
select {
|
||||||
|
case sh.workerFree <- wid:
|
||||||
|
case <-sh.closing:
|
||||||
|
}
|
||||||
|
|
||||||
|
err = req.work(req.ctx, w.w)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case req.ret <- workerResponse{err: err}:
|
||||||
|
case <-req.ctx.Done():
|
||||||
|
log.Warnf("request got cancelled before we could respond")
|
||||||
|
case <-sh.closing:
|
||||||
|
log.Warnf("scheduler closed while sending response")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
sh.workersLk.Unlock()
|
||||||
|
|
||||||
|
// This error should always be nil, since nothing is setting it, but just to be safe:
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("error executing worker (withResources): %+v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) canHandleRequest(wid WorkerID, w *workerHandle, req *workerRequest) (bool, error) {
|
func (a *activeResources) withResources(spt abi.RegisteredProof, id WorkerID, wr storiface.WorkerResources, r Resources, locker sync.Locker, cb func() error) error {
|
||||||
needRes, ok := ResourceTable[req.taskType][m.scfg.SealProofType]
|
for !canHandleRequest(r, spt, id, wr, a) {
|
||||||
if !ok {
|
if a.cond == nil {
|
||||||
return false, xerrors.Errorf("canHandleRequest: missing ResourceTable entry for %s/%d", req.taskType, m.scfg.SealProofType)
|
a.cond = sync.NewCond(locker)
|
||||||
|
}
|
||||||
|
a.cond.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
res := w.info.Resources
|
a.add(wr, r)
|
||||||
|
|
||||||
|
err := cb()
|
||||||
|
|
||||||
|
a.free(wr, r)
|
||||||
|
if a.cond != nil {
|
||||||
|
a.cond.Broadcast()
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *activeResources) add(wr storiface.WorkerResources, r Resources) {
|
||||||
|
a.gpuUsed = r.CanGPU
|
||||||
|
if r.MultiThread() {
|
||||||
|
a.cpuUse += wr.CPUs
|
||||||
|
} else {
|
||||||
|
a.cpuUse += uint64(r.Threads)
|
||||||
|
}
|
||||||
|
|
||||||
|
a.memUsedMin += r.MinMemory
|
||||||
|
a.memUsedMax += r.MaxMemory
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *activeResources) free(wr storiface.WorkerResources, r Resources) {
|
||||||
|
if r.CanGPU {
|
||||||
|
a.gpuUsed = false
|
||||||
|
}
|
||||||
|
if r.MultiThread() {
|
||||||
|
a.cpuUse -= wr.CPUs
|
||||||
|
} else {
|
||||||
|
a.cpuUse -= uint64(r.Threads)
|
||||||
|
}
|
||||||
|
|
||||||
|
a.memUsedMin -= r.MinMemory
|
||||||
|
a.memUsedMax -= r.MaxMemory
|
||||||
|
}
|
||||||
|
|
||||||
|
func canHandleRequest(needRes Resources, spt abi.RegisteredProof, wid WorkerID, res storiface.WorkerResources, active *activeResources) bool {
|
||||||
|
|
||||||
// TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running)
|
// TODO: dedupe needRes.BaseMinMemory per task type (don't add if that task is already running)
|
||||||
minNeedMem := res.MemReserved + w.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory
|
minNeedMem := res.MemReserved + active.memUsedMin + needRes.MinMemory + needRes.BaseMinMemory
|
||||||
if minNeedMem > res.MemPhysical {
|
if minNeedMem > res.MemPhysical {
|
||||||
log.Debugf("sched: not scheduling on worker %d; not enough physical memory - need: %dM, have %dM", wid, minNeedMem/mib, res.MemPhysical/mib)
|
log.Debugf("sched: not scheduling on worker %d; not enough physical memory - need: %dM, have %dM", wid, minNeedMem/mib, res.MemPhysical/mib)
|
||||||
return false, nil
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
maxNeedMem := res.MemReserved + w.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory
|
maxNeedMem := res.MemReserved + active.memUsedMax + needRes.MaxMemory + needRes.BaseMinMemory
|
||||||
if m.scfg.SealProofType == abi.RegisteredProof_StackedDRG32GiBSeal {
|
if spt == abi.RegisteredProof_StackedDRG32GiBSeal {
|
||||||
maxNeedMem += MaxCachingOverhead
|
maxNeedMem += MaxCachingOverhead
|
||||||
}
|
}
|
||||||
if maxNeedMem > res.MemSwap+res.MemPhysical {
|
if maxNeedMem > res.MemSwap+res.MemPhysical {
|
||||||
log.Debugf("sched: not scheduling on worker %d; not enough virtual memory - need: %dM, have %dM", wid, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib)
|
log.Debugf("sched: not scheduling on worker %d; not enough virtual memory - need: %dM, have %dM", wid, maxNeedMem/mib, (res.MemSwap+res.MemPhysical)/mib)
|
||||||
return false, nil
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
if needRes.MultiThread {
|
if needRes.MultiThread() {
|
||||||
if w.cpuUse != 0 {
|
if active.cpuUse > 0 {
|
||||||
log.Debugf("sched: not scheduling on worker %d; multicore process needs free CPU", wid)
|
log.Debugf("sched: not scheduling on worker %d; multicore process needs %d threads, %d in use, target %d", wid, res.CPUs, active.cpuUse, res.CPUs)
|
||||||
return false, nil
|
return false
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if w.cpuUse == -1 {
|
if active.cpuUse + uint64(needRes.Threads) > res.CPUs {
|
||||||
log.Debugf("sched: not scheduling on worker %d; CPU in use by a multicore process", wid)
|
log.Debugf("sched: not scheduling on worker %d; not enough threads, need %d, %d in use, target %d", wid, needRes.Threads, active.cpuUse, res.CPUs)
|
||||||
return false, nil
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(res.GPUs) > 0 && needRes.CanGPU {
|
if len(res.GPUs) > 0 && needRes.CanGPU {
|
||||||
if w.gpuUsed {
|
if active.gpuUsed {
|
||||||
log.Debugf("sched: not scheduling on worker %d; GPU in use", wid)
|
log.Debugf("sched: not scheduling on worker %d; GPU in use", wid)
|
||||||
return false, nil
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return true, nil
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) schedNewWorker(w *workerHandle) {
|
func (a *activeResources) utilization(wr storiface.WorkerResources) float64 {
|
||||||
m.workersLk.Lock()
|
var max float64
|
||||||
defer m.workersLk.Unlock()
|
|
||||||
|
|
||||||
id := m.nextWorker
|
cpu := float64(a.cpuUse) / float64(wr.CPUs)
|
||||||
m.workers[id] = w
|
max = cpu
|
||||||
m.nextWorker++
|
|
||||||
|
memMin := float64(a.memUsedMin + wr.MemReserved) / float64(wr.MemPhysical)
|
||||||
|
if memMin > max {
|
||||||
|
max = memMin
|
||||||
|
}
|
||||||
|
|
||||||
|
memMax := float64(a.memUsedMax + wr.MemReserved) / float64(wr.MemPhysical + wr.MemSwap)
|
||||||
|
if memMax > max {
|
||||||
|
max = memMax
|
||||||
|
}
|
||||||
|
|
||||||
|
return max
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) schedClose() {
|
func (sh *scheduler) schedNewWorker(w *workerHandle) {
|
||||||
m.workersLk.Lock()
|
sh.workersLk.Lock()
|
||||||
defer m.workersLk.Unlock()
|
defer sh.workersLk.Unlock()
|
||||||
|
|
||||||
for i, w := range m.workers {
|
id := sh.nextWorker
|
||||||
|
sh.workers[id] = w
|
||||||
|
sh.nextWorker++
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sh *scheduler) schedClose() {
|
||||||
|
sh.workersLk.Lock()
|
||||||
|
defer sh.workersLk.Unlock()
|
||||||
|
|
||||||
|
for i, w := range sh.workers {
|
||||||
if err := w.w.Close(); err != nil {
|
if err := w.w.Close(); err != nil {
|
||||||
log.Errorf("closing worker %d: %+v", i, err)
|
log.Errorf("closing worker %d: %+v", i, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sh *scheduler) Close() error {
|
||||||
|
close(sh.closing)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -10,4 +10,6 @@ const (
|
|||||||
TTCommit2 TaskType = "seal/v0/commit/2"
|
TTCommit2 TaskType = "seal/v0/commit/2"
|
||||||
|
|
||||||
TTFinalize TaskType = "seal/v0/finalize"
|
TTFinalize TaskType = "seal/v0/finalize"
|
||||||
|
|
||||||
|
TTFetch TaskType = "seal/v0/fetch"
|
||||||
)
|
)
|
||||||
|
59
selector_alloc.go
Normal file
59
selector_alloc.go
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
package sectorstorage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/sector-storage/sealtasks"
|
||||||
|
"github.com/filecoin-project/sector-storage/stores"
|
||||||
|
)
|
||||||
|
|
||||||
|
type allocSelector struct {
|
||||||
|
best []stores.StorageInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func newAllocSelector(ctx context.Context, index stores.SectorIndex, alloc stores.SectorFileType) (*allocSelector, error) {
|
||||||
|
best, err := index.StorageBestAlloc(ctx, alloc, true)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &allocSelector{
|
||||||
|
best: best,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, whnd *workerHandle) (bool, error) {
|
||||||
|
tasks, err := whnd.w.TaskTypes(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("getting supported worker task types: %w", err)
|
||||||
|
}
|
||||||
|
if _, supported := tasks[task]; !supported {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
paths, err := whnd.w.Paths(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("getting worker paths: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
have := map[stores.ID]struct{}{}
|
||||||
|
for _, path := range paths {
|
||||||
|
have[path.ID] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, info := range s.best {
|
||||||
|
if _, ok := have[info.ID]; ok {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *allocSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) {
|
||||||
|
return a.active.utilization(a.info.Resources) < b.active.utilization(b.info.Resources), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ WorkerSelector = &allocSelector{}
|
60
selector_existing.go
Normal file
60
selector_existing.go
Normal file
@ -0,0 +1,60 @@
|
|||||||
|
package sectorstorage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/sector-storage/sealtasks"
|
||||||
|
"github.com/filecoin-project/sector-storage/stores"
|
||||||
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||||
|
)
|
||||||
|
|
||||||
|
type existingSelector struct {
|
||||||
|
best []stores.StorageInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func newExistingSelector(ctx context.Context, index stores.SectorIndex, sector abi.SectorID, alloc stores.SectorFileType, allowFetch bool) (*existingSelector, error) {
|
||||||
|
best, err := index.StorageFindSector(ctx, sector, alloc, allowFetch)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &existingSelector{
|
||||||
|
best: best,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *existingSelector) Ok(ctx context.Context, task sealtasks.TaskType, whnd *workerHandle) (bool, error) {
|
||||||
|
tasks, err := whnd.w.TaskTypes(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("getting supported worker task types: %w", err)
|
||||||
|
}
|
||||||
|
if _, supported := tasks[task]; !supported {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
paths, err := whnd.w.Paths(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("getting worker paths: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
have := map[stores.ID]struct{}{}
|
||||||
|
for _, path := range paths {
|
||||||
|
have[path.ID] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, info := range s.best {
|
||||||
|
if _, ok := have[info.ID]; ok {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *existingSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) {
|
||||||
|
return a.active.utilization(a.info.Resources) < b.active.utilization(b.info.Resources), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ WorkerSelector = &existingSelector{}
|
46
selector_task.go
Normal file
46
selector_task.go
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
package sectorstorage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/sector-storage/sealtasks"
|
||||||
|
"github.com/filecoin-project/sector-storage/stores"
|
||||||
|
)
|
||||||
|
|
||||||
|
type taskSelector struct {
|
||||||
|
best []stores.StorageInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTaskSelector() *taskSelector {
|
||||||
|
return &taskSelector{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *taskSelector) Ok(ctx context.Context, task sealtasks.TaskType, whnd *workerHandle) (bool, error) {
|
||||||
|
tasks, err := whnd.w.TaskTypes(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("getting supported worker task types: %w", err)
|
||||||
|
}
|
||||||
|
_, supported := tasks[task]
|
||||||
|
|
||||||
|
return supported, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *taskSelector) Cmp(ctx context.Context, _ sealtasks.TaskType, a, b *workerHandle) (bool, error) {
|
||||||
|
atasks, err := a.w.TaskTypes(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("getting supported worker task types: %w", err)
|
||||||
|
}
|
||||||
|
btasks, err := b.w.TaskTypes(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return false, xerrors.Errorf("getting supported worker task types: %w", err)
|
||||||
|
}
|
||||||
|
if len(atasks) != len(btasks) {
|
||||||
|
return len(atasks) < len(btasks), nil // prefer workers which can do less
|
||||||
|
}
|
||||||
|
|
||||||
|
return a.active.utilization(a.info.Resources) < b.active.utilization(b.info.Resources), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ WorkerSelector = &allocSelector{}
|
14
stats.go
14
stats.go
@ -3,18 +3,18 @@ package sectorstorage
|
|||||||
import "github.com/filecoin-project/sector-storage/storiface"
|
import "github.com/filecoin-project/sector-storage/storiface"
|
||||||
|
|
||||||
func (m *Manager) WorkerStats() map[uint64]storiface.WorkerStats {
|
func (m *Manager) WorkerStats() map[uint64]storiface.WorkerStats {
|
||||||
m.workersLk.Lock()
|
m.sched.workersLk.Lock()
|
||||||
defer m.workersLk.Unlock()
|
defer m.sched.workersLk.Unlock()
|
||||||
|
|
||||||
out := map[uint64]storiface.WorkerStats{}
|
out := map[uint64]storiface.WorkerStats{}
|
||||||
|
|
||||||
for id, handle := range m.workers {
|
for id, handle := range m.sched.workers {
|
||||||
out[uint64(id)] = storiface.WorkerStats{
|
out[uint64(id)] = storiface.WorkerStats{
|
||||||
Info: handle.info,
|
Info: handle.info,
|
||||||
MemUsedMin: handle.memUsedMin,
|
MemUsedMin: handle.active.memUsedMin,
|
||||||
MemUsedMax: handle.memUsedMax,
|
MemUsedMax: handle.active.memUsedMax,
|
||||||
GpuUsed: handle.gpuUsed,
|
GpuUsed: handle.active.gpuUsed,
|
||||||
CpuUse: handle.cpuUse,
|
CpuUse: handle.active.cpuUse,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -63,6 +63,9 @@ func NewIndex() *Index {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (i *Index) StorageList(ctx context.Context) (map[ID][]Decl, error) {
|
func (i *Index) StorageList(ctx context.Context) (map[ID][]Decl, error) {
|
||||||
|
i.lk.RLock()
|
||||||
|
defer i.lk.RUnlock()
|
||||||
|
|
||||||
byID := map[ID]map[abi.SectorID]SectorFileType{}
|
byID := map[ID]map[abi.SectorID]SectorFileType{}
|
||||||
|
|
||||||
for id := range i.stores {
|
for id := range i.stores {
|
||||||
@ -274,11 +277,9 @@ func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, s
|
|||||||
|
|
||||||
for _, p := range i.stores {
|
for _, p := range i.stores {
|
||||||
if sealing && !p.info.CanSeal {
|
if sealing && !p.info.CanSeal {
|
||||||
log.Debugf("alloc: not considering %s; can't seal", p.info.ID)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if !sealing && !p.info.CanStore {
|
if !sealing && !p.info.CanStore {
|
||||||
log.Debugf("alloc: not considering %s; can't store", p.info.ID)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -27,9 +27,8 @@ type Remote struct {
|
|||||||
index SectorIndex
|
index SectorIndex
|
||||||
auth http.Header
|
auth http.Header
|
||||||
|
|
||||||
fetchLk sync.Mutex // TODO: this can be much smarter
|
fetchLk sync.Mutex
|
||||||
// TODO: allow multiple parallel fetches
|
fetching map[abi.SectorID]chan struct{}
|
||||||
// (make sure to not fetch the same sector data twice)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRemote(local *Local, index SectorIndex, auth http.Header) *Remote {
|
func NewRemote(local *Local, index SectorIndex, auth http.Header) *Remote {
|
||||||
@ -37,6 +36,8 @@ func NewRemote(local *Local, index SectorIndex, auth http.Header) *Remote {
|
|||||||
local: local,
|
local: local,
|
||||||
index: index,
|
index: index,
|
||||||
auth: auth,
|
auth: auth,
|
||||||
|
|
||||||
|
fetching: map[abi.SectorID]chan struct{}{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -45,8 +46,32 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, existing Sec
|
|||||||
return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
|
return SectorPaths{}, SectorPaths{}, nil, xerrors.New("can't both find and allocate a sector")
|
||||||
}
|
}
|
||||||
|
|
||||||
r.fetchLk.Lock()
|
for {
|
||||||
defer r.fetchLk.Unlock()
|
r.fetchLk.Lock()
|
||||||
|
|
||||||
|
c, locked := r.fetching[s]
|
||||||
|
if !locked {
|
||||||
|
r.fetching[s] = make(chan struct{})
|
||||||
|
r.fetchLk.Unlock()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
r.fetchLk.Unlock()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-c:
|
||||||
|
continue
|
||||||
|
case <-ctx.Done():
|
||||||
|
return SectorPaths{}, SectorPaths{}, nil, ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
r.fetchLk.Lock()
|
||||||
|
close(r.fetching[s])
|
||||||
|
delete(r.fetching, s)
|
||||||
|
r.fetchLk.Unlock()
|
||||||
|
}()
|
||||||
|
|
||||||
paths, stores, done, err := r.local.AcquireSector(ctx, s, existing, allocate, sealing)
|
paths, stores, done, err := r.local.AcquireSector(ctx, s, existing, allocate, sealing)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -92,7 +117,11 @@ func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType
|
|||||||
return "", "", "", nil, err
|
return "", "", "", nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
sort.Slice(si, func(i, j int) bool {
|
if len(si) == 0 {
|
||||||
|
return "", "", "", nil, xerrors.Errorf("failed to acquire sector %v from remote(%d): not found", s, fileType)
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Slice(si, func(i, j int) bool {
|
||||||
return si[i].Weight < si[j].Weight
|
return si[i].Weight < si[j].Weight
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -12,6 +12,7 @@ type WorkerResources struct {
|
|||||||
|
|
||||||
MemReserved uint64 // Used by system / other processes
|
MemReserved uint64 // Used by system / other processes
|
||||||
|
|
||||||
|
CPUs uint64 // Logical cores
|
||||||
GPUs []string
|
GPUs []string
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -21,5 +22,5 @@ type WorkerStats struct {
|
|||||||
MemUsedMin uint64
|
MemUsedMin uint64
|
||||||
MemUsedMax uint64
|
MemUsedMax uint64
|
||||||
GpuUsed bool
|
GpuUsed bool
|
||||||
CpuUse int
|
CpuUse uint64
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user