From 39e3ddb0ed1ea15b83b3c6d325cd3524376c85c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 16 Mar 2022 11:49:48 +0100 Subject: [PATCH] storagemgr: MoveStorage in separate steps --- extern/sector-storage/manager.go | 43 +++++++++++++++++++------------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index 28e071559..595f6419e 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -12,6 +12,7 @@ import ( "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "github.com/mitchellh/go-homedir" + "go.uber.org/multierr" "golang.org/x/xerrors" "github.com/filecoin-project/go-state-types/abi" @@ -589,7 +590,7 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect return xerrors.Errorf("acquiring sector lock: %w", err) } - fts := storiface.FTUnsealed + moveUnsealed := storiface.FTUnsealed { unsealedStores, err := m.index.StorageFindSector(ctx, sector.ID, storiface.FTUnsealed, 0, false) if err != nil { @@ -597,7 +598,7 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect } if len(unsealedStores) == 0 { // Is some edge-cases unsealed sector may not exist already, that's fine - fts = storiface.FTNone + moveUnsealed = storiface.FTNone } } @@ -616,10 +617,10 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect } } - selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache, false) + selector := newExistingSelector(m.index, sector.ID, storiface.FTCache|storiface.FTUpdateCache, false) err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalizeReplicaUpdate, selector, - m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache|fts, pathType, storiface.AcquireMove), + m.schedFetch(sector, storiface.FTCache|storiface.FTUpdateCache|moveUnsealed, pathType, storiface.AcquireMove), func(ctx context.Context, w Worker) error { _, err := m.waitSimpleCall(ctx)(w.FinalizeReplicaUpdate(ctx, sector, keepUnsealed)) return err @@ -628,22 +629,30 @@ func (m *Manager) FinalizeReplicaUpdate(ctx context.Context, sector storage.Sect return err } - fetchSel := newAllocSelector(m.index, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache, storiface.PathStorage) - moveUnsealed := fts - { - if len(keepUnsealed) == 0 { - moveUnsealed = storiface.FTNone + move := func(types storiface.SectorFileType) error { + fetchSel := newAllocSelector(m.index, types, storiface.PathStorage) + { + if len(keepUnsealed) == 0 { + moveUnsealed = storiface.FTNone + } } + + err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel, + m.schedFetch(sector, types, storiface.PathStorage, storiface.AcquireMove), + func(ctx context.Context, w Worker) error { + _, err := m.waitSimpleCall(ctx)(w.MoveStorage(ctx, sector, types)) + return err + }) + if err != nil { + return xerrors.Errorf("moving sector to storage: %w", err) + } + return nil } - err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel, - m.schedFetch(sector, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache|moveUnsealed, storiface.PathStorage, storiface.AcquireMove), - func(ctx context.Context, w Worker) error { - _, err := m.waitSimpleCall(ctx)(w.MoveStorage(ctx, sector, storiface.FTCache|storiface.FTSealed|storiface.FTUpdate|storiface.FTUpdateCache|moveUnsealed)) - return err - }) - if err != nil { - return xerrors.Errorf("moving sector to storage: %w", err) + err = multierr.Append(move(storiface.FTUpdate|storiface.FTUpdateCache), move(storiface.FTCache)) + err = multierr.Append(err, move(storiface.FTSealed)) // Sealed separate from cache just in case ReleaseSectorKey was already called + if moveUnsealed != storiface.FTNone { + err = multierr.Append(err, move(moveUnsealed)) } return nil