add ability to ignore worker resources when scheduling.

This commit is contained in:
Raúl Kripalani 2021-06-21 19:35:47 +01:00
parent fa2b247f75
commit f3b6f8de1a
5 changed files with 49 additions and 18 deletions

View File

@ -96,6 +96,10 @@ type SealerConfig struct {
AllowPreCommit2 bool
AllowCommit bool
AllowUnseal bool
// IgnoreResourceFiltering instructs the system to ignore available
// resources when assigning tasks to the local worker.
IgnoreResourceFiltering bool
}
type StorageAuth http.Header

View File

@ -349,24 +349,24 @@ func (sh *scheduler) trySched() {
defer sh.workersLk.RUnlock()
windowsLen := len(sh.openWindows)
queuneLen := sh.schedQueue.Len()
queueLen := sh.schedQueue.Len()
log.Debugf("SCHED %d queued; %d open windows", queuneLen, windowsLen)
log.Debugf("SCHED %d queued; %d open windows", queueLen, windowsLen)
if windowsLen == 0 || queuneLen == 0 {
if windowsLen == 0 || queueLen == 0 {
// nothing to schedule on
return
}
windows := make([]schedWindow, windowsLen)
acceptableWindows := make([][]int, queuneLen)
acceptableWindows := make([][]int, queueLen)
// Step 1
throttle := make(chan struct{}, windowsLen)
var wg sync.WaitGroup
wg.Add(queuneLen)
for i := 0; i < queuneLen; i++ {
wg.Add(queueLen)
for i := 0; i < queueLen; i++ {
throttle <- struct{}{}
go func(sqi int) {
@ -393,7 +393,8 @@ func (sh *scheduler) trySched() {
}
// TODO: allow bigger windows
if !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, "schedAcceptable", worker.info.Resources) {
ignoringResources := worker.info.IgnoreResources
if !ignoringResources && !windows[wnd].allocated.canHandleRequest(needRes, windowRequest.worker, "schedAcceptable", worker.info.Resources) {
continue
}
@ -451,9 +452,9 @@ func (sh *scheduler) trySched() {
// Step 2
scheduled := 0
rmQueue := make([]int, 0, queuneLen)
rmQueue := make([]int, 0, queueLen)
for sqi := 0; sqi < queuneLen; sqi++ {
for sqi := 0; sqi < queueLen; sqi++ {
task := (*sh.schedQueue)[sqi]
needRes := ResourceTable[task.taskType][task.sector.ProofType]

View File

@ -18,7 +18,12 @@ import (
type WorkerInfo struct {
Hostname string
Resources WorkerResources
// IgnoreResources indicates whether the worker's available resources should
// be used ignored (true) or used (false) for the purposes of scheduling and
// task assignment. Only supported on local workers. Used for testing.
// Default should be false (zero value, i.e. resources taken into account).
IgnoreResources bool
Resources WorkerResources
}
type WorkerResources struct {

View File

@ -20,7 +20,7 @@ import (
ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-statestore"
storage "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
@ -33,6 +33,11 @@ var pathTypes = []storiface.SectorFileType{storiface.FTUnsealed, storiface.FTSea
type WorkerConfig struct {
TaskTypes []sealtasks.TaskType
NoSwap bool
// IgnoreResourceFiltering enables task distribution to happen on this
// worker regardless of its currently available resources. Used in testing
// with the local worker.
IgnoreResourceFiltering bool
}
// used do provide custom proofs impl (mostly used in testing)
@ -46,6 +51,9 @@ type LocalWorker struct {
executor ExecutorFunc
noSwap bool
// see equivalent field on WorkerConfig.
ignoreResources bool
ct *workerCallTracker
acceptTasks map[sealtasks.TaskType]struct{}
running sync.WaitGroup
@ -71,12 +79,12 @@ func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, store stores.Store
ct: &workerCallTracker{
st: cst,
},
acceptTasks: acceptTasks,
executor: executor,
noSwap: wcfg.NoSwap,
session: uuid.New(),
closing: make(chan struct{}),
acceptTasks: acceptTasks,
executor: executor,
noSwap: wcfg.NoSwap,
ignoreResources: wcfg.IgnoreResourceFiltering,
session: uuid.New(),
closing: make(chan struct{}),
}
if w.executor == nil {
@ -501,7 +509,8 @@ func (l *LocalWorker) Info(context.Context) (storiface.WorkerInfo, error) {
}
return storiface.WorkerInfo{
Hostname: hostname,
Hostname: hostname,
IgnoreResources: l.ignoreResources,
Resources: storiface.WorkerResources{
MemPhysical: mem.Total,
MemSwap: memSwap,

View File

@ -12,6 +12,7 @@ import (
"time"
"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
@ -127,6 +128,12 @@ func CreateTestStorageNode(ctx context.Context, t *testing.T, waddr address.Addr
node.Override(new(v1api.FullNode), tnd),
node.Override(new(*lotusminer.Miner), lotusminer.NewTestMiner(mineBlock, act)),
node.Override(new(*sectorstorage.SealerConfig), func() *sectorstorage.SealerConfig {
scfg := config.DefaultStorageMiner()
scfg.Storage.IgnoreResourceFiltering = true
return &scfg.Storage
}),
opts,
)
if err != nil {
@ -532,6 +539,11 @@ func mockMinerBuilderOpts(t *testing.T, fullOpts []FullNodeOpts, storage []Stora
node.Override(new(sectorstorage.SectorManager), node.From(new(*mock.SectorMgr))),
node.Override(new(sectorstorage.Unsealer), node.From(new(*mock.SectorMgr))),
node.Override(new(sectorstorage.PieceProvider), node.From(new(*mock.SectorMgr))),
node.Override(new(*sectorstorage.SealerConfig), func() *sectorstorage.SealerConfig {
scfg := config.DefaultStorageMiner()
scfg.Storage.IgnoreResourceFiltering = true
return &scfg.Storage
}),
node.Override(new(ffiwrapper.Verifier), mock.MockVerifier),
node.Override(new(ffiwrapper.Prover), mock.MockProver),