storage: Fix import cycle

This commit is contained in:
Łukasz Magiera 2020-09-06 18:54:00 +02:00
parent 159ce13f5e
commit 5d73943929
28 changed files with 323 additions and 318 deletions

View File

@ -284,18 +284,18 @@ type StorageMinerStruct struct {
SealingSchedDiag func(context.Context) (interface{}, error) `perm:"admin"` SealingSchedDiag func(context.Context) (interface{}, error) `perm:"admin"`
StorageList func(context.Context) (map[stores.ID][]stores.Decl, error) `perm:"admin"` StorageList func(context.Context) (map[stores.ID][]stores.Decl, error) `perm:"admin"`
StorageLocal func(context.Context) (map[stores.ID]string, error) `perm:"admin"` StorageLocal func(context.Context) (map[stores.ID]string, error) `perm:"admin"`
StorageStat func(context.Context, stores.ID) (fsutil.FsStat, error) `perm:"admin"` StorageStat func(context.Context, stores.ID) (fsutil.FsStat, error) `perm:"admin"`
StorageAttach func(context.Context, stores.StorageInfo, fsutil.FsStat) error `perm:"admin"` StorageAttach func(context.Context, stores.StorageInfo, fsutil.FsStat) error `perm:"admin"`
StorageDeclareSector func(context.Context, stores.ID, abi.SectorID, stores.SectorFileType, bool) error `perm:"admin"` StorageDeclareSector func(context.Context, stores.ID, abi.SectorID, storiface.SectorFileType, bool) error `perm:"admin"`
StorageDropSector func(context.Context, stores.ID, abi.SectorID, stores.SectorFileType) error `perm:"admin"` StorageDropSector func(context.Context, stores.ID, abi.SectorID, storiface.SectorFileType) error `perm:"admin"`
StorageFindSector func(context.Context, abi.SectorID, stores.SectorFileType, abi.RegisteredSealProof, bool) ([]stores.SectorStorageInfo, error) `perm:"admin"` StorageFindSector func(context.Context, abi.SectorID, storiface.SectorFileType, abi.RegisteredSealProof, bool) ([]stores.SectorStorageInfo, error) `perm:"admin"`
StorageInfo func(context.Context, stores.ID) (stores.StorageInfo, error) `perm:"admin"` StorageInfo func(context.Context, stores.ID) (stores.StorageInfo, error) `perm:"admin"`
StorageBestAlloc func(ctx context.Context, allocate stores.SectorFileType, spt abi.RegisteredSealProof, sealing stores.PathType) ([]stores.StorageInfo, error) `perm:"admin"` StorageBestAlloc func(ctx context.Context, allocate storiface.SectorFileType, spt abi.RegisteredSealProof, sealing storiface.PathType) ([]stores.StorageInfo, error) `perm:"admin"`
StorageReportHealth func(ctx context.Context, id stores.ID, report stores.HealthReport) error `perm:"admin"` StorageReportHealth func(ctx context.Context, id stores.ID, report stores.HealthReport) error `perm:"admin"`
StorageLock func(ctx context.Context, sector abi.SectorID, read stores.SectorFileType, write stores.SectorFileType) error `perm:"admin"` StorageLock func(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) error `perm:"admin"`
StorageTryLock func(ctx context.Context, sector abi.SectorID, read stores.SectorFileType, write stores.SectorFileType) (bool, error) `perm:"admin"` StorageTryLock func(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) `perm:"admin"`
DealsImportData func(ctx context.Context, dealPropCid cid.Cid, file string) error `perm:"write"` DealsImportData func(ctx context.Context, dealPropCid cid.Cid, file string) error `perm:"write"`
DealsList func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"` DealsList func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"`
@ -329,22 +329,21 @@ type WorkerStruct struct {
Paths func(context.Context) ([]stores.StoragePath, error) `perm:"admin"` Paths func(context.Context) ([]stores.StoragePath, error) `perm:"admin"`
Info func(context.Context) (storiface.WorkerInfo, error) `perm:"admin"` Info func(context.Context) (storiface.WorkerInfo, error) `perm:"admin"`
AddPiece func(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) `perm:"admin"` AddPiece func(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) `perm:"admin"`
SealPreCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) `perm:"admin"` SealPreCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) `perm:"admin"`
SealPreCommit2 func(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storiface.CallID, error) `perm:"admin"` SealPreCommit2 func(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storiface.CallID, error) `perm:"admin"`
SealCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) `perm:"admin"` SealCommit1 func(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) `perm:"admin"`
SealCommit2 func(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storiface.CallID, error) `perm:"admin"` SealCommit2 func(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storiface.CallID, error) `perm:"admin"`
FinalizeSector func(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) (storiface.CallID, error) `perm:"admin"` FinalizeSector func(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) (storiface.CallID, error) `perm:"admin"`
ReleaseUnsealed func(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) (storiface.CallID, error) `perm:"admin"` ReleaseUnsealed func(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) (storiface.CallID, error) `perm:"admin"`
MoveStorage func(ctx context.Context, sector abi.SectorID, types stores.SectorFileType) (storiface.CallID, error) `perm:"admin"` MoveStorage func(ctx context.Context, sector abi.SectorID, types storiface.SectorFileType) (storiface.CallID, error) `perm:"admin"`
UnsealPiece func(context.Context, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (storiface.CallID, error) `perm:"admin"` UnsealPiece func(context.Context, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (storiface.CallID, error) `perm:"admin"`
ReadPiece func(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) (storiface.CallID, error) `perm:"admin"` ReadPiece func(context.Context, io.Writer, abi.SectorID, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) (storiface.CallID, error) `perm:"admin"`
Fetch func(context.Context, abi.SectorID, stores.SectorFileType, stores.PathType, stores.AcquireMode) (storiface.CallID, error) `perm:"admin"` Fetch func(context.Context, abi.SectorID, storiface.SectorFileType, storiface.PathType, storiface.AcquireMode) (storiface.CallID, error) `perm:"admin"`
Remove func(ctx context.Context, sector abi.SectorID) error `perm:"admin"` Remove func(ctx context.Context, sector abi.SectorID) error `perm:"admin"`
StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"` StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"`
Closing func(context.Context) (<-chan struct{}, error) `perm:"admin"` Closing func(context.Context) (<-chan struct{}, error) `perm:"admin"`
} }
} }
@ -1102,15 +1101,15 @@ func (c *StorageMinerStruct) StorageAttach(ctx context.Context, si stores.Storag
return c.Internal.StorageAttach(ctx, si, st) return c.Internal.StorageAttach(ctx, si, st)
} }
func (c *StorageMinerStruct) StorageDeclareSector(ctx context.Context, storageId stores.ID, s abi.SectorID, ft stores.SectorFileType, primary bool) error { func (c *StorageMinerStruct) StorageDeclareSector(ctx context.Context, storageId stores.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error {
return c.Internal.StorageDeclareSector(ctx, storageId, s, ft, primary) return c.Internal.StorageDeclareSector(ctx, storageId, s, ft, primary)
} }
func (c *StorageMinerStruct) StorageDropSector(ctx context.Context, storageId stores.ID, s abi.SectorID, ft stores.SectorFileType) error { func (c *StorageMinerStruct) StorageDropSector(ctx context.Context, storageId stores.ID, s abi.SectorID, ft storiface.SectorFileType) error {
return c.Internal.StorageDropSector(ctx, storageId, s, ft) return c.Internal.StorageDropSector(ctx, storageId, s, ft)
} }
func (c *StorageMinerStruct) StorageFindSector(ctx context.Context, si abi.SectorID, types stores.SectorFileType, spt abi.RegisteredSealProof, allowFetch bool) ([]stores.SectorStorageInfo, error) { func (c *StorageMinerStruct) StorageFindSector(ctx context.Context, si abi.SectorID, types storiface.SectorFileType, spt abi.RegisteredSealProof, allowFetch bool) ([]stores.SectorStorageInfo, error) {
return c.Internal.StorageFindSector(ctx, si, types, spt, allowFetch) return c.Internal.StorageFindSector(ctx, si, types, spt, allowFetch)
} }
@ -1130,7 +1129,7 @@ func (c *StorageMinerStruct) StorageInfo(ctx context.Context, id stores.ID) (sto
return c.Internal.StorageInfo(ctx, id) return c.Internal.StorageInfo(ctx, id)
} }
func (c *StorageMinerStruct) StorageBestAlloc(ctx context.Context, allocate stores.SectorFileType, spt abi.RegisteredSealProof, pt stores.PathType) ([]stores.StorageInfo, error) { func (c *StorageMinerStruct) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, spt abi.RegisteredSealProof, pt storiface.PathType) ([]stores.StorageInfo, error) {
return c.Internal.StorageBestAlloc(ctx, allocate, spt, pt) return c.Internal.StorageBestAlloc(ctx, allocate, spt, pt)
} }
@ -1138,11 +1137,11 @@ func (c *StorageMinerStruct) StorageReportHealth(ctx context.Context, id stores.
return c.Internal.StorageReportHealth(ctx, id, report) return c.Internal.StorageReportHealth(ctx, id, report)
} }
func (c *StorageMinerStruct) StorageLock(ctx context.Context, sector abi.SectorID, read stores.SectorFileType, write stores.SectorFileType) error { func (c *StorageMinerStruct) StorageLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) error {
return c.Internal.StorageLock(ctx, sector, read, write) return c.Internal.StorageLock(ctx, sector, read, write)
} }
func (c *StorageMinerStruct) StorageTryLock(ctx context.Context, sector abi.SectorID, read stores.SectorFileType, write stores.SectorFileType) (bool, error) { func (c *StorageMinerStruct) StorageTryLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) {
return c.Internal.StorageTryLock(ctx, sector, read, write) return c.Internal.StorageTryLock(ctx, sector, read, write)
} }
@ -1304,7 +1303,7 @@ func (w *WorkerStruct) ReleaseUnsealed(ctx context.Context, sector abi.SectorID,
return w.Internal.ReleaseUnsealed(ctx, sector, safeToFree) return w.Internal.ReleaseUnsealed(ctx, sector, safeToFree)
} }
func (w *WorkerStruct) MoveStorage(ctx context.Context, sector abi.SectorID, types stores.SectorFileType) (storiface.CallID, error) { func (w *WorkerStruct) MoveStorage(ctx context.Context, sector abi.SectorID, types storiface.SectorFileType) (storiface.CallID, error) {
return w.Internal.MoveStorage(ctx, sector, types) return w.Internal.MoveStorage(ctx, sector, types)
} }
@ -1316,7 +1315,7 @@ func (w *WorkerStruct) ReadPiece(ctx context.Context, sink io.Writer, sector abi
return w.Internal.ReadPiece(ctx, sink, sector, offset, size) return w.Internal.ReadPiece(ctx, sink, sector, offset, size)
} }
func (w *WorkerStruct) Fetch(ctx context.Context, id abi.SectorID, fileType stores.SectorFileType, ptype stores.PathType, am stores.AcquireMode) (storiface.CallID, error) { func (w *WorkerStruct) Fetch(ctx context.Context, id abi.SectorID, fileType storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
return w.Internal.Fetch(ctx, id, fileType, ptype, am) return w.Internal.Fetch(ctx, id, fileType, ptype, am)
} }

View File

@ -23,7 +23,7 @@ import (
lcli "github.com/filecoin-project/lotus/cli" lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper/basicfs" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper/basicfs"
"github.com/filecoin-project/lotus/extern/sector-storage/stores" "github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/builtin/miner" "github.com/filecoin-project/specs-actors/actors/builtin/miner"
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
@ -612,7 +612,7 @@ func runSeals(sb *ffiwrapper.Sealer, sbfs *basicfs.Provider, numSectors int, par
if !skipunseal { if !skipunseal {
log.Infof("[%d] Unsealing sector", i) log.Infof("[%d] Unsealing sector", i)
{ {
p, done, err := sbfs.AcquireSector(context.TODO(), abi.SectorID{Miner: mid, Number: 1}, stores.FTUnsealed, stores.FTNone, stores.PathSealing) p, done, err := sbfs.AcquireSector(context.TODO(), abi.SectorID{Miner: mid, Number: 1}, storiface.FTUnsealed, storiface.FTNone, storiface.PathSealing)
if err != nil { if err != nil {
return xerrors.Errorf("acquire unsealed sector for removing: %w", err) return xerrors.Errorf("acquire unsealed sector for removing: %w", err)
} }

View File

@ -6,6 +6,7 @@ import (
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
@ -187,7 +188,7 @@ func presealSector(sb *ffiwrapper.Sealer, sbfs *basicfs.Provider, sid abi.Sector
} }
func presealSectorFake(sbfs *basicfs.Provider, sid abi.SectorID, spt abi.RegisteredSealProof, ssize abi.SectorSize) (*genesis.PreSeal, error) { func presealSectorFake(sbfs *basicfs.Provider, sid abi.SectorID, spt abi.RegisteredSealProof, ssize abi.SectorSize) (*genesis.PreSeal, error) {
paths, done, err := sbfs.AcquireSector(context.TODO(), sid, 0, stores.FTSealed|stores.FTCache, stores.PathSealing) paths, done, err := sbfs.AcquireSector(context.TODO(), sid, 0, storiface.FTSealed|storiface.FTCache, storiface.PathSealing)
if err != nil { if err != nil {
return nil, xerrors.Errorf("acquire unsealed sector: %w", err) return nil, xerrors.Errorf("acquire unsealed sector: %w", err)
} }
@ -211,7 +212,7 @@ func presealSectorFake(sbfs *basicfs.Provider, sid abi.SectorID, spt abi.Registe
} }
func cleanupUnsealed(sbfs *basicfs.Provider, sid abi.SectorID) error { func cleanupUnsealed(sbfs *basicfs.Provider, sid abi.SectorID) error {
paths, done, err := sbfs.AcquireSector(context.TODO(), sid, stores.FTUnsealed, stores.FTNone, stores.PathSealing) paths, done, err := sbfs.AcquireSector(context.TODO(), sid, storiface.FTUnsealed, storiface.FTNone, storiface.PathSealing)
if err != nil { if err != nil {
return err return err
} }

View File

@ -3,6 +3,7 @@ package main
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
@ -294,17 +295,17 @@ var storageFindCmd = &cli.Command{
Number: abi.SectorNumber(snum), Number: abi.SectorNumber(snum),
} }
u, err := nodeApi.StorageFindSector(ctx, sid, stores.FTUnsealed, 0, false) u, err := nodeApi.StorageFindSector(ctx, sid, storiface.FTUnsealed, 0, false)
if err != nil { if err != nil {
return xerrors.Errorf("finding unsealed: %w", err) return xerrors.Errorf("finding unsealed: %w", err)
} }
s, err := nodeApi.StorageFindSector(ctx, sid, stores.FTSealed, 0, false) s, err := nodeApi.StorageFindSector(ctx, sid, storiface.FTSealed, 0, false)
if err != nil { if err != nil {
return xerrors.Errorf("finding sealed: %w", err) return xerrors.Errorf("finding sealed: %w", err)
} }
c, err := nodeApi.StorageFindSector(ctx, sid, stores.FTCache, 0, false) c, err := nodeApi.StorageFindSector(ctx, sid, storiface.FTCache, 0, false)
if err != nil { if err != nil {
return xerrors.Errorf("finding cache: %w", err) return xerrors.Errorf("finding cache: %w", err)
} }

View File

@ -3,12 +3,12 @@ package sectorstorage
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"os" "os"
"path/filepath" "path/filepath"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
) )
@ -32,7 +32,7 @@ func (m *Manager) CheckProvable(ctx context.Context, spt abi.RegisteredSealProof
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
locked, err := m.index.StorageTryLock(ctx, sector, stores.FTSealed|stores.FTCache, stores.FTNone) locked, err := m.index.StorageTryLock(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.FTNone)
if err != nil { if err != nil {
return xerrors.Errorf("acquiring sector lock: %w", err) return xerrors.Errorf("acquiring sector lock: %w", err)
} }
@ -43,7 +43,7 @@ func (m *Manager) CheckProvable(ctx context.Context, spt abi.RegisteredSealProof
return nil return nil
} }
lp, _, err := m.localStore.AcquireSector(ctx, sector, spt, stores.FTSealed|stores.FTCache, stores.FTNone, stores.PathStorage, stores.AcquireMove) lp, _, err := m.localStore.AcquireSector(ctx, sector, spt, storiface.FTSealed|storiface.FTCache, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
if err != nil { if err != nil {
log.Warnw("CheckProvable Sector FAULT: acquire sector in checkProvable", "sector", sector, "error", err) log.Warnw("CheckProvable Sector FAULT: acquire sector in checkProvable", "sector", sector, "error", err)
bad = append(bad, sector) bad = append(bad, sector)

View File

@ -8,13 +8,12 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface" "github.com/filecoin-project/lotus/extern/sector-storage/storiface"
) )
type sectorFile struct { type sectorFile struct {
abi.SectorID abi.SectorID
stores.SectorFileType storiface.SectorFileType
} }
type Provider struct { type Provider struct {
@ -24,24 +23,24 @@ type Provider struct {
waitSector map[sectorFile]chan struct{} waitSector map[sectorFile]chan struct{}
} }
func (b *Provider) AcquireSector(ctx context.Context, id abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, ptype stores.PathType) (stores.SectorPaths, func(), error) { func (b *Provider) AcquireSector(ctx context.Context, id abi.SectorID, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error) {
if err := os.Mkdir(filepath.Join(b.Root, stores.FTUnsealed.String()), 0755); err != nil && !os.IsExist(err) { // nolint if err := os.Mkdir(filepath.Join(b.Root, storiface.FTUnsealed.String()), 0755); err != nil && !os.IsExist(err) { // nolint
return stores.SectorPaths{}, nil, err return storiface.SectorPaths{}, nil, err
} }
if err := os.Mkdir(filepath.Join(b.Root, stores.FTSealed.String()), 0755); err != nil && !os.IsExist(err) { // nolint if err := os.Mkdir(filepath.Join(b.Root, storiface.FTSealed.String()), 0755); err != nil && !os.IsExist(err) { // nolint
return stores.SectorPaths{}, nil, err return storiface.SectorPaths{}, nil, err
} }
if err := os.Mkdir(filepath.Join(b.Root, stores.FTCache.String()), 0755); err != nil && !os.IsExist(err) { // nolint if err := os.Mkdir(filepath.Join(b.Root, storiface.FTCache.String()), 0755); err != nil && !os.IsExist(err) { // nolint
return stores.SectorPaths{}, nil, err return storiface.SectorPaths{}, nil, err
} }
done := func() {} done := func() {}
out := stores.SectorPaths{ out := storiface.SectorPaths{
ID: id, ID: id,
} }
for _, fileType := range stores.PathTypes { for _, fileType := range storiface.PathTypes {
if !existing.Has(fileType) && !allocate.Has(fileType) { if !existing.Has(fileType) && !allocate.Has(fileType) {
continue continue
} }
@ -61,10 +60,10 @@ func (b *Provider) AcquireSector(ctx context.Context, id abi.SectorID, existing
case ch <- struct{}{}: case ch <- struct{}{}:
case <-ctx.Done(): case <-ctx.Done():
done() done()
return stores.SectorPaths{}, nil, ctx.Err() return storiface.SectorPaths{}, nil, ctx.Err()
} }
path := filepath.Join(b.Root, fileType.String(), stores.SectorName(id)) path := filepath.Join(b.Root, fileType.String(), storiface.SectorName(id))
prevDone := done prevDone := done
done = func() { done = func() {
@ -75,11 +74,11 @@ func (b *Provider) AcquireSector(ctx context.Context, id abi.SectorID, existing
if !allocate.Has(fileType) { if !allocate.Has(fileType) {
if _, err := os.Stat(path); os.IsNotExist(err) { if _, err := os.Stat(path); os.IsNotExist(err) {
done() done()
return stores.SectorPaths{}, nil, storiface.ErrSectorNotFound return storiface.SectorPaths{}, nil, storiface.ErrSectorNotFound
} }
} }
stores.SetPathByType(&out, fileType, path) storiface.SetPathByType(&out, fileType, path)
} }
return out, done, nil return out, done, nil

View File

@ -21,7 +21,6 @@ import (
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/fr32" "github.com/filecoin-project/lotus/extern/sector-storage/fr32"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface" "github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/lotus/extern/sector-storage/zerocomm" "github.com/filecoin-project/lotus/extern/sector-storage/zerocomm"
) )
@ -80,9 +79,9 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
} }
}() }()
var stagedPath stores.SectorPaths var stagedPath storiface.SectorPaths
if len(existingPieceSizes) == 0 { if len(existingPieceSizes) == 0 {
stagedPath, done, err = sb.sectors.AcquireSector(ctx, sector, 0, stores.FTUnsealed, stores.PathSealing) stagedPath, done, err = sb.sectors.AcquireSector(ctx, sector, 0, storiface.FTUnsealed, storiface.PathSealing)
if err != nil { if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err) return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err)
} }
@ -92,7 +91,7 @@ func (sb *Sealer) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
return abi.PieceInfo{}, xerrors.Errorf("creating unsealed sector file: %w", err) return abi.PieceInfo{}, xerrors.Errorf("creating unsealed sector file: %w", err)
} }
} else { } else {
stagedPath, done, err = sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, 0, stores.PathSealing) stagedPath, done, err = sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, 0, storiface.PathSealing)
if err != nil { if err != nil {
return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err) return abi.PieceInfo{}, xerrors.Errorf("acquire unsealed sector: %w", err)
} }
@ -199,12 +198,12 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s
maxPieceSize := abi.PaddedPieceSize(sb.ssize) maxPieceSize := abi.PaddedPieceSize(sb.ssize)
// try finding existing // try finding existing
unsealedPath, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTNone, stores.PathStorage) unsealedPath, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, storiface.FTNone, storiface.PathStorage)
var pf *partialFile var pf *partialFile
switch { switch {
case xerrors.Is(err, storiface.ErrSectorNotFound): case xerrors.Is(err, storiface.ErrSectorNotFound):
unsealedPath, done, err = sb.sectors.AcquireSector(ctx, sector, stores.FTNone, stores.FTUnsealed, stores.PathStorage) unsealedPath, done, err = sb.sectors.AcquireSector(ctx, sector, storiface.FTNone, storiface.FTUnsealed, storiface.PathStorage)
if err != nil { if err != nil {
return xerrors.Errorf("acquire unsealed sector path (allocate): %w", err) return xerrors.Errorf("acquire unsealed sector path (allocate): %w", err)
} }
@ -241,7 +240,7 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s
return nil return nil
} }
srcPaths, srcDone, err := sb.sectors.AcquireSector(ctx, sector, stores.FTCache|stores.FTSealed, stores.FTNone, stores.PathStorage) srcPaths, srcDone, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache|storiface.FTSealed, storiface.FTNone, storiface.PathStorage)
if err != nil { if err != nil {
return xerrors.Errorf("acquire sealed sector paths: %w", err) return xerrors.Errorf("acquire sealed sector paths: %w", err)
} }
@ -362,7 +361,7 @@ func (sb *Sealer) UnsealPiece(ctx context.Context, sector abi.SectorID, offset s
} }
func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) { func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector abi.SectorID, offset storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
path, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTNone, stores.PathStorage) path, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, storiface.FTNone, storiface.PathStorage)
if err != nil { if err != nil {
return false, xerrors.Errorf("acquire unsealed sector path: %w", err) return false, xerrors.Errorf("acquire unsealed sector path: %w", err)
} }
@ -414,7 +413,7 @@ func (sb *Sealer) ReadPiece(ctx context.Context, writer io.Writer, sector abi.Se
} }
func (sb *Sealer) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) { func (sb *Sealer) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (out storage.PreCommit1Out, err error) {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, stores.FTSealed|stores.FTCache, stores.PathSealing) paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, storiface.FTSealed|storiface.FTCache, storiface.PathSealing)
if err != nil { if err != nil {
return nil, xerrors.Errorf("acquiring sector paths: %w", err) return nil, xerrors.Errorf("acquiring sector paths: %w", err)
} }
@ -471,7 +470,7 @@ func (sb *Sealer) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
} }
func (sb *Sealer) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.PreCommit1Out) (storage.SectorCids, error) { func (sb *Sealer) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase1Out storage.PreCommit1Out) (storage.SectorCids, error) {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTSealed|stores.FTCache, 0, stores.PathSealing) paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache, 0, storiface.PathSealing)
if err != nil { if err != nil {
return storage.SectorCids{}, xerrors.Errorf("acquiring sector paths: %w", err) return storage.SectorCids{}, xerrors.Errorf("acquiring sector paths: %w", err)
} }
@ -489,7 +488,7 @@ func (sb *Sealer) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
} }
func (sb *Sealer) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) { func (sb *Sealer) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) {
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTSealed|stores.FTCache, 0, stores.PathSealing) paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTSealed|storiface.FTCache, 0, storiface.PathSealing)
if err != nil { if err != nil {
return nil, xerrors.Errorf("acquire sector paths: %w", err) return nil, xerrors.Errorf("acquire sector paths: %w", err)
} }
@ -539,7 +538,7 @@ func (sb *Sealer) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU
} }
} }
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTUnsealed, 0, stores.PathStorage) paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTUnsealed, 0, storiface.PathStorage)
if err != nil { if err != nil {
return xerrors.Errorf("acquiring sector cache path: %w", err) return xerrors.Errorf("acquiring sector cache path: %w", err)
} }
@ -576,7 +575,7 @@ func (sb *Sealer) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU
} }
} }
paths, done, err := sb.sectors.AcquireSector(ctx, sector, stores.FTCache, 0, stores.PathStorage) paths, done, err := sb.sectors.AcquireSector(ctx, sector, storiface.FTCache, 0, storiface.PathStorage)
if err != nil { if err != nil {
return xerrors.Errorf("acquiring sector cache path: %w", err) return xerrors.Errorf("acquiring sector cache path: %w", err)
} }

View File

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"io" "io"
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
@ -28,7 +29,6 @@ import (
ffi "github.com/filecoin-project/filecoin-ffi" ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper/basicfs" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper/basicfs"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
) )
func init() { func init() {
@ -123,7 +123,7 @@ func (s *seal) unseal(t *testing.T, sb *Sealer, sp *basicfs.Provider, si abi.Sec
t.Fatal("read wrong bytes") t.Fatal("read wrong bytes")
} }
p, sd, err := sp.AcquireSector(context.TODO(), si, stores.FTUnsealed, stores.FTNone, stores.PathStorage) p, sd, err := sp.AcquireSector(context.TODO(), si, storiface.FTUnsealed, storiface.FTNone, storiface.PathStorage)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -10,13 +10,12 @@ import (
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper/basicfs" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper/basicfs"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface" "github.com/filecoin-project/lotus/extern/sector-storage/storiface"
) )
type Validator interface { type Validator interface {
CanCommit(sector stores.SectorPaths) (bool, error) CanCommit(sector storiface.SectorPaths) (bool, error)
CanProve(sector stores.SectorPaths) (bool, error) CanProve(sector storiface.SectorPaths) (bool, error)
} }
type StorageSealer interface { type StorageSealer interface {
@ -43,7 +42,7 @@ type Verifier interface {
type SectorProvider interface { type SectorProvider interface {
// * returns storiface.ErrSectorNotFound if a requested existing sector doesn't exist // * returns storiface.ErrSectorNotFound if a requested existing sector doesn't exist
// * returns an error when allocate is set, and existing isn't, and the sector exists // * returns an error when allocate is set, and existing isn't, and the sector exists
AcquireSector(ctx context.Context, id abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, ptype stores.PathType) (stores.SectorPaths, func(), error) AcquireSector(ctx context.Context, id abi.SectorID, existing storiface.SectorFileType, allocate storiface.SectorFileType, ptype storiface.PathType) (storiface.SectorPaths, func(), error)
} }
var _ SectorProvider = &basicfs.Provider{} var _ SectorProvider = &basicfs.Provider{}

View File

@ -4,6 +4,7 @@ package ffiwrapper
import ( import (
"context" "context"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -11,8 +12,6 @@ import (
ffi "github.com/filecoin-project/filecoin-ffi" ffi "github.com/filecoin-project/filecoin-ffi"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"go.opencensus.io/trace" "go.opencensus.io/trace"
) )
@ -64,7 +63,7 @@ func (sb *Sealer) pubSectorToPriv(ctx context.Context, mid abi.ActorID, sectorIn
sid := abi.SectorID{Miner: mid, Number: s.SectorNumber} sid := abi.SectorID{Miner: mid, Number: s.SectorNumber}
paths, d, err := sb.sectors.AcquireSector(ctx, sid, stores.FTCache|stores.FTSealed, 0, stores.PathStorage) paths, d, err := sb.sectors.AcquireSector(ctx, sid, storiface.FTCache|storiface.FTSealed, 0, storiface.PathStorage)
if err != nil { if err != nil {
log.Warnw("failed to acquire sector, skipping", "sector", sid, "error", err) log.Warnw("failed to acquire sector, skipping", "sector", sid, "error", err)
skipped = append(skipped, sid) skipped = append(skipped, sid)

View File

@ -22,7 +22,7 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/storiface" "github.com/filecoin-project/lotus/extern/sector-storage/storiface"
) )
var pathTypes = []stores.SectorFileType{stores.FTUnsealed, stores.FTSealed, stores.FTCache} var pathTypes = []storiface.SectorFileType{storiface.FTUnsealed, storiface.FTSealed, storiface.FTCache}
type WorkerConfig struct { type WorkerConfig struct {
SealProof abi.RegisteredSealProof SealProof abi.RegisteredSealProof
@ -59,19 +59,19 @@ func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local,
type localWorkerPathProvider struct { type localWorkerPathProvider struct {
w *LocalWorker w *LocalWorker
op stores.AcquireMode op storiface.AcquireMode
} }
func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing stores.PathType) (stores.SectorPaths, func(), error) { func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.SectorID, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) {
paths, storageIDs, err := l.w.storage.AcquireSector(ctx, sector, l.w.scfg.SealProofType, existing, allocate, sealing, l.op) paths, storageIDs, err := l.w.storage.AcquireSector(ctx, sector, l.w.scfg.SealProofType, existing, allocate, sealing, l.op)
if err != nil { if err != nil {
return stores.SectorPaths{}, nil, err return storiface.SectorPaths{}, nil, err
} }
releaseStorage, err := l.w.localStore.Reserve(ctx, sector, l.w.scfg.SealProofType, allocate, storageIDs, stores.FSOverheadSeal) releaseStorage, err := l.w.localStore.Reserve(ctx, sector, l.w.scfg.SealProofType, allocate, storageIDs, storiface.FSOverheadSeal)
if err != nil { if err != nil {
return stores.SectorPaths{}, nil, xerrors.Errorf("reserving storage space: %w", err) return storiface.SectorPaths{}, nil, xerrors.Errorf("reserving storage space: %w", err)
} }
log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths) log.Debugf("acquired sector %d (e:%d; a:%d): %v", sector, existing, allocate, paths)
@ -84,9 +84,9 @@ func (l *localWorkerPathProvider) AcquireSector(ctx context.Context, sector abi.
continue continue
} }
sid := stores.PathByType(storageIDs, fileType) sid := storiface.PathByType(storageIDs, fileType)
if err := l.w.sindex.StorageDeclareSector(ctx, stores.ID(sid), sector, fileType, l.op == stores.AcquireMove); err != nil { if err := l.w.sindex.StorageDeclareSector(ctx, stores.ID(sid), sector, fileType, l.op == storiface.AcquireMove); err != nil {
log.Errorf("declare sector error: %+v", err) log.Errorf("declare sector error: %+v", err)
} }
} }
@ -140,9 +140,9 @@ func (l *LocalWorker) AddPiece(ctx context.Context, sector abi.SectorID, epcs []
}) })
} }
func (l *LocalWorker) Fetch(ctx context.Context, sector abi.SectorID, fileType stores.SectorFileType, ptype stores.PathType, am stores.AcquireMode) (storiface.CallID, error) { func (l *LocalWorker) Fetch(ctx context.Context, sector abi.SectorID, fileType storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
return l.asyncCall(sector, func(ci storiface.CallID) { return l.asyncCall(sector, func(ci storiface.CallID) {
_, done, err := (&localWorkerPathProvider{w: l, op: am}).AcquireSector(ctx, sector, fileType, stores.FTNone, ptype) _, done, err := (&localWorkerPathProvider{w: l, op: am}).AcquireSector(ctx, sector, fileType, storiface.FTNone, ptype)
if err == nil { if err == nil {
done() done()
} }
@ -165,12 +165,12 @@ func (l *LocalWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, t
{ {
// cleanup previous failed attempts if they exist // cleanup previous failed attempts if they exist
if err = l.storage.Remove(ctx, sector, stores.FTSealed, true); err != nil { if err = l.storage.Remove(ctx, sector, storiface.FTSealed, true); err != nil {
err = xerrors.Errorf("cleaning up sealed data: %w", err) err = xerrors.Errorf("cleaning up sealed data: %w", err)
return return
} }
if err = l.storage.Remove(ctx, sector, stores.FTCache, true); err != nil { if err = l.storage.Remove(ctx, sector, storiface.FTCache, true); err != nil {
err = xerrors.Errorf("cleaning up cache data: %w", err) err = xerrors.Errorf("cleaning up cache data: %w", err)
return return
} }
@ -264,20 +264,20 @@ func (l *LocalWorker) ReleaseUnsealed(ctx context.Context, sector abi.SectorID,
func (l *LocalWorker) Remove(ctx context.Context, sector abi.SectorID) error { func (l *LocalWorker) Remove(ctx context.Context, sector abi.SectorID) error {
var err error var err error
if rerr := l.storage.Remove(ctx, sector, stores.FTSealed, true); rerr != nil { if rerr := l.storage.Remove(ctx, sector, storiface.FTSealed, true); rerr != nil {
err = multierror.Append(err, xerrors.Errorf("removing sector (sealed): %w", rerr)) err = multierror.Append(err, xerrors.Errorf("removing sector (sealed): %w", rerr))
} }
if rerr := l.storage.Remove(ctx, sector, stores.FTCache, true); rerr != nil { if rerr := l.storage.Remove(ctx, sector, storiface.FTCache, true); rerr != nil {
err = multierror.Append(err, xerrors.Errorf("removing sector (cache): %w", rerr)) err = multierror.Append(err, xerrors.Errorf("removing sector (cache): %w", rerr))
} }
if rerr := l.storage.Remove(ctx, sector, stores.FTUnsealed, true); rerr != nil { if rerr := l.storage.Remove(ctx, sector, storiface.FTUnsealed, true); rerr != nil {
err = multierror.Append(err, xerrors.Errorf("removing sector (unsealed): %w", rerr)) err = multierror.Append(err, xerrors.Errorf("removing sector (unsealed): %w", rerr))
} }
return err return err
} }
func (l *LocalWorker) MoveStorage(ctx context.Context, sector abi.SectorID, types stores.SectorFileType) (storiface.CallID, error) { func (l *LocalWorker) MoveStorage(ctx context.Context, sector abi.SectorID, types storiface.SectorFileType) (storiface.CallID, error) {
return l.asyncCall(sector, func(ci storiface.CallID) { return l.asyncCall(sector, func(ci storiface.CallID) {
err := l.storage.MoveStorage(ctx, sector, l.scfg.SealProofType, types) err := l.storage.MoveStorage(ctx, sector, l.scfg.SealProofType, types)
@ -306,12 +306,12 @@ func (l *LocalWorker) UnsealPiece(ctx context.Context, sector abi.SectorID, inde
return return
} }
if err = l.storage.RemoveCopies(ctx, sector, stores.FTSealed); err != nil { if err = l.storage.RemoveCopies(ctx, sector, storiface.FTSealed); err != nil {
err = xerrors.Errorf("removing source data: %w", err) err = xerrors.Errorf("removing source data: %w", err)
return return
} }
if err = l.storage.RemoveCopies(ctx, sector, stores.FTCache); err != nil { if err = l.storage.RemoveCopies(ctx, sector, storiface.FTCache); err != nil {
err = xerrors.Errorf("removing source data: %w", err) err = xerrors.Errorf("removing source data: %w", err)
return return
} }

View File

@ -191,7 +191,7 @@ func schedNop(context.Context, Worker) error {
return nil return nil
} }
func schedFetch(sector abi.SectorID, ft stores.SectorFileType, ptype stores.PathType, am stores.AcquireMode) func(context.Context, Worker) error { func schedFetch(sector abi.SectorID, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) func(context.Context, Worker) error {
return func(ctx context.Context, worker Worker) error { return func(ctx context.Context, worker Worker) error {
return worker.Fetch(ctx, sector, ft, ptype, am) return worker.Fetch(ctx, sector, ft, ptype, am)
} }
@ -201,21 +201,21 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
if err := m.index.StorageLock(ctx, sector, stores.FTSealed|stores.FTCache, stores.FTUnsealed); err != nil { if err := m.index.StorageLock(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.FTUnsealed); err != nil {
return xerrors.Errorf("acquiring sector lock: %w", err) return xerrors.Errorf("acquiring sector lock: %w", err)
} }
// passing 0 spt because we only need it when allowFetch is true // passing 0 spt because we only need it when allowFetch is true
best, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, 0, false) best, err := m.index.StorageFindSector(ctx, sector, storiface.FTUnsealed, 0, false)
if err != nil { if err != nil {
return xerrors.Errorf("read piece: checking for already existing unsealed sector: %w", err) return xerrors.Errorf("read piece: checking for already existing unsealed sector: %w", err)
} }
var selector WorkerSelector var selector WorkerSelector
if len(best) == 0 { // new if len(best) == 0 { // new
selector = newAllocSelector(m.index, stores.FTUnsealed, stores.PathSealing) selector = newAllocSelector(m.index, storiface.FTUnsealed, storiface.PathSealing)
} else { // append to existing } else { // append to existing
selector = newExistingSelector(m.index, sector, stores.FTUnsealed, false) selector = newExistingSelector(m.index, sector, storiface.FTUnsealed, false)
} }
var readOk bool var readOk bool
@ -223,9 +223,9 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
if len(best) > 0 { if len(best) > 0 {
// There is unsealed sector, see if we can read from it // There is unsealed sector, see if we can read from it
selector = newExistingSelector(m.index, sector, stores.FTUnsealed, false) selector = newExistingSelector(m.index, sector, storiface.FTUnsealed, false)
err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error { err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
readOk, err = w.ReadPiece(ctx, sink, sector, offset, size) readOk, err = w.ReadPiece(ctx, sink, sector, offset, size)
return err return err
}) })
@ -239,12 +239,12 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
} }
unsealFetch := func(ctx context.Context, worker Worker) error { unsealFetch := func(ctx context.Context, worker Worker) error {
if err := worker.Fetch(ctx, sector, stores.FTSealed|stores.FTCache, stores.PathSealing, stores.AcquireCopy); err != nil { if err := worker.Fetch(ctx, sector, storiface.FTSealed|storiface.FTCache, storiface.PathSealing, storiface.AcquireCopy); err != nil {
return xerrors.Errorf("copy sealed/cache sector data: %w", err) return xerrors.Errorf("copy sealed/cache sector data: %w", err)
} }
if len(best) > 0 { if len(best) > 0 {
if err := worker.Fetch(ctx, sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove); err != nil { if err := worker.Fetch(ctx, sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove); err != nil {
return xerrors.Errorf("copy unsealed sector data: %w", err) return xerrors.Errorf("copy unsealed sector data: %w", err)
} }
} }
@ -258,9 +258,9 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect
return err return err
} }
selector = newExistingSelector(m.index, sector, stores.FTUnsealed, false) selector = newExistingSelector(m.index, sector, storiface.FTUnsealed, false)
err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error { err = m.sched.Schedule(ctx, sector, sealtasks.TTReadUnsealed, selector, schedFetch(sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
readOk, err = w.ReadPiece(ctx, sink, sector, offset, size) readOk, err = w.ReadPiece(ctx, sink, sector, offset, size)
return err return err
}) })
@ -284,16 +284,16 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
if err := m.index.StorageLock(ctx, sector, stores.FTNone, stores.FTUnsealed); err != nil { if err := m.index.StorageLock(ctx, sector, storiface.FTNone, storiface.FTUnsealed); err != nil {
return abi.PieceInfo{}, xerrors.Errorf("acquiring sector lock: %w", err) return abi.PieceInfo{}, xerrors.Errorf("acquiring sector lock: %w", err)
} }
var selector WorkerSelector var selector WorkerSelector
var err error var err error
if len(existingPieces) == 0 { // new if len(existingPieces) == 0 { // new
selector = newAllocSelector(m.index, stores.FTUnsealed, stores.PathSealing) selector = newAllocSelector(m.index, storiface.FTUnsealed, storiface.PathSealing)
} else { // use existing } else { // use existing
selector = newExistingSelector(m.index, sector, stores.FTUnsealed, false) selector = newExistingSelector(m.index, sector, storiface.FTUnsealed, false)
} }
var out abi.PieceInfo var out abi.PieceInfo
@ -313,15 +313,15 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
if err := m.index.StorageLock(ctx, sector, stores.FTUnsealed, stores.FTSealed|stores.FTCache); err != nil { if err := m.index.StorageLock(ctx, sector, storiface.FTUnsealed, storiface.FTSealed|storiface.FTCache); err != nil {
return nil, xerrors.Errorf("acquiring sector lock: %w", err) return nil, xerrors.Errorf("acquiring sector lock: %w", err)
} }
// TODO: also consider where the unsealed data sits // TODO: also consider where the unsealed data sits
selector := newAllocSelector(m.index, stores.FTCache|stores.FTSealed, stores.PathSealing) selector := newAllocSelector(m.index, storiface.FTCache|storiface.FTSealed, storiface.PathSealing)
err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit1, selector, schedFetch(sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error { err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit1, selector, schedFetch(sector, storiface.FTUnsealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
p, err := w.SealPreCommit1(ctx, sector, ticket, pieces) p, err := w.SealPreCommit1(ctx, sector, ticket, pieces)
if err != nil { if err != nil {
return err return err
@ -337,13 +337,13 @@ func (m *Manager) SealPreCommit2(ctx context.Context, sector abi.SectorID, phase
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
if err := m.index.StorageLock(ctx, sector, stores.FTSealed, stores.FTCache); err != nil { if err := m.index.StorageLock(ctx, sector, storiface.FTSealed, storiface.FTCache); err != nil {
return storage.SectorCids{}, xerrors.Errorf("acquiring sector lock: %w", err) return storage.SectorCids{}, xerrors.Errorf("acquiring sector lock: %w", err)
} }
selector := newExistingSelector(m.index, sector, stores.FTCache|stores.FTSealed, true) selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, true)
err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit2, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error { err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit2, selector, schedFetch(sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
p, err := w.SealPreCommit2(ctx, sector, phase1Out) p, err := w.SealPreCommit2(ctx, sector, phase1Out)
if err != nil { if err != nil {
return err return err
@ -358,16 +358,16 @@ func (m *Manager) SealCommit1(ctx context.Context, sector abi.SectorID, ticket a
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
if err := m.index.StorageLock(ctx, sector, stores.FTSealed, stores.FTCache); err != nil { if err := m.index.StorageLock(ctx, sector, storiface.FTSealed, storiface.FTCache); err != nil {
return storage.Commit1Out{}, xerrors.Errorf("acquiring sector lock: %w", err) return storage.Commit1Out{}, xerrors.Errorf("acquiring sector lock: %w", err)
} }
// NOTE: We set allowFetch to false in so that we always execute on a worker // NOTE: We set allowFetch to false in so that we always execute on a worker
// with direct access to the data. We want to do that because this step is // with direct access to the data. We want to do that because this step is
// generally very cheap / fast, and transferring data is not worth the effort // generally very cheap / fast, and transferring data is not worth the effort
selector := newExistingSelector(m.index, sector, stores.FTCache|stores.FTSealed, false) selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, false)
err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, schedFetch(sector, stores.FTCache|stores.FTSealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error { err = m.sched.Schedule(ctx, sector, sealtasks.TTCommit1, selector, schedFetch(sector, storiface.FTCache|storiface.FTSealed, storiface.PathSealing, storiface.AcquireMove), func(ctx context.Context, w Worker) error {
p, err := w.SealCommit1(ctx, sector, ticket, seed, pieces, cids) p, err := w.SealCommit1(ctx, sector, ticket, seed, pieces, cids)
if err != nil { if err != nil {
return err return err
@ -397,26 +397,26 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
if err := m.index.StorageLock(ctx, sector, stores.FTNone, stores.FTSealed|stores.FTUnsealed|stores.FTCache); err != nil { if err := m.index.StorageLock(ctx, sector, storiface.FTNone, storiface.FTSealed|storiface.FTUnsealed|storiface.FTCache); err != nil {
return xerrors.Errorf("acquiring sector lock: %w", err) return xerrors.Errorf("acquiring sector lock: %w", err)
} }
unsealed := stores.FTUnsealed unsealed := storiface.FTUnsealed
{ {
unsealedStores, err := m.index.StorageFindSector(ctx, sector, stores.FTUnsealed, 0, false) unsealedStores, err := m.index.StorageFindSector(ctx, sector, storiface.FTUnsealed, 0, false)
if err != nil { if err != nil {
return xerrors.Errorf("finding unsealed sector: %w", err) return xerrors.Errorf("finding unsealed sector: %w", err)
} }
if len(unsealedStores) == 0 { // Is some edge-cases unsealed sector may not exist already, that's fine if len(unsealedStores) == 0 { // Is some edge-cases unsealed sector may not exist already, that's fine
unsealed = stores.FTNone unsealed = storiface.FTNone
} }
} }
selector := newExistingSelector(m.index, sector, stores.FTCache|stores.FTSealed, false) selector := newExistingSelector(m.index, sector, storiface.FTCache|storiface.FTSealed, false)
err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector, err := m.sched.Schedule(ctx, sector, sealtasks.TTFinalize, selector,
schedFetch(sector, stores.FTCache|stores.FTSealed|unsealed, stores.PathSealing, stores.AcquireMove), schedFetch(sector, storiface.FTCache|storiface.FTSealed|unsealed, storiface.PathSealing, storiface.AcquireMove),
func(ctx context.Context, w Worker) error { func(ctx context.Context, w Worker) error {
return w.FinalizeSector(ctx, sector, keepUnsealed) return w.FinalizeSector(ctx, sector, keepUnsealed)
}) })
@ -424,18 +424,18 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU
return err return err
} }
fetchSel := newAllocSelector(m.index, stores.FTCache|stores.FTSealed, stores.PathStorage) fetchSel := newAllocSelector(m.index, storiface.FTCache|storiface.FTSealed, storiface.PathStorage)
moveUnsealed := unsealed moveUnsealed := unsealed
{ {
if len(keepUnsealed) == 0 { if len(keepUnsealed) == 0 {
moveUnsealed = stores.FTNone moveUnsealed = storiface.FTNone
} }
} }
err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel, err = m.sched.Schedule(ctx, sector, sealtasks.TTFetch, fetchSel,
schedFetch(sector, stores.FTCache|stores.FTSealed|moveUnsealed, stores.PathStorage, stores.AcquireMove), schedFetch(sector, storiface.FTCache|storiface.FTSealed|moveUnsealed, storiface.PathStorage, storiface.AcquireMove),
func(ctx context.Context, w Worker) error { func(ctx context.Context, w Worker) error {
return w.MoveStorage(ctx, sector, stores.FTCache|stores.FTSealed|moveUnsealed) return w.MoveStorage(ctx, sector, storiface.FTCache|storiface.FTSealed|moveUnsealed)
}) })
if err != nil { if err != nil {
return xerrors.Errorf("moving sector to storage: %w", err) return xerrors.Errorf("moving sector to storage: %w", err)
@ -453,19 +453,19 @@ func (m *Manager) Remove(ctx context.Context, sector abi.SectorID) error {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
if err := m.index.StorageLock(ctx, sector, stores.FTNone, stores.FTSealed|stores.FTUnsealed|stores.FTCache); err != nil { if err := m.index.StorageLock(ctx, sector, storiface.FTNone, storiface.FTSealed|storiface.FTUnsealed|storiface.FTCache); err != nil {
return xerrors.Errorf("acquiring sector lock: %w", err) return xerrors.Errorf("acquiring sector lock: %w", err)
} }
var err error var err error
if rerr := m.storage.Remove(ctx, sector, stores.FTSealed, true); rerr != nil { if rerr := m.storage.Remove(ctx, sector, storiface.FTSealed, true); rerr != nil {
err = multierror.Append(err, xerrors.Errorf("removing sector (sealed): %w", rerr)) err = multierror.Append(err, xerrors.Errorf("removing sector (sealed): %w", rerr))
} }
if rerr := m.storage.Remove(ctx, sector, stores.FTCache, true); rerr != nil { if rerr := m.storage.Remove(ctx, sector, storiface.FTCache, true); rerr != nil {
err = multierror.Append(err, xerrors.Errorf("removing sector (cache): %w", rerr)) err = multierror.Append(err, xerrors.Errorf("removing sector (cache): %w", rerr))
} }
if rerr := m.storage.Remove(ctx, sector, stores.FTUnsealed, true); rerr != nil { if rerr := m.storage.Remove(ctx, sector, storiface.FTUnsealed, true); rerr != nil {
err = multierror.Append(err, xerrors.Errorf("removing sector (unsealed): %w", rerr)) err = multierror.Append(err, xerrors.Errorf("removing sector (unsealed): %w", rerr))
} }

View File

@ -2,6 +2,7 @@ package sectorstorage
import ( import (
"context" "context"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -16,25 +17,25 @@ type readonlyProvider struct {
spt abi.RegisteredSealProof spt abi.RegisteredSealProof
} }
func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, existing stores.SectorFileType, allocate stores.SectorFileType, sealing stores.PathType) (stores.SectorPaths, func(), error) { func (l *readonlyProvider) AcquireSector(ctx context.Context, id abi.SectorID, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) {
if allocate != stores.FTNone { if allocate != storiface.FTNone {
return stores.SectorPaths{}, nil, xerrors.New("read-only storage") return storiface.SectorPaths{}, nil, xerrors.New("read-only storage")
} }
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
// use TryLock to avoid blocking // use TryLock to avoid blocking
locked, err := l.index.StorageTryLock(ctx, id, existing, stores.FTNone) locked, err := l.index.StorageTryLock(ctx, id, existing, storiface.FTNone)
if err != nil { if err != nil {
cancel() cancel()
return stores.SectorPaths{}, nil, xerrors.Errorf("acquiring sector lock: %w", err) return storiface.SectorPaths{}, nil, xerrors.Errorf("acquiring sector lock: %w", err)
} }
if !locked { if !locked {
cancel() cancel()
return stores.SectorPaths{}, nil, xerrors.Errorf("failed to acquire sector lock") return storiface.SectorPaths{}, nil, xerrors.Errorf("failed to acquire sector lock")
} }
p, _, err := l.stor.AcquireSector(ctx, id, l.spt, existing, allocate, sealing, stores.AcquireMove) p, _, err := l.stor.AcquireSector(ctx, id, l.spt, existing, allocate, sealing, storiface.AcquireMove)
return p, cancel, err return p, cancel, err
} }

View File

@ -82,11 +82,11 @@ func (s *schedTestWorker) AddPiece(ctx context.Context, sector abi.SectorID, pie
panic("implement me") panic("implement me")
} }
func (s *schedTestWorker) MoveStorage(ctx context.Context, sector abi.SectorID, types stores.SectorFileType) error { func (s *schedTestWorker) MoveStorage(ctx context.Context, sector abi.SectorID, types storiface.SectorFileType) error {
panic("implement me") panic("implement me")
} }
func (s *schedTestWorker) Fetch(ctx context.Context, id abi.SectorID, ft stores.SectorFileType, ptype stores.PathType, am stores.AcquireMode) error { func (s *schedTestWorker) Fetch(ctx context.Context, id abi.SectorID, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) error {
panic("implement me") panic("implement me")
} }
@ -215,7 +215,7 @@ func TestSched(t *testing.T) {
done := make(chan struct{}) done := make(chan struct{})
rm.done[taskName] = done rm.done[taskName] = done
sel := newAllocSelector(index, stores.FTCache, stores.PathSealing) sel := newAllocSelector(index, storiface.FTCache, storiface.PathSealing)
rm.wg.Add(1) rm.wg.Add(1)
go func() { go func() {

View File

@ -2,6 +2,7 @@ package sectorstorage
import ( import (
"context" "context"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -13,11 +14,11 @@ import (
type allocSelector struct { type allocSelector struct {
index stores.SectorIndex index stores.SectorIndex
alloc stores.SectorFileType alloc storiface.SectorFileType
ptype stores.PathType ptype storiface.PathType
} }
func newAllocSelector(index stores.SectorIndex, alloc stores.SectorFileType, ptype stores.PathType) *allocSelector { func newAllocSelector(index stores.SectorIndex, alloc storiface.SectorFileType, ptype storiface.PathType) *allocSelector {
return &allocSelector{ return &allocSelector{
index: index, index: index,
alloc: alloc, alloc: alloc,

View File

@ -2,6 +2,7 @@ package sectorstorage
import ( import (
"context" "context"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -14,11 +15,11 @@ import (
type existingSelector struct { type existingSelector struct {
index stores.SectorIndex index stores.SectorIndex
sector abi.SectorID sector abi.SectorID
alloc stores.SectorFileType alloc storiface.SectorFileType
allowFetch bool allowFetch bool
} }
func newExistingSelector(index stores.SectorIndex, sector abi.SectorID, alloc stores.SectorFileType, allowFetch bool) *existingSelector { func newExistingSelector(index stores.SectorIndex, sector abi.SectorID, alloc storiface.SectorFileType, allowFetch bool) *existingSelector {
return &existingSelector{ return &existingSelector{
index: index, index: index,
sector: sector, sector: sector,

View File

@ -2,6 +2,7 @@ package stores
import ( import (
"encoding/json" "encoding/json"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"io" "io"
"net/http" "net/http"
"os" "os"
@ -55,7 +56,7 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ
log.Infof("SERVE GET %s", r.URL) log.Infof("SERVE GET %s", r.URL)
vars := mux.Vars(r) vars := mux.Vars(r)
id, err := ParseSectorID(vars["id"]) id, err := storiface.ParseSectorID(vars["id"])
if err != nil { if err != nil {
log.Error("%+v", err) log.Error("%+v", err)
w.WriteHeader(500) w.WriteHeader(500)
@ -72,7 +73,7 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ
// The caller has a lock on this sector already, no need to get one here // The caller has a lock on this sector already, no need to get one here
// passing 0 spt because we don't allocate anything // passing 0 spt because we don't allocate anything
paths, _, err := handler.Local.AcquireSector(r.Context(), id, 0, ft, FTNone, PathStorage, AcquireMove) paths, _, err := handler.Local.AcquireSector(r.Context(), id, 0, ft, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
if err != nil { if err != nil {
log.Error("%+v", err) log.Error("%+v", err)
w.WriteHeader(500) w.WriteHeader(500)
@ -81,7 +82,7 @@ func (handler *FetchHandler) remoteGetSector(w http.ResponseWriter, r *http.Requ
// TODO: reserve local storage here // TODO: reserve local storage here
path := PathByType(paths, ft) path := storiface.PathByType(paths, ft)
if path == "" { if path == "" {
log.Error("acquired path was empty") log.Error("acquired path was empty")
w.WriteHeader(500) w.WriteHeader(500)
@ -120,7 +121,7 @@ func (handler *FetchHandler) remoteDeleteSector(w http.ResponseWriter, r *http.R
log.Infof("SERVE DELETE %s", r.URL) log.Infof("SERVE DELETE %s", r.URL)
vars := mux.Vars(r) vars := mux.Vars(r)
id, err := ParseSectorID(vars["id"]) id, err := storiface.ParseSectorID(vars["id"])
if err != nil { if err != nil {
log.Error("%+v", err) log.Error("%+v", err)
w.WriteHeader(500) w.WriteHeader(500)
@ -141,14 +142,14 @@ func (handler *FetchHandler) remoteDeleteSector(w http.ResponseWriter, r *http.R
} }
} }
func ftFromString(t string) (SectorFileType, error) { func ftFromString(t string) (storiface.SectorFileType, error) {
switch t { switch t {
case FTUnsealed.String(): case storiface.FTUnsealed.String():
return FTUnsealed, nil return storiface.FTUnsealed, nil
case FTSealed.String(): case storiface.FTSealed.String():
return FTSealed, nil return storiface.FTSealed, nil
case FTCache.String(): case storiface.FTCache.String():
return FTCache, nil return storiface.FTCache, nil
default: default:
return 0, xerrors.Errorf("unknown sector file type: '%s'", t) return 0, xerrors.Errorf("unknown sector file type: '%s'", t)
} }

View File

@ -2,6 +2,7 @@ package stores
import ( import (
"context" "context"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"net/url" "net/url"
gopath "path" gopath "path"
"sort" "sort"
@ -53,20 +54,20 @@ type SectorIndex interface { // part of storage-miner api
StorageInfo(context.Context, ID) (StorageInfo, error) StorageInfo(context.Context, ID) (StorageInfo, error)
StorageReportHealth(context.Context, ID, HealthReport) error StorageReportHealth(context.Context, ID, HealthReport) error
StorageDeclareSector(ctx context.Context, storageID ID, s abi.SectorID, ft SectorFileType, primary bool) error StorageDeclareSector(ctx context.Context, storageID ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error
StorageDropSector(ctx context.Context, storageID ID, s abi.SectorID, ft SectorFileType) error StorageDropSector(ctx context.Context, storageID ID, s abi.SectorID, ft storiface.SectorFileType) error
StorageFindSector(ctx context.Context, sector abi.SectorID, ft SectorFileType, spt abi.RegisteredSealProof, allowFetch bool) ([]SectorStorageInfo, error) StorageFindSector(ctx context.Context, sector abi.SectorID, ft storiface.SectorFileType, spt abi.RegisteredSealProof, allowFetch bool) ([]SectorStorageInfo, error)
StorageBestAlloc(ctx context.Context, allocate SectorFileType, spt abi.RegisteredSealProof, pathType PathType) ([]StorageInfo, error) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, spt abi.RegisteredSealProof, pathType storiface.PathType) ([]StorageInfo, error)
// atomically acquire locks on all sector file types. close ctx to unlock // atomically acquire locks on all sector file types. close ctx to unlock
StorageLock(ctx context.Context, sector abi.SectorID, read SectorFileType, write SectorFileType) error StorageLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) error
StorageTryLock(ctx context.Context, sector abi.SectorID, read SectorFileType, write SectorFileType) (bool, error) StorageTryLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error)
} }
type Decl struct { type Decl struct {
abi.SectorID abi.SectorID
SectorFileType storiface.SectorFileType
} }
type declMeta struct { type declMeta struct {
@ -104,10 +105,10 @@ func (i *Index) StorageList(ctx context.Context) (map[ID][]Decl, error) {
i.lk.RLock() i.lk.RLock()
defer i.lk.RUnlock() defer i.lk.RUnlock()
byID := map[ID]map[abi.SectorID]SectorFileType{} byID := map[ID]map[abi.SectorID]storiface.SectorFileType{}
for id := range i.stores { for id := range i.stores {
byID[id] = map[abi.SectorID]SectorFileType{} byID[id] = map[abi.SectorID]storiface.SectorFileType{}
} }
for decl, ids := range i.sectors { for decl, ids := range i.sectors {
for _, id := range ids { for _, id := range ids {
@ -180,12 +181,12 @@ func (i *Index) StorageReportHealth(ctx context.Context, id ID, report HealthRep
return nil return nil
} }
func (i *Index) StorageDeclareSector(ctx context.Context, storageID ID, s abi.SectorID, ft SectorFileType, primary bool) error { func (i *Index) StorageDeclareSector(ctx context.Context, storageID ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error {
i.lk.Lock() i.lk.Lock()
defer i.lk.Unlock() defer i.lk.Unlock()
loop: loop:
for _, fileType := range PathTypes { for _, fileType := range storiface.PathTypes {
if fileType&ft == 0 { if fileType&ft == 0 {
continue continue
} }
@ -212,11 +213,11 @@ loop:
return nil return nil
} }
func (i *Index) StorageDropSector(ctx context.Context, storageID ID, s abi.SectorID, ft SectorFileType) error { func (i *Index) StorageDropSector(ctx context.Context, storageID ID, s abi.SectorID, ft storiface.SectorFileType) error {
i.lk.Lock() i.lk.Lock()
defer i.lk.Unlock() defer i.lk.Unlock()
for _, fileType := range PathTypes { for _, fileType := range storiface.PathTypes {
if fileType&ft == 0 { if fileType&ft == 0 {
continue continue
} }
@ -246,14 +247,14 @@ func (i *Index) StorageDropSector(ctx context.Context, storageID ID, s abi.Secto
return nil return nil
} }
func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft SectorFileType, spt abi.RegisteredSealProof, allowFetch bool) ([]SectorStorageInfo, error) { func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType, spt abi.RegisteredSealProof, allowFetch bool) ([]SectorStorageInfo, error) {
i.lk.RLock() i.lk.RLock()
defer i.lk.RUnlock() defer i.lk.RUnlock()
storageIDs := map[ID]uint64{} storageIDs := map[ID]uint64{}
isprimary := map[ID]bool{} isprimary := map[ID]bool{}
for _, pathType := range PathTypes { for _, pathType := range storiface.PathTypes {
if ft&pathType == 0 { if ft&pathType == 0 {
continue continue
} }
@ -280,7 +281,7 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft Sector
return nil, xerrors.Errorf("failed to parse url: %w", err) return nil, xerrors.Errorf("failed to parse url: %w", err)
} }
rl.Path = gopath.Join(rl.Path, ft.String(), SectorName(s)) rl.Path = gopath.Join(rl.Path, ft.String(), storiface.SectorName(s))
urls[k] = rl.String() urls[k] = rl.String()
} }
@ -333,7 +334,7 @@ func (i *Index) StorageFindSector(ctx context.Context, s abi.SectorID, ft Sector
return nil, xerrors.Errorf("failed to parse url: %w", err) return nil, xerrors.Errorf("failed to parse url: %w", err)
} }
rl.Path = gopath.Join(rl.Path, ft.String(), SectorName(s)) rl.Path = gopath.Join(rl.Path, ft.String(), storiface.SectorName(s))
urls[k] = rl.String() urls[k] = rl.String()
} }
@ -365,7 +366,7 @@ func (i *Index) StorageInfo(ctx context.Context, id ID) (StorageInfo, error) {
return *si.info, nil return *si.info, nil
} }
func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, spt abi.RegisteredSealProof, pathType PathType) ([]StorageInfo, error) { func (i *Index) StorageBestAlloc(ctx context.Context, allocate storiface.SectorFileType, spt abi.RegisteredSealProof, pathType storiface.PathType) ([]StorageInfo, error) {
i.lk.RLock() i.lk.RLock()
defer i.lk.RUnlock() defer i.lk.RUnlock()
@ -377,10 +378,10 @@ func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, s
} }
for _, p := range i.stores { for _, p := range i.stores {
if (pathType == PathSealing) && !p.info.CanSeal { if (pathType == storiface.PathSealing) && !p.info.CanSeal {
continue continue
} }
if (pathType == PathStorage) && !p.info.CanStore { if (pathType == storiface.PathStorage) && !p.info.CanStore {
continue continue
} }
@ -421,7 +422,7 @@ func (i *Index) StorageBestAlloc(ctx context.Context, allocate SectorFileType, s
return out, nil return out, nil
} }
func (i *Index) FindSector(id abi.SectorID, typ SectorFileType) ([]ID, error) { func (i *Index) FindSector(id abi.SectorID, typ storiface.SectorFileType) ([]ID, error) {
i.lk.RLock() i.lk.RLock()
defer i.lk.RUnlock() defer i.lk.RUnlock()

View File

@ -2,6 +2,7 @@ package stores
import ( import (
"context" "context"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"sync" "sync"
"golang.org/x/xerrors" "golang.org/x/xerrors"
@ -12,13 +13,13 @@ import (
type sectorLock struct { type sectorLock struct {
cond *ctxCond cond *ctxCond
r [FileTypes]uint r [storiface.FileTypes]uint
w SectorFileType w storiface.SectorFileType
refs uint // access with indexLocks.lk refs uint // access with indexLocks.lk
} }
func (l *sectorLock) canLock(read SectorFileType, write SectorFileType) bool { func (l *sectorLock) canLock(read storiface.SectorFileType, write storiface.SectorFileType) bool {
for i, b := range write.All() { for i, b := range write.All() {
if b && l.r[i] > 0 { if b && l.r[i] > 0 {
return false return false
@ -29,7 +30,7 @@ func (l *sectorLock) canLock(read SectorFileType, write SectorFileType) bool {
return l.w&read == 0 && l.w&write == 0 return l.w&read == 0 && l.w&write == 0
} }
func (l *sectorLock) tryLock(read SectorFileType, write SectorFileType) bool { func (l *sectorLock) tryLock(read storiface.SectorFileType, write storiface.SectorFileType) bool {
if !l.canLock(read, write) { if !l.canLock(read, write) {
return false return false
} }
@ -45,16 +46,16 @@ func (l *sectorLock) tryLock(read SectorFileType, write SectorFileType) bool {
return true return true
} }
type lockFn func(l *sectorLock, ctx context.Context, read SectorFileType, write SectorFileType) (bool, error) type lockFn func(l *sectorLock, ctx context.Context, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error)
func (l *sectorLock) tryLockSafe(ctx context.Context, read SectorFileType, write SectorFileType) (bool, error) { func (l *sectorLock) tryLockSafe(ctx context.Context, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) {
l.cond.L.Lock() l.cond.L.Lock()
defer l.cond.L.Unlock() defer l.cond.L.Unlock()
return l.tryLock(read, write), nil return l.tryLock(read, write), nil
} }
func (l *sectorLock) lock(ctx context.Context, read SectorFileType, write SectorFileType) (bool, error) { func (l *sectorLock) lock(ctx context.Context, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) {
l.cond.L.Lock() l.cond.L.Lock()
defer l.cond.L.Unlock() defer l.cond.L.Unlock()
@ -67,7 +68,7 @@ func (l *sectorLock) lock(ctx context.Context, read SectorFileType, write Sector
return true, nil return true, nil
} }
func (l *sectorLock) unlock(read SectorFileType, write SectorFileType) { func (l *sectorLock) unlock(read storiface.SectorFileType, write storiface.SectorFileType) {
l.cond.L.Lock() l.cond.L.Lock()
defer l.cond.L.Unlock() defer l.cond.L.Unlock()
@ -88,12 +89,12 @@ type indexLocks struct {
locks map[abi.SectorID]*sectorLock locks map[abi.SectorID]*sectorLock
} }
func (i *indexLocks) lockWith(ctx context.Context, lockFn lockFn, sector abi.SectorID, read SectorFileType, write SectorFileType) (bool, error) { func (i *indexLocks) lockWith(ctx context.Context, lockFn lockFn, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) {
if read|write == 0 { if read|write == 0 {
return false, nil return false, nil
} }
if read|write > (1<<FileTypes)-1 { if read|write > (1<<storiface.FileTypes)-1 {
return false, xerrors.Errorf("unknown file types specified") return false, xerrors.Errorf("unknown file types specified")
} }
@ -136,7 +137,7 @@ func (i *indexLocks) lockWith(ctx context.Context, lockFn lockFn, sector abi.Sec
return true, nil return true, nil
} }
func (i *indexLocks) StorageLock(ctx context.Context, sector abi.SectorID, read SectorFileType, write SectorFileType) error { func (i *indexLocks) StorageLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) error {
ok, err := i.lockWith(ctx, (*sectorLock).lock, sector, read, write) ok, err := i.lockWith(ctx, (*sectorLock).lock, sector, read, write)
if err != nil { if err != nil {
return err return err
@ -149,6 +150,6 @@ func (i *indexLocks) StorageLock(ctx context.Context, sector abi.SectorID, read
return nil return nil
} }
func (i *indexLocks) StorageTryLock(ctx context.Context, sector abi.SectorID, read SectorFileType, write SectorFileType) (bool, error) { func (i *indexLocks) StorageTryLock(ctx context.Context, sector abi.SectorID, read storiface.SectorFileType, write storiface.SectorFileType) (bool, error) {
return i.lockWith(ctx, (*sectorLock).tryLockSafe, sector, read, write) return i.lockWith(ctx, (*sectorLock).tryLockSafe, sector, read, write)
} }

View File

@ -2,6 +2,7 @@ package stores
import ( import (
"context" "context"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"testing" "testing"
"time" "time"
@ -17,41 +18,41 @@ var aSector = abi.SectorID{
func TestCanLock(t *testing.T) { func TestCanLock(t *testing.T) {
lk := sectorLock{ lk := sectorLock{
r: [FileTypes]uint{}, r: [storiface.FileTypes]uint{},
w: FTNone, w: storiface.FTNone,
} }
require.Equal(t, true, lk.canLock(FTUnsealed, FTNone)) require.Equal(t, true, lk.canLock(storiface.FTUnsealed, storiface.FTNone))
require.Equal(t, true, lk.canLock(FTNone, FTUnsealed)) require.Equal(t, true, lk.canLock(storiface.FTNone, storiface.FTUnsealed))
ftAll := FTUnsealed | FTSealed | FTCache ftAll := storiface.FTUnsealed | storiface.FTSealed | storiface.FTCache
require.Equal(t, true, lk.canLock(ftAll, FTNone)) require.Equal(t, true, lk.canLock(ftAll, storiface.FTNone))
require.Equal(t, true, lk.canLock(FTNone, ftAll)) require.Equal(t, true, lk.canLock(storiface.FTNone, ftAll))
lk.r[0] = 1 // unsealed read taken lk.r[0] = 1 // unsealed read taken
require.Equal(t, true, lk.canLock(FTUnsealed, FTNone)) require.Equal(t, true, lk.canLock(storiface.FTUnsealed, storiface.FTNone))
require.Equal(t, false, lk.canLock(FTNone, FTUnsealed)) require.Equal(t, false, lk.canLock(storiface.FTNone, storiface.FTUnsealed))
require.Equal(t, true, lk.canLock(ftAll, FTNone)) require.Equal(t, true, lk.canLock(ftAll, storiface.FTNone))
require.Equal(t, false, lk.canLock(FTNone, ftAll)) require.Equal(t, false, lk.canLock(storiface.FTNone, ftAll))
require.Equal(t, true, lk.canLock(FTNone, FTSealed|FTCache)) require.Equal(t, true, lk.canLock(storiface.FTNone, storiface.FTSealed|storiface.FTCache))
require.Equal(t, true, lk.canLock(FTUnsealed, FTSealed|FTCache)) require.Equal(t, true, lk.canLock(storiface.FTUnsealed, storiface.FTSealed|storiface.FTCache))
lk.r[0] = 0 lk.r[0] = 0
lk.w = FTSealed lk.w = storiface.FTSealed
require.Equal(t, true, lk.canLock(FTUnsealed, FTNone)) require.Equal(t, true, lk.canLock(storiface.FTUnsealed, storiface.FTNone))
require.Equal(t, true, lk.canLock(FTNone, FTUnsealed)) require.Equal(t, true, lk.canLock(storiface.FTNone, storiface.FTUnsealed))
require.Equal(t, false, lk.canLock(FTSealed, FTNone)) require.Equal(t, false, lk.canLock(storiface.FTSealed, storiface.FTNone))
require.Equal(t, false, lk.canLock(FTNone, FTSealed)) require.Equal(t, false, lk.canLock(storiface.FTNone, storiface.FTSealed))
require.Equal(t, false, lk.canLock(ftAll, FTNone)) require.Equal(t, false, lk.canLock(ftAll, storiface.FTNone))
require.Equal(t, false, lk.canLock(FTNone, ftAll)) require.Equal(t, false, lk.canLock(storiface.FTNone, ftAll))
} }
func TestIndexLocksSeq(t *testing.T) { func TestIndexLocksSeq(t *testing.T) {
@ -61,32 +62,32 @@ func TestIndexLocksSeq(t *testing.T) {
locks: map[abi.SectorID]*sectorLock{}, locks: map[abi.SectorID]*sectorLock{},
} }
require.NoError(t, ilk.StorageLock(ctx, aSector, FTNone, FTUnsealed)) require.NoError(t, ilk.StorageLock(ctx, aSector, storiface.FTNone, storiface.FTUnsealed))
cancel() cancel()
ctx, cancel = context.WithTimeout(context.Background(), time.Second) ctx, cancel = context.WithTimeout(context.Background(), time.Second)
require.NoError(t, ilk.StorageLock(ctx, aSector, FTNone, FTUnsealed)) require.NoError(t, ilk.StorageLock(ctx, aSector, storiface.FTNone, storiface.FTUnsealed))
cancel() cancel()
ctx, cancel = context.WithTimeout(context.Background(), time.Second) ctx, cancel = context.WithTimeout(context.Background(), time.Second)
require.NoError(t, ilk.StorageLock(ctx, aSector, FTNone, FTUnsealed)) require.NoError(t, ilk.StorageLock(ctx, aSector, storiface.FTNone, storiface.FTUnsealed))
cancel() cancel()
ctx, cancel = context.WithTimeout(context.Background(), time.Second) ctx, cancel = context.WithTimeout(context.Background(), time.Second)
require.NoError(t, ilk.StorageLock(ctx, aSector, FTUnsealed, FTNone)) require.NoError(t, ilk.StorageLock(ctx, aSector, storiface.FTUnsealed, storiface.FTNone))
cancel() cancel()
ctx, cancel = context.WithTimeout(context.Background(), time.Second) ctx, cancel = context.WithTimeout(context.Background(), time.Second)
require.NoError(t, ilk.StorageLock(ctx, aSector, FTNone, FTUnsealed)) require.NoError(t, ilk.StorageLock(ctx, aSector, storiface.FTNone, storiface.FTUnsealed))
cancel() cancel()
ctx, cancel = context.WithTimeout(context.Background(), time.Second) ctx, cancel = context.WithTimeout(context.Background(), time.Second)
require.NoError(t, ilk.StorageLock(ctx, aSector, FTNone, FTUnsealed)) require.NoError(t, ilk.StorageLock(ctx, aSector, storiface.FTNone, storiface.FTUnsealed))
cancel() cancel()
} }
func TestIndexLocksBlockOn(t *testing.T) { func TestIndexLocksBlockOn(t *testing.T) {
test := func(r1 SectorFileType, w1 SectorFileType, r2 SectorFileType, w2 SectorFileType) func(t *testing.T) { test := func(r1 storiface.SectorFileType, w1 storiface.SectorFileType, r2 storiface.SectorFileType, w2 storiface.SectorFileType) func(t *testing.T) {
return func(t *testing.T) { return func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -126,9 +127,9 @@ func TestIndexLocksBlockOn(t *testing.T) {
} }
} }
t.Run("readBlocksWrite", test(FTUnsealed, FTNone, FTNone, FTUnsealed)) t.Run("readBlocksWrite", test(storiface.FTUnsealed, storiface.FTNone, storiface.FTNone, storiface.FTUnsealed))
t.Run("writeBlocksRead", test(FTNone, FTUnsealed, FTUnsealed, FTNone)) t.Run("writeBlocksRead", test(storiface.FTNone, storiface.FTUnsealed, storiface.FTUnsealed, storiface.FTNone))
t.Run("writeBlocksWrite", test(FTNone, FTUnsealed, FTNone, FTUnsealed)) t.Run("writeBlocksWrite", test(storiface.FTNone, storiface.FTUnsealed, storiface.FTNone, storiface.FTUnsealed))
} }
func TestIndexLocksBlockWonR(t *testing.T) { func TestIndexLocksBlockWonR(t *testing.T) {
@ -138,7 +139,7 @@ func TestIndexLocksBlockWonR(t *testing.T) {
locks: map[abi.SectorID]*sectorLock{}, locks: map[abi.SectorID]*sectorLock{},
} }
require.NoError(t, ilk.StorageLock(ctx, aSector, FTUnsealed, FTNone)) require.NoError(t, ilk.StorageLock(ctx, aSector, storiface.FTUnsealed, storiface.FTNone))
sch := make(chan struct{}) sch := make(chan struct{})
go func() { go func() {
@ -146,7 +147,7 @@ func TestIndexLocksBlockWonR(t *testing.T) {
sch <- struct{}{} sch <- struct{}{}
require.NoError(t, ilk.StorageLock(ctx, aSector, FTNone, FTUnsealed)) require.NoError(t, ilk.StorageLock(ctx, aSector, storiface.FTNone, storiface.FTUnsealed))
cancel() cancel()
sch <- struct{}{} sch <- struct{}{}

View File

@ -3,34 +3,22 @@ package stores
import ( import (
"context" "context"
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
)
type PathType string "github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
const (
PathStorage PathType = "storage"
PathSealing PathType = "sealing"
)
type AcquireMode string
const (
AcquireMove AcquireMode = "move"
AcquireCopy AcquireMode = "copy"
) )
type Store interface { type Store interface {
AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredSealProof, existing SectorFileType, allocate SectorFileType, sealing PathType, op AcquireMode) (paths SectorPaths, stores SectorPaths, err error) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredSealProof, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType, op storiface.AcquireMode) (paths storiface.SectorPaths, stores storiface.SectorPaths, err error)
Remove(ctx context.Context, s abi.SectorID, types SectorFileType, force bool) error Remove(ctx context.Context, s abi.SectorID, types storiface.SectorFileType, force bool) error
// like remove, but doesn't remove the primary sector copy, nor the last // like remove, but doesn't remove the primary sector copy, nor the last
// non-primary copy if there no primary copies // non-primary copy if there no primary copies
RemoveCopies(ctx context.Context, s abi.SectorID, types SectorFileType) error RemoveCopies(ctx context.Context, s abi.SectorID, types storiface.SectorFileType) error
// move sectors into storage // move sectors into storage
MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredSealProof, types SectorFileType) error MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredSealProof, types storiface.SectorFileType) error
FsStat(ctx context.Context, id ID) (fsutil.FsStat, error) FsStat(ctx context.Context, id ID) (fsutil.FsStat, error)
} }

View File

@ -16,6 +16,7 @@ import (
"github.com/filecoin-project/specs-actors/actors/abi" "github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil" "github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
) )
type StoragePath struct { type StoragePath struct {
@ -59,8 +60,6 @@ type LocalStorage interface {
const MetaFile = "sectorstore.json" const MetaFile = "sectorstore.json"
var PathTypes = []SectorFileType{FTUnsealed, FTSealed, FTCache}
type Local struct { type Local struct {
localStorage LocalStorage localStorage LocalStorage
index SectorIndex index SectorIndex
@ -75,7 +74,7 @@ type path struct {
local string // absolute local path local string // absolute local path
reserved int64 reserved int64
reservations map[abi.SectorID]SectorFileType reservations map[abi.SectorID]storiface.SectorFileType
} }
func (p *path) stat(ls LocalStorage) (fsutil.FsStat, error) { func (p *path) stat(ls LocalStorage) (fsutil.FsStat, error) {
@ -87,7 +86,7 @@ func (p *path) stat(ls LocalStorage) (fsutil.FsStat, error) {
stat.Reserved = p.reserved stat.Reserved = p.reserved
for id, ft := range p.reservations { for id, ft := range p.reservations {
for _, fileType := range PathTypes { for _, fileType := range storiface.PathTypes {
if fileType&ft == 0 { if fileType&ft == 0 {
continue continue
} }
@ -125,8 +124,8 @@ func (p *path) stat(ls LocalStorage) (fsutil.FsStat, error) {
return stat, err return stat, err
} }
func (p *path) sectorPath(sid abi.SectorID, fileType SectorFileType) string { func (p *path) sectorPath(sid abi.SectorID, fileType storiface.SectorFileType) string {
return filepath.Join(p.local, fileType.String(), SectorName(sid)) return filepath.Join(p.local, fileType.String(), storiface.SectorName(sid))
} }
func NewLocal(ctx context.Context, ls LocalStorage, index SectorIndex, urls []string) (*Local, error) { func NewLocal(ctx context.Context, ls LocalStorage, index SectorIndex, urls []string) (*Local, error) {
@ -160,7 +159,7 @@ func (st *Local) OpenPath(ctx context.Context, p string) error {
local: p, local: p,
reserved: 0, reserved: 0,
reservations: map[abi.SectorID]SectorFileType{}, reservations: map[abi.SectorID]storiface.SectorFileType{},
} }
fst, err := out.stat(st.localStorage) fst, err := out.stat(st.localStorage)
@ -179,7 +178,7 @@ func (st *Local) OpenPath(ctx context.Context, p string) error {
return xerrors.Errorf("declaring storage in index: %w", err) return xerrors.Errorf("declaring storage in index: %w", err)
} }
for _, t := range PathTypes { for _, t := range storiface.PathTypes {
ents, err := ioutil.ReadDir(filepath.Join(p, t.String())) ents, err := ioutil.ReadDir(filepath.Join(p, t.String()))
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
@ -197,7 +196,7 @@ func (st *Local) OpenPath(ctx context.Context, p string) error {
continue continue
} }
sid, err := ParseSectorID(ent.Name()) sid, err := storiface.ParseSectorID(ent.Name())
if err != nil { if err != nil {
return xerrors.Errorf("parse sector id %s: %w", ent.Name(), err) return xerrors.Errorf("parse sector id %s: %w", ent.Name(), err)
} }
@ -264,7 +263,7 @@ func (st *Local) reportHealth(ctx context.Context) {
} }
} }
func (st *Local) Reserve(ctx context.Context, sid abi.SectorID, spt abi.RegisteredSealProof, ft SectorFileType, storageIDs SectorPaths, overheadTab map[SectorFileType]int) (func(), error) { func (st *Local) Reserve(ctx context.Context, sid abi.SectorID, spt abi.RegisteredSealProof, ft storiface.SectorFileType, storageIDs storiface.SectorPaths, overheadTab map[storiface.SectorFileType]int) (func(), error) {
ssize, err := spt.SectorSize() ssize, err := spt.SectorSize()
if err != nil { if err != nil {
return nil, xerrors.Errorf("getting sector size: %w", err) return nil, xerrors.Errorf("getting sector size: %w", err)
@ -279,12 +278,12 @@ func (st *Local) Reserve(ctx context.Context, sid abi.SectorID, spt abi.Register
deferredDone() deferredDone()
}() }()
for _, fileType := range PathTypes { for _, fileType := range storiface.PathTypes {
if fileType&ft == 0 { if fileType&ft == 0 {
continue continue
} }
id := ID(PathByType(storageIDs, fileType)) id := ID(storiface.PathByType(storageIDs, fileType))
p, ok := st.paths[id] p, ok := st.paths[id]
if !ok { if !ok {
@ -296,7 +295,7 @@ func (st *Local) Reserve(ctx context.Context, sid abi.SectorID, spt abi.Register
return nil, xerrors.Errorf("getting local storage stat: %w", err) return nil, xerrors.Errorf("getting local storage stat: %w", err)
} }
overhead := int64(overheadTab[fileType]) * int64(ssize) / FSOverheadDen overhead := int64(overheadTab[fileType]) * int64(ssize) / storiface.FSOverheadDen
if stat.Available < overhead { if stat.Available < overhead {
return nil, xerrors.Errorf("can't reserve %d bytes in '%s' (id:%s), only %d available", overhead, p.local, id, stat.Available) return nil, xerrors.Errorf("can't reserve %d bytes in '%s' (id:%s), only %d available", overhead, p.local, id, stat.Available)
@ -319,18 +318,18 @@ func (st *Local) Reserve(ctx context.Context, sid abi.SectorID, spt abi.Register
return done, nil return done, nil
} }
func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.RegisteredSealProof, existing SectorFileType, allocate SectorFileType, pathType PathType, op AcquireMode) (SectorPaths, SectorPaths, error) { func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.RegisteredSealProof, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType, op storiface.AcquireMode) (storiface.SectorPaths, storiface.SectorPaths, error) {
if existing|allocate != existing^allocate { if existing|allocate != existing^allocate {
return SectorPaths{}, SectorPaths{}, xerrors.New("can't both find and allocate a sector") return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.New("can't both find and allocate a sector")
} }
st.localLk.RLock() st.localLk.RLock()
defer st.localLk.RUnlock() defer st.localLk.RUnlock()
var out SectorPaths var out storiface.SectorPaths
var storageIDs SectorPaths var storageIDs storiface.SectorPaths
for _, fileType := range PathTypes { for _, fileType := range storiface.PathTypes {
if fileType&existing == 0 { if fileType&existing == 0 {
continue continue
} }
@ -352,22 +351,22 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.Re
} }
spath := p.sectorPath(sid, fileType) spath := p.sectorPath(sid, fileType)
SetPathByType(&out, fileType, spath) storiface.SetPathByType(&out, fileType, spath)
SetPathByType(&storageIDs, fileType, string(info.ID)) storiface.SetPathByType(&storageIDs, fileType, string(info.ID))
existing ^= fileType existing ^= fileType
break break
} }
} }
for _, fileType := range PathTypes { for _, fileType := range storiface.PathTypes {
if fileType&allocate == 0 { if fileType&allocate == 0 {
continue continue
} }
sis, err := st.index.StorageBestAlloc(ctx, fileType, spt, pathType) sis, err := st.index.StorageBestAlloc(ctx, fileType, spt, pathType)
if err != nil { if err != nil {
return SectorPaths{}, SectorPaths{}, xerrors.Errorf("finding best storage for allocating : %w", err) return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("finding best storage for allocating : %w", err)
} }
var best string var best string
@ -383,11 +382,11 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.Re
continue continue
} }
if (pathType == PathSealing) && !si.CanSeal { if (pathType == storiface.PathSealing) && !si.CanSeal {
continue continue
} }
if (pathType == PathStorage) && !si.CanStore { if (pathType == storiface.PathStorage) && !si.CanStore {
continue continue
} }
@ -399,11 +398,11 @@ func (st *Local) AcquireSector(ctx context.Context, sid abi.SectorID, spt abi.Re
} }
if best == "" { if best == "" {
return SectorPaths{}, SectorPaths{}, xerrors.Errorf("couldn't find a suitable path for a sector") return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("couldn't find a suitable path for a sector")
} }
SetPathByType(&out, fileType, best) storiface.SetPathByType(&out, fileType, best)
SetPathByType(&storageIDs, fileType, string(bestID)) storiface.SetPathByType(&storageIDs, fileType, string(bestID))
allocate ^= fileType allocate ^= fileType
} }
@ -437,7 +436,7 @@ func (st *Local) Local(ctx context.Context) ([]StoragePath, error) {
return out, nil return out, nil
} }
func (st *Local) Remove(ctx context.Context, sid abi.SectorID, typ SectorFileType, force bool) error { func (st *Local) Remove(ctx context.Context, sid abi.SectorID, typ storiface.SectorFileType, force bool) error {
if bits.OnesCount(uint(typ)) != 1 { if bits.OnesCount(uint(typ)) != 1 {
return xerrors.New("delete expects one file type") return xerrors.New("delete expects one file type")
} }
@ -460,7 +459,7 @@ func (st *Local) Remove(ctx context.Context, sid abi.SectorID, typ SectorFileTyp
return nil return nil
} }
func (st *Local) RemoveCopies(ctx context.Context, sid abi.SectorID, typ SectorFileType) error { func (st *Local) RemoveCopies(ctx context.Context, sid abi.SectorID, typ storiface.SectorFileType) error {
if bits.OnesCount(uint(typ)) != 1 { if bits.OnesCount(uint(typ)) != 1 {
return xerrors.New("delete expects one file type") return xerrors.New("delete expects one file type")
} }
@ -496,7 +495,7 @@ func (st *Local) RemoveCopies(ctx context.Context, sid abi.SectorID, typ SectorF
return nil return nil
} }
func (st *Local) removeSector(ctx context.Context, sid abi.SectorID, typ SectorFileType, storage ID) error { func (st *Local) removeSector(ctx context.Context, sid abi.SectorID, typ storiface.SectorFileType, storage ID) error {
p, ok := st.paths[storage] p, ok := st.paths[storage]
if !ok { if !ok {
return nil return nil
@ -520,28 +519,28 @@ func (st *Local) removeSector(ctx context.Context, sid abi.SectorID, typ SectorF
return nil return nil
} }
func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredSealProof, types SectorFileType) error { func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredSealProof, types storiface.SectorFileType) error {
dest, destIds, err := st.AcquireSector(ctx, s, spt, FTNone, types, PathStorage, AcquireMove) dest, destIds, err := st.AcquireSector(ctx, s, spt, storiface.FTNone, types, storiface.PathStorage, storiface.AcquireMove)
if err != nil { if err != nil {
return xerrors.Errorf("acquire dest storage: %w", err) return xerrors.Errorf("acquire dest storage: %w", err)
} }
src, srcIds, err := st.AcquireSector(ctx, s, spt, types, FTNone, PathStorage, AcquireMove) src, srcIds, err := st.AcquireSector(ctx, s, spt, types, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
if err != nil { if err != nil {
return xerrors.Errorf("acquire src storage: %w", err) return xerrors.Errorf("acquire src storage: %w", err)
} }
for _, fileType := range PathTypes { for _, fileType := range storiface.PathTypes {
if fileType&types == 0 { if fileType&types == 0 {
continue continue
} }
sst, err := st.index.StorageInfo(ctx, ID(PathByType(srcIds, fileType))) sst, err := st.index.StorageInfo(ctx, ID(storiface.PathByType(srcIds, fileType)))
if err != nil { if err != nil {
return xerrors.Errorf("failed to get source storage info: %w", err) return xerrors.Errorf("failed to get source storage info: %w", err)
} }
dst, err := st.index.StorageInfo(ctx, ID(PathByType(destIds, fileType))) dst, err := st.index.StorageInfo(ctx, ID(storiface.PathByType(destIds, fileType)))
if err != nil { if err != nil {
return xerrors.Errorf("failed to get source storage info: %w", err) return xerrors.Errorf("failed to get source storage info: %w", err)
} }
@ -558,17 +557,17 @@ func (st *Local) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.Regist
log.Debugf("moving %v(%d) to storage: %s(se:%t; st:%t) -> %s(se:%t; st:%t)", s, fileType, sst.ID, sst.CanSeal, sst.CanStore, dst.ID, dst.CanSeal, dst.CanStore) log.Debugf("moving %v(%d) to storage: %s(se:%t; st:%t) -> %s(se:%t; st:%t)", s, fileType, sst.ID, sst.CanSeal, sst.CanStore, dst.ID, dst.CanSeal, dst.CanStore)
if err := st.index.StorageDropSector(ctx, ID(PathByType(srcIds, fileType)), s, fileType); err != nil { if err := st.index.StorageDropSector(ctx, ID(storiface.PathByType(srcIds, fileType)), s, fileType); err != nil {
return xerrors.Errorf("dropping source sector from index: %w", err) return xerrors.Errorf("dropping source sector from index: %w", err)
} }
if err := move(PathByType(src, fileType), PathByType(dest, fileType)); err != nil { if err := move(storiface.PathByType(src, fileType), storiface.PathByType(dest, fileType)); err != nil {
// TODO: attempt some recovery (check if src is still there, re-declare) // TODO: attempt some recovery (check if src is still there, re-declare)
return xerrors.Errorf("moving sector %v(%d): %w", s, fileType, err) return xerrors.Errorf("moving sector %v(%d): %w", s, fileType, err)
} }
if err := st.index.StorageDeclareSector(ctx, ID(PathByType(destIds, fileType)), s, fileType, true); err != nil { if err := st.index.StorageDeclareSector(ctx, ID(storiface.PathByType(destIds, fileType)), s, fileType, true); err != nil {
return xerrors.Errorf("declare sector %d(t:%d) -> %s: %w", s, fileType, ID(PathByType(destIds, fileType)), err) return xerrors.Errorf("declare sector %d(t:%d) -> %s: %w", s, fileType, ID(storiface.PathByType(destIds, fileType)), err)
} }
} }

View File

@ -38,7 +38,7 @@ type Remote struct {
fetching map[abi.SectorID]chan struct{} fetching map[abi.SectorID]chan struct{}
} }
func (r *Remote) RemoveCopies(ctx context.Context, s abi.SectorID, types SectorFileType) error { func (r *Remote) RemoveCopies(ctx context.Context, s abi.SectorID, types storiface.SectorFileType) error {
// TODO: do this on remotes too // TODO: do this on remotes too
// (not that we really need to do that since it's always called by the // (not that we really need to do that since it's always called by the
// worker which pulled the copy) // worker which pulled the copy)
@ -58,9 +58,9 @@ func NewRemote(local *Local, index SectorIndex, auth http.Header, fetchLimit int
} }
} }
func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredSealProof, existing SectorFileType, allocate SectorFileType, pathType PathType, op AcquireMode) (SectorPaths, SectorPaths, error) { func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.RegisteredSealProof, existing storiface.SectorFileType, allocate storiface.SectorFileType, pathType storiface.PathType, op storiface.AcquireMode) (storiface.SectorPaths, storiface.SectorPaths, error) {
if existing|allocate != existing^allocate { if existing|allocate != existing^allocate {
return SectorPaths{}, SectorPaths{}, xerrors.New("can't both find and allocate a sector") return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.New("can't both find and allocate a sector")
} }
for { for {
@ -79,7 +79,7 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi
case <-c: case <-c:
continue continue
case <-ctx.Done(): case <-ctx.Done():
return SectorPaths{}, SectorPaths{}, ctx.Err() return storiface.SectorPaths{}, storiface.SectorPaths{}, ctx.Err()
} }
} }
@ -92,62 +92,62 @@ func (r *Remote) AcquireSector(ctx context.Context, s abi.SectorID, spt abi.Regi
paths, stores, err := r.local.AcquireSector(ctx, s, spt, existing, allocate, pathType, op) paths, stores, err := r.local.AcquireSector(ctx, s, spt, existing, allocate, pathType, op)
if err != nil { if err != nil {
return SectorPaths{}, SectorPaths{}, xerrors.Errorf("local acquire error: %w", err) return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("local acquire error: %w", err)
} }
var toFetch SectorFileType var toFetch storiface.SectorFileType
for _, fileType := range PathTypes { for _, fileType := range storiface.PathTypes {
if fileType&existing == 0 { if fileType&existing == 0 {
continue continue
} }
if PathByType(paths, fileType) == "" { if storiface.PathByType(paths, fileType) == "" {
toFetch |= fileType toFetch |= fileType
} }
} }
apaths, ids, err := r.local.AcquireSector(ctx, s, spt, FTNone, toFetch, pathType, op) apaths, ids, err := r.local.AcquireSector(ctx, s, spt, storiface.FTNone, toFetch, pathType, op)
if err != nil { if err != nil {
return SectorPaths{}, SectorPaths{}, xerrors.Errorf("allocate local sector for fetching: %w", err) return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("allocate local sector for fetching: %w", err)
} }
odt := FSOverheadSeal odt := storiface.FSOverheadSeal
if pathType == PathStorage { if pathType == storiface.PathStorage {
odt = FsOverheadFinalized odt = storiface.FsOverheadFinalized
} }
releaseStorage, err := r.local.Reserve(ctx, s, spt, toFetch, ids, odt) releaseStorage, err := r.local.Reserve(ctx, s, spt, toFetch, ids, odt)
if err != nil { if err != nil {
return SectorPaths{}, SectorPaths{}, xerrors.Errorf("reserving storage space: %w", err) return storiface.SectorPaths{}, storiface.SectorPaths{}, xerrors.Errorf("reserving storage space: %w", err)
} }
defer releaseStorage() defer releaseStorage()
for _, fileType := range PathTypes { for _, fileType := range storiface.PathTypes {
if fileType&existing == 0 { if fileType&existing == 0 {
continue continue
} }
if PathByType(paths, fileType) != "" { if storiface.PathByType(paths, fileType) != "" {
continue continue
} }
dest := PathByType(apaths, fileType) dest := storiface.PathByType(apaths, fileType)
storageID := PathByType(ids, fileType) storageID := storiface.PathByType(ids, fileType)
url, err := r.acquireFromRemote(ctx, s, fileType, dest) url, err := r.acquireFromRemote(ctx, s, fileType, dest)
if err != nil { if err != nil {
return SectorPaths{}, SectorPaths{}, err return storiface.SectorPaths{}, storiface.SectorPaths{}, err
} }
SetPathByType(&paths, fileType, dest) storiface.SetPathByType(&paths, fileType, dest)
SetPathByType(&stores, fileType, storageID) storiface.SetPathByType(&stores, fileType, storageID)
if err := r.index.StorageDeclareSector(ctx, ID(storageID), s, fileType, op == AcquireMove); err != nil { if err := r.index.StorageDeclareSector(ctx, ID(storageID), s, fileType, op == storiface.AcquireMove); err != nil {
log.Warnf("declaring sector %v in %s failed: %+v", s, storageID, err) log.Warnf("declaring sector %v in %s failed: %+v", s, storageID, err)
continue continue
} }
if op == AcquireMove { if op == storiface.AcquireMove {
if err := r.deleteFromRemote(ctx, url); err != nil { if err := r.deleteFromRemote(ctx, url); err != nil {
log.Warnf("deleting sector %v from %s (delete %s): %+v", s, storageID, url, err) log.Warnf("deleting sector %v from %s (delete %s): %+v", s, storageID, url, err)
} }
@ -169,7 +169,7 @@ func tempFetchDest(spath string, create bool) (string, error) {
return filepath.Join(tempdir, b), nil return filepath.Join(tempdir, b), nil
} }
func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType SectorFileType, dest string) (string, error) { func (r *Remote) acquireFromRemote(ctx context.Context, s abi.SectorID, fileType storiface.SectorFileType, dest string) (string, error) {
si, err := r.index.StorageFindSector(ctx, s, fileType, 0, false) si, err := r.index.StorageFindSector(ctx, s, fileType, 0, false)
if err != nil { if err != nil {
return "", err return "", err
@ -281,9 +281,9 @@ func (r *Remote) fetch(ctx context.Context, url, outname string) error {
} }
} }
func (r *Remote) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredSealProof, types SectorFileType) error { func (r *Remote) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.RegisteredSealProof, types storiface.SectorFileType) error {
// Make sure we have the data local // Make sure we have the data local
_, _, err := r.AcquireSector(ctx, s, spt, types, FTNone, PathStorage, AcquireMove) _, _, err := r.AcquireSector(ctx, s, spt, types, storiface.FTNone, storiface.PathStorage, storiface.AcquireMove)
if err != nil { if err != nil {
return xerrors.Errorf("acquire src storage (remote): %w", err) return xerrors.Errorf("acquire src storage (remote): %w", err)
} }
@ -291,7 +291,7 @@ func (r *Remote) MoveStorage(ctx context.Context, s abi.SectorID, spt abi.Regist
return r.local.MoveStorage(ctx, s, spt, types) return r.local.MoveStorage(ctx, s, spt, types)
} }
func (r *Remote) Remove(ctx context.Context, sid abi.SectorID, typ SectorFileType, force bool) error { func (r *Remote) Remove(ctx context.Context, sid abi.SectorID, typ storiface.SectorFileType, force bool) error {
if bits.OnesCount(uint(typ)) != 1 { if bits.OnesCount(uint(typ)) != 1 {
return xerrors.New("delete expects one file type") return xerrors.New("delete expects one file type")
} }

View File

@ -1,4 +1,4 @@
package stores package storiface
import ( import (
"fmt" "fmt"
@ -16,6 +16,8 @@ const (
FileTypes = iota FileTypes = iota
) )
var PathTypes = []SectorFileType{FTUnsealed, FTSealed, FTCache}
const ( const (
FTNone SectorFileType = 0 FTNone SectorFileType = 0
) )

View File

@ -1 +1,15 @@
package storiface package storiface
type PathType string
const (
PathStorage PathType = "storage"
PathSealing PathType = "sealing"
)
type AcquireMode string
const (
AcquireMove AcquireMode = "move"
AcquireCopy AcquireMode = "copy"
)

View File

@ -12,7 +12,6 @@ import (
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
) )
type WorkerInfo struct { type WorkerInfo struct {
@ -64,10 +63,10 @@ type WorkerCalls interface {
SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (CallID, error) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (CallID, error)
FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) (CallID, error) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) (CallID, error)
ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) (CallID, error) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) (CallID, error)
MoveStorage(ctx context.Context, sector abi.SectorID, types stores.SectorFileType) (CallID, error) MoveStorage(ctx context.Context, sector abi.SectorID, types SectorFileType) (CallID, error)
UnsealPiece(context.Context, abi.SectorID, UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (CallID, error) UnsealPiece(context.Context, abi.SectorID, UnpaddedByteIndex, abi.UnpaddedPieceSize, abi.SealRandomness, cid.Cid) (CallID, error)
ReadPiece(context.Context, io.Writer, abi.SectorID, UnpaddedByteIndex, abi.UnpaddedPieceSize) (CallID, error) ReadPiece(context.Context, io.Writer, abi.SectorID, UnpaddedByteIndex, abi.UnpaddedPieceSize) (CallID, error)
Fetch(context.Context, abi.SectorID, stores.SectorFileType, stores.PathType, stores.AcquireMode) (CallID, error) Fetch(context.Context, abi.SectorID, SectorFileType, PathType, AcquireMode) (CallID, error)
} }
type WorkerReturn interface { type WorkerReturn interface {

View File

@ -85,11 +85,11 @@ func (t *testWorker) Remove(ctx context.Context, sector abi.SectorID) error {
panic("implement me") panic("implement me")
} }
func (t *testWorker) MoveStorage(ctx context.Context, sector abi.SectorID, types stores.SectorFileType) error { func (t *testWorker) MoveStorage(ctx context.Context, sector abi.SectorID, types storiface.SectorFileType) error {
panic("implement me") panic("implement me")
} }
func (t *testWorker) Fetch(ctx context.Context, id abi.SectorID, fileType stores.SectorFileType, ptype stores.PathType, am stores.AcquireMode) error { func (t *testWorker) Fetch(ctx context.Context, id abi.SectorID, fileType storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) error {
return nil return nil
} }

View File

@ -12,7 +12,6 @@ import (
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks" "github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface" "github.com/filecoin-project/lotus/extern/sector-storage/storiface"
) )
@ -108,7 +107,7 @@ func (t *trackedWorker) AddPiece(ctx context.Context, sector abi.SectorID, piece
return t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData) return t.Worker.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)
} }
func (t *trackedWorker) Fetch(ctx context.Context, s abi.SectorID, ft stores.SectorFileType, ptype stores.PathType, am stores.AcquireMode) error { func (t *trackedWorker) Fetch(ctx context.Context, s abi.SectorID, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) error {
defer t.tracker.track(s, sealtasks.TTFetch)() defer t.tracker.track(s, sealtasks.TTFetch)()
return t.Worker.Fetch(ctx, s, ft, ptype, am) return t.Worker.Fetch(ctx, s, ft, ptype, am)