curio: Storage reservations when fetching
This commit is contained in:
parent
662ea6f2d5
commit
c7b64bd6a9
@ -80,6 +80,23 @@ func (l *storageProvider) AcquireSector(ctx context.Context, taskID *harmonytask
|
||||
paths = resv.Paths
|
||||
storageIDs = resv.PathIDs
|
||||
releaseStorage = resv.Release
|
||||
|
||||
if len(existing.AllSet()) > 0 {
|
||||
// there are some "existing" files in the reservation. Some of them may need fetching, so call l.storage.AcquireSector
|
||||
// (which unlike in the reservation code will be called on the paths.Remote instance) to ensure that the files are
|
||||
// present locally. Note that we do not care about 'allocate' reqeuests, those files don't exist, and are just
|
||||
// proposed paths with a reservation of space.
|
||||
|
||||
_, checkPathIDs, err := l.storage.AcquireSector(ctx, sector, existing, storiface.FTNone, sealing, storiface.AcquireMove, storiface.AcquireInto(storiface.PathsWithIDs{Paths: paths, IDs: storageIDs}))
|
||||
if err != nil {
|
||||
return storiface.SectorPaths{}, nil, xerrors.Errorf("acquire reserved existing files: %w", err)
|
||||
}
|
||||
|
||||
// assert that checkPathIDs is the same as storageIDs
|
||||
if storageIDs.Subset(existing) != checkPathIDs.Subset(existing) {
|
||||
return storiface.SectorPaths{}, nil, xerrors.Errorf("acquire reserved existing files: pathIDs mismatch %#v != %#v", storageIDs, checkPathIDs)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
var err error
|
||||
paths, storageIDs, err = l.storage.AcquireSector(ctx, sector, existing, allocate, sealing, storiface.AcquireMove)
|
||||
@ -143,10 +160,7 @@ func (sb *SealCalls) GenerateSDR(ctx context.Context, taskID harmonytask.TaskID,
|
||||
}
|
||||
|
||||
func (sb *SealCalls) TreeD(ctx context.Context, sector storiface.SectorRef, size abi.PaddedPieceSize, data io.Reader, unpaddedData bool) (cid.Cid, error) {
|
||||
maybeUns := storiface.FTNone
|
||||
// todo sectors with data
|
||||
|
||||
paths, releaseSector, err := sb.sectors.AcquireSector(ctx, nil, sector, storiface.FTCache, maybeUns, storiface.PathSealing)
|
||||
paths, releaseSector, err := sb.sectors.AcquireSector(ctx, nil, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing)
|
||||
if err != nil {
|
||||
return cid.Undef, xerrors.Errorf("acquiring sector paths: %w", err)
|
||||
}
|
||||
|
@ -10,7 +10,6 @@ import (
|
||||
|
||||
"github.com/filecoin-project/lotus/lib/harmony/harmonytask"
|
||||
"github.com/filecoin-project/lotus/lib/harmony/resources"
|
||||
"github.com/filecoin-project/lotus/lib/must"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
@ -111,6 +110,11 @@ func (t *TaskStorage) HasCapacity() bool {
|
||||
}
|
||||
|
||||
func (t *TaskStorage) Claim(taskID int) error {
|
||||
// TaskStorage Claim Attempts to reserve storage for the task
|
||||
// A: Create a reservation for files to be allocated
|
||||
// B: Create a reservation for existing files to be fetched into local storage
|
||||
// C: Create a reservation for existing files in local storage which may be extended (e.g. sector cache when computing Trees)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
sectorRef, err := t.taskToSectorRef(harmonytask.TaskID(taskID))
|
||||
@ -121,7 +125,7 @@ func (t *TaskStorage) Claim(taskID int) error {
|
||||
// storage writelock sector
|
||||
lkctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
allocate := storiface.FTCache
|
||||
requestedTypes := t.alloc | t.existing
|
||||
|
||||
lockAcquireTimuout := time.Second * 10
|
||||
lockAcquireTimer := time.NewTimer(lockAcquireTimuout)
|
||||
@ -135,7 +139,7 @@ func (t *TaskStorage) Claim(taskID int) error {
|
||||
}
|
||||
}()
|
||||
|
||||
if err := t.sc.sectors.sindex.StorageLock(lkctx, sectorRef.ID(), storiface.FTNone, allocate); err != nil {
|
||||
if err := t.sc.sectors.sindex.StorageLock(lkctx, sectorRef.ID(), storiface.FTNone, requestedTypes); err != nil {
|
||||
// timer will expire
|
||||
return xerrors.Errorf("claim StorageLock: %w", err)
|
||||
}
|
||||
@ -149,39 +153,18 @@ func (t *TaskStorage) Claim(taskID int) error {
|
||||
lockAcquireTimer.Reset(0)
|
||||
}()
|
||||
|
||||
// find anywhere
|
||||
// if found return nil, for now
|
||||
s, err := t.sc.sectors.sindex.StorageFindSector(ctx, sectorRef.ID(), allocate, must.One(sectorRef.RegSealProof.SectorSize()), false)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("claim StorageFindSector: %w", err)
|
||||
}
|
||||
|
||||
lp, err := t.sc.sectors.localStore.Local(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// see if there are any non-local sector files in storage
|
||||
for _, info := range s {
|
||||
for _, l := range lp {
|
||||
if l.ID == info.ID {
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO: Create reservation for fetching; This will require quite a bit more refactoring, but for now we'll
|
||||
// only care about new allocations
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// acquire a path to make a reservation in
|
||||
pathsFs, pathIDs, err := t.sc.sectors.localStore.AcquireSector(ctx, sectorRef.Ref(), storiface.FTNone, allocate, storiface.PathSealing, storiface.AcquireMove)
|
||||
// First see what we have locally. We are putting allocate and existing together because local acquire will look
|
||||
// for existing files for allocate requests, separately existing files which aren't found locally will be need to
|
||||
// be fetched, so we will need to create reservations for that too.
|
||||
// NOTE localStore.AcquireSector does not open or create any files, nor does it reserve space. It only proposes
|
||||
// paths to be used.
|
||||
pathsFs, pathIDs, err := t.sc.sectors.localStore.AcquireSector(ctx, sectorRef.Ref(), storiface.FTNone, requestedTypes, storiface.PathSealing, storiface.AcquireMove)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// reserve the space
|
||||
release, err := t.sc.sectors.localStore.Reserve(ctx, sectorRef.Ref(), allocate, pathIDs, storiface.FSOverheadSeal)
|
||||
release, err := t.sc.sectors.localStore.Reserve(ctx, sectorRef.Ref(), requestedTypes, pathIDs, storiface.FSOverheadSeal)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -35,7 +35,7 @@ type PartialFileHandler interface {
|
||||
//go:generate go run github.com/golang/mock/mockgen -destination=mocks/store.go -package=mocks . Store
|
||||
|
||||
type Store interface {
|
||||
AcquireSector(ctx context.Context, s storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType, op storiface.AcquireMode) (paths storiface.SectorPaths, stores storiface.SectorPaths, err error)
|
||||
AcquireSector(ctx context.Context, s storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType, op storiface.AcquireMode, opts ...storiface.AcquireOption) (paths storiface.SectorPaths, stores storiface.SectorPaths, err error)
|
||||
Remove(ctx context.Context, s abi.SectorID, types storiface.SectorFileType, force bool, keepIn []storiface.ID) error
|
||||
|
||||
// like remove, but doesn't remove the primary sector copy, nor the last
|
||||
|
@ -460,7 +460,7 @@ func (st *Local) Reserve(ctx context.Context, sid storiface.SectorRef, ft storif
|
||||
return done, nil
|
||||
}
|
||||
|
||||
func (st *Local) AcquireSector(ctx context.Context, sid storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType, op storiface.AcquireMode) (storiface.SectorPaths, storiface.SectorPaths, error) {
|
||||
func (st *Local) AcquireSector(ctx context.Context, sid storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType, op storiface.AcquireMode, opts ...storiface.AcquireOption) (storiface.SectorPaths, storiface.SectorPaths, error) {
|
||||
if existing|allocate != existing^allocate {
|
||||
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.New("can't both find and allocate a sector")
|
||||
}
|
||||
|
@ -93,11 +93,28 @@ func NewRemote(local Store, index SectorIndex, auth http.Header, fetchLimit int,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Remote) AcquireSector(ctx context.Context, s storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType, op storiface.AcquireMode) (storiface.SectorPaths, storiface.SectorPaths, error) {
|
||||
func (r *Remote) AcquireSector(ctx context.Context, s storiface.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType, op storiface.AcquireMode, opts ...storiface.AcquireOption) (storiface.SectorPaths, storiface.SectorPaths, error) {
|
||||
if existing|allocate != existing^allocate {
|
||||
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.New("can't both find and allocate a sector")
|
||||
}
|
||||
|
||||
settings := storiface.AcquireSettings{
|
||||
// Into will tell us which paths things should be fetched into or allocated in.
|
||||
Into: nil,
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(&settings)
|
||||
}
|
||||
|
||||
if settings.Into != nil {
|
||||
if !allocate.IsNone() {
|
||||
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.New("cannot specify Into with allocate")
|
||||
}
|
||||
if !settings.Into.HasAllSet(existing) {
|
||||
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.New("Into has to have all existing paths")
|
||||
}
|
||||
}
|
||||
|
||||
// First make sure that no other goroutines are trying to fetch this sector;
|
||||
// wait if there are any.
|
||||
for {
|
||||
@ -134,47 +151,43 @@ func (r *Remote) AcquireSector(ctx context.Context, s storiface.SectorRef, exist
|
||||
}
|
||||
|
||||
var toFetch storiface.SectorFileType
|
||||
for _, fileType := range storiface.PathTypes {
|
||||
if fileType&existing == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
for _, fileType := range existing.AllSet() {
|
||||
if storiface.PathByType(paths, fileType) == "" {
|
||||
toFetch |= fileType
|
||||
}
|
||||
}
|
||||
|
||||
// get a list of paths to fetch data into. Note: file type filters will apply inside this call.
|
||||
fetchPaths, ids, err := r.local.AcquireSector(ctx, s, storiface.FTNone, toFetch, pathType, op)
|
||||
if err != nil {
|
||||
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("allocate local sector for fetching: %w", err)
|
||||
}
|
||||
var fetchPaths, fetchIDs storiface.SectorPaths
|
||||
|
||||
overheadTable := storiface.FSOverheadSeal
|
||||
if pathType == storiface.PathStorage {
|
||||
overheadTable = storiface.FsOverheadFinalized
|
||||
}
|
||||
|
||||
// If any path types weren't found in local storage, try fetching them
|
||||
|
||||
// First reserve storage
|
||||
releaseStorage, err := r.local.Reserve(ctx, s, toFetch, ids, overheadTable)
|
||||
if err != nil {
|
||||
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("reserving storage space: %w", err)
|
||||
}
|
||||
defer releaseStorage()
|
||||
|
||||
for _, fileType := range storiface.PathTypes {
|
||||
if fileType&existing == 0 {
|
||||
continue
|
||||
if settings.Into == nil {
|
||||
// fetching without existing reservation, so allocate paths and create a reservation
|
||||
fetchPaths, fetchIDs, err = r.local.AcquireSector(ctx, s, storiface.FTNone, toFetch, pathType, op)
|
||||
if err != nil {
|
||||
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("allocate local sector for fetching: %w", err)
|
||||
}
|
||||
|
||||
if storiface.PathByType(paths, fileType) != "" {
|
||||
continue
|
||||
overheadTable := storiface.FSOverheadSeal
|
||||
if pathType == storiface.PathStorage {
|
||||
overheadTable = storiface.FsOverheadFinalized
|
||||
}
|
||||
|
||||
// If any path types weren't found in local storage, try fetching them
|
||||
|
||||
// First reserve storage
|
||||
releaseStorage, err := r.local.Reserve(ctx, s, toFetch, fetchIDs, overheadTable)
|
||||
if err != nil {
|
||||
return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("reserving storage space: %w", err)
|
||||
}
|
||||
defer releaseStorage()
|
||||
} else {
|
||||
fetchPaths = settings.Into.Paths
|
||||
fetchIDs = settings.Into.IDs
|
||||
}
|
||||
|
||||
for _, fileType := range toFetch.AllSet() {
|
||||
dest := storiface.PathByType(fetchPaths, fileType)
|
||||
storageID := storiface.PathByType(ids, fileType)
|
||||
storageID := storiface.PathByType(fetchIDs, fileType)
|
||||
|
||||
url, err := r.acquireFromRemote(ctx, s.ID, fileType, dest)
|
||||
if err != nil {
|
||||
|
@ -214,6 +214,10 @@ func (t SectorFileType) All() [FileTypes]bool {
|
||||
return out
|
||||
}
|
||||
|
||||
func (t SectorFileType) IsNone() bool {
|
||||
return t == 0
|
||||
}
|
||||
|
||||
type SectorPaths struct {
|
||||
ID abi.SectorID
|
||||
|
||||
@ -225,6 +229,28 @@ type SectorPaths struct {
|
||||
Piece string
|
||||
}
|
||||
|
||||
func (sp SectorPaths) HasAllSet(ft SectorFileType) bool {
|
||||
for _, fileType := range ft.AllSet() {
|
||||
if PathByType(sp, fileType) == "" {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (sp SectorPaths) Subset(filter SectorFileType) SectorPaths {
|
||||
var out SectorPaths
|
||||
|
||||
for _, fileType := range filter.AllSet() {
|
||||
SetPathByType(&out, fileType, PathByType(sp, fileType))
|
||||
}
|
||||
|
||||
out.ID = sp.ID
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
func ParseSectorID(baseName string) (abi.SectorID, error) {
|
||||
var n abi.SectorNumber
|
||||
var mid abi.ActorID
|
||||
@ -282,3 +308,12 @@ func SetPathByType(sps *SectorPaths, fileType SectorFileType, p string) {
|
||||
sps.Piece = p
|
||||
}
|
||||
}
|
||||
|
||||
type PathsWithIDs struct {
|
||||
Paths SectorPaths
|
||||
IDs SectorPaths
|
||||
}
|
||||
|
||||
func (p PathsWithIDs) HasAllSet(ft SectorFileType) bool {
|
||||
return p.Paths.HasAllSet(ft) && p.IDs.HasAllSet(ft)
|
||||
}
|
||||
|
@ -25,3 +25,15 @@ type SectorLock struct {
|
||||
type SectorLocks struct {
|
||||
Locks []SectorLock
|
||||
}
|
||||
|
||||
type AcquireSettings struct {
|
||||
Into *PathsWithIDs
|
||||
}
|
||||
|
||||
type AcquireOption func(*AcquireSettings)
|
||||
|
||||
func AcquireInto(pathIDs PathsWithIDs) AcquireOption {
|
||||
return func(settings *AcquireSettings) {
|
||||
settings.Into = &pathIDs
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user