Merge pull request #65 from filecoin-project/feat/local-space-alloc
Better storage reservation logic
This commit is contained in:
commit
77cf6e49a0
@ -43,9 +43,11 @@ func (m *Manager) CheckProvable(ctx context.Context, spt abi.RegisteredSealProof
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
lp, _, err := m.localStore.AcquireSector(ctx, sector, spt, stores.FTSealed|stores.FTCache, stores.FTNone, false, stores.AcquireMove)
|
lp, _, err := m.localStore.AcquireSector(ctx, sector, spt, stores.FTSealed|stores.FTCache, stores.FTNone, stores.PathStorage, stores.AcquireMove)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("acquire sector in checkProvable: %w", err)
|
log.Warnw("CheckProvable Sector FAULT: acquire sector in checkProvable", "sector", sector, "error", err)
|
||||||
|
bad = append(bad, sector)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if lp.Sealed == "" || lp.Cache == "" {
|
if lp.Sealed == "" || lp.Cache == "" {
|
||||||
|
@ -82,7 +82,7 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
|
|||||||
|
|
||||||
var stagedPath stores.SectorPaths
|
var stagedPath stores.SectorPaths
|
||||||
if len(existingPieceSizes) == 0 {
|
if len(existingPieceSizes) == 0 {
|
||||||
stagedPath, done, err = sb.sectors.AcquireSector(ctx, sector, 0, stores.FTUnsealed, true)
|
stagedPath, done, err = sb.sectors.AcquireSector(ctx, sector, 0, stores.FTUnsealed, stores.PathSealing)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err)
|
return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err)
|
||||||
}
|
}
|
||||||
@ -92,7 +92,7 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
|
|||||||
return abi.PieceInfo{}, xerrors.Errorf("creating unsealed sector file: %w", err)
|
return abi.PieceInfo{}, xerrors.Errorf("creating unsealed sector file: %w", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
stagedPath, done, err = sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, 0, true)
|
stagedPath, done, err = sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, 0, stores.PathSealing)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err)
|
return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err)
|
||||||
}
|
}
|
||||||
@ -199,12 +199,12 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s
|
|||||||
maxPieceSize := abi.PaddedPieceSize(sb.ssize)
|
maxPieceSize := abi.PaddedPieceSize(sb.ssize)
|
||||||
|
|
||||||
// try finding existing
|
// try finding existing
|
||||||
unsealedPath, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTNone, false)
|
unsealedPath, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTNone, stores.PathStorage)
|
||||||
var pf *partialFile
|
var pf *partialFile
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case xerrors.Is(err, storiface.ErrSectorNotFound):
|
case xerrors.Is(err, storiface.ErrSectorNotFound):
|
||||||
unsealedPath, done, err = sb.sectors.AcquireSector(ctx, sector, stores.FTNone, stores.FTUnsealed, false)
|
unsealedPath, done, err = sb.sectors.AcquireSector(ctx, sector, stores.FTNone, stores.FTUnsealed, stores.PathStorage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("acquire unsealed sector path (allocate): %w", err)
|
return xerrors.Errorf("acquire unsealed sector path (allocate): %w", err)
|
||||||
}
|
}
|
||||||
@ -241,7 +241,7 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
srcPaths, srcDone, err := sb.sectors.AcquireSector(ctx, sector, stores.FTCache|stores.FTSealed, stores.FTNone, false)
|
srcPaths, srcDone, err := sb.sectors.AcquireSector(ctx, sector, stores.FTCache|stores.FTSealed, stores.FTNone, stores.PathStorage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("acquire sealed sector paths: %w", err)
|
return xerrors.Errorf("acquire sealed sector paths: %w", err)
|
||||||
}
|
}
|
||||||
@ -359,7 +359,7 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error {
|
func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error {
|
||||||
path, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTNone, false)
|
path, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTNone, stores.PathStorage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("acquire unsealed sector path: %w", err)
|
return xerrors.Errorf("acquire unsealed sector path: %w", err)
|
||||||
}
|
}
|
||||||
@ -396,7 +396,7 @@ func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector abi.Se
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sb *Sealer) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
|
func (sb *Sealer) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
|
||||||
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTSealed|stores.FTCache, true)
|
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTSealed|stores.FTCache, stores.PathSealing)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("acquiring sector paths: %w", err)
|
return nil, xerrors.Errorf("acquiring sector paths: %w", err)
|
||||||
}
|
}
|
||||||
@ -453,7 +453,7 @@ func (sb *Sealer) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sb *Sealer) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.PreCommit1Out) (storage.SectorCids, error) {
|
func (sb *Sealer) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.PreCommit1Out) (storage.SectorCids, error) {
|
||||||
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTSealed|stores.FTCache, 0, true)
|
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTSealed|stores.FTCache, 0, stores.PathSealing)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return storage.SectorCids{}, xerrors.Errorf("acquiring sector paths: %w", err)
|
return storage.SectorCids{}, xerrors.Errorf("acquiring sector paths: %w", err)
|
||||||
}
|
}
|
||||||
@ -471,7 +471,7 @@ func (sb *Sealer) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sb *Sealer) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) {
|
func (sb *Sealer) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) {
|
||||||
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTSealed|stores.FTCache, 0, true)
|
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTSealed|stores.FTCache, 0, stores.PathSealing)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("acquire sector paths: %w", err)
|
return nil, xerrors.Errorf("acquire sector paths: %w", err)
|
||||||
}
|
}
|
||||||
@ -559,7 +559,7 @@ func (sb *Sealer) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTCache, 0, false)
|
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTCache, 0, stores.PathStorage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("acquiring sector cache path: %w", err)
|
return xerrors.Errorf("acquiring sector cache path: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -121,7 +121,7 @@ func (s *seal) unseal(t *testing.T, sb *Sealer, sp *basicfs.Provider, si abi.Sec
|
|||||||
t.Fatal("read wrong bytes")
|
t.Fatal("read wrong bytes")
|
||||||
}
|
}
|
||||||
|
|
||||||
p, sd, err := sp.AcquireSector(context.TODO(), si, stores.FTUnsealed, stores.FTNone, false)
|
p, sd, err := sp.AcquireSector(context.TODO(), si, stores.FTUnsealed, stores.FTNone, stores.PathStorage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -62,7 +62,7 @@ func (sb *Sealer) pubSectorToPriv(ctx context.Context, mid abi.ActorID, sectorIn
|
|||||||
|
|
||||||
sid := abi.SectorID{Miner: mid, Number: s.SectorNumber}
|
sid := abi.SectorID{Miner: mid, Number: s.SectorNumber}
|
||||||
|
|
||||||
paths, d, err := sb.sectors.AcquireSector(ctx, sid, stores.FTCache|stores.FTSealed, 0, false)
|
paths, d, err := sb.sectors.AcquireSector(ctx, sid, stores.FTCache|stores.FTSealed, 0, stores.PathStorage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnw("failed to acquire sector, skipping", "sector", sid, "error", err)
|
log.Warnw("failed to acquire sector, skipping", "sector", sid, "error", err)
|
||||||
skipped = append(skipped, sid)
|
skipped = append(skipped, sid)
|
||||||
|
@ -61,14 +61,22 @@ type localWorkerPathProvider struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing stores.PathType) (stores.SectorPaths, func(), error) {
|
func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing stores.PathType) (stores.SectorPaths, func(), error) {
|
||||||
|
|
||||||
paths, storageIDs, err := l.w.storage.AcquireSector(ctx, sector, l.w.scfg.SealProofType, existing, allocate, sealing, l.op)
|
paths, storageIDs, err := l.w.storage.AcquireSector(ctx, sector, l.w.scfg.SealProofType, existing, allocate, sealing, l.op)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return stores.SectorPaths{}, nil, err
|
return stores.SectorPaths{}, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
releaseStorage, err := l.w.localStore.Reserve(ctx, sector, l.w.scfg.SealProofType, allocate, storageIDs, stores.FSOverheadSeal)
|
||||||
|
if err != nil {
|
||||||
|
return stores.SectorPaths{}, nil, xerrors.Errorf("reserving storage space: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths)
|
log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths)
|
||||||
|
|
||||||
return paths, func() {
|
return paths, func() {
|
||||||
|
releaseStorage()
|
||||||
|
|
||||||
for _, fileType := range pathTypes {
|
for _, fileType := range pathTypes {
|
||||||
if fileType&allocate == 0 {
|
if fileType&allocate == 0 {
|
||||||
continue
|
continue
|
||||||
|
@ -218,12 +218,12 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
|
|||||||
// TODO: Optimization: don't send unseal to a worker if the requested range is already unsealed
|
// TODO: Optimization: don't send unseal to a worker if the requested range is already unsealed
|
||||||
|
|
||||||
unsealFetch := func(ctx context.Context, worker Worker) error {
|
unsealFetch := func(ctx context.Context, worker Worker) error {
|
||||||
if err := worker.Fetch(ctx, sector, stores.FTSealed|stores.FTCache, true, stores.AcquireCopy); err != nil {
|
if err := worker.Fetch(ctx, sector, stores.FTSealed|stores.FTCache, stores.PathSealing, stores.AcquireCopy); err != nil {
|
||||||
return xerrors.Errorf("copy sealed/cache sector data: %w", err)
|
return xerrors.Errorf("copy sealed/cache sector data: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(best) > 0 {
|
if len(best) > 0 {
|
||||||
if err := worker.Fetch(ctx, sector, stores.FTUnsealed, true, stores.AcquireMove); err != nil {
|
if err := worker.Fetch(ctx, sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove); err != nil {
|
||||||
return xerrors.Errorf("copy unsealed sector data: %w", err)
|
return xerrors.Errorf("copy unsealed sector data: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -24,6 +24,10 @@ import (
|
|||||||
|
|
||||||
type testStorage stores.StorageConfig
|
type testStorage stores.StorageConfig
|
||||||
|
|
||||||
|
func (t testStorage) DiskUsage(path string) (int64, error) {
|
||||||
|
return 1, nil // close enough
|
||||||
|
}
|
||||||
|
|
||||||
func newTestStorage(t *testing.T) *testStorage {
|
func newTestStorage(t *testing.T) *testStorage {
|
||||||
tp, err := ioutil.TempDir(os.TempDir(), "sector-storage-test-")
|
tp, err := ioutil.TempDir(os.TempDir(), "sector-storage-test-")
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -19,15 +19,17 @@ const (
|
|||||||
FTNone SectorFileType = 0
|
FTNone SectorFileType = 0
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const FSOverheadDen = 10
|
||||||
|
|
||||||
var FSOverheadSeal = map[SectorFileType]int{ // 10x overheads
|
var FSOverheadSeal = map[SectorFileType]int{ // 10x overheads
|
||||||
FTUnsealed: 10,
|
FTUnsealed: FSOverheadDen,
|
||||||
FTSealed: 10,
|
FTSealed: FSOverheadDen,
|
||||||
FTCache: 70, // TODO: confirm for 32G
|
FTCache: 141, // 11 layers + D(2x ssize) + C + R
|
||||||
}
|
}
|
||||||
|
|
||||||
var FsOverheadFinalized = map[SectorFileType]int{
|
var FsOverheadFinalized = map[SectorFileType]int{
|
||||||
FTUnsealed: 10,
|
FTUnsealed: FSOverheadDen,
|
||||||
FTSealed: 10,
|
FTSealed: FSOverheadDen,
|
||||||
FTCache: 2,
|
FTCache: 2,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -67,7 +69,7 @@ func (t SectorFileType) SealSpaceUse(spt abi.RegisteredSealProof) (uint64, error
|
|||||||
return 0, xerrors.Errorf("no seal overhead info for %s", pathType)
|
return 0, xerrors.Errorf("no seal overhead info for %s", pathType)
|
||||||
}
|
}
|
||||||
|
|
||||||
need += uint64(oh) * uint64(ssize) / 10
|
need += uint64(oh) * uint64(ssize) / FSOverheadDen
|
||||||
}
|
}
|
||||||
|
|
||||||
return need, nil
|
return need, nil
|
||||||
|
@ -72,13 +72,15 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ
|
|||||||
// The caller has a lock on this sector already, no need to get one here
|
// The caller has a lock on this sector already, no need to get one here
|
||||||
|
|
||||||
// passing 0 spt because we don't allocate anything
|
// passing 0 spt because we don't allocate anything
|
||||||
paths, _, err := handler.Local.AcquireSector(r.Context(), id, 0, ft, FTNone, false, AcquireMove)
|
paths, _, err := handler.Local.AcquireSector(r.Context(), id, 0, ft, FTNone, PathStorage, AcquireMove)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("%+v", err)
|
log.Error("%+v", err)
|
||||||
w.WriteHeader(500)
|
w.WriteHeader(500)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: reserve local storage here
|
||||||
|
|
||||||
path := PathByType(paths, ft)
|
path := PathByType(paths, ft)
|
||||||
if path == "" {
|
if path == "" {
|
||||||
log.Error("acquired path was empty")
|
log.Error("acquired path was empty")
|
||||||
|
@ -361,7 +361,7 @@ func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, s
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if spaceReq > p.fsi.Available {
|
if spaceReq > uint64(p.fsi.Available) {
|
||||||
log.Debugf("not allocating on %s, out of space (available: %d, need: %d)", p.info.ID, p.fsi.Available, spaceReq)
|
log.Debugf("not allocating on %s, out of space (available: %d, need: %d)", p.info.ID, p.fsi.Available, spaceReq)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -9,11 +9,11 @@ import (
|
|||||||
"github.com/filecoin-project/specs-actors/actors/abi"
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||||
)
|
)
|
||||||
|
|
||||||
type PathType bool
|
type PathType string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
PathStorage = false
|
PathStorage = "storage"
|
||||||
PathSealing = true
|
PathSealing = "sealing"
|
||||||
)
|
)
|
||||||
|
|
||||||
type AcquireMode string
|
type AcquireMode string
|
||||||
@ -44,13 +44,14 @@ func Stat(path string) (FsStat, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return FsStat{
|
return FsStat{
|
||||||
Capacity: stat.Blocks * uint64(stat.Bsize),
|
Capacity: int64(stat.Blocks) * stat.Bsize,
|
||||||
Available: stat.Bavail * uint64(stat.Bsize),
|
Available: int64(stat.Bavail) * stat.Bsize,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type FsStat struct {
|
type FsStat struct {
|
||||||
Capacity uint64
|
Capacity int64
|
||||||
Available uint64 // Available to use for sector storage
|
Available int64 // Available to use for sector storage
|
||||||
Used uint64
|
Used int64
|
||||||
|
Reserved int64
|
||||||
}
|
}
|
||||||
|
119
stores/local.go
119
stores/local.go
@ -49,6 +49,7 @@ type LocalStorage interface {
|
|||||||
SetStorage(func(*StorageConfig)) error
|
SetStorage(func(*StorageConfig)) error
|
||||||
|
|
||||||
Stat(path string) (FsStat, error)
|
Stat(path string) (FsStat, error)
|
||||||
|
DiskUsage(path string) (int64, error) // returns real disk usage for a file/directory
|
||||||
}
|
}
|
||||||
|
|
||||||
const MetaFile = "sectorstore.json"
|
const MetaFile = "sectorstore.json"
|
||||||
@ -67,6 +68,50 @@ type Local struct {
|
|||||||
|
|
||||||
type path struct {
|
type path struct {
|
||||||
local string // absolute local path
|
local string // absolute local path
|
||||||
|
|
||||||
|
reserved int64
|
||||||
|
reservations map[abi.SectorID]SectorFileType
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *path) stat(ls LocalStorage) (FsStat, error) {
|
||||||
|
stat, err := ls.Stat(p.local)
|
||||||
|
if err != nil {
|
||||||
|
return FsStat{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
stat.Reserved = p.reserved
|
||||||
|
|
||||||
|
for id, ft := range p.reservations {
|
||||||
|
for _, fileType := range PathTypes {
|
||||||
|
if fileType&ft == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
used, err := ls.DiskUsage(p.sectorPath(id, fileType))
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("getting disk usage of '%s': %+v", p.sectorPath(id, fileType), err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
stat.Reserved -= used
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if stat.Reserved < 0 {
|
||||||
|
log.Warnf("negative reserved storage: p.reserved=%d, reserved: %d", p.reserved, stat.Reserved)
|
||||||
|
stat.Reserved = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
stat.Available -= stat.Reserved
|
||||||
|
if stat.Available < 0 {
|
||||||
|
stat.Available = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
return stat, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *path) sectorPath(sid abi.SectorID, fileType SectorFileType) string {
|
||||||
|
return filepath.Join(p.local, fileType.String(), SectorName(sid))
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLocal(ctx context.Context, ls LocalStorage, index SectorIndex, urls []string) (*Local, error) {
|
func NewLocal(ctx context.Context, ls LocalStorage, index SectorIndex, urls []string) (*Local, error) {
|
||||||
@ -98,9 +143,12 @@ func (st *Local) OpenPath(ctx context.Context, p string) error {
|
|||||||
|
|
||||||
out := &path{
|
out := &path{
|
||||||
local: p,
|
local: p,
|
||||||
|
|
||||||
|
reserved: 0,
|
||||||
|
reservations: map[abi.SectorID]SectorFileType{},
|
||||||
}
|
}
|
||||||
|
|
||||||
fst, err := st.localStorage.Stat(p)
|
fst, err := out.stat(st.localStorage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -179,7 +227,7 @@ func (st *Local) reportHealth(ctx context.Context) {
|
|||||||
|
|
||||||
toReport := map[ID]HealthReport{}
|
toReport := map[ID]HealthReport{}
|
||||||
for id, p := range st.paths {
|
for id, p := range st.paths {
|
||||||
stat, err := st.localStorage.Stat(p.local)
|
stat, err := p.stat(st.localStorage)
|
||||||
|
|
||||||
toReport[id] = HealthReport{
|
toReport[id] = HealthReport{
|
||||||
Stat: stat,
|
Stat: stat,
|
||||||
@ -197,6 +245,61 @@ func (st *Local) reportHealth(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (st *Local) Reserve(ctx context.Context, sid abi.SectorID, spt abi.RegisteredSealProof, ft SectorFileType, storageIDs SectorPaths, overheadTab map[SectorFileType]int) (func(), error) {
|
||||||
|
ssize, err := spt.SectorSize()
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("getting sector size: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
st.localLk.Lock()
|
||||||
|
|
||||||
|
done := func() {}
|
||||||
|
deferredDone := func() { done() }
|
||||||
|
defer func() {
|
||||||
|
st.localLk.Unlock()
|
||||||
|
deferredDone()
|
||||||
|
}()
|
||||||
|
|
||||||
|
for _, fileType := range PathTypes {
|
||||||
|
if fileType&ft == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
id := ID(PathByType(storageIDs, fileType))
|
||||||
|
|
||||||
|
p, ok := st.paths[id]
|
||||||
|
if !ok {
|
||||||
|
return nil, errPathNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
stat, err := p.stat(st.localStorage)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
overhead := int64(overheadTab[fileType]) * int64(ssize) / FSOverheadDen
|
||||||
|
|
||||||
|
if stat.Available < overhead {
|
||||||
|
return nil, xerrors.Errorf("can't reserve %d bytes in '%s' (id:%s), only %d available", overhead, p.local, id, stat.Available)
|
||||||
|
}
|
||||||
|
|
||||||
|
p.reserved += overhead
|
||||||
|
|
||||||
|
prevDone := done
|
||||||
|
done = func() {
|
||||||
|
prevDone()
|
||||||
|
|
||||||
|
st.localLk.Lock()
|
||||||
|
defer st.localLk.Unlock()
|
||||||
|
|
||||||
|
p.reserved -= overhead
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
deferredDone = func() {}
|
||||||
|
return done, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.RegisteredSealProof, existing SectorFileType, allocate SectorFileType, pathType PathType, op AcquireMode) (SectorPaths, SectorPaths, error) {
|
func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.RegisteredSealProof, existing SectorFileType, allocate SectorFileType, pathType PathType, op AcquireMode) (SectorPaths, SectorPaths, error) {
|
||||||
if existing|allocate != existing^allocate {
|
if existing|allocate != existing^allocate {
|
||||||
return SectorPaths{}, SectorPaths{}, xerrors.New("can't both find and allocate a sector")
|
return SectorPaths{}, SectorPaths{}, xerrors.New("can't both find and allocate a sector")
|
||||||
@ -229,7 +332,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.Re
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
spath := filepath.Join(p.local, fileType.String(), SectorName(sid))
|
spath := p.sectorPath(sid, fileType)
|
||||||
SetPathByType(&out, fileType, spath)
|
SetPathByType(&out, fileType, spath)
|
||||||
SetPathByType(&storageIDs, fileType, string(info.ID))
|
SetPathByType(&storageIDs, fileType, string(info.ID))
|
||||||
|
|
||||||
@ -271,7 +374,7 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.Re
|
|||||||
|
|
||||||
// TODO: Check free space
|
// TODO: Check free space
|
||||||
|
|
||||||
best = filepath.Join(p.local, fileType.String(), SectorName(sid))
|
best = p.sectorPath(sid, fileType)
|
||||||
bestID = si.ID
|
bestID = si.ID
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -387,7 +490,7 @@ func (st *Local) removeSector(ctx context.Context, sid abi.SectorID, typ SectorF
|
|||||||
return xerrors.Errorf("dropping sector from index: %w", err)
|
return xerrors.Errorf("dropping sector from index: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
spath := filepath.Join(p.local, typ.String(), SectorName(sid))
|
spath := p.sectorPath(sid, typ)
|
||||||
log.Infof("remove %s", spath)
|
log.Infof("remove %s", spath)
|
||||||
|
|
||||||
if err := os.RemoveAll(spath); err != nil {
|
if err := os.RemoveAll(spath); err != nil {
|
||||||
@ -398,12 +501,12 @@ func (st *Local) removeSector(ctx context.Context, sid abi.SectorID, typ SectorF
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredSealProof, types SectorFileType) error {
|
func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredSealProof, types SectorFileType) error {
|
||||||
dest, destIds, err := st.AcquireSector(ctx, s, spt, FTNone, types, false, AcquireMove)
|
dest, destIds, err := st.AcquireSector(ctx, s, spt, FTNone, types, PathStorage, AcquireMove)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("acquire dest storage: %w", err)
|
return xerrors.Errorf("acquire dest storage: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
src, srcIds, err := st.AcquireSector(ctx, s, spt, types, FTNone, false, AcquireMove)
|
src, srcIds, err := st.AcquireSector(ctx, s, spt, types, FTNone, PathStorage, AcquireMove)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("acquire src storage: %w", err)
|
return xerrors.Errorf("acquire src storage: %w", err)
|
||||||
}
|
}
|
||||||
@ -463,7 +566,7 @@ func (st *Local) FsStat(ctx context.Context, id ID) (FsStat, error) {
|
|||||||
return FsStat{}, errPathNotFound
|
return FsStat{}, errPathNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
return st.localStorage.Stat(p.local)
|
return p.stat(st.localStorage)
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ Store = &Local{}
|
var _ Store = &Local{}
|
||||||
|
@ -19,6 +19,10 @@ type TestingLocalStorage struct {
|
|||||||
c StorageConfig
|
c StorageConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *TestingLocalStorage) DiskUsage(path string) (int64, error) {
|
||||||
|
return 1, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (t *TestingLocalStorage) GetStorage() (StorageConfig, error) {
|
func (t *TestingLocalStorage) GetStorage() (StorageConfig, error) {
|
||||||
return t.c, nil
|
return t.c, nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user