sealing sched: Wait a bit for tasks to come in on restart
This commit is contained in:
parent
7fdffc0340
commit
59d2034cbb
29
extern/sector-storage/manager.go
vendored
29
extern/sector-storage/manager.go
vendored
@ -6,8 +6,8 @@ import (
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
|
||||
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/ipfs/go-cid"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"github.com/mitchellh/go-homedir"
|
||||
@ -17,6 +17,7 @@ import (
|
||||
"github.com/filecoin-project/specs-storage/storage"
|
||||
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
|
||||
@ -463,25 +464,19 @@ func (m *Manager) Remove(ctx context.Context, sector abi.SectorID) error {
|
||||
return xerrors.Errorf("acquiring sector lock: %w", err)
|
||||
}
|
||||
|
||||
unsealed := stores.FTUnsealed
|
||||
{
|
||||
unsealedStores, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, 0, false)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("finding unsealed sector: %w", err)
|
||||
}
|
||||
var err error
|
||||
|
||||
if len(unsealedStores) == 0 { // can be already removed
|
||||
unsealed = stores.FTNone
|
||||
}
|
||||
if rerr := m.storage.Remove(ctx, sector, stores.FTSealed, true); rerr != nil {
|
||||
err = multierror.Append(err, xerrors.Errorf("removing sector (sealed): %w", rerr))
|
||||
}
|
||||
if rerr := m.storage.Remove(ctx, sector, stores.FTCache, true); rerr != nil {
|
||||
err = multierror.Append(err, xerrors.Errorf("removing sector (cache): %w", rerr))
|
||||
}
|
||||
if rerr := m.storage.Remove(ctx, sector, stores.FTUnsealed, true); rerr != nil {
|
||||
err = multierror.Append(err, xerrors.Errorf("removing sector (unsealed): %w", rerr))
|
||||
}
|
||||
|
||||
selector := newExistingSelector(m.index, sector, stores.FTCache|stores.FTSealed, false)
|
||||
|
||||
return m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
|
||||
schedFetch(sector, stores.FTCache|stores.FTSealed|unsealed, stores.PathStorage, stores.AcquireMove),
|
||||
func(ctx context.Context, w Worker) error {
|
||||
return w.Remove(ctx, sector)
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *Manager) StorageLocal(ctx context.Context) (map[stores.ID]string, error) {
|
||||
|
17
extern/sector-storage/sched.go
vendored
17
extern/sector-storage/sched.go
vendored
@ -21,6 +21,7 @@ type schedPrioCtxKey int
|
||||
var SchedPriorityKey schedPrioCtxKey
|
||||
var DefaultSchedPriority = 0
|
||||
var SelectorTimeout = 5 * time.Second
|
||||
var InitWait = 3 * time.Second
|
||||
|
||||
var (
|
||||
SchedWindows = 2
|
||||
@ -221,6 +222,9 @@ func (sh *scheduler) runSched() {
|
||||
|
||||
go sh.runWorkerWatcher()
|
||||
|
||||
iw := time.After(InitWait)
|
||||
var initialised bool
|
||||
|
||||
for {
|
||||
select {
|
||||
case w := <-sh.newWorkers:
|
||||
@ -231,18 +235,27 @@ func (sh *scheduler) runSched() {
|
||||
|
||||
case req := <-sh.schedule:
|
||||
sh.schedQueue.Push(req)
|
||||
sh.trySched()
|
||||
if initialised {
|
||||
sh.trySched()
|
||||
}
|
||||
|
||||
if sh.testSync != nil {
|
||||
sh.testSync <- struct{}{}
|
||||
}
|
||||
case req := <-sh.windowRequests:
|
||||
sh.openWindows = append(sh.openWindows, req)
|
||||
sh.trySched()
|
||||
if initialised {
|
||||
sh.trySched()
|
||||
}
|
||||
|
||||
case ireq := <-sh.info:
|
||||
ireq(sh.diag())
|
||||
|
||||
case <-iw:
|
||||
initialised = true
|
||||
iw = nil
|
||||
|
||||
sh.trySched()
|
||||
case <-sh.closing:
|
||||
sh.schedClose()
|
||||
return
|
||||
|
Loading…
Reference in New Issue
Block a user