From 6855284d88c8e19beed4c5bc8a7f374d1babd0da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 30 Sep 2020 17:26:09 +0200 Subject: [PATCH] sectorstorage: Cancel non-running work in case of abort in sched --- extern/sector-storage/manager.go | 12 +++-- extern/sector-storage/manager_calltracker.go | 49 +++++++++++++++++--- 2 files changed, 51 insertions(+), 10 deletions(-) diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index 7d49cc958..afcc28ffd 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -364,10 +364,11 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke ctx, cancel := context.WithCancel(ctx) defer cancel() - wk, wait, err := m.getWork(ctx, sealtasks.TTPreCommit1, sector, ticket, pieces) + wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTPreCommit1, sector, ticket, pieces) if err != nil { return nil, xerrors.Errorf("getWork: %w", err) } + defer cancel() waitRes := func() { p, werr := m.waitWork(ctx, wk) @@ -408,10 +409,11 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase ctx, cancel := context.WithCancel(ctx) defer cancel() - wk, wait, err := m.getWork(ctx, sealtasks.TTPreCommit2, sector, phase1Out) + wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTPreCommit2, sector, phase1Out) if err != nil { return storage.SectorCids{}, xerrors.Errorf("getWork: %w", err) } + defer cancel() waitRes := func() { p, werr := m.waitWork(ctx, wk) @@ -449,10 +451,11 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a ctx, cancel := context.WithCancel(ctx) defer cancel() - wk, wait, err := m.getWork(ctx, sealtasks.TTCommit1, sector, ticket, seed, pieces, cids) + wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTCommit1, sector, ticket, seed, pieces, cids) if err != nil { return storage.Commit1Out{}, xerrors.Errorf("getWork: %w", err) } + defer cancel() waitRes := func() { p, werr := m.waitWork(ctx, wk) @@ -490,10 +493,11 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a } func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.Commit1Out) (out storage.Proof, err error) { - wk, wait, err := m.getWork(ctx, sealtasks.TTCommit2, sector, phase1Out) + wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTCommit2, sector, phase1Out) if err != nil { return storage.Proof{}, xerrors.Errorf("getWork: %w", err) } + defer cancel() waitRes := func() { p, werr := m.waitWork(ctx, wk) diff --git a/extern/sector-storage/manager_calltracker.go b/extern/sector-storage/manager_calltracker.go index 147e11b91..1135af4af 100644 --- a/extern/sector-storage/manager_calltracker.go +++ b/extern/sector-storage/manager_calltracker.go @@ -102,10 +102,10 @@ func (m *Manager) setupWorkTracker() { } // returns wait=true when the task is already tracked/running -func (m *Manager) getWork(ctx context.Context, method sealtasks.TaskType, params ...interface{}) (wid WorkID, wait bool, err error) { +func (m *Manager) getWork(ctx context.Context, method sealtasks.TaskType, params ...interface{}) (wid WorkID, wait bool, cancel func(), err error) { wid, err = newWorkID(method, params) if err != nil { - return WorkID{}, false, xerrors.Errorf("creating WorkID: %w", err) + return WorkID{}, false, nil, xerrors.Errorf("creating WorkID: %w", err) } m.workLk.Lock() @@ -113,7 +113,7 @@ func (m *Manager) getWork(ctx context.Context, method sealtasks.TaskType, params have, err := m.work.Has(wid) if err != nil { - return WorkID{}, false, xerrors.Errorf("failed to check if the task is already tracked: %w", err) + return WorkID{}, false, nil, xerrors.Errorf("failed to check if the task is already tracked: %w", err) } if !have { @@ -122,15 +122,52 @@ func (m *Manager) getWork(ctx context.Context, method sealtasks.TaskType, params Status: wsStarted, }) if err != nil { - return WorkID{}, false, xerrors.Errorf("failed to track task start: %w", err) + return WorkID{}, false, nil, xerrors.Errorf("failed to track task start: %w", err) } - return wid, false, nil + return wid, false, func() { + m.workLk.Lock() + defer m.workLk.Unlock() + + have, err := m.work.Has(wid) + if err != nil { + log.Errorf("cancel: work has error: %+v", err) + return + } + + if !have { + return // expected / happy path + } + + var ws WorkState + if err := m.work.Get(wid).Get(&ws); err != nil { + log.Errorf("cancel: get work %s: %+v", wid, err) + return + } + + switch ws.Status { + case wsStarted: + log.Warn("canceling started (not running) work %s", wid) + + if err := m.work.Get(wid).End(); err != nil { + log.Errorf("cancel: failed to cancel started work %s: %+v", wid, err) + return + } + case wsDone: + // TODO: still remove? + log.Warn("cancel called on work %s in 'done' state", wid) + case wsRunning: + log.Warn("cancel called on work %s in 'running' state (manager shutting down?)", wid) + } + + }, nil } // already started - return wid, true, nil + return wid, true, func() { + // TODO + }, nil } func (m *Manager) startWork(ctx context.Context, wk WorkID) func(callID storiface.CallID, err error) error {