Merge pull request #89 from filecoin-project/fix/locking

Fix some locking issues
This commit is contained in:
Łukasz Magiera 2020-08-04 00:37:49 +02:00 committed by GitHub
commit ad9a691e0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 22 additions and 8 deletions

View File

@ -236,7 +236,7 @@ func (mgr *SectorMgr) SealCommit2(ctx context.Context, sid abi.SectorID, phase1O
// Test Instrumentation Methods // Test Instrumentation Methods
func (mgr *SectorMgr) FailSector(sid abi.SectorID) error { func (mgr *SectorMgr) MarkFailed(sid abi.SectorID, failed bool) error {
mgr.lk.Lock() mgr.lk.Lock()
defer mgr.lk.Unlock() defer mgr.lk.Unlock()
ss, ok := mgr.sectors[sid] ss, ok := mgr.sectors[sid]
@ -244,7 +244,7 @@ func (mgr *SectorMgr) FailSector(sid abi.SectorID) error {
return fmt.Errorf("no such sector in storage") return fmt.Errorf("no such sector in storage")
} }
ss.failed = true ss.failed = failed
return nil return nil
} }

View File

@ -52,7 +52,7 @@ type WorkerSelector interface {
type scheduler struct { type scheduler struct {
spt abi.RegisteredSealProof spt abi.RegisteredSealProof
workersLk sync.Mutex workersLk sync.RWMutex
nextWorker WorkerID nextWorker WorkerID
workers map[WorkerID]*workerHandle workers map[WorkerID]*workerHandle
@ -83,6 +83,8 @@ type workerHandle struct {
preparing *activeResources preparing *activeResources
active *activeResources active *activeResources
lk sync.Mutex
// stats / tracking // stats / tracking
wt *workTracker wt *workTracker
@ -283,6 +285,9 @@ func (sh *scheduler) trySched() {
log.Debugf("SCHED %d queued; %d open windows", sh.schedQueue.Len(), len(windows)) log.Debugf("SCHED %d queued; %d open windows", sh.schedQueue.Len(), len(windows))
sh.workersLk.RLock()
defer sh.workersLk.RUnlock()
// Step 1 // Step 1
for sqi := 0; sqi < sh.schedQueue.Len(); sqi++ { for sqi := 0; sqi < sh.schedQueue.Len(); sqi++ {
task := (*sh.schedQueue)[sqi] task := (*sh.schedQueue)[sqi]
@ -428,9 +433,9 @@ func (sh *scheduler) runWorker(wid WorkerID) {
defer ready.Wait() defer ready.Wait()
go func() { go func() {
sh.workersLk.Lock() sh.workersLk.RLock()
worker, found := sh.workers[wid] worker, found := sh.workers[wid]
sh.workersLk.Unlock() sh.workersLk.RUnlock()
ready.Done() ready.Done()
@ -498,16 +503,19 @@ func (sh *scheduler) runWorker(wid WorkerID) {
todo := activeWindows[0].todo[0] todo := activeWindows[0].todo[0]
needRes := ResourceTable[todo.taskType][sh.spt] needRes := ResourceTable[todo.taskType][sh.spt]
sh.workersLk.Lock() sh.workersLk.RLock()
worker.lk.Lock()
ok := worker.preparing.canHandleRequest(needRes, wid, worker.info.Resources) ok := worker.preparing.canHandleRequest(needRes, wid, worker.info.Resources)
worker.lk.Unlock()
if !ok { if !ok {
sh.workersLk.Unlock() sh.workersLk.RUnlock()
break assignLoop break assignLoop
} }
log.Debugf("assign worker sector %d", todo.sector.Number) log.Debugf("assign worker sector %d", todo.sector.Number)
err := sh.assignWorker(taskDone, wid, worker, todo) err := sh.assignWorker(taskDone, wid, worker, todo)
sh.workersLk.Unlock() sh.workersLk.RUnlock()
if err != nil { if err != nil {
log.Error("assignWorker error: %+v", err) log.Error("assignWorker error: %+v", err)
@ -530,14 +538,18 @@ func (sh *scheduler) runWorker(wid WorkerID) {
func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *workerHandle, req *workerRequest) error { func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *workerHandle, req *workerRequest) error {
needRes := ResourceTable[req.taskType][sh.spt] needRes := ResourceTable[req.taskType][sh.spt]
w.lk.Lock()
w.preparing.add(w.info.Resources, needRes) w.preparing.add(w.info.Resources, needRes)
w.lk.Unlock()
go func() { go func() {
err := req.prepare(req.ctx, w.wt.worker(w.w)) err := req.prepare(req.ctx, w.wt.worker(w.w))
sh.workersLk.Lock() sh.workersLk.Lock()
if err != nil { if err != nil {
w.lk.Lock()
w.preparing.free(w.info.Resources, needRes) w.preparing.free(w.info.Resources, needRes)
w.lk.Unlock()
sh.workersLk.Unlock() sh.workersLk.Unlock()
select { select {
@ -557,7 +569,9 @@ func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *worke
} }
err = w.active.withResources(wid, w.info.Resources, needRes, &sh.workersLk, func() error { err = w.active.withResources(wid, w.info.Resources, needRes, &sh.workersLk, func() error {
w.lk.Lock()
w.preparing.free(w.info.Resources, needRes) w.preparing.free(w.info.Resources, needRes)
w.lk.Unlock()
sh.workersLk.Unlock() sh.workersLk.Unlock()
defer sh.workersLk.Lock() // we MUST return locked from this function defer sh.workersLk.Lock() // we MUST return locked from this function