From c7b64bd6a952192a6f88e2d60c18a34aef0c4160 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 22 Mar 2024 15:12:03 +0100 Subject: [PATCH] curio: Storage reservations when fetching --- curiosrc/ffi/sdr_funcs.go | 22 +++++++-- curiosrc/ffi/task_storage.go | 45 ++++++----------- storage/paths/interface.go | 2 +- storage/paths/local.go | 2 +- storage/paths/remote.go | 73 ++++++++++++++++------------ storage/sealer/storiface/filetype.go | 35 +++++++++++++ storage/sealer/storiface/paths.go | 12 +++++ 7 files changed, 124 insertions(+), 67 deletions(-) diff --git a/curiosrc/ffi/sdr_funcs.go b/curiosrc/ffi/sdr_funcs.go index 52d90f70d..4dc66bbde 100644 --- a/curiosrc/ffi/sdr_funcs.go +++ b/curiosrc/ffi/sdr_funcs.go @@ -80,6 +80,23 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask paths = resv.Paths storageIDs = resv.PathIDs releaseStorage = resv.Release + + if len(existing.AllSet()) > 0 { + // there are some "existing" files in the reservation. Some of them may need fetching, so call l.storage.AcquireSector + // (which unlike in the reservation code will be called on the paths.Remote instance) to ensure that the files are + // present locally. Note that we do not care about 'allocate' reqeuests, those files don't exist, and are just + // proposed paths with a reservation of space. + + _, checkPathIDs, err := l.storage.AcquireSector(ctx, sector, existing, storiface.FTNone, sealing, storiface.AcquireMove, storiface.AcquireInto(storiface.PathsWithIDs{Paths: paths, IDs: storageIDs})) + if err != nil { + return storiface.SectorPaths{}, nil, xerrors.Errorf("acquire reserved existing files: %w", err) + } + + // assert that checkPathIDs is the same as storageIDs + if storageIDs.Subset(existing) != checkPathIDs.Subset(existing) { + return storiface.SectorPaths{}, nil, xerrors.Errorf("acquire reserved existing files: pathIDs mismatch %#v != %#v", storageIDs, checkPathIDs) + } + } } else { var err error paths, storageIDs, err = l.storage.AcquireSector(ctx, sector, existing, allocate, sealing, storiface.AcquireMove) @@ -143,10 +160,7 @@ func (sb *SealCalls) GenerateSDR(ctx context.Context, taskID harmonytask.TaskID, } func (sb *SealCalls) TreeD(ctx context.Context, sector storiface.SectorRef, size abi.PaddedPieceSize, data io.Reader, unpaddedData bool) (cid.Cid, error) { - maybeUns := storiface.FTNone - // todo sectors with data - - paths, releaseSector, err := sb.sectors.AcquireSector(ctx, nil, sector, storiface.FTCache, maybeUns, storiface.PathSealing) + paths, releaseSector, err := sb.sectors.AcquireSector(ctx, nil, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing) if err != nil { return cid.Undef, xerrors.Errorf("acquiring sector paths: %w", err) } diff --git a/curiosrc/ffi/task_storage.go b/curiosrc/ffi/task_storage.go index ddc5e00a3..0fcb030dc 100644 --- a/curiosrc/ffi/task_storage.go +++ b/curiosrc/ffi/task_storage.go @@ -10,7 +10,6 @@ import ( "github.com/filecoin-project/lotus/lib/harmony/harmonytask" "github.com/filecoin-project/lotus/lib/harmony/resources" - "github.com/filecoin-project/lotus/lib/must" "github.com/filecoin-project/lotus/storage/sealer/storiface" ) @@ -111,6 +110,11 @@ func (t *TaskStorage) HasCapacity() bool { } func (t *TaskStorage) Claim(taskID int) error { + // TaskStorage Claim Attempts to reserve storage for the task + // A: Create a reservation for files to be allocated + // B: Create a reservation for existing files to be fetched into local storage + // C: Create a reservation for existing files in local storage which may be extended (e.g. sector cache when computing Trees) + ctx := context.Background() sectorRef, err := t.taskToSectorRef(harmonytask.TaskID(taskID)) @@ -121,7 +125,7 @@ func (t *TaskStorage) Claim(taskID int) error { // storage writelock sector lkctx, cancel := context.WithCancel(ctx) - allocate := storiface.FTCache + requestedTypes := t.alloc | t.existing lockAcquireTimuout := time.Second * 10 lockAcquireTimer := time.NewTimer(lockAcquireTimuout) @@ -135,7 +139,7 @@ func (t *TaskStorage) Claim(taskID int) error { } }() - if err := t.sc.sectors.sindex.StorageLock(lkctx, sectorRef.ID(), storiface.FTNone, allocate); err != nil { + if err := t.sc.sectors.sindex.StorageLock(lkctx, sectorRef.ID(), storiface.FTNone, requestedTypes); err != nil { // timer will expire return xerrors.Errorf("claim StorageLock: %w", err) } @@ -149,39 +153,18 @@ func (t *TaskStorage) Claim(taskID int) error { lockAcquireTimer.Reset(0) }() - // find anywhere - // if found return nil, for now - s, err := t.sc.sectors.sindex.StorageFindSector(ctx, sectorRef.ID(), allocate, must.One(sectorRef.RegSealProof.SectorSize()), false) - if err != nil { - return xerrors.Errorf("claim StorageFindSector: %w", err) - } - - lp, err := t.sc.sectors.localStore.Local(ctx) - if err != nil { - return err - } - - // see if there are any non-local sector files in storage - for _, info := range s { - for _, l := range lp { - if l.ID == info.ID { - continue - } - - // TODO: Create reservation for fetching; This will require quite a bit more refactoring, but for now we'll - // only care about new allocations - return nil - } - } - - // acquire a path to make a reservation in - pathsFs, pathIDs, err := t.sc.sectors.localStore.AcquireSector(ctx, sectorRef.Ref(), storiface.FTNone, allocate, storiface.PathSealing, storiface.AcquireMove) + // First see what we have locally. We are putting allocate and existing together because local acquire will look + // for existing files for allocate requests, separately existing files which aren't found locally will be need to + // be fetched, so we will need to create reservations for that too. + // NOTE localStore.AcquireSector does not open or create any files, nor does it reserve space. It only proposes + // paths to be used. + pathsFs, pathIDs, err := t.sc.sectors.localStore.AcquireSector(ctx, sectorRef.Ref(), storiface.FTNone, requestedTypes, storiface.PathSealing, storiface.AcquireMove) if err != nil { return err } // reserve the space - release, err := t.sc.sectors.localStore.Reserve(ctx, sectorRef.Ref(), allocate, pathIDs, storiface.FSOverheadSeal) + release, err := t.sc.sectors.localStore.Reserve(ctx, sectorRef.Ref(), requestedTypes, pathIDs, storiface.FSOverheadSeal) if err != nil { return err } diff --git a/storage/paths/interface.go b/storage/paths/interface.go index 4ff206c6d..27d6ee541 100644 --- a/storage/paths/interface.go +++ b/storage/paths/interface.go @@ -35,7 +35,7 @@ type PartialFileHandler interface { //go:generate go run github.com/golang/mock/mockgen -destination=mocks/store.go -package=mocks . Store type Store interface { - AcquireSector(ctx context.Context, s storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType, op storiface.AcquireMode) (paths storiface.SectorPaths, stores storiface.SectorPaths, err error) + AcquireSector(ctx context.Context, s storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType, op storiface.AcquireMode, opts ...storiface.AcquireOption) (paths storiface.SectorPaths, stores storiface.SectorPaths, err error) Remove(ctx context.Context, s abi.SectorID, types storiface.SectorFileType, force bool, keepIn []storiface.ID) error // like remove, but doesn't remove the primary sector copy, nor the last diff --git a/storage/paths/local.go b/storage/paths/local.go index 7dd7c1256..3efe099e6 100644 --- a/storage/paths/local.go +++ b/storage/paths/local.go @@ -460,7 +460,7 @@ func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storif return done, nil } -func (st *Local) AcquireSector(ctx context.Context, sid storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType, op storiface.AcquireMode) (storiface.SectorPaths, storiface.SectorPaths, error) { +func (st *Local) AcquireSector(ctx context.Context, sid storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType, op storiface.AcquireMode, opts ...storiface.AcquireOption) (storiface.SectorPaths, storiface.SectorPaths, error) { if existing|allocate != existing^allocate { return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.New("can't both find and allocate a sector") } diff --git a/storage/paths/remote.go b/storage/paths/remote.go index 9ff719954..94b7f511a 100644 --- a/storage/paths/remote.go +++ b/storage/paths/remote.go @@ -93,11 +93,28 @@ func NewRemote(local Store, index SectorIndex, auth http.Header, fetchLimit int, } } -func (r *Remote) AcquireSector(ctx context.Context, s storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType, op storiface.AcquireMode) (storiface.SectorPaths, storiface.SectorPaths, error) { +func (r *Remote) AcquireSector(ctx context.Context, s storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType, op storiface.AcquireMode, opts ...storiface.AcquireOption) (storiface.SectorPaths, storiface.SectorPaths, error) { if existing|allocate != existing^allocate { return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.New("can't both find and allocate a sector") } + settings := storiface.AcquireSettings{ + // Into will tell us which paths things should be fetched into or allocated in. + Into: nil, + } + for _, o := range opts { + o(&settings) + } + + if settings.Into != nil { + if !allocate.IsNone() { + return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.New("cannot specify Into with allocate") + } + if !settings.Into.HasAllSet(existing) { + return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.New("Into has to have all existing paths") + } + } + // First make sure that no other goroutines are trying to fetch this sector; // wait if there are any. for { @@ -134,47 +151,43 @@ func (r *Remote) AcquireSector(ctx context.Context, s storiface.SectorRef, exist } var toFetch storiface.SectorFileType - for _, fileType := range storiface.PathTypes { - if fileType&existing == 0 { - continue - } - + for _, fileType := range existing.AllSet() { if storiface.PathByType(paths, fileType) == "" { toFetch |= fileType } } // get a list of paths to fetch data into. Note: file type filters will apply inside this call. - fetchPaths, ids, err := r.local.AcquireSector(ctx, s, storiface.FTNone, toFetch, pathType, op) - if err != nil { - return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("allocate local sector for fetching: %w", err) - } + var fetchPaths, fetchIDs storiface.SectorPaths - overheadTable := storiface.FSOverheadSeal - if pathType == storiface.PathStorage { - overheadTable = storiface.FsOverheadFinalized - } - - // If any path types weren't found in local storage, try fetching them - - // First reserve storage - releaseStorage, err := r.local.Reserve(ctx, s, toFetch, ids, overheadTable) - if err != nil { - return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("reserving storage space: %w", err) - } - defer releaseStorage() - - for _, fileType := range storiface.PathTypes { - if fileType&existing == 0 { - continue + if settings.Into == nil { + // fetching without existing reservation, so allocate paths and create a reservation + fetchPaths, fetchIDs, err = r.local.AcquireSector(ctx, s, storiface.FTNone, toFetch, pathType, op) + if err != nil { + return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("allocate local sector for fetching: %w", err) } - if storiface.PathByType(paths, fileType) != "" { - continue + overheadTable := storiface.FSOverheadSeal + if pathType == storiface.PathStorage { + overheadTable = storiface.FsOverheadFinalized } + // If any path types weren't found in local storage, try fetching them + + // First reserve storage + releaseStorage, err := r.local.Reserve(ctx, s, toFetch, fetchIDs, overheadTable) + if err != nil { + return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("reserving storage space: %w", err) + } + defer releaseStorage() + } else { + fetchPaths = settings.Into.Paths + fetchIDs = settings.Into.IDs + } + + for _, fileType := range toFetch.AllSet() { dest := storiface.PathByType(fetchPaths, fileType) - storageID := storiface.PathByType(ids, fileType) + storageID := storiface.PathByType(fetchIDs, fileType) url, err := r.acquireFromRemote(ctx, s.ID, fileType, dest) if err != nil { diff --git a/storage/sealer/storiface/filetype.go b/storage/sealer/storiface/filetype.go index 109e494a8..422f87cf3 100644 --- a/storage/sealer/storiface/filetype.go +++ b/storage/sealer/storiface/filetype.go @@ -214,6 +214,10 @@ func (t SectorFileType) All() [FileTypes]bool { return out } +func (t SectorFileType) IsNone() bool { + return t == 0 +} + type SectorPaths struct { ID abi.SectorID @@ -225,6 +229,28 @@ type SectorPaths struct { Piece string } +func (sp SectorPaths) HasAllSet(ft SectorFileType) bool { + for _, fileType := range ft.AllSet() { + if PathByType(sp, fileType) == "" { + return false + } + } + + return true +} + +func (sp SectorPaths) Subset(filter SectorFileType) SectorPaths { + var out SectorPaths + + for _, fileType := range filter.AllSet() { + SetPathByType(&out, fileType, PathByType(sp, fileType)) + } + + out.ID = sp.ID + + return out +} + func ParseSectorID(baseName string) (abi.SectorID, error) { var n abi.SectorNumber var mid abi.ActorID @@ -282,3 +308,12 @@ func SetPathByType(sps *SectorPaths, fileType SectorFileType, p string) { sps.Piece = p } } + +type PathsWithIDs struct { + Paths SectorPaths + IDs SectorPaths +} + +func (p PathsWithIDs) HasAllSet(ft SectorFileType) bool { + return p.Paths.HasAllSet(ft) && p.IDs.HasAllSet(ft) +} diff --git a/storage/sealer/storiface/paths.go b/storage/sealer/storiface/paths.go index 2cb4f34d3..0f0eaeadf 100644 --- a/storage/sealer/storiface/paths.go +++ b/storage/sealer/storiface/paths.go @@ -25,3 +25,15 @@ type SectorLock struct { type SectorLocks struct { Locks []SectorLock } + +type AcquireSettings struct { + Into *PathsWithIDs +} + +type AcquireOption func(*AcquireSettings) + +func AcquireInto(pathIDs PathsWithIDs) AcquireOption { + return func(settings *AcquireSettings) { + settings.Into = &pathIDs + } +}