worker sched: Separate resource def for preparing window
This commit is contained in:
parent
2316363f7a
commit
c484c38735
@ -289,14 +289,20 @@ func (m *Manager) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
m.remoteHnd.ServeHTTP(w, r)
|
m.remoteHnd.ServeHTTP(w, r)
|
||||||
}
|
}
|
||||||
|
|
||||||
func schedNop(context.Context, Worker) error {
|
var schedNop = PrepareAction{
|
||||||
return nil
|
Action: func(ctx context.Context, w Worker) error {
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
PrepType: sealtasks.TTNoop,
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) schedFetch(sector storiface.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) func(context.Context, Worker) error {
|
func (m *Manager) schedFetch(sector storiface.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) PrepareAction {
|
||||||
return func(ctx context.Context, worker Worker) error {
|
return PrepareAction{
|
||||||
_, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, ft, ptype, am))
|
Action: func(ctx context.Context, worker Worker) error {
|
||||||
return err
|
_, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, ft, ptype, am))
|
||||||
|
return err
|
||||||
|
},
|
||||||
|
PrepType: sealtasks.TTFetch,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -315,16 +321,19 @@ func (m *Manager) SectorsUnsealPiece(ctx context.Context, sector storiface.Secto
|
|||||||
|
|
||||||
// if the selected worker does NOT have the sealed files for the sector, instruct it to fetch it from a worker that has them and
|
// if the selected worker does NOT have the sealed files for the sector, instruct it to fetch it from a worker that has them and
|
||||||
// put it in the sealing scratch space.
|
// put it in the sealing scratch space.
|
||||||
sealFetch := func(ctx context.Context, worker Worker) error {
|
sealFetch := PrepareAction{
|
||||||
log.Debugf("copy sealed/cache sector data for sector %d", sector.ID)
|
Action: func(ctx context.Context, worker Worker) error {
|
||||||
_, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy))
|
log.Debugf("copy sealed/cache sector data for sector %d", sector.ID)
|
||||||
_, err2 := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathSealing, storiface.AcquireCopy))
|
_, err := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy))
|
||||||
|
_, err2 := m.waitSimpleCall(ctx)(worker.Fetch(ctx, sector, storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathSealing, storiface.AcquireCopy))
|
||||||
|
|
||||||
if err != nil && err2 != nil {
|
if err != nil && err2 != nil {
|
||||||
return xerrors.Errorf("cannot unseal piece. error fetching sealed data: %w. error fetching replica data: %w", err, err2)
|
return xerrors.Errorf("cannot unseal piece. error fetching sealed data: %w. error fetching replica data: %w", err, err2)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
},
|
||||||
|
PrepType: sealtasks.TTFetch,
|
||||||
}
|
}
|
||||||
|
|
||||||
if unsealed == nil {
|
if unsealed == nil {
|
||||||
|
@ -42,6 +42,10 @@ func WithPriority(ctx context.Context, priority int) context.Context {
|
|||||||
const mib = 1 << 20
|
const mib = 1 << 20
|
||||||
|
|
||||||
type WorkerAction func(ctx context.Context, w Worker) error
|
type WorkerAction func(ctx context.Context, w Worker) error
|
||||||
|
type PrepareAction struct {
|
||||||
|
Action WorkerAction
|
||||||
|
PrepType sealtasks.TaskType
|
||||||
|
}
|
||||||
|
|
||||||
type SchedWorker interface {
|
type SchedWorker interface {
|
||||||
TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error)
|
TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error)
|
||||||
@ -130,7 +134,7 @@ type WorkerRequest struct {
|
|||||||
Sel WorkerSelector
|
Sel WorkerSelector
|
||||||
SchedId uuid.UUID
|
SchedId uuid.UUID
|
||||||
|
|
||||||
prepare WorkerAction
|
prepare PrepareAction
|
||||||
work WorkerAction
|
work WorkerAction
|
||||||
|
|
||||||
start time.Time
|
start time.Time
|
||||||
@ -197,7 +201,7 @@ func newScheduler(ctx context.Context, assigner string) (*Scheduler, error) {
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sh *Scheduler) Schedule(ctx context.Context, sector storiface.SectorRef, taskType sealtasks.TaskType, sel WorkerSelector, prepare WorkerAction, work WorkerAction) error {
|
func (sh *Scheduler) Schedule(ctx context.Context, sector storiface.SectorRef, taskType sealtasks.TaskType, sel WorkerSelector, prepare PrepareAction, work WorkerAction) error {
|
||||||
ret := make(chan workerResponse)
|
ret := make(chan workerResponse)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
@ -247,6 +251,13 @@ func (r *WorkerRequest) SealTask() sealtasks.SealTaskType {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *WorkerRequest) PrepSealTask() sealtasks.SealTaskType {
|
||||||
|
return sealtasks.SealTaskType{
|
||||||
|
TaskType: r.prepare.PrepType,
|
||||||
|
RegisteredSealProof: r.Sector.ProofType,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type SchedDiagRequestInfo struct {
|
type SchedDiagRequestInfo struct {
|
||||||
Sector abi.SectorID
|
Sector abi.SectorID
|
||||||
TaskType sealtasks.TaskType
|
TaskType sealtasks.TaskType
|
||||||
|
@ -288,25 +288,30 @@ func TestSched(t *testing.T) {
|
|||||||
ProofType: spt,
|
ProofType: spt,
|
||||||
}
|
}
|
||||||
|
|
||||||
err := sched.Schedule(ctx, sectorRef, taskType, sel, func(ctx context.Context, w Worker) error {
|
prep := PrepareAction{
|
||||||
wi, err := w.Info(ctx)
|
Action: func(ctx context.Context, w Worker) error {
|
||||||
require.NoError(t, err)
|
wi, err := w.Info(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, expectWorker, wi.Hostname)
|
require.Equal(t, expectWorker, wi.Hostname)
|
||||||
|
|
||||||
log.Info("IN ", taskName)
|
log.Info("IN ", taskName)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
_, ok := <-done
|
_, ok := <-done
|
||||||
if !ok {
|
if !ok {
|
||||||
break
|
break
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
log.Info("OUT ", taskName)
|
log.Info("OUT ", taskName)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}, noopAction)
|
},
|
||||||
|
PrepType: taskType,
|
||||||
|
}
|
||||||
|
|
||||||
|
err := sched.Schedule(ctx, sectorRef, taskType, sel, prep, noopAction)
|
||||||
if err != context.Canceled {
|
if err != context.Canceled {
|
||||||
require.NoError(t, err, fmt.Sprint(l, l2))
|
require.NoError(t, err, fmt.Sprint(l, l2))
|
||||||
}
|
}
|
||||||
|
@ -354,8 +354,8 @@ assignLoop:
|
|||||||
|
|
||||||
worker.lk.Lock()
|
worker.lk.Lock()
|
||||||
for t, todo := range firstWindow.Todo {
|
for t, todo := range firstWindow.Todo {
|
||||||
needRes := worker.Info.Resources.ResourceSpec(todo.Sector.ProofType, todo.TaskType)
|
needResPrep := worker.Info.Resources.PrepResourceSpec(todo.Sector.ProofType, todo.TaskType, todo.prepare.PrepType)
|
||||||
if worker.preparing.CanHandleRequest(todo.SealTask(), needRes, sw.wid, "startPreparing", worker.Info) {
|
if worker.preparing.CanHandleRequest(todo.PrepSealTask(), needResPrep, sw.wid, "startPreparing", worker.Info) {
|
||||||
tidx = t
|
tidx = t
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -454,20 +454,21 @@ func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error {
|
|||||||
w, sh := sw.worker, sw.sched
|
w, sh := sw.worker, sw.sched
|
||||||
|
|
||||||
needRes := w.Info.Resources.ResourceSpec(req.Sector.ProofType, req.TaskType)
|
needRes := w.Info.Resources.ResourceSpec(req.Sector.ProofType, req.TaskType)
|
||||||
|
needResPrep := w.Info.Resources.PrepResourceSpec(req.Sector.ProofType, req.TaskType, req.prepare.PrepType)
|
||||||
|
|
||||||
w.lk.Lock()
|
w.lk.Lock()
|
||||||
w.preparing.Add(req.SealTask(), w.Info.Resources, needRes)
|
w.preparing.Add(req.PrepSealTask(), w.Info.Resources, needResPrep)
|
||||||
w.lk.Unlock()
|
w.lk.Unlock()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
// first run the prepare step (e.g. fetching sector data from other worker)
|
// first run the prepare step (e.g. fetching sector data from other worker)
|
||||||
tw := sh.workTracker.worker(sw.wid, w.Info, w.workerRpc)
|
tw := sh.workTracker.worker(sw.wid, w.Info, w.workerRpc)
|
||||||
tw.start()
|
tw.start()
|
||||||
err := req.prepare(req.Ctx, tw)
|
err := req.prepare.Action(req.Ctx, tw)
|
||||||
w.lk.Lock()
|
w.lk.Lock()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.preparing.Free(req.SealTask(), w.Info.Resources, needRes)
|
w.preparing.Free(req.PrepSealTask(), w.Info.Resources, needResPrep)
|
||||||
w.lk.Unlock()
|
w.lk.Unlock()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
@ -497,7 +498,7 @@ func (sw *schedWorker) startProcessingTask(req *WorkerRequest) error {
|
|||||||
|
|
||||||
// wait (if needed) for resources in the 'active' window
|
// wait (if needed) for resources in the 'active' window
|
||||||
err = w.active.withResources(sw.wid, w.Info, req.SealTask(), needRes, &w.lk, func() error {
|
err = w.active.withResources(sw.wid, w.Info, req.SealTask(), needRes, &w.lk, func() error {
|
||||||
w.preparing.Free(req.SealTask(), w.Info.Resources, needRes)
|
w.preparing.Free(req.PrepSealTask(), w.Info.Resources, needResPrep)
|
||||||
w.lk.Unlock()
|
w.lk.Unlock()
|
||||||
defer w.lk.Lock() // we MUST return locked from this function
|
defer w.lk.Lock() // we MUST return locked from this function
|
||||||
|
|
||||||
|
@ -36,6 +36,8 @@ const (
|
|||||||
|
|
||||||
TTGenerateWindowPoSt TaskType = "post/v0/windowproof"
|
TTGenerateWindowPoSt TaskType = "post/v0/windowproof"
|
||||||
TTGenerateWinningPoSt TaskType = "post/v0/winningproof"
|
TTGenerateWinningPoSt TaskType = "post/v0/winningproof"
|
||||||
|
|
||||||
|
TTNoop TaskType = ""
|
||||||
)
|
)
|
||||||
|
|
||||||
var order = map[TaskType]int{
|
var order = map[TaskType]int{
|
||||||
|
@ -65,6 +65,20 @@ func (wr WorkerResources) ResourceSpec(spt abi.RegisteredSealProof, tt sealtasks
|
|||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PrepResourceSpec is like ResourceSpec, but meant for use limiting parallel preparing
|
||||||
|
// tasks.
|
||||||
|
func (wr WorkerResources) PrepResourceSpec(spt abi.RegisteredSealProof, tt, prepTT sealtasks.TaskType) Resources {
|
||||||
|
res := wr.ResourceSpec(spt, tt)
|
||||||
|
|
||||||
|
if prepTT != tt && prepTT != sealtasks.TTNoop {
|
||||||
|
prepRes := wr.ResourceSpec(spt, prepTT)
|
||||||
|
res.MaxConcurrent = prepRes.MaxConcurrent
|
||||||
|
}
|
||||||
|
|
||||||
|
// otherwise, use the default resource table
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
type WorkerStats struct {
|
type WorkerStats struct {
|
||||||
Info WorkerInfo
|
Info WorkerInfo
|
||||||
Tasks []sealtasks.TaskType
|
Tasks []sealtasks.TaskType
|
||||||
|
Loading…
Reference in New Issue
Block a user