sectorstorage: Cancel non-running work in case of abort in sched

This commit is contained in:
Łukasz Magiera 2020-09-30 17:26:09 +02:00
parent 46a5beafe4
commit 6855284d88
2 changed files with 51 additions and 10 deletions

View File

@ -364,10 +364,11 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
ctx, cancel := context.WithCancel(ctx)
defer cancel()
wk, wait, err := m.getWork(ctx, sealtasks.TTPreCommit1, sector, ticket, pieces)
wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTPreCommit1, sector, ticket, pieces)
if err != nil {
return nil, xerrors.Errorf("getWork: %w", err)
}
defer cancel()
waitRes := func() {
p, werr := m.waitWork(ctx, wk)
@ -408,10 +409,11 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
ctx, cancel := context.WithCancel(ctx)
defer cancel()
wk, wait, err := m.getWork(ctx, sealtasks.TTPreCommit2, sector, phase1Out)
wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTPreCommit2, sector, phase1Out)
if err != nil {
return storage.SectorCids{}, xerrors.Errorf("getWork: %w", err)
}
defer cancel()
waitRes := func() {
p, werr := m.waitWork(ctx, wk)
@ -449,10 +451,11 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a
ctx, cancel := context.WithCancel(ctx)
defer cancel()
wk, wait, err := m.getWork(ctx, sealtasks.TTCommit1, sector, ticket, seed, pieces, cids)
wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTCommit1, sector, ticket, seed, pieces, cids)
if err != nil {
return storage.Commit1Out{}, xerrors.Errorf("getWork: %w", err)
}
defer cancel()
waitRes := func() {
p, werr := m.waitWork(ctx, wk)
@ -490,10 +493,11 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a
}
func (m *Manager) SealCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.Commit1Out) (out storage.Proof, err error) {
wk, wait, err := m.getWork(ctx, sealtasks.TTCommit2, sector, phase1Out)
wk, wait, cancel, err := m.getWork(ctx, sealtasks.TTCommit2, sector, phase1Out)
if err != nil {
return storage.Proof{}, xerrors.Errorf("getWork: %w", err)
}
defer cancel()
waitRes := func() {
p, werr := m.waitWork(ctx, wk)

View File

@ -102,10 +102,10 @@ func (m *Manager) setupWorkTracker() {
}
// returns wait=true when the task is already tracked/running
func (m *Manager) getWork(ctx context.Context, method sealtasks.TaskType, params ...interface{}) (wid WorkID, wait bool, err error) {
func (m *Manager) getWork(ctx context.Context, method sealtasks.TaskType, params ...interface{}) (wid WorkID, wait bool, cancel func(), err error) {
wid, err = newWorkID(method, params)
if err != nil {
return WorkID{}, false, xerrors.Errorf("creating WorkID: %w", err)
return WorkID{}, false, nil, xerrors.Errorf("creating WorkID: %w", err)
}
m.workLk.Lock()
@ -113,7 +113,7 @@ func (m *Manager) getWork(ctx context.Context, method sealtasks.TaskType, params
have, err := m.work.Has(wid)
if err != nil {
return WorkID{}, false, xerrors.Errorf("failed to check if the task is already tracked: %w", err)
return WorkID{}, false, nil, xerrors.Errorf("failed to check if the task is already tracked: %w", err)
}
if !have {
@ -122,15 +122,52 @@ func (m *Manager) getWork(ctx context.Context, method sealtasks.TaskType, params
Status: wsStarted,
})
if err != nil {
return WorkID{}, false, xerrors.Errorf("failed to track task start: %w", err)
return WorkID{}, false, nil, xerrors.Errorf("failed to track task start: %w", err)
}
return wid, false, nil
return wid, false, func() {
m.workLk.Lock()
defer m.workLk.Unlock()
have, err := m.work.Has(wid)
if err != nil {
log.Errorf("cancel: work has error: %+v", err)
return
}
if !have {
return // expected / happy path
}
var ws WorkState
if err := m.work.Get(wid).Get(&ws); err != nil {
log.Errorf("cancel: get work %s: %+v", wid, err)
return
}
switch ws.Status {
case wsStarted:
log.Warn("canceling started (not running) work %s", wid)
if err := m.work.Get(wid).End(); err != nil {
log.Errorf("cancel: failed to cancel started work %s: %+v", wid, err)
return
}
case wsDone:
// TODO: still remove?
log.Warn("cancel called on work %s in 'done' state", wid)
case wsRunning:
log.Warn("cancel called on work %s in 'running' state (manager shutting down?)", wid)
}
}, nil
}
// already started
return wid, true, nil
return wid, true, func() {
// TODO
}, nil
}
func (m *Manager) startWork(ctx context.Context, wk WorkID) func(callID storiface.CallID, err error) error {