Merge pull request #7334 from filecoin-project/chore/sched-worklk
storagemgr: Cleanup workerLk around worker resources
This commit is contained in:
commit
6df17bceb1
8
extern/sector-storage/sched.go
vendored
8
extern/sector-storage/sched.go
vendored
@ -78,12 +78,12 @@ type workerHandle struct {
|
||||
|
||||
info storiface.WorkerInfo
|
||||
|
||||
preparing *activeResources
|
||||
active *activeResources
|
||||
preparing *activeResources // use with workerHandle.lk
|
||||
active *activeResources // use with workerHandle.lk
|
||||
|
||||
lk sync.Mutex
|
||||
lk sync.Mutex // can be taken inside sched.workersLk.RLock
|
||||
|
||||
wndLk sync.Mutex
|
||||
wndLk sync.Mutex // can be taken inside sched.workersLk.RLock
|
||||
activeWindows []*schedWindow
|
||||
|
||||
enabled bool
|
||||
|
12
extern/sector-storage/sched_worker.go
vendored
12
extern/sector-storage/sched_worker.go
vendored
@ -399,13 +399,11 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe
|
||||
go func() {
|
||||
// first run the prepare step (e.g. fetching sector data from other worker)
|
||||
err := req.prepare(req.ctx, sh.workTracker.worker(sw.wid, w.info, w.workerRpc))
|
||||
sh.workersLk.Lock()
|
||||
w.lk.Lock()
|
||||
|
||||
if err != nil {
|
||||
w.lk.Lock()
|
||||
w.preparing.free(w.info.Resources, needRes)
|
||||
w.lk.Unlock()
|
||||
sh.workersLk.Unlock()
|
||||
|
||||
select {
|
||||
case taskDone <- struct{}{}:
|
||||
@ -424,12 +422,10 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe
|
||||
}
|
||||
|
||||
// wait (if needed) for resources in the 'active' window
|
||||
err = w.active.withResources(sw.wid, w.info, needRes, &sh.workersLk, func() error {
|
||||
w.lk.Lock()
|
||||
err = w.active.withResources(sw.wid, w.info, needRes, &w.lk, func() error {
|
||||
w.preparing.free(w.info.Resources, needRes)
|
||||
w.lk.Unlock()
|
||||
sh.workersLk.Unlock()
|
||||
defer sh.workersLk.Lock() // we MUST return locked from this function
|
||||
defer w.lk.Lock() // we MUST return locked from this function
|
||||
|
||||
select {
|
||||
case taskDone <- struct{}{}:
|
||||
@ -450,7 +446,7 @@ func (sw *schedWorker) startProcessingTask(taskDone chan struct{}, req *workerRe
|
||||
return nil
|
||||
})
|
||||
|
||||
sh.workersLk.Unlock()
|
||||
w.lk.Unlock()
|
||||
|
||||
// This error should always be nil, since nothing is setting it, but just to be safe:
|
||||
if err != nil {
|
||||
|
2
extern/sector-storage/stats.go
vendored
2
extern/sector-storage/stats.go
vendored
@ -15,6 +15,7 @@ func (m *Manager) WorkerStats() map[uuid.UUID]storiface.WorkerStats {
|
||||
out := map[uuid.UUID]storiface.WorkerStats{}
|
||||
|
||||
for id, handle := range m.sched.workers {
|
||||
handle.lk.Lock()
|
||||
out[uuid.UUID(id)] = storiface.WorkerStats{
|
||||
Info: handle.info,
|
||||
Enabled: handle.enabled,
|
||||
@ -24,6 +25,7 @@ func (m *Manager) WorkerStats() map[uuid.UUID]storiface.WorkerStats {
|
||||
GpuUsed: handle.active.gpuUsed,
|
||||
CpuUse: handle.active.cpuUse,
|
||||
}
|
||||
handle.lk.Unlock()
|
||||
}
|
||||
|
||||
return out
|
||||
|
Loading…
Reference in New Issue
Block a user