From fbc05fa5a86c7a90b4f0997f7fe14dd1701cc8db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 9 Nov 2020 21:43:52 +0100 Subject: [PATCH 1/2] stores: Report storage changes quickly --- extern/sector-storage/stores/local.go | 34 +++++++++++++++++---------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/extern/sector-storage/stores/local.go b/extern/sector-storage/stores/local.go index 89c22bd99..f0e487da7 100644 --- a/extern/sector-storage/stores/local.go +++ b/extern/sector-storage/stores/local.go @@ -298,24 +298,28 @@ func (st *Local) reportHealth(ctx context.Context) { return } - st.localLk.RLock() + st.reportStorage(ctx) + } +} - toReport := map[ID]HealthReport{} - for id, p := range st.paths { - stat, err := p.stat(st.localStorage) +func (st *Local) reportStorage(ctx context.Context) { + st.localLk.RLock() - toReport[id] = HealthReport{ - Stat: stat, - Err: err, - } + toReport := map[ID]HealthReport{} + for id, p := range st.paths { + stat, err := p.stat(st.localStorage) + + toReport[id] = HealthReport{ + Stat: stat, + Err: err, } + } - st.localLk.RUnlock() + st.localLk.RUnlock() - for id, report := range toReport { - if err := st.index.StorageReportHealth(ctx, id, report); err != nil { - log.Warnf("error reporting storage health for %s (%+v): %+v", id, report, err) - } + for id, report := range toReport { + if err := st.index.StorageReportHealth(ctx, id, report); err != nil { + log.Warnf("error reporting storage health for %s (%+v): %+v", id, report, err) } } } @@ -568,6 +572,8 @@ func (st *Local) removeSector(ctx context.Context, sid abi.SectorID, typ storifa log.Errorf("removing sector (%v) from %s: %+v", sid, spath, err) } + st.reportStorage(ctx) // report freed space + return nil } @@ -623,6 +629,8 @@ func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, ssize abi.Sect } } + st.reportStorage(ctx) // report space use changes + return nil } From 5b8f55522188c9f27a8b905b8bc9a6967e2e6ece Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 9 Nov 2020 21:44:28 +0100 Subject: [PATCH 2/2] sched: Update after tasks finish preparing --- extern/sector-storage/sched_worker.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/extern/sector-storage/sched_worker.go b/extern/sector-storage/sched_worker.go index e56e9056d..29c730b03 100644 --- a/extern/sector-storage/sched_worker.go +++ b/extern/sector-storage/sched_worker.go @@ -139,10 +139,17 @@ func (sw *schedWorker) handleWorker() { // wait for more tasks to be assigned by the main scheduler or for the worker // to finish precessing a task - update, ok := sw.waitForUpdates() + update, pokeSched, ok := sw.waitForUpdates() if !ok { return } + if pokeSched { + // a task has finished preparing, which can mean that we've freed some space on some worker + select { + case sched.workerChange <- struct{}{}: + default: // workerChange is buffered, and scheduling is global, so it's ok if we don't send here + } + } if update { break } @@ -257,23 +264,23 @@ func (sw *schedWorker) requestWindows() bool { return true } -func (sw *schedWorker) waitForUpdates() (update bool, ok bool) { +func (sw *schedWorker) waitForUpdates() (update bool, sched bool, ok bool) { select { case <-sw.heartbeatTimer.C: - return false, true + return false, false, true case w := <-sw.scheduledWindows: sw.worker.wndLk.Lock() sw.worker.activeWindows = append(sw.worker.activeWindows, w) sw.worker.wndLk.Unlock() - return true, true + return true, false, true case <-sw.taskDone: log.Debugw("task done", "workerid", sw.wid) - return true, true + return true, true, true case <-sw.sched.closing: case <-sw.worker.closingMgr: } - return false, false + return false, false, false } func (sw *schedWorker) workerCompactWindows() {