From ed251d9f82123e86cab87adf57a60ea8d3762e26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 3 Aug 2020 14:18:11 +0200 Subject: [PATCH 1/2] Fix some locking issues --- sched.go | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/sched.go b/sched.go index ed648bf19..e8eda4834 100644 --- a/sched.go +++ b/sched.go @@ -52,7 +52,7 @@ type WorkerSelector interface { type scheduler struct { spt abi.RegisteredSealProof - workersLk sync.Mutex + workersLk sync.RWMutex nextWorker WorkerID workers map[WorkerID]*workerHandle @@ -83,6 +83,8 @@ type workerHandle struct { preparing *activeResources active *activeResources + lk sync.Mutex + // stats / tracking wt *workTracker @@ -283,6 +285,9 @@ func (sh *scheduler) trySched() { log.Debugf("SCHED %d queued; %d open windows", sh.schedQueue.Len(), len(windows)) + sh.workersLk.RLock() + defer sh.workersLk.RUnlock() + // Step 1 for sqi := 0; sqi < sh.schedQueue.Len(); sqi++ { task := (*sh.schedQueue)[sqi] @@ -428,9 +433,9 @@ func (sh *scheduler) runWorker(wid WorkerID) { defer ready.Wait() go func() { - sh.workersLk.Lock() + sh.workersLk.RLock() worker, found := sh.workers[wid] - sh.workersLk.Unlock() + sh.workersLk.RUnlock() ready.Done() @@ -498,16 +503,19 @@ func (sh *scheduler) runWorker(wid WorkerID) { todo := activeWindows[0].todo[0] needRes := ResourceTable[todo.taskType][sh.spt] - sh.workersLk.Lock() + sh.workersLk.RLock() + worker.lk.Lock() ok := worker.preparing.canHandleRequest(needRes, wid, worker.info.Resources) + worker.lk.Unlock() + if !ok { - sh.workersLk.Unlock() + sh.workersLk.RUnlock() break assignLoop } log.Debugf("assign worker sector %d", todo.sector.Number) err := sh.assignWorker(taskDone, wid, worker, todo) - sh.workersLk.Unlock() + sh.workersLk.RUnlock() if err != nil { log.Error("assignWorker error: %+v", err) @@ -530,14 +538,18 @@ func (sh *scheduler) runWorker(wid WorkerID) { func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *workerHandle, req *workerRequest) error { needRes := ResourceTable[req.taskType][sh.spt] + w.lk.Lock() w.preparing.add(w.info.Resources, needRes) + w.lk.Unlock() go func() { err := req.prepare(req.ctx, w.wt.worker(w.w)) sh.workersLk.Lock() if err != nil { + w.lk.Lock() w.preparing.free(w.info.Resources, needRes) + w.lk.Unlock() sh.workersLk.Unlock() select { @@ -557,7 +569,9 @@ func (sh *scheduler) assignWorker(taskDone chan struct{}, wid WorkerID, w *worke } err = w.active.withResources(wid, w.info.Resources, needRes, &sh.workersLk, func() error { + w.lk.Lock() w.preparing.free(w.info.Resources, needRes) + w.lk.Unlock() sh.workersLk.Unlock() defer sh.workersLk.Lock() // we MUST return locked from this function From 3cab915fd225717efd7fc9b099b854e11af2d056 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 3 Aug 2020 20:49:04 +0200 Subject: [PATCH 2/2] mock: Make it possible to unfail sectors --- mock/mock.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mock/mock.go b/mock/mock.go index 55b103ab8..6eb71cd6b 100644 --- a/mock/mock.go +++ b/mock/mock.go @@ -236,7 +236,7 @@ func (mgr *SectorMgr) SealCommit2(ctx context.Context, sid abi.SectorID, phase1O // Test Instrumentation Methods -func (mgr *SectorMgr) FailSector(sid abi.SectorID) error { +func (mgr *SectorMgr) MarkFailed(sid abi.SectorID, failed bool) error { mgr.lk.Lock() defer mgr.lk.Unlock() ss, ok := mgr.sectors[sid] @@ -244,7 +244,7 @@ func (mgr *SectorMgr) FailSector(sid abi.SectorID) error { return fmt.Errorf("no such sector in storage") } - ss.failed = true + ss.failed = failed return nil }