storage: Move storage/sealer/stores to storage/paths
This commit is contained in:
parent
82857e6d5d
commit
28099a3905
@ -54,10 +54,10 @@ import (
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
"github.com/filecoin-project/lotus/storage"
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
sealing2 "github.com/filecoin-project/lotus/storage/pipeline"
|
||||
"github.com/filecoin-project/lotus/storage/sealer"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
|
||||
stores "github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
@ -213,7 +213,7 @@ var initCmd = &cli.Command{
|
||||
return err
|
||||
}
|
||||
|
||||
var localPaths []stores.LocalPath
|
||||
var localPaths []paths.LocalPath
|
||||
|
||||
if pssb := cctx.StringSlice("pre-sealed-sectors"); len(pssb) != 0 {
|
||||
log.Infof("Setting up storage config with presealed sectors: %v", pssb)
|
||||
@ -223,14 +223,14 @@ var initCmd = &cli.Command{
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
localPaths = append(localPaths, stores.LocalPath{
|
||||
localPaths = append(localPaths, paths.LocalPath{
|
||||
Path: psp,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
if !cctx.Bool("no-local-storage") {
|
||||
b, err := json.MarshalIndent(&stores.LocalStorageMeta{
|
||||
b, err := json.MarshalIndent(&paths.LocalStorageMeta{
|
||||
ID: storiface.ID(uuid.New().String()),
|
||||
Weight: 10,
|
||||
CanSeal: true,
|
||||
@ -244,12 +244,12 @@ var initCmd = &cli.Command{
|
||||
return xerrors.Errorf("persisting storage metadata (%s): %w", filepath.Join(lr.Path(), "sectorstore.json"), err)
|
||||
}
|
||||
|
||||
localPaths = append(localPaths, stores.LocalPath{
|
||||
localPaths = append(localPaths, paths.LocalPath{
|
||||
Path: lr.Path(),
|
||||
})
|
||||
}
|
||||
|
||||
if err := lr.SetStorage(func(sc *stores.StorageConfig) {
|
||||
if err := lr.SetStorage(func(sc *paths.StorageConfig) {
|
||||
sc.StoragePaths = append(sc.StoragePaths, localPaths...)
|
||||
}); err != nil {
|
||||
return xerrors.Errorf("set storage config: %w", err)
|
||||
@ -458,13 +458,13 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api v1api.FullNode
|
||||
wsts := statestore.New(namespace.Wrap(mds, modules.WorkerCallsPrefix))
|
||||
smsts := statestore.New(namespace.Wrap(mds, modules.ManagerWorkPrefix))
|
||||
|
||||
si := stores.NewIndex()
|
||||
si := paths.NewIndex()
|
||||
|
||||
lstor, err := stores.NewLocal(ctx, lr, si, nil)
|
||||
lstor, err := paths.NewLocal(ctx, lr, si, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
stor := stores.NewRemote(lstor, si, http.Header(sa), 10, &stores.DefaultPartialFileHandler{})
|
||||
stor := paths.NewRemote(lstor, si, http.Header(sa), 10, &paths.DefaultPartialFileHandler{})
|
||||
|
||||
smgr, err := sealer.New(ctx, lstor, stor, lr, si, sealer.Config{
|
||||
ParallelFetchLimit: 10,
|
||||
|
@ -27,7 +27,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/lib/backupds"
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
)
|
||||
|
||||
var restoreCmd = &cli.Command{
|
||||
@ -52,7 +52,7 @@ var restoreCmd = &cli.Command{
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
log.Info("Initializing lotus miner using a backup")
|
||||
|
||||
var storageCfg *stores.StorageConfig
|
||||
var storageCfg *paths.StorageConfig
|
||||
if cctx.IsSet("storage-config") {
|
||||
cf, err := homedir.Expand(cctx.String("storage-config"))
|
||||
if err != nil {
|
||||
@ -64,7 +64,7 @@ var restoreCmd = &cli.Command{
|
||||
return xerrors.Errorf("reading storage config: %w", err)
|
||||
}
|
||||
|
||||
storageCfg = &stores.StorageConfig{}
|
||||
storageCfg = &paths.StorageConfig{}
|
||||
err = json.Unmarshal(cfb, storageCfg)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("cannot unmarshal json for storage config: %w", err)
|
||||
@ -95,7 +95,7 @@ var restoreCmd = &cli.Command{
|
||||
},
|
||||
}
|
||||
|
||||
func restore(ctx context.Context, cctx *cli.Context, targetPath string, strConfig *stores.StorageConfig, manageConfig func(*config.StorageMiner) error, after func(api lapi.FullNode, addr address.Address, peerid peer.ID, mi api.MinerInfo) error) error {
|
||||
func restore(ctx context.Context, cctx *cli.Context, targetPath string, strConfig *paths.StorageConfig, manageConfig func(*config.StorageMiner) error, after func(api lapi.FullNode, addr address.Address, peerid peer.ID, mi api.MinerInfo) error) error {
|
||||
if cctx.Args().Len() != 1 {
|
||||
return xerrors.Errorf("expected 1 argument")
|
||||
}
|
||||
@ -214,7 +214,7 @@ func restore(ctx context.Context, cctx *cli.Context, targetPath string, strConfi
|
||||
if strConfig != nil {
|
||||
log.Info("Restoring storage path config")
|
||||
|
||||
err = lr.SetStorage(func(scfg *stores.StorageConfig) {
|
||||
err = lr.SetStorage(func(scfg *paths.StorageConfig) {
|
||||
*scfg = *strConfig
|
||||
})
|
||||
if err != nil {
|
||||
@ -223,8 +223,8 @@ func restore(ctx context.Context, cctx *cli.Context, targetPath string, strConfi
|
||||
} else {
|
||||
log.Warn("--storage-config NOT SET. NO SECTOR PATHS WILL BE CONFIGURED")
|
||||
// setting empty config to allow miner to be started
|
||||
if err := lr.SetStorage(func(sc *stores.StorageConfig) {
|
||||
sc.StoragePaths = append(sc.StoragePaths, stores.LocalPath{})
|
||||
if err := lr.SetStorage(func(sc *paths.StorageConfig) {
|
||||
sc.StoragePaths = append(sc.StoragePaths, paths.LocalPath{})
|
||||
}); err != nil {
|
||||
return xerrors.Errorf("set storage config: %w", err)
|
||||
}
|
||||
|
@ -17,7 +17,7 @@ import (
|
||||
lcli "github.com/filecoin-project/lotus/cli"
|
||||
cliutil "github.com/filecoin-project/lotus/cli/util"
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -78,7 +78,7 @@ var serviceCmd = &cli.Command{
|
||||
return xerrors.Errorf("please provide Lotus markets repo path via flag %s", FlagMarketsRepo)
|
||||
}
|
||||
|
||||
if err := restore(ctx, cctx, repoPath, &stores.StorageConfig{}, func(cfg *config.StorageMiner) error {
|
||||
if err := restore(ctx, cctx, repoPath, &paths.StorageConfig{}, func(cfg *config.StorageMiner) error {
|
||||
cfg.Subsystems.EnableMarkets = es.Contains(MarketsService)
|
||||
cfg.Subsystems.EnableMining = false
|
||||
cfg.Subsystems.EnableSealing = false
|
||||
|
@ -28,9 +28,9 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
lcli "github.com/filecoin-project/lotus/cli"
|
||||
"github.com/filecoin-project/lotus/lib/tablewriter"
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
sealing "github.com/filecoin-project/lotus/storage/pipeline"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
storiface "github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
@ -145,7 +145,7 @@ over time
|
||||
}
|
||||
}
|
||||
|
||||
cfg := &stores.LocalStorageMeta{
|
||||
cfg := &paths.LocalStorageMeta{
|
||||
ID: storiface.ID(uuid.New().String()),
|
||||
Weight: cctx.Uint64("weight"),
|
||||
CanSeal: cctx.Bool("seal"),
|
||||
|
@ -28,9 +28,9 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/chain/wallet/key"
|
||||
"github.com/filecoin-project/lotus/genesis"
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
ffiwrapper "github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper/basicfs"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
storiface "github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
@ -127,7 +127,7 @@ func PreSeal(maddr address.Address, spt abi.RegisteredSealProof, offset abi.Sect
|
||||
}
|
||||
|
||||
{
|
||||
b, err := json.MarshalIndent(&stores.LocalStorageMeta{
|
||||
b, err := json.MarshalIndent(&paths.LocalStorageMeta{
|
||||
ID: storiface.ID(uuid.New().String()),
|
||||
Weight: 0, // read-only
|
||||
CanSeal: false,
|
||||
|
@ -35,9 +35,9 @@ import (
|
||||
"github.com/filecoin-project/lotus/metrics"
|
||||
"github.com/filecoin-project/lotus/node/modules"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
"github.com/filecoin-project/lotus/storage/sealer"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
|
||||
stores "github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
@ -384,10 +384,10 @@ var runCmd = &cli.Command{
|
||||
return err
|
||||
}
|
||||
|
||||
var localPaths []stores.LocalPath
|
||||
var localPaths []paths.LocalPath
|
||||
|
||||
if !cctx.Bool("no-local-storage") {
|
||||
b, err := json.MarshalIndent(&stores.LocalStorageMeta{
|
||||
b, err := json.MarshalIndent(&paths.LocalStorageMeta{
|
||||
ID: storiface.ID(uuid.New().String()),
|
||||
Weight: 10,
|
||||
CanSeal: true,
|
||||
@ -401,12 +401,12 @@ var runCmd = &cli.Command{
|
||||
return xerrors.Errorf("persisting storage metadata (%s): %w", filepath.Join(lr.Path(), "sectorstore.json"), err)
|
||||
}
|
||||
|
||||
localPaths = append(localPaths, stores.LocalPath{
|
||||
localPaths = append(localPaths, paths.LocalPath{
|
||||
Path: lr.Path(),
|
||||
})
|
||||
}
|
||||
|
||||
if err := lr.SetStorage(func(sc *stores.StorageConfig) {
|
||||
if err := lr.SetStorage(func(sc *paths.StorageConfig) {
|
||||
sc.StoragePaths = append(sc.StoragePaths, localPaths...)
|
||||
}); err != nil {
|
||||
return xerrors.Errorf("set storage config: %w", err)
|
||||
@ -456,7 +456,7 @@ var runCmd = &cli.Command{
|
||||
}
|
||||
}
|
||||
|
||||
localStore, err := stores.NewLocal(ctx, lr, nodeApi, []string{"http://" + address + "/remote"})
|
||||
localStore, err := paths.NewLocal(ctx, lr, nodeApi, []string{"http://" + address + "/remote"})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -467,10 +467,10 @@ var runCmd = &cli.Command{
|
||||
return xerrors.Errorf("could not get api info: %w", err)
|
||||
}
|
||||
|
||||
remote := stores.NewRemote(localStore, nodeApi, sminfo.AuthHeader(), cctx.Int("parallel-fetch-limit"),
|
||||
&stores.DefaultPartialFileHandler{})
|
||||
remote := paths.NewRemote(localStore, nodeApi, sminfo.AuthHeader(), cctx.Int("parallel-fetch-limit"),
|
||||
&paths.DefaultPartialFileHandler{})
|
||||
|
||||
fh := &stores.FetchHandler{Local: localStore, PfHandler: &stores.DefaultPartialFileHandler{}}
|
||||
fh := &paths.FetchHandler{Local: localStore, PfHandler: &paths.DefaultPartialFileHandler{}}
|
||||
remoteHandler := func(w http.ResponseWriter, r *http.Request) {
|
||||
if !auth.HasPerm(r.Context(), nil, api.PermAdmin) {
|
||||
w.WriteHeader(401)
|
||||
@ -561,7 +561,7 @@ var runCmd = &cli.Command{
|
||||
}
|
||||
|
||||
go func() {
|
||||
heartbeats := time.NewTicker(stores.HeartbeatInterval)
|
||||
heartbeats := time.NewTicker(paths.HeartbeatInterval)
|
||||
defer heartbeats.Stop()
|
||||
|
||||
var redeclareStorage bool
|
||||
|
@ -18,8 +18,8 @@ import (
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/lib/rpcenc"
|
||||
"github.com/filecoin-project/lotus/metrics/proxy"
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
"github.com/filecoin-project/lotus/storage/sealer"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
@ -55,8 +55,8 @@ func WorkerHandler(authv func(ctx context.Context, token string) ([]auth.Permiss
|
||||
type Worker struct {
|
||||
*sealer.LocalWorker
|
||||
|
||||
LocalStore *stores.Local
|
||||
Storage stores.LocalStorage
|
||||
LocalStore *paths.Local
|
||||
Storage paths.LocalStorage
|
||||
|
||||
disabled int64
|
||||
}
|
||||
@ -75,8 +75,8 @@ func (w *Worker) StorageAddLocal(ctx context.Context, path string) error {
|
||||
return xerrors.Errorf("opening local path: %w", err)
|
||||
}
|
||||
|
||||
if err := w.Storage.SetStorage(func(sc *stores.StorageConfig) {
|
||||
sc.StoragePaths = append(sc.StoragePaths, stores.LocalPath{Path: path})
|
||||
if err := w.Storage.SetStorage(func(sc *paths.StorageConfig) {
|
||||
sc.StoragePaths = append(sc.StoragePaths, paths.LocalPath{Path: path})
|
||||
}); err != nil {
|
||||
return xerrors.Errorf("get storage config: %w", err)
|
||||
}
|
||||
|
@ -13,7 +13,7 @@ import (
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
lcli "github.com/filecoin-project/lotus/cli"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
@ -101,7 +101,7 @@ var storageAttachCmd = &cli.Command{
|
||||
}
|
||||
}
|
||||
|
||||
cfg := &stores.LocalStorageMeta{
|
||||
cfg := &paths.LocalStorageMeta{
|
||||
ID: storiface.ID(uuid.New().String()),
|
||||
Weight: cctx.Uint64("weight"),
|
||||
CanSeal: cctx.Bool("seal"),
|
||||
|
@ -55,10 +55,10 @@ import (
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
testing2 "github.com/filecoin-project/lotus/node/modules/testing"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
sectorstorage "github.com/filecoin-project/lotus/storage/sealer"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
|
||||
mock2 "github.com/filecoin-project/lotus/storage/sealer/mock"
|
||||
stores "github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -546,8 +546,8 @@ func (n *Ensemble) Start() *Ensemble {
|
||||
// using real proofs, therefore need real sectors.
|
||||
if !n.bootstrapped && !n.options.mockProofs {
|
||||
psd := m.PresealDir
|
||||
err := lr.SetStorage(func(sc *stores.StorageConfig) {
|
||||
sc.StoragePaths = append(sc.StoragePaths, stores.LocalPath{Path: psd})
|
||||
err := lr.SetStorage(func(sc *paths.StorageConfig) {
|
||||
sc.StoragePaths = append(sc.StoragePaths, paths.LocalPath{Path: psd})
|
||||
})
|
||||
|
||||
require.NoError(n.t, err)
|
||||
@ -698,15 +698,15 @@ func (n *Ensemble) Start() *Ensemble {
|
||||
|
||||
addr := m.RemoteListener.Addr().String()
|
||||
|
||||
localStore, err := stores.NewLocal(ctx, lr, m.MinerNode, []string{"http://" + addr + "/remote"})
|
||||
localStore, err := paths.NewLocal(ctx, lr, m.MinerNode, []string{"http://" + addr + "/remote"})
|
||||
require.NoError(n.t, err)
|
||||
|
||||
auth := http.Header(nil)
|
||||
|
||||
remote := stores.NewRemote(localStore, m.MinerNode, auth, 20, &stores.DefaultPartialFileHandler{})
|
||||
remote := paths.NewRemote(localStore, m.MinerNode, auth, 20, &paths.DefaultPartialFileHandler{})
|
||||
store := m.options.workerStorageOpt(remote)
|
||||
|
||||
fh := &stores.FetchHandler{Local: localStore, PfHandler: &stores.DefaultPartialFileHandler{}}
|
||||
fh := &paths.FetchHandler{Local: localStore, PfHandler: &paths.DefaultPartialFileHandler{}}
|
||||
m.FetchHandler = fh.ServeHTTP
|
||||
|
||||
wsts := statestore.New(namespace.Wrap(ds, modules.WorkerCallsPrefix))
|
||||
|
@ -25,8 +25,8 @@ import (
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/wallet/key"
|
||||
"github.com/filecoin-project/lotus/miner"
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
sealing "github.com/filecoin-project/lotus/storage/pipeline"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
@ -182,7 +182,7 @@ func (tm *TestMiner) AddStorage(ctx context.Context, t *testing.T, weight uint64
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
cfg := &stores.LocalStorageMeta{
|
||||
cfg := &paths.LocalStorageMeta{
|
||||
ID: storiface.ID(uuid.New().String()),
|
||||
Weight: weight,
|
||||
CanSeal: seal,
|
||||
|
@ -12,9 +12,9 @@ import (
|
||||
"github.com/filecoin-project/lotus/node/modules"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
"github.com/filecoin-project/lotus/storage/pipeline/sealiface"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
)
|
||||
|
||||
// DefaultPresealsPerBootstrapMiner is the number of preseals that every
|
||||
@ -45,7 +45,7 @@ type nodeOpts struct {
|
||||
disallowRemoteFinalize bool
|
||||
|
||||
workerTasks []sealtasks.TaskType
|
||||
workerStorageOpt func(stores.Store) stores.Store
|
||||
workerStorageOpt func(paths.Store) paths.Store
|
||||
}
|
||||
|
||||
// DefaultNodeOpts are the default options that will be applied to test nodes.
|
||||
@ -55,7 +55,7 @@ var DefaultNodeOpts = nodeOpts{
|
||||
sectorSize: abi.SectorSize(2 << 10), // 2KiB.
|
||||
|
||||
workerTasks: []sealtasks.TaskType{sealtasks.TTFetch, sealtasks.TTCommit1, sealtasks.TTFinalize},
|
||||
workerStorageOpt: func(store stores.Store) stores.Store { return store },
|
||||
workerStorageOpt: func(store paths.Store) paths.Store { return store },
|
||||
}
|
||||
|
||||
// OptBuilder is used to create an option after some other node is already
|
||||
@ -210,7 +210,7 @@ func WithTaskTypes(tt []sealtasks.TaskType) NodeOpt {
|
||||
}
|
||||
}
|
||||
|
||||
func WithWorkerStorage(transform func(stores.Store) stores.Store) NodeOpt {
|
||||
func WithWorkerStorage(transform func(paths.Store) paths.Store) NodeOpt {
|
||||
return func(opts *nodeOpts) error {
|
||||
opts.workerStorageOpt = transform
|
||||
return nil
|
||||
|
@ -21,8 +21,8 @@ import (
|
||||
"github.com/filecoin-project/lotus/node"
|
||||
"github.com/filecoin-project/lotus/node/impl"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
|
||||
stores "github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
"github.com/filecoin-project/lotus/storage/wdpost"
|
||||
)
|
||||
@ -227,7 +227,7 @@ func TestWindowPostWorker(t *testing.T) {
|
||||
}
|
||||
|
||||
type badWorkerStorage struct {
|
||||
stores.Store
|
||||
paths.Store
|
||||
|
||||
badsector *uint64
|
||||
notBadCount int
|
||||
@ -258,14 +258,14 @@ func TestWindowPostWorkerSkipBadSector(t *testing.T) {
|
||||
kit.LatestActorsAt(-1),
|
||||
kit.ThroughRPC(),
|
||||
kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWindowPoSt}),
|
||||
kit.WithWorkerStorage(func(store stores.Store) stores.Store {
|
||||
kit.WithWorkerStorage(func(store paths.Store) paths.Store {
|
||||
return &badWorkerStorage{
|
||||
Store: store,
|
||||
badsector: &badsector,
|
||||
}
|
||||
}),
|
||||
kit.ConstructorOpts(node.ApplyIf(node.IsType(repo.StorageMiner),
|
||||
node.Override(new(stores.Store), func(store *stores.Remote) stores.Store {
|
||||
node.Override(new(paths.Store), func(store *paths.Remote) paths.Store {
|
||||
return &badWorkerStorage{
|
||||
Store: store,
|
||||
badsector: &badsector,
|
||||
|
@ -41,7 +41,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/node/modules/lp2p"
|
||||
"github.com/filecoin-project/lotus/node/modules/testing"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
"github.com/filecoin-project/lotus/system"
|
||||
)
|
||||
|
||||
@ -262,10 +262,10 @@ func ConfigCommon(cfg *config.Common, enableLibp2pNode bool) Option {
|
||||
Override(SetApiEndpointKey, func(lr repo.LockedRepo, e dtypes.APIEndpoint) error {
|
||||
return lr.SetAPIEndpoint(e)
|
||||
}),
|
||||
Override(new(stores.URLs), func(e dtypes.APIEndpoint) (stores.URLs, error) {
|
||||
Override(new(paths.URLs), func(e dtypes.APIEndpoint) (paths.URLs, error) {
|
||||
ip := cfg.API.RemoteListenAddress
|
||||
|
||||
var urls stores.URLs
|
||||
var urls paths.URLs
|
||||
urls = append(urls, "http://"+ip+"/remote") // TODO: This makes no assumptions, and probably could...
|
||||
return urls, nil
|
||||
}),
|
||||
|
@ -33,10 +33,10 @@ import (
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
"github.com/filecoin-project/lotus/storage"
|
||||
"github.com/filecoin-project/lotus/storage/ctladdr"
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
sealing "github.com/filecoin-project/lotus/storage/pipeline"
|
||||
sectorstorage "github.com/filecoin-project/lotus/storage/sealer"
|
||||
ffiwrapper "github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
|
||||
stores "github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||
"github.com/filecoin-project/lotus/storage/wdpost"
|
||||
@ -87,10 +87,10 @@ func ConfigStorageMiner(c interface{}) Option {
|
||||
Override(CheckFDLimit, modules.CheckFdLimit(build.MinerFDLimit)), // recommend at least 100k FD limit to miners
|
||||
|
||||
Override(new(api.MinerSubsystems), modules.ExtractEnabledMinerSubsystems(cfg.Subsystems)),
|
||||
Override(new(stores.LocalStorage), From(new(repo.LockedRepo))),
|
||||
Override(new(*stores.Local), modules.LocalStorage),
|
||||
Override(new(*stores.Remote), modules.RemoteStorage),
|
||||
Override(new(stores.Store), From(new(*stores.Remote))),
|
||||
Override(new(paths.LocalStorage), From(new(repo.LockedRepo))),
|
||||
Override(new(*paths.Local), modules.LocalStorage),
|
||||
Override(new(*paths.Remote), modules.RemoteStorage),
|
||||
Override(new(paths.Store), From(new(*paths.Remote))),
|
||||
Override(new(dtypes.RetrievalPricingFunc), modules.RetrievalPricingFunc(cfg.Dealmaking)),
|
||||
|
||||
If(!cfg.Subsystems.EnableMining,
|
||||
@ -124,8 +124,8 @@ func ConfigStorageMiner(c interface{}) Option {
|
||||
|
||||
If(cfg.Subsystems.EnableSectorStorage,
|
||||
// Sector storage
|
||||
Override(new(*stores.Index), stores.NewIndex),
|
||||
Override(new(stores.SectorIndex), From(new(*stores.Index))),
|
||||
Override(new(*paths.Index), paths.NewIndex),
|
||||
Override(new(paths.SectorIndex), From(new(*paths.Index))),
|
||||
Override(new(*sectorstorage.Manager), modules.SectorStorage),
|
||||
Override(new(sectorstorage.Unsealer), From(new(*sectorstorage.Manager))),
|
||||
Override(new(sectorstorage.SectorManager), From(new(*sectorstorage.Manager))),
|
||||
@ -140,7 +140,7 @@ func ConfigStorageMiner(c interface{}) Option {
|
||||
),
|
||||
If(!cfg.Subsystems.EnableSealing,
|
||||
Override(new(modules.MinerSealingService), modules.ConnectSealingService(cfg.Subsystems.SealerApiInfo)),
|
||||
Override(new(stores.SectorIndex), From(new(modules.MinerSealingService))),
|
||||
Override(new(paths.SectorIndex), From(new(modules.MinerSealingService))),
|
||||
),
|
||||
|
||||
If(cfg.Subsystems.EnableMarkets,
|
||||
|
@ -8,11 +8,11 @@ import (
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
"github.com/filecoin-project/lotus/storage/sealer"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
)
|
||||
|
||||
func StorageFromFile(path string, def *stores.StorageConfig) (*stores.StorageConfig, error) {
|
||||
func StorageFromFile(path string, def *paths.StorageConfig) (*paths.StorageConfig, error) {
|
||||
file, err := os.Open(path)
|
||||
switch {
|
||||
case os.IsNotExist(err):
|
||||
@ -28,8 +28,8 @@ func StorageFromFile(path string, def *stores.StorageConfig) (*stores.StorageCon
|
||||
return StorageFromReader(file)
|
||||
}
|
||||
|
||||
func StorageFromReader(reader io.Reader) (*stores.StorageConfig, error) {
|
||||
var cfg stores.StorageConfig
|
||||
func StorageFromReader(reader io.Reader) (*paths.StorageConfig, error) {
|
||||
var cfg paths.StorageConfig
|
||||
err := json.NewDecoder(reader).Decode(&cfg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -38,7 +38,7 @@ func StorageFromReader(reader io.Reader) (*stores.StorageConfig, error) {
|
||||
return &cfg, nil
|
||||
}
|
||||
|
||||
func WriteStorageFile(path string, config stores.StorageConfig) error {
|
||||
func WriteStorageFile(path string, config paths.StorageConfig) error {
|
||||
b, err := json.MarshalIndent(config, "", " ")
|
||||
if err != nil {
|
||||
return xerrors.Errorf("marshaling storage config: %w", err)
|
||||
|
@ -48,11 +48,11 @@ import (
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/storage"
|
||||
"github.com/filecoin-project/lotus/storage/ctladdr"
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
sealing "github.com/filecoin-project/lotus/storage/pipeline"
|
||||
"github.com/filecoin-project/lotus/storage/pipeline/sealiface"
|
||||
"github.com/filecoin-project/lotus/storage/sealer"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
|
||||
stores "github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
storiface "github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||
"github.com/filecoin-project/lotus/storage/wdpost"
|
||||
@ -67,8 +67,8 @@ type StorageMinerAPI struct {
|
||||
EnabledSubsystems api.MinerSubsystems
|
||||
|
||||
Full api.FullNode
|
||||
LocalStore *stores.Local
|
||||
RemoteStore *stores.Remote
|
||||
LocalStore *paths.Local
|
||||
RemoteStore *paths.Remote
|
||||
|
||||
// Markets
|
||||
PieceStore dtypes.ProviderPieceStore `optional:"true"`
|
||||
@ -89,7 +89,7 @@ type StorageMinerAPI struct {
|
||||
BlockMiner *miner.Miner `optional:"true"`
|
||||
StorageMgr *sealer.Manager `optional:"true"`
|
||||
IStorageMgr sealer.SectorManager `optional:"true"`
|
||||
stores.SectorIndex
|
||||
paths.SectorIndex
|
||||
storiface.WorkerReturn `optional:"true"`
|
||||
AddrSel *ctladdr.AddressSelector
|
||||
|
||||
|
@ -66,11 +66,11 @@ import (
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
"github.com/filecoin-project/lotus/storage"
|
||||
"github.com/filecoin-project/lotus/storage/ctladdr"
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
sealing "github.com/filecoin-project/lotus/storage/pipeline"
|
||||
"github.com/filecoin-project/lotus/storage/pipeline/sealiface"
|
||||
"github.com/filecoin-project/lotus/storage/sealer"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
|
||||
stores "github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
"github.com/filecoin-project/lotus/storage/wdpost"
|
||||
)
|
||||
|
||||
@ -735,16 +735,16 @@ func RetrievalProvider(
|
||||
var WorkerCallsPrefix = datastore.NewKey("/worker/calls")
|
||||
var ManagerWorkPrefix = datastore.NewKey("/stmgr/calls")
|
||||
|
||||
func LocalStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls stores.LocalStorage, si stores.SectorIndex, urls stores.URLs) (*stores.Local, error) {
|
||||
func LocalStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, ls paths.LocalStorage, si paths.SectorIndex, urls paths.URLs) (*paths.Local, error) {
|
||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||
return stores.NewLocal(ctx, ls, si, urls)
|
||||
return paths.NewLocal(ctx, ls, si, urls)
|
||||
}
|
||||
|
||||
func RemoteStorage(lstor *stores.Local, si stores.SectorIndex, sa sealer.StorageAuth, sc sealer.Config) *stores.Remote {
|
||||
return stores.NewRemote(lstor, si, http.Header(sa), sc.ParallelFetchLimit, &stores.DefaultPartialFileHandler{})
|
||||
func RemoteStorage(lstor *paths.Local, si paths.SectorIndex, sa sealer.StorageAuth, sc sealer.Config) *paths.Remote {
|
||||
return paths.NewRemote(lstor, si, http.Header(sa), sc.ParallelFetchLimit, &paths.DefaultPartialFileHandler{})
|
||||
}
|
||||
|
||||
func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, lstor *stores.Local, stor stores.Store, ls stores.LocalStorage, si stores.SectorIndex, sc sealer.Config, ds dtypes.MetadataDS) (*sealer.Manager, error) {
|
||||
func SectorStorage(mctx helpers.MetricsCtx, lc fx.Lifecycle, lstor *paths.Local, stor paths.Store, ls paths.LocalStorage, si paths.SectorIndex, sc sealer.Config, ds dtypes.MetadataDS) (*sealer.Manager, error) {
|
||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||
|
||||
wsts := statestore.New(namespace.Wrap(ds, WorkerCallsPrefix))
|
||||
|
@ -25,8 +25,8 @@ import (
|
||||
badgerbs "github.com/filecoin-project/lotus/blockstore/badger"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
fsutil "github.com/filecoin-project/lotus/storage/sealer/fsutil"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -572,26 +572,26 @@ func (fsr *fsLockedRepo) SetConfig(c func(interface{})) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fsr *fsLockedRepo) GetStorage() (stores.StorageConfig, error) {
|
||||
func (fsr *fsLockedRepo) GetStorage() (paths.StorageConfig, error) {
|
||||
fsr.storageLk.Lock()
|
||||
defer fsr.storageLk.Unlock()
|
||||
|
||||
return fsr.getStorage(nil)
|
||||
}
|
||||
|
||||
func (fsr *fsLockedRepo) getStorage(def *stores.StorageConfig) (stores.StorageConfig, error) {
|
||||
func (fsr *fsLockedRepo) getStorage(def *paths.StorageConfig) (paths.StorageConfig, error) {
|
||||
c, err := config.StorageFromFile(fsr.join(fsStorageConfig), def)
|
||||
if err != nil {
|
||||
return stores.StorageConfig{}, err
|
||||
return paths.StorageConfig{}, err
|
||||
}
|
||||
return *c, nil
|
||||
}
|
||||
|
||||
func (fsr *fsLockedRepo) SetStorage(c func(*stores.StorageConfig)) error {
|
||||
func (fsr *fsLockedRepo) SetStorage(c func(*paths.StorageConfig)) error {
|
||||
fsr.storageLk.Lock()
|
||||
defer fsr.storageLk.Unlock()
|
||||
|
||||
sc, err := fsr.getStorage(&stores.StorageConfig{})
|
||||
sc, err := fsr.getStorage(&paths.StorageConfig{})
|
||||
if err != nil {
|
||||
return xerrors.Errorf("get storage: %w", err)
|
||||
}
|
||||
|
@ -9,8 +9,8 @@ import (
|
||||
|
||||
"github.com/filecoin-project/lotus/blockstore"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
)
|
||||
|
||||
// BlockstoreDomain represents the domain of a blockstore.
|
||||
@ -73,8 +73,8 @@ type LockedRepo interface {
|
||||
Config() (interface{}, error)
|
||||
SetConfig(func(interface{})) error
|
||||
|
||||
GetStorage() (stores.StorageConfig, error)
|
||||
SetStorage(func(*stores.StorageConfig)) error
|
||||
GetStorage() (paths.StorageConfig, error)
|
||||
SetStorage(func(*paths.StorageConfig)) error
|
||||
Stat(path string) (fsutil.FsStat, error)
|
||||
DiskUsage(path string) (int64, error)
|
||||
|
||||
|
@ -18,8 +18,8 @@ import (
|
||||
"github.com/filecoin-project/lotus/blockstore"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
fsutil "github.com/filecoin-project/lotus/storage/sealer/fsutil"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
@ -37,7 +37,7 @@ type MemRepo struct {
|
||||
keystore map[string]types.KeyInfo
|
||||
blockstore blockstore.Blockstore
|
||||
|
||||
sc *stores.StorageConfig
|
||||
sc *paths.StorageConfig
|
||||
tempDir string
|
||||
|
||||
// holds the current config value
|
||||
@ -59,13 +59,13 @@ func (lmem *lockedMemRepo) RepoType() RepoType {
|
||||
return lmem.t
|
||||
}
|
||||
|
||||
func (lmem *lockedMemRepo) GetStorage() (stores.StorageConfig, error) {
|
||||
func (lmem *lockedMemRepo) GetStorage() (paths.StorageConfig, error) {
|
||||
if err := lmem.checkToken(); err != nil {
|
||||
return stores.StorageConfig{}, err
|
||||
return paths.StorageConfig{}, err
|
||||
}
|
||||
|
||||
if lmem.mem.sc == nil {
|
||||
lmem.mem.sc = &stores.StorageConfig{StoragePaths: []stores.LocalPath{
|
||||
lmem.mem.sc = &paths.StorageConfig{StoragePaths: []paths.LocalPath{
|
||||
{Path: lmem.Path()},
|
||||
}}
|
||||
}
|
||||
@ -73,7 +73,7 @@ func (lmem *lockedMemRepo) GetStorage() (stores.StorageConfig, error) {
|
||||
return *lmem.mem.sc, nil
|
||||
}
|
||||
|
||||
func (lmem *lockedMemRepo) SetStorage(c func(*stores.StorageConfig)) error {
|
||||
func (lmem *lockedMemRepo) SetStorage(c func(*paths.StorageConfig)) error {
|
||||
if err := lmem.checkToken(); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -126,14 +126,14 @@ func (lmem *lockedMemRepo) Path() string {
|
||||
}
|
||||
|
||||
func (lmem *lockedMemRepo) initSectorStore(t string) {
|
||||
if err := config.WriteStorageFile(filepath.Join(t, fsStorageConfig), stores.StorageConfig{
|
||||
StoragePaths: []stores.LocalPath{
|
||||
if err := config.WriteStorageFile(filepath.Join(t, fsStorageConfig), paths.StorageConfig{
|
||||
StoragePaths: []paths.LocalPath{
|
||||
{Path: t},
|
||||
}}); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
b, err := json.MarshalIndent(&stores.LocalStorageMeta{
|
||||
b, err := json.MarshalIndent(&paths.LocalStorageMeta{
|
||||
ID: storiface.ID(uuid.New().String()),
|
||||
Weight: 10,
|
||||
CanSeal: true,
|
||||
|
@ -1,4 +1,4 @@
|
||||
package stores
|
||||
package paths
|
||||
|
||||
import (
|
||||
"bytes"
|
@ -1,4 +1,4 @@
|
||||
package stores_test
|
||||
package paths_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
@ -16,9 +16,9 @@ import (
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/specs-storage/storage"
|
||||
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
mocks2 "github.com/filecoin-project/lotus/storage/paths/mocks"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/partialfile"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
mocks "github.com/filecoin-project/lotus/storage/sealer/stores/mocks"
|
||||
storiface "github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
@ -64,8 +64,8 @@ func TestRemoteGetAllocated(t *testing.T) {
|
||||
|
||||
tcs := map[string]struct {
|
||||
piFnc func(pi *pieceInfo)
|
||||
storeFnc func(s *mocks.MockStore)
|
||||
pfFunc func(s *mocks.MockPartialFileHandler)
|
||||
storeFnc func(s *mocks2.MockStore)
|
||||
pfFunc func(s *mocks2.MockPartialFileHandler)
|
||||
|
||||
// expectation
|
||||
expectedStatusCode int
|
||||
@ -102,7 +102,7 @@ func TestRemoteGetAllocated(t *testing.T) {
|
||||
},
|
||||
"fails when errors out during acquiring unsealed sector file": {
|
||||
expectedStatusCode: http.StatusInternalServerError,
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
storeFnc: func(l *mocks2.MockStore) {
|
||||
|
||||
l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed,
|
||||
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{
|
||||
@ -113,7 +113,7 @@ func TestRemoteGetAllocated(t *testing.T) {
|
||||
},
|
||||
"fails when unsealed sector file is not found locally": {
|
||||
expectedStatusCode: http.StatusInternalServerError,
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
storeFnc: func(l *mocks2.MockStore) {
|
||||
|
||||
l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed,
|
||||
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{},
|
||||
@ -122,7 +122,7 @@ func TestRemoteGetAllocated(t *testing.T) {
|
||||
},
|
||||
"fails when error while opening partial file": {
|
||||
expectedStatusCode: http.StatusInternalServerError,
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
storeFnc: func(l *mocks2.MockStore) {
|
||||
|
||||
l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed,
|
||||
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{
|
||||
@ -131,7 +131,7 @@ func TestRemoteGetAllocated(t *testing.T) {
|
||||
storiface.SectorPaths{}, nil).Times(1)
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockPartialFileHandler) {
|
||||
pfFunc: func(pf *mocks2.MockPartialFileHandler) {
|
||||
pf.EXPECT().OpenPartialFile(abi.PaddedPieceSize(sectorSize), pfPath).Return(&partialfile.PartialFile{},
|
||||
xerrors.New("some error")).Times(1)
|
||||
},
|
||||
@ -139,7 +139,7 @@ func TestRemoteGetAllocated(t *testing.T) {
|
||||
|
||||
"fails when determining partial file allocation returns an error": {
|
||||
expectedStatusCode: http.StatusInternalServerError,
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
storeFnc: func(l *mocks2.MockStore) {
|
||||
|
||||
l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed,
|
||||
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{
|
||||
@ -148,7 +148,7 @@ func TestRemoteGetAllocated(t *testing.T) {
|
||||
storiface.SectorPaths{}, nil).Times(1)
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockPartialFileHandler) {
|
||||
pfFunc: func(pf *mocks2.MockPartialFileHandler) {
|
||||
pf.EXPECT().OpenPartialFile(abi.PaddedPieceSize(sectorSize), pfPath).Return(emptyPartialFile,
|
||||
nil).Times(1)
|
||||
|
||||
@ -158,7 +158,7 @@ func TestRemoteGetAllocated(t *testing.T) {
|
||||
},
|
||||
"StatusRequestedRangeNotSatisfiable when piece is NOT allocated in partial file": {
|
||||
expectedStatusCode: http.StatusRequestedRangeNotSatisfiable,
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
storeFnc: func(l *mocks2.MockStore) {
|
||||
|
||||
l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed,
|
||||
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{
|
||||
@ -167,7 +167,7 @@ func TestRemoteGetAllocated(t *testing.T) {
|
||||
storiface.SectorPaths{}, nil).Times(1)
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockPartialFileHandler) {
|
||||
pfFunc: func(pf *mocks2.MockPartialFileHandler) {
|
||||
pf.EXPECT().OpenPartialFile(abi.PaddedPieceSize(sectorSize), pfPath).Return(emptyPartialFile,
|
||||
nil).Times(1)
|
||||
|
||||
@ -177,7 +177,7 @@ func TestRemoteGetAllocated(t *testing.T) {
|
||||
},
|
||||
"OK when piece is allocated in partial file": {
|
||||
expectedStatusCode: http.StatusOK,
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
storeFnc: func(l *mocks2.MockStore) {
|
||||
|
||||
l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed,
|
||||
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{
|
||||
@ -186,7 +186,7 @@ func TestRemoteGetAllocated(t *testing.T) {
|
||||
storiface.SectorPaths{}, nil).Times(1)
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockPartialFileHandler) {
|
||||
pfFunc: func(pf *mocks2.MockPartialFileHandler) {
|
||||
pf.EXPECT().OpenPartialFile(abi.PaddedPieceSize(sectorSize), pfPath).Return(emptyPartialFile,
|
||||
nil).Times(1)
|
||||
|
||||
@ -204,10 +204,10 @@ func TestRemoteGetAllocated(t *testing.T) {
|
||||
// when test is done, assert expectations on all mock objects.
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
lstore := mocks.NewMockStore(mockCtrl)
|
||||
pfhandler := mocks.NewMockPartialFileHandler(mockCtrl)
|
||||
lstore := mocks2.NewMockStore(mockCtrl)
|
||||
pfhandler := mocks2.NewMockPartialFileHandler(mockCtrl)
|
||||
|
||||
handler := &stores.FetchHandler{
|
||||
handler := &paths.FetchHandler{
|
||||
lstore,
|
||||
pfhandler,
|
||||
}
|
||||
@ -274,7 +274,7 @@ func TestRemoteGetSector(t *testing.T) {
|
||||
|
||||
tcs := map[string]struct {
|
||||
siFnc func(pi *sectorInfo)
|
||||
storeFnc func(s *mocks.MockStore, path string)
|
||||
storeFnc func(s *mocks2.MockStore, path string)
|
||||
|
||||
// reading a file or a dir
|
||||
isDir bool
|
||||
@ -300,7 +300,7 @@ func TestRemoteGetSector(t *testing.T) {
|
||||
noResponseBytes: true,
|
||||
},
|
||||
"fails when error while acquiring sector file": {
|
||||
storeFnc: func(l *mocks.MockStore, _ string) {
|
||||
storeFnc: func(l *mocks2.MockStore, _ string) {
|
||||
|
||||
l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed,
|
||||
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{
|
||||
@ -313,7 +313,7 @@ func TestRemoteGetSector(t *testing.T) {
|
||||
},
|
||||
"fails when acquired sector file path is empty": {
|
||||
expectedStatusCode: http.StatusInternalServerError,
|
||||
storeFnc: func(l *mocks.MockStore, _ string) {
|
||||
storeFnc: func(l *mocks2.MockStore, _ string) {
|
||||
|
||||
l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed,
|
||||
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{},
|
||||
@ -323,7 +323,7 @@ func TestRemoteGetSector(t *testing.T) {
|
||||
},
|
||||
"fails when acquired file does not exist": {
|
||||
expectedStatusCode: http.StatusInternalServerError,
|
||||
storeFnc: func(l *mocks.MockStore, _ string) {
|
||||
storeFnc: func(l *mocks2.MockStore, _ string) {
|
||||
|
||||
l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed,
|
||||
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{
|
||||
@ -334,7 +334,7 @@ func TestRemoteGetSector(t *testing.T) {
|
||||
noResponseBytes: true,
|
||||
},
|
||||
"successfully read a sector file": {
|
||||
storeFnc: func(l *mocks.MockStore, path string) {
|
||||
storeFnc: func(l *mocks2.MockStore, path string) {
|
||||
|
||||
l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed,
|
||||
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{
|
||||
@ -349,7 +349,7 @@ func TestRemoteGetSector(t *testing.T) {
|
||||
expectedResponseBytes: fileBytes,
|
||||
},
|
||||
"successfully read a sector dir": {
|
||||
storeFnc: func(l *mocks.MockStore, path string) {
|
||||
storeFnc: func(l *mocks2.MockStore, path string) {
|
||||
|
||||
l.EXPECT().AcquireSector(gomock.Any(), expectedSectorRef, storiface.FTUnsealed,
|
||||
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{
|
||||
@ -372,8 +372,8 @@ func TestRemoteGetSector(t *testing.T) {
|
||||
mockCtrl := gomock.NewController(t)
|
||||
// when test is done, assert expectations on all mock objects.
|
||||
defer mockCtrl.Finish()
|
||||
lstore := mocks.NewMockStore(mockCtrl)
|
||||
pfhandler := mocks.NewMockPartialFileHandler(mockCtrl)
|
||||
lstore := mocks2.NewMockStore(mockCtrl)
|
||||
pfhandler := mocks2.NewMockPartialFileHandler(mockCtrl)
|
||||
|
||||
var path string
|
||||
|
||||
@ -406,7 +406,7 @@ func TestRemoteGetSector(t *testing.T) {
|
||||
path = tempDir
|
||||
}
|
||||
|
||||
handler := &stores.FetchHandler{
|
||||
handler := &paths.FetchHandler{
|
||||
lstore,
|
||||
pfhandler,
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package stores
|
||||
package paths
|
||||
|
||||
import (
|
||||
"context"
|
@ -1,4 +1,4 @@
|
||||
package stores
|
||||
package paths
|
||||
|
||||
import (
|
||||
"context"
|
@ -1,4 +1,4 @@
|
||||
package stores
|
||||
package paths
|
||||
|
||||
import (
|
||||
"context"
|
@ -1,4 +1,4 @@
|
||||
package stores
|
||||
package paths
|
||||
|
||||
import (
|
||||
"context"
|
@ -1,4 +1,4 @@
|
||||
package stores
|
||||
package paths
|
||||
|
||||
import (
|
||||
"context"
|
@ -1,4 +1,4 @@
|
||||
package stores
|
||||
package paths
|
||||
|
||||
import (
|
||||
"context"
|
@ -1,4 +1,4 @@
|
||||
package stores
|
||||
package paths
|
||||
|
||||
import (
|
||||
"context"
|
@ -1,4 +1,4 @@
|
||||
package stores
|
||||
package paths
|
||||
|
||||
import (
|
||||
"context"
|
@ -1,4 +1,4 @@
|
||||
package stores
|
||||
package paths
|
||||
|
||||
import (
|
||||
"sync"
|
@ -1,4 +1,4 @@
|
||||
package stores
|
||||
package paths
|
||||
|
||||
import (
|
||||
"context"
|
@ -1,5 +1,5 @@
|
||||
//stm: #unit
|
||||
package stores_test
|
||||
package paths_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
@ -24,22 +24,22 @@ import (
|
||||
"github.com/filecoin-project/specs-storage/storage"
|
||||
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
mocks2 "github.com/filecoin-project/lotus/storage/paths/mocks"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/partialfile"
|
||||
stores "github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
mocks "github.com/filecoin-project/lotus/storage/sealer/stores/mocks"
|
||||
storiface "github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
const metaFile = "sectorstore.json"
|
||||
|
||||
func createTestStorage(t *testing.T, p string, seal bool, att ...*stores.Local) storiface.ID {
|
||||
func createTestStorage(t *testing.T, p string, seal bool, att ...*paths.Local) storiface.ID {
|
||||
if err := os.MkdirAll(p, 0755); err != nil {
|
||||
if !os.IsExist(err) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
cfg := &stores.LocalStorageMeta{
|
||||
cfg := &paths.LocalStorageMeta{
|
||||
ID: storiface.ID(uuid.New().String()),
|
||||
Weight: 10,
|
||||
CanSeal: seal,
|
||||
@ -61,7 +61,7 @@ func createTestStorage(t *testing.T, p string, seal bool, att ...*stores.Local)
|
||||
func TestMoveShared(t *testing.T) {
|
||||
logging.SetAllLoggers(logging.LevelDebug)
|
||||
|
||||
index := stores.NewIndex()
|
||||
index := paths.NewIndex()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
@ -78,8 +78,8 @@ func TestMoveShared(t *testing.T) {
|
||||
_ = lr.Close()
|
||||
})
|
||||
|
||||
err = lr.SetStorage(func(config *stores.StorageConfig) {
|
||||
*config = stores.StorageConfig{}
|
||||
err = lr.SetStorage(func(config *paths.StorageConfig) {
|
||||
*config = paths.StorageConfig{}
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -98,9 +98,9 @@ func TestMoveShared(t *testing.T) {
|
||||
hs1 := httptest.NewServer(mux1)
|
||||
hs2 := httptest.NewServer(mux2)
|
||||
|
||||
ls1, err := stores.NewLocal(ctx, lr1, index, []string{hs1.URL + "/remote"})
|
||||
ls1, err := paths.NewLocal(ctx, lr1, index, []string{hs1.URL + "/remote"})
|
||||
require.NoError(t, err)
|
||||
ls2, err := stores.NewLocal(ctx, lr2, index, []string{hs2.URL + "/remote"})
|
||||
ls2, err := paths.NewLocal(ctx, lr2, index, []string{hs2.URL + "/remote"})
|
||||
require.NoError(t, err)
|
||||
|
||||
dirStor := filepath.Join(dir, "stor")
|
||||
@ -109,11 +109,11 @@ func TestMoveShared(t *testing.T) {
|
||||
id1 := createTestStorage(t, dirStor, false, ls1, ls2)
|
||||
id2 := createTestStorage(t, dirSeal, true, ls1)
|
||||
|
||||
rs1 := stores.NewRemote(ls1, index, nil, 20, &stores.DefaultPartialFileHandler{})
|
||||
rs2 := stores.NewRemote(ls2, index, nil, 20, &stores.DefaultPartialFileHandler{})
|
||||
rs1 := paths.NewRemote(ls1, index, nil, 20, &paths.DefaultPartialFileHandler{})
|
||||
rs2 := paths.NewRemote(ls2, index, nil, 20, &paths.DefaultPartialFileHandler{})
|
||||
_ = rs2
|
||||
mux1.PathPrefix("/").Handler(&stores.FetchHandler{Local: ls1, PfHandler: &stores.DefaultPartialFileHandler{}})
|
||||
mux2.PathPrefix("/").Handler(&stores.FetchHandler{Local: ls2, PfHandler: &stores.DefaultPartialFileHandler{}})
|
||||
mux1.PathPrefix("/").Handler(&paths.FetchHandler{Local: ls1, PfHandler: &paths.DefaultPartialFileHandler{}})
|
||||
mux2.PathPrefix("/").Handler(&paths.FetchHandler{Local: ls2, PfHandler: &paths.DefaultPartialFileHandler{}})
|
||||
|
||||
// add a sealed replica file to the sealing (non-shared) path
|
||||
|
||||
@ -175,9 +175,9 @@ func TestReader(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
tcs := map[string]struct {
|
||||
storeFnc func(s *mocks.MockStore)
|
||||
pfFunc func(s *mocks.MockPartialFileHandler)
|
||||
indexFnc func(s *mocks.MockSectorIndex, serverURL string)
|
||||
storeFnc func(s *mocks2.MockStore)
|
||||
pfFunc func(s *mocks2.MockPartialFileHandler)
|
||||
indexFnc func(s *mocks2.MockSectorIndex, serverURL string)
|
||||
|
||||
needHttpServer bool
|
||||
|
||||
@ -194,7 +194,7 @@ func TestReader(t *testing.T) {
|
||||
|
||||
// -------- have the unsealed file locally
|
||||
"fails when error while acquiring unsealed file": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
storeFnc: func(l *mocks2.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, xerrors.New("acquire error"))
|
||||
},
|
||||
|
||||
@ -202,22 +202,22 @@ func TestReader(t *testing.T) {
|
||||
},
|
||||
|
||||
"fails when error while opening local partial (unsealed) file": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
storeFnc: func(l *mocks2.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, nil)
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockPartialFileHandler) {
|
||||
pfFunc: func(pf *mocks2.MockPartialFileHandler) {
|
||||
mockPartialFileOpen(pf, sectorSize, pfPath, xerrors.New("pf open error"))
|
||||
},
|
||||
errStr: "pf open error",
|
||||
},
|
||||
|
||||
"fails when error while checking if local unsealed file has piece": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
storeFnc: func(l *mocks2.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, nil)
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockPartialFileHandler) {
|
||||
pfFunc: func(pf *mocks2.MockPartialFileHandler) {
|
||||
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
|
||||
mockCheckAllocation(pf, offset, size, emptyPartialFile,
|
||||
true, xerrors.New("piece check error"))
|
||||
@ -227,11 +227,11 @@ func TestReader(t *testing.T) {
|
||||
},
|
||||
|
||||
"fails when error while closing local unsealed file that does not have the piece": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
storeFnc: func(l *mocks2.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, nil)
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockPartialFileHandler) {
|
||||
pfFunc: func(pf *mocks2.MockPartialFileHandler) {
|
||||
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
|
||||
mockCheckAllocation(pf, offset, size, emptyPartialFile,
|
||||
false, nil)
|
||||
@ -241,11 +241,11 @@ func TestReader(t *testing.T) {
|
||||
},
|
||||
|
||||
"fails when error while fetching reader for the local unsealed file that has the unsealed piece": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
storeFnc: func(l *mocks2.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, nil)
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockPartialFileHandler) {
|
||||
pfFunc: func(pf *mocks2.MockPartialFileHandler) {
|
||||
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
|
||||
mockCheckAllocation(pf, offset, size, emptyPartialFile,
|
||||
true, nil)
|
||||
@ -258,11 +258,11 @@ func TestReader(t *testing.T) {
|
||||
// ------------------- don't have the unsealed file locally
|
||||
|
||||
"fails when error while finding sector": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
storeFnc: func(l *mocks2.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, "", nil)
|
||||
},
|
||||
|
||||
indexFnc: func(in *mocks.MockSectorIndex, _ string) {
|
||||
indexFnc: func(in *mocks2.MockSectorIndex, _ string) {
|
||||
in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(),
|
||||
false).Return(nil, xerrors.New("find sector error"))
|
||||
},
|
||||
@ -270,11 +270,11 @@ func TestReader(t *testing.T) {
|
||||
},
|
||||
|
||||
"fails when no worker has unsealed file": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
storeFnc: func(l *mocks2.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, "", nil)
|
||||
},
|
||||
|
||||
indexFnc: func(in *mocks.MockSectorIndex, _ string) {
|
||||
indexFnc: func(in *mocks2.MockSectorIndex, _ string) {
|
||||
in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(),
|
||||
false).Return(nil, nil)
|
||||
},
|
||||
@ -283,11 +283,11 @@ func TestReader(t *testing.T) {
|
||||
|
||||
// --- nil reader when local unsealed file does NOT have unsealed piece
|
||||
"nil reader when local unsealed file does not have the unsealed piece and remote sector also dosen't have the unsealed piece": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
storeFnc: func(l *mocks2.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, nil)
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockPartialFileHandler) {
|
||||
pfFunc: func(pf *mocks2.MockPartialFileHandler) {
|
||||
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
|
||||
mockCheckAllocation(pf, offset, size, emptyPartialFile,
|
||||
false, nil)
|
||||
@ -296,7 +296,7 @@ func TestReader(t *testing.T) {
|
||||
|
||||
},
|
||||
|
||||
indexFnc: func(in *mocks.MockSectorIndex, url string) {
|
||||
indexFnc: func(in *mocks2.MockSectorIndex, url string) {
|
||||
si := storiface.SectorStorageInfo{
|
||||
URLs: []string{url},
|
||||
}
|
||||
@ -311,11 +311,11 @@ func TestReader(t *testing.T) {
|
||||
|
||||
// ---- nil reader when none of the remote unsealed file has unsealed piece
|
||||
"nil reader when none of the worker has the unsealed piece": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
storeFnc: func(l *mocks2.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, "", nil)
|
||||
},
|
||||
|
||||
indexFnc: func(in *mocks.MockSectorIndex, url string) {
|
||||
indexFnc: func(in *mocks2.MockSectorIndex, url string) {
|
||||
si := storiface.SectorStorageInfo{
|
||||
URLs: []string{url},
|
||||
}
|
||||
@ -329,11 +329,11 @@ func TestReader(t *testing.T) {
|
||||
},
|
||||
|
||||
"nil reader when none of the worker is able to serve the unsealed piece even though they have it": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
storeFnc: func(l *mocks2.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, "", nil)
|
||||
},
|
||||
|
||||
indexFnc: func(in *mocks.MockSectorIndex, url string) {
|
||||
indexFnc: func(in *mocks2.MockSectorIndex, url string) {
|
||||
si := storiface.SectorStorageInfo{
|
||||
URLs: []string{url},
|
||||
}
|
||||
@ -349,11 +349,11 @@ func TestReader(t *testing.T) {
|
||||
|
||||
// ---- Success for local unsealed file
|
||||
"successfully fetches reader for piece from local unsealed file": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
storeFnc: func(l *mocks2.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, nil)
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockPartialFileHandler) {
|
||||
pfFunc: func(pf *mocks2.MockPartialFileHandler) {
|
||||
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
|
||||
mockCheckAllocation(pf, offset, size, emptyPartialFile,
|
||||
true, nil)
|
||||
@ -377,11 +377,11 @@ func TestReader(t *testing.T) {
|
||||
// --- Success for remote unsealed file
|
||||
// --- Success for remote unsealed file
|
||||
"successfully fetches reader from remote unsealed piece when local unsealed file does NOT have the unsealed Piece": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
storeFnc: func(l *mocks2.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, nil)
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockPartialFileHandler) {
|
||||
pfFunc: func(pf *mocks2.MockPartialFileHandler) {
|
||||
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
|
||||
mockCheckAllocation(pf, offset, size, emptyPartialFile,
|
||||
false, nil)
|
||||
@ -390,7 +390,7 @@ func TestReader(t *testing.T) {
|
||||
|
||||
},
|
||||
|
||||
indexFnc: func(in *mocks.MockSectorIndex, url string) {
|
||||
indexFnc: func(in *mocks2.MockSectorIndex, url string) {
|
||||
si := storiface.SectorStorageInfo{
|
||||
URLs: []string{url},
|
||||
}
|
||||
@ -407,11 +407,11 @@ func TestReader(t *testing.T) {
|
||||
},
|
||||
|
||||
"successfully fetches reader for piece from remote unsealed piece": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
storeFnc: func(l *mocks2.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, "", nil)
|
||||
},
|
||||
|
||||
indexFnc: func(in *mocks.MockSectorIndex, url string) {
|
||||
indexFnc: func(in *mocks2.MockSectorIndex, url string) {
|
||||
si := storiface.SectorStorageInfo{
|
||||
URLs: []string{url},
|
||||
}
|
||||
@ -437,9 +437,9 @@ func TestReader(t *testing.T) {
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
// create them mocks
|
||||
lstore := mocks.NewMockStore(mockCtrl)
|
||||
pfhandler := mocks.NewMockPartialFileHandler(mockCtrl)
|
||||
index := mocks.NewMockSectorIndex(mockCtrl)
|
||||
lstore := mocks2.NewMockStore(mockCtrl)
|
||||
pfhandler := mocks2.NewMockPartialFileHandler(mockCtrl)
|
||||
index := mocks2.NewMockSectorIndex(mockCtrl)
|
||||
|
||||
if tc.storeFnc != nil {
|
||||
tc.storeFnc(lstore)
|
||||
@ -468,7 +468,7 @@ func TestReader(t *testing.T) {
|
||||
tc.indexFnc(index, tc.serverUrl)
|
||||
}
|
||||
|
||||
remoteStore := stores.NewRemote(lstore, index, nil, 6000, pfhandler)
|
||||
remoteStore := paths.NewRemote(lstore, index, nil, 6000, pfhandler)
|
||||
|
||||
rdg, err := remoteStore.Reader(ctx, sectorRef, offset, size)
|
||||
var rd io.ReadCloser
|
||||
@ -533,9 +533,9 @@ func TestCheckIsUnsealed(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
tcs := map[string]struct {
|
||||
storeFnc func(s *mocks.MockStore)
|
||||
pfFunc func(s *mocks.MockPartialFileHandler)
|
||||
indexFnc func(s *mocks.MockSectorIndex, serverURL string)
|
||||
storeFnc func(s *mocks2.MockStore)
|
||||
pfFunc func(s *mocks2.MockPartialFileHandler)
|
||||
indexFnc func(s *mocks2.MockSectorIndex, serverURL string)
|
||||
|
||||
needHttpServer bool
|
||||
|
||||
@ -550,7 +550,7 @@ func TestCheckIsUnsealed(t *testing.T) {
|
||||
|
||||
// -------- have the unsealed file locally
|
||||
"fails when error while acquiring unsealed file": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
storeFnc: func(l *mocks2.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, xerrors.New("acquire error"))
|
||||
},
|
||||
|
||||
@ -558,22 +558,22 @@ func TestCheckIsUnsealed(t *testing.T) {
|
||||
},
|
||||
|
||||
"fails when error while opening local partial (unsealed) file": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
storeFnc: func(l *mocks2.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, nil)
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockPartialFileHandler) {
|
||||
pfFunc: func(pf *mocks2.MockPartialFileHandler) {
|
||||
mockPartialFileOpen(pf, sectorSize, pfPath, xerrors.New("pf open error"))
|
||||
},
|
||||
errStr: "pf open error",
|
||||
},
|
||||
|
||||
"fails when error while checking if local unsealed file has piece": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
storeFnc: func(l *mocks2.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, nil)
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockPartialFileHandler) {
|
||||
pfFunc: func(pf *mocks2.MockPartialFileHandler) {
|
||||
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
|
||||
mockCheckAllocation(pf, offset, size, emptyPartialFile,
|
||||
true, xerrors.New("piece check error"))
|
||||
@ -583,11 +583,11 @@ func TestCheckIsUnsealed(t *testing.T) {
|
||||
},
|
||||
|
||||
"fails when error while closing local unsealed file": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
storeFnc: func(l *mocks2.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, nil)
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockPartialFileHandler) {
|
||||
pfFunc: func(pf *mocks2.MockPartialFileHandler) {
|
||||
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
|
||||
|
||||
mockCheckAllocation(pf, offset, size, emptyPartialFile,
|
||||
@ -601,11 +601,11 @@ func TestCheckIsUnsealed(t *testing.T) {
|
||||
// ------------------- don't have the unsealed file locally
|
||||
|
||||
"fails when error while finding sector": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
storeFnc: func(l *mocks2.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, "", nil)
|
||||
},
|
||||
|
||||
indexFnc: func(in *mocks.MockSectorIndex, _ string) {
|
||||
indexFnc: func(in *mocks2.MockSectorIndex, _ string) {
|
||||
in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(),
|
||||
false).Return(nil, xerrors.New("find sector error"))
|
||||
},
|
||||
@ -613,11 +613,11 @@ func TestCheckIsUnsealed(t *testing.T) {
|
||||
},
|
||||
|
||||
"false when no worker has unsealed file": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
storeFnc: func(l *mocks2.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, "", nil)
|
||||
},
|
||||
|
||||
indexFnc: func(in *mocks.MockSectorIndex, _ string) {
|
||||
indexFnc: func(in *mocks2.MockSectorIndex, _ string) {
|
||||
in.EXPECT().StorageFindSector(gomock.Any(), sectorRef.ID, storiface.FTUnsealed, gomock.Any(),
|
||||
false).Return(nil, nil)
|
||||
},
|
||||
@ -625,11 +625,11 @@ func TestCheckIsUnsealed(t *testing.T) {
|
||||
|
||||
// false when local unsealed file does NOT have unsealed piece
|
||||
"false when local unsealed file does not have the piece and remote sector too dosen't have the piece": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
storeFnc: func(l *mocks2.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, nil)
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockPartialFileHandler) {
|
||||
pfFunc: func(pf *mocks2.MockPartialFileHandler) {
|
||||
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
|
||||
mockCheckAllocation(pf, offset, size, emptyPartialFile,
|
||||
false, nil)
|
||||
@ -637,7 +637,7 @@ func TestCheckIsUnsealed(t *testing.T) {
|
||||
pf.EXPECT().Close(emptyPartialFile).Return(nil).Times(1)
|
||||
},
|
||||
|
||||
indexFnc: func(in *mocks.MockSectorIndex, url string) {
|
||||
indexFnc: func(in *mocks2.MockSectorIndex, url string) {
|
||||
si := storiface.SectorStorageInfo{
|
||||
URLs: []string{url},
|
||||
}
|
||||
@ -651,11 +651,11 @@ func TestCheckIsUnsealed(t *testing.T) {
|
||||
},
|
||||
|
||||
"false when none of the worker has the unsealed piece": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
storeFnc: func(l *mocks2.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, "", nil)
|
||||
},
|
||||
|
||||
indexFnc: func(in *mocks.MockSectorIndex, url string) {
|
||||
indexFnc: func(in *mocks2.MockSectorIndex, url string) {
|
||||
si := storiface.SectorStorageInfo{
|
||||
URLs: []string{url},
|
||||
}
|
||||
@ -670,11 +670,11 @@ func TestCheckIsUnsealed(t *testing.T) {
|
||||
|
||||
// ---- Success for local unsealed file
|
||||
"true when local unsealed file has the piece": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
storeFnc: func(l *mocks2.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, nil)
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockPartialFileHandler) {
|
||||
pfFunc: func(pf *mocks2.MockPartialFileHandler) {
|
||||
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
|
||||
mockCheckAllocation(pf, offset, size, emptyPartialFile,
|
||||
true, nil)
|
||||
@ -687,11 +687,11 @@ func TestCheckIsUnsealed(t *testing.T) {
|
||||
|
||||
// --- Success for remote unsealed file
|
||||
"true if we have a remote unsealed piece": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
storeFnc: func(l *mocks2.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, "", nil)
|
||||
},
|
||||
|
||||
indexFnc: func(in *mocks.MockSectorIndex, url string) {
|
||||
indexFnc: func(in *mocks2.MockSectorIndex, url string) {
|
||||
si := storiface.SectorStorageInfo{
|
||||
URLs: []string{url},
|
||||
}
|
||||
@ -706,11 +706,11 @@ func TestCheckIsUnsealed(t *testing.T) {
|
||||
},
|
||||
|
||||
"true when local unsealed file does NOT have the unsealed Piece but remote sector has the unsealed piece": {
|
||||
storeFnc: func(l *mocks.MockStore) {
|
||||
storeFnc: func(l *mocks2.MockStore) {
|
||||
mockSectorAcquire(l, sectorRef, pfPath, nil)
|
||||
},
|
||||
|
||||
pfFunc: func(pf *mocks.MockPartialFileHandler) {
|
||||
pfFunc: func(pf *mocks2.MockPartialFileHandler) {
|
||||
mockPartialFileOpen(pf, sectorSize, pfPath, nil)
|
||||
mockCheckAllocation(pf, offset, size, emptyPartialFile,
|
||||
false, nil)
|
||||
@ -718,7 +718,7 @@ func TestCheckIsUnsealed(t *testing.T) {
|
||||
pf.EXPECT().Close(emptyPartialFile).Return(nil).Times(1)
|
||||
},
|
||||
|
||||
indexFnc: func(in *mocks.MockSectorIndex, url string) {
|
||||
indexFnc: func(in *mocks2.MockSectorIndex, url string) {
|
||||
si := storiface.SectorStorageInfo{
|
||||
URLs: []string{url},
|
||||
}
|
||||
@ -742,9 +742,9 @@ func TestCheckIsUnsealed(t *testing.T) {
|
||||
defer mockCtrl.Finish()
|
||||
|
||||
// create them mocks
|
||||
lstore := mocks.NewMockStore(mockCtrl)
|
||||
pfhandler := mocks.NewMockPartialFileHandler(mockCtrl)
|
||||
index := mocks.NewMockSectorIndex(mockCtrl)
|
||||
lstore := mocks2.NewMockStore(mockCtrl)
|
||||
pfhandler := mocks2.NewMockPartialFileHandler(mockCtrl)
|
||||
index := mocks2.NewMockSectorIndex(mockCtrl)
|
||||
|
||||
if tc.storeFnc != nil {
|
||||
tc.storeFnc(lstore)
|
||||
@ -771,7 +771,7 @@ func TestCheckIsUnsealed(t *testing.T) {
|
||||
tc.indexFnc(index, tc.serverUrl)
|
||||
}
|
||||
|
||||
remoteStore := stores.NewRemote(lstore, index, nil, 6000, pfhandler)
|
||||
remoteStore := paths.NewRemote(lstore, index, nil, 6000, pfhandler)
|
||||
|
||||
isUnsealed, err := remoteStore.CheckIsUnsealed(ctx, sectorRef, offset, size)
|
||||
|
||||
@ -789,7 +789,7 @@ func TestCheckIsUnsealed(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func mockSectorAcquire(l *mocks.MockStore, sectorRef storage.SectorRef, pfPath string, err error) {
|
||||
func mockSectorAcquire(l *mocks2.MockStore, sectorRef storage.SectorRef, pfPath string, err error) {
|
||||
l.EXPECT().AcquireSector(gomock.Any(), sectorRef, storiface.FTUnsealed,
|
||||
storiface.FTNone, storiface.PathStorage, storiface.AcquireMove).Return(storiface.SectorPaths{
|
||||
Unsealed: pfPath,
|
||||
@ -797,18 +797,18 @@ func mockSectorAcquire(l *mocks.MockStore, sectorRef storage.SectorRef, pfPath s
|
||||
storiface.SectorPaths{}, err).Times(1)
|
||||
}
|
||||
|
||||
func mockPartialFileOpen(pf *mocks.MockPartialFileHandler, sectorSize abi.SectorSize, pfPath string, err error) {
|
||||
func mockPartialFileOpen(pf *mocks2.MockPartialFileHandler, sectorSize abi.SectorSize, pfPath string, err error) {
|
||||
pf.EXPECT().OpenPartialFile(abi.PaddedPieceSize(sectorSize), pfPath).Return(&partialfile.PartialFile{},
|
||||
err).Times(1)
|
||||
}
|
||||
|
||||
func mockCheckAllocation(pf *mocks.MockPartialFileHandler, offset, size abi.PaddedPieceSize, file *partialfile.PartialFile,
|
||||
func mockCheckAllocation(pf *mocks2.MockPartialFileHandler, offset, size abi.PaddedPieceSize, file *partialfile.PartialFile,
|
||||
out bool, err error) {
|
||||
pf.EXPECT().HasAllocated(file, storiface.UnpaddedByteIndex(offset.Unpadded()),
|
||||
size.Unpadded()).Return(out, err).Times(1)
|
||||
}
|
||||
|
||||
func mockPfReader(pf *mocks.MockPartialFileHandler, file *partialfile.PartialFile, offset, size abi.PaddedPieceSize,
|
||||
func mockPfReader(pf *mocks2.MockPartialFileHandler, file *partialfile.PartialFile, offset, size abi.PaddedPieceSize,
|
||||
outFile *os.File, err error) {
|
||||
pf.EXPECT().Reader(file, storiface.PaddedByteIndex(offset), size).Return(outFile, err)
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package stores
|
||||
package paths
|
||||
|
||||
import (
|
||||
"bytes"
|
@ -19,10 +19,10 @@ import (
|
||||
"github.com/filecoin-project/go-statestore"
|
||||
"github.com/filecoin-project/specs-storage/storage"
|
||||
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
ffiwrapper "github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
|
||||
stores "github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
storiface "github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
@ -55,11 +55,11 @@ type SectorManager interface {
|
||||
var ClosedWorkerID = uuid.UUID{}
|
||||
|
||||
type Manager struct {
|
||||
ls stores.LocalStorage
|
||||
storage stores.Store
|
||||
localStore *stores.Local
|
||||
remoteHnd *stores.FetchHandler
|
||||
index stores.SectorIndex
|
||||
ls paths.LocalStorage
|
||||
storage paths.Store
|
||||
localStore *paths.Local
|
||||
remoteHnd *paths.FetchHandler
|
||||
index paths.SectorIndex
|
||||
|
||||
sched *Scheduler
|
||||
windowPoStSched *poStScheduler
|
||||
@ -133,7 +133,7 @@ type StorageAuth http.Header
|
||||
type WorkerStateStore *statestore.StateStore
|
||||
type ManagerStateStore *statestore.StateStore
|
||||
|
||||
func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores.LocalStorage, si stores.SectorIndex, sc Config, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) {
|
||||
func New(ctx context.Context, lstor *paths.Local, stor paths.Store, ls paths.LocalStorage, si paths.SectorIndex, sc Config, wss WorkerStateStore, mss ManagerStateStore) (*Manager, error) {
|
||||
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si})
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("creating prover instance: %w", err)
|
||||
@ -148,7 +148,7 @@ func New(ctx context.Context, lstor *stores.Local, stor stores.Store, ls stores.
|
||||
ls: ls,
|
||||
storage: stor,
|
||||
localStore: lstor,
|
||||
remoteHnd: &stores.FetchHandler{Local: lstor, PfHandler: &stores.DefaultPartialFileHandler{}},
|
||||
remoteHnd: &paths.FetchHandler{Local: lstor, PfHandler: &paths.DefaultPartialFileHandler{}},
|
||||
index: si,
|
||||
|
||||
sched: sh,
|
||||
@ -222,8 +222,8 @@ func (m *Manager) AddLocalStorage(ctx context.Context, path string) error {
|
||||
return xerrors.Errorf("opening local path: %w", err)
|
||||
}
|
||||
|
||||
if err := m.ls.SetStorage(func(sc *stores.StorageConfig) {
|
||||
sc.StoragePaths = append(sc.StoragePaths, stores.LocalPath{Path: path})
|
||||
if err := m.ls.SetStorage(func(sc *paths.StorageConfig) {
|
||||
sc.StoragePaths = append(sc.StoragePaths, paths.LocalPath{Path: path})
|
||||
}); err != nil {
|
||||
return xerrors.Errorf("get storage config: %w", err)
|
||||
}
|
||||
|
@ -29,10 +29,10 @@ import (
|
||||
proof7 "github.com/filecoin-project/specs-actors/v7/actors/runtime/proof"
|
||||
"github.com/filecoin-project/specs-storage/storage"
|
||||
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
ffiwrapper "github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
|
||||
fsutil "github.com/filecoin-project/lotus/storage/sealer/fsutil"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
|
||||
stores "github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
storiface "github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
@ -40,7 +40,7 @@ func init() {
|
||||
logging.SetAllLoggers(logging.LevelDebug)
|
||||
}
|
||||
|
||||
type testStorage stores.StorageConfig
|
||||
type testStorage paths.StorageConfig
|
||||
|
||||
func (t testStorage) DiskUsage(path string) (int64, error) {
|
||||
return 1, nil // close enough
|
||||
@ -51,7 +51,7 @@ func newTestStorage(t *testing.T) *testStorage {
|
||||
require.NoError(t, err)
|
||||
|
||||
{
|
||||
b, err := json.MarshalIndent(&stores.LocalStorageMeta{
|
||||
b, err := json.MarshalIndent(&paths.LocalStorageMeta{
|
||||
ID: storiface.ID(uuid.New().String()),
|
||||
Weight: 1,
|
||||
CanSeal: true,
|
||||
@ -64,7 +64,7 @@ func newTestStorage(t *testing.T) *testStorage {
|
||||
}
|
||||
|
||||
return &testStorage{
|
||||
StoragePaths: []stores.LocalPath{
|
||||
StoragePaths: []paths.LocalPath{
|
||||
{Path: tp},
|
||||
},
|
||||
}
|
||||
@ -83,12 +83,12 @@ func (t testStorage) cleanup() {
|
||||
}
|
||||
}
|
||||
|
||||
func (t testStorage) GetStorage() (stores.StorageConfig, error) {
|
||||
return stores.StorageConfig(t), nil
|
||||
func (t testStorage) GetStorage() (paths.StorageConfig, error) {
|
||||
return paths.StorageConfig(t), nil
|
||||
}
|
||||
|
||||
func (t *testStorage) SetStorage(f func(*stores.StorageConfig)) error {
|
||||
f((*stores.StorageConfig)(t))
|
||||
func (t *testStorage) SetStorage(f func(*paths.StorageConfig)) error {
|
||||
f((*paths.StorageConfig)(t))
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -96,20 +96,20 @@ func (t *testStorage) Stat(path string) (fsutil.FsStat, error) {
|
||||
return fsutil.Statfs(path)
|
||||
}
|
||||
|
||||
var _ stores.LocalStorage = &testStorage{}
|
||||
var _ paths.LocalStorage = &testStorage{}
|
||||
|
||||
func newTestMgr(ctx context.Context, t *testing.T, ds datastore.Datastore) (*Manager, *stores.Local, *stores.Remote, *stores.Index, func()) {
|
||||
func newTestMgr(ctx context.Context, t *testing.T, ds datastore.Datastore) (*Manager, *paths.Local, *paths.Remote, *paths.Index, func()) {
|
||||
st := newTestStorage(t)
|
||||
|
||||
si := stores.NewIndex()
|
||||
si := paths.NewIndex()
|
||||
|
||||
lstor, err := stores.NewLocal(ctx, st, si, nil)
|
||||
lstor, err := paths.NewLocal(ctx, st, si, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
prover, err := ffiwrapper.New(&readonlyProvider{stor: lstor, index: si})
|
||||
require.NoError(t, err)
|
||||
|
||||
stor := stores.NewRemote(lstor, si, nil, 6000, &stores.DefaultPartialFileHandler{})
|
||||
stor := paths.NewRemote(lstor, si, nil, 6000, &paths.DefaultPartialFileHandler{})
|
||||
|
||||
sh, err := newScheduler("")
|
||||
require.NoError(t, err)
|
||||
@ -118,7 +118,7 @@ func newTestMgr(ctx context.Context, t *testing.T, ds datastore.Datastore) (*Man
|
||||
ls: st,
|
||||
storage: stor,
|
||||
localStore: lstor,
|
||||
remoteHnd: &stores.FetchHandler{Local: lstor},
|
||||
remoteHnd: &paths.FetchHandler{Local: lstor},
|
||||
index: si,
|
||||
|
||||
sched: sh,
|
||||
@ -692,7 +692,7 @@ func TestRestartWorker(t *testing.T) {
|
||||
|
||||
func TestReenableWorker(t *testing.T) {
|
||||
logging.SetAllLoggers(logging.LevelDebug)
|
||||
stores.HeartbeatInterval = 5 * time.Millisecond
|
||||
paths.HeartbeatInterval = 5 * time.Millisecond
|
||||
|
||||
ctx, done := context.WithCancel(context.Background())
|
||||
defer done()
|
||||
|
@ -12,8 +12,8 @@ import (
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/specs-storage/storage"
|
||||
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/fr32"
|
||||
stores "github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
storiface "github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
@ -35,12 +35,12 @@ type PieceProvider interface {
|
||||
var _ PieceProvider = &pieceProvider{}
|
||||
|
||||
type pieceProvider struct {
|
||||
storage *stores.Remote
|
||||
index stores.SectorIndex
|
||||
storage *paths.Remote
|
||||
index paths.SectorIndex
|
||||
uns Unsealer
|
||||
}
|
||||
|
||||
func NewPieceProvider(storage *stores.Remote, index stores.SectorIndex, uns Unsealer) PieceProvider {
|
||||
func NewPieceProvider(storage *paths.Remote, index paths.SectorIndex, uns Unsealer) PieceProvider {
|
||||
return &pieceProvider{
|
||||
storage: storage,
|
||||
index: index,
|
||||
|
@ -22,8 +22,8 @@ import (
|
||||
"github.com/filecoin-project/go-statestore"
|
||||
specstorage "github.com/filecoin-project/specs-storage/storage"
|
||||
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
|
||||
stores "github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
storiface "github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
@ -180,13 +180,13 @@ func TestReadPieceRemoteWorkers(t *testing.T) {
|
||||
|
||||
type pieceProviderTestHarness struct {
|
||||
ctx context.Context
|
||||
index *stores.Index
|
||||
index *paths.Index
|
||||
pp PieceProvider
|
||||
sector specstorage.SectorRef
|
||||
mgr *Manager
|
||||
ticket abi.SealRandomness
|
||||
commD cid.Cid
|
||||
localStores []*stores.Local
|
||||
localStores []*paths.Local
|
||||
|
||||
servers []*http.Server
|
||||
|
||||
@ -207,11 +207,11 @@ func newPieceProviderTestHarness(t *testing.T, mgrConfig Config, sectorProofType
|
||||
require.NoError(t, err)
|
||||
|
||||
// create index, storage, local store & remote store.
|
||||
index := stores.NewIndex()
|
||||
index := paths.NewIndex()
|
||||
storage := newTestStorage(t)
|
||||
localStore, err := stores.NewLocal(ctx, storage, index, []string{"http://" + nl.Addr().String() + "/remote"})
|
||||
localStore, err := paths.NewLocal(ctx, storage, index, []string{"http://" + nl.Addr().String() + "/remote"})
|
||||
require.NoError(t, err)
|
||||
remoteStore := stores.NewRemote(localStore, index, nil, 6000, &stores.DefaultPartialFileHandler{})
|
||||
remoteStore := paths.NewRemote(localStore, index, nil, 6000, &paths.DefaultPartialFileHandler{})
|
||||
|
||||
// data stores for state tracking.
|
||||
dstore := ds_sync.MutexWrap(datastore.NewMapDatastore())
|
||||
@ -261,12 +261,12 @@ func (p *pieceProviderTestHarness) addRemoteWorker(t *testing.T, tasks []sealtas
|
||||
nl, err := net.Listen("tcp", address)
|
||||
require.NoError(t, err)
|
||||
|
||||
localStore, err := stores.NewLocal(p.ctx, newTestStorage(t), p.index, []string{"http://" + nl.Addr().String() + "/remote"})
|
||||
localStore, err := paths.NewLocal(p.ctx, newTestStorage(t), p.index, []string{"http://" + nl.Addr().String() + "/remote"})
|
||||
require.NoError(t, err)
|
||||
|
||||
fh := &stores.FetchHandler{
|
||||
fh := &paths.FetchHandler{
|
||||
Local: localStore,
|
||||
PfHandler: &stores.DefaultPartialFileHandler{},
|
||||
PfHandler: &paths.DefaultPartialFileHandler{},
|
||||
}
|
||||
|
||||
mux := mux.NewRouter()
|
||||
@ -280,8 +280,8 @@ func (p *pieceProviderTestHarness) addRemoteWorker(t *testing.T, tasks []sealtas
|
||||
_ = svc.Serve(nl)
|
||||
}()
|
||||
|
||||
remote := stores.NewRemote(localStore, p.index, nil, 1000,
|
||||
&stores.DefaultPartialFileHandler{})
|
||||
remote := paths.NewRemote(localStore, p.index, nil, 1000,
|
||||
&paths.DefaultPartialFileHandler{})
|
||||
|
||||
dstore := ds_sync.MutexWrap(datastore.NewMapDatastore())
|
||||
csts := statestore.New(namespace.Wrap(dstore, datastore.NewKey("/stmgr/calls")))
|
||||
|
@ -7,13 +7,13 @@ import (
|
||||
|
||||
"github.com/filecoin-project/specs-storage/storage"
|
||||
|
||||
stores "github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
storiface "github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
type readonlyProvider struct {
|
||||
index stores.SectorIndex
|
||||
stor *stores.Local
|
||||
index paths.SectorIndex
|
||||
stor *paths.Local
|
||||
}
|
||||
|
||||
func (l *readonlyProvider) AcquireSector(ctx context.Context, id storage.SectorRef, existing storiface.SectorFileType, allocate storiface.SectorFileType, sealing storiface.PathType) (storiface.SectorPaths, func(), error) {
|
||||
|
@ -10,8 +10,8 @@ import (
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
storiface "github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
@ -155,7 +155,7 @@ func (ps *poStScheduler) enable(wid storiface.WorkerID) {
|
||||
}
|
||||
|
||||
func (ps *poStScheduler) watch(wid storiface.WorkerID, worker *WorkerHandle) {
|
||||
heartbeatTimer := time.NewTicker(stores.HeartbeatInterval)
|
||||
heartbeatTimer := time.NewTicker(paths.HeartbeatInterval)
|
||||
defer heartbeatTimer.Stop()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.TODO())
|
||||
@ -169,7 +169,7 @@ func (ps *poStScheduler) watch(wid storiface.WorkerID, worker *WorkerHandle) {
|
||||
}()
|
||||
|
||||
for {
|
||||
sctx, scancel := context.WithTimeout(ctx, stores.HeartbeatInterval/2)
|
||||
sctx, scancel := context.WithTimeout(ctx, paths.HeartbeatInterval/2)
|
||||
curSes, err := worker.workerRpc.Session(sctx)
|
||||
scancel()
|
||||
if err != nil {
|
||||
|
@ -20,9 +20,9 @@ import (
|
||||
prooftypes "github.com/filecoin-project/go-state-types/proof"
|
||||
"github.com/filecoin-project/specs-storage/storage"
|
||||
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
storiface "github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
@ -183,7 +183,7 @@ func (s *schedTestWorker) Close() error {
|
||||
|
||||
var _ Worker = &schedTestWorker{}
|
||||
|
||||
func addTestWorker(t *testing.T, sched *Scheduler, index *stores.Index, name string, taskTypes map[sealtasks.TaskType]struct{}, resources storiface.WorkerResources, ignoreResources bool) {
|
||||
func addTestWorker(t *testing.T, sched *Scheduler, index *paths.Index, name string, taskTypes map[sealtasks.TaskType]struct{}, resources storiface.WorkerResources, ignoreResources bool) {
|
||||
w := &schedTestWorker{
|
||||
name: name,
|
||||
taskTypes: taskTypes,
|
||||
@ -227,7 +227,7 @@ func TestSchedStartStop(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
go sched.runSched()
|
||||
|
||||
addTestWorker(t, sched, stores.NewIndex(), "fred", nil, decentWorkerResources, false)
|
||||
addTestWorker(t, sched, paths.NewIndex(), "fred", nil, decentWorkerResources, false)
|
||||
|
||||
require.NoError(t, sched.Close(context.TODO()))
|
||||
}
|
||||
@ -260,13 +260,13 @@ func TestSched(t *testing.T) {
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
type task func(*testing.T, *Scheduler, *stores.Index, *runMeta)
|
||||
type task func(*testing.T, *Scheduler, *paths.Index, *runMeta)
|
||||
|
||||
sched := func(taskName, expectWorker string, sid abi.SectorNumber, taskType sealtasks.TaskType) task {
|
||||
_, _, l, _ := runtime.Caller(1)
|
||||
_, _, l2, _ := runtime.Caller(2)
|
||||
|
||||
return func(t *testing.T, sched *Scheduler, index *stores.Index, rm *runMeta) {
|
||||
return func(t *testing.T, sched *Scheduler, index *paths.Index, rm *runMeta) {
|
||||
done := make(chan struct{})
|
||||
rm.done[taskName] = done
|
||||
|
||||
@ -315,7 +315,7 @@ func TestSched(t *testing.T) {
|
||||
taskStarted := func(name string) task {
|
||||
_, _, l, _ := runtime.Caller(1)
|
||||
_, _, l2, _ := runtime.Caller(2)
|
||||
return func(t *testing.T, sched *Scheduler, index *stores.Index, rm *runMeta) {
|
||||
return func(t *testing.T, sched *Scheduler, index *paths.Index, rm *runMeta) {
|
||||
select {
|
||||
case rm.done[name] <- struct{}{}:
|
||||
case <-ctx.Done():
|
||||
@ -327,7 +327,7 @@ func TestSched(t *testing.T) {
|
||||
taskDone := func(name string) task {
|
||||
_, _, l, _ := runtime.Caller(1)
|
||||
_, _, l2, _ := runtime.Caller(2)
|
||||
return func(t *testing.T, sched *Scheduler, index *stores.Index, rm *runMeta) {
|
||||
return func(t *testing.T, sched *Scheduler, index *paths.Index, rm *runMeta) {
|
||||
select {
|
||||
case rm.done[name] <- struct{}{}:
|
||||
case <-ctx.Done():
|
||||
@ -340,7 +340,7 @@ func TestSched(t *testing.T) {
|
||||
taskNotScheduled := func(name string) task {
|
||||
_, _, l, _ := runtime.Caller(1)
|
||||
_, _, l2, _ := runtime.Caller(2)
|
||||
return func(t *testing.T, sched *Scheduler, index *stores.Index, rm *runMeta) {
|
||||
return func(t *testing.T, sched *Scheduler, index *paths.Index, rm *runMeta) {
|
||||
select {
|
||||
case rm.done[name] <- struct{}{}:
|
||||
t.Fatal("not expected", l, l2)
|
||||
@ -351,7 +351,7 @@ func TestSched(t *testing.T) {
|
||||
|
||||
testFunc := func(workers []workerSpec, tasks []task) func(t *testing.T) {
|
||||
return func(t *testing.T) {
|
||||
index := stores.NewIndex()
|
||||
index := paths.NewIndex()
|
||||
|
||||
sched, err := newScheduler("")
|
||||
require.NoError(t, err)
|
||||
@ -380,7 +380,7 @@ func TestSched(t *testing.T) {
|
||||
}
|
||||
|
||||
multTask := func(tasks ...task) task {
|
||||
return func(t *testing.T, s *Scheduler, index *stores.Index, meta *runMeta) {
|
||||
return func(t *testing.T, s *Scheduler, index *paths.Index, meta *runMeta) {
|
||||
for _, tsk := range tasks {
|
||||
tsk(t, s, index, meta)
|
||||
}
|
||||
@ -494,7 +494,7 @@ func TestSched(t *testing.T) {
|
||||
}
|
||||
|
||||
diag := func() task {
|
||||
return func(t *testing.T, s *Scheduler, index *stores.Index, meta *runMeta) {
|
||||
return func(t *testing.T, s *Scheduler, index *paths.Index, meta *runMeta) {
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
for _, request := range s.diag().Requests {
|
||||
log.Infof("!!! sDIAG: sid(%d) task(%s)", request.Sector.Number, request.TaskType)
|
||||
|
@ -6,8 +6,8 @@ import (
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
@ -66,7 +66,7 @@ func (sh *Scheduler) runWorker(ctx context.Context, wid storiface.WorkerID, work
|
||||
|
||||
wid: wid,
|
||||
|
||||
heartbeatTimer: time.NewTicker(stores.HeartbeatInterval),
|
||||
heartbeatTimer: time.NewTicker(paths.HeartbeatInterval),
|
||||
scheduledWindows: make(chan *SchedWindow, SchedWindows),
|
||||
taskDone: make(chan struct{}, 1),
|
||||
|
||||
@ -200,7 +200,7 @@ func (sw *schedWorker) disable(ctx context.Context) error {
|
||||
|
||||
func (sw *schedWorker) checkSession(ctx context.Context) bool {
|
||||
for {
|
||||
sctx, scancel := context.WithTimeout(ctx, stores.HeartbeatInterval/2)
|
||||
sctx, scancel := context.WithTimeout(ctx, paths.HeartbeatInterval/2)
|
||||
curSes, err := sw.worker.workerRpc.Session(sctx)
|
||||
scancel()
|
||||
if err != nil {
|
||||
|
@ -7,18 +7,18 @@ import (
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
storiface "github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
type allocSelector struct {
|
||||
index stores.SectorIndex
|
||||
index paths.SectorIndex
|
||||
alloc storiface.SectorFileType
|
||||
ptype storiface.PathType
|
||||
}
|
||||
|
||||
func newAllocSelector(index stores.SectorIndex, alloc storiface.SectorFileType, ptype storiface.PathType) *allocSelector {
|
||||
func newAllocSelector(index paths.SectorIndex, alloc storiface.SectorFileType, ptype storiface.PathType) *allocSelector {
|
||||
return &allocSelector{
|
||||
index: index,
|
||||
alloc: alloc,
|
||||
|
@ -7,19 +7,19 @@ import (
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
storiface "github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
type existingSelector struct {
|
||||
index stores.SectorIndex
|
||||
index paths.SectorIndex
|
||||
sector abi.SectorID
|
||||
alloc storiface.SectorFileType
|
||||
allowFetch bool
|
||||
}
|
||||
|
||||
func newExistingSelector(index stores.SectorIndex, sector abi.SectorID, alloc storiface.SectorFileType, allowFetch bool) *existingSelector {
|
||||
func newExistingSelector(index paths.SectorIndex, sector abi.SectorID, alloc storiface.SectorFileType, allowFetch bool) *existingSelector {
|
||||
return &existingSelector{
|
||||
index: index,
|
||||
sector: sector,
|
||||
|
@ -7,20 +7,20 @@ import (
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
storiface "github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
type moveSelector struct {
|
||||
index stores.SectorIndex
|
||||
index paths.SectorIndex
|
||||
sector abi.SectorID
|
||||
alloc storiface.SectorFileType
|
||||
destPtype storiface.PathType
|
||||
allowRemote bool
|
||||
}
|
||||
|
||||
func newMoveSelector(index stores.SectorIndex, sector abi.SectorID, alloc storiface.SectorFileType, destPtype storiface.PathType, allowRemote bool) *moveSelector {
|
||||
func newMoveSelector(index paths.SectorIndex, sector abi.SectorID, alloc storiface.SectorFileType, destPtype storiface.PathType, allowRemote bool) *moveSelector {
|
||||
return &moveSelector{
|
||||
index: index,
|
||||
sector: sector,
|
||||
|
@ -10,15 +10,15 @@ import (
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/specs-storage/storage"
|
||||
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/mock"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
storiface "github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
type testWorker struct {
|
||||
acceptTasks map[sealtasks.TaskType]struct{}
|
||||
lstor *stores.Local
|
||||
lstor *paths.Local
|
||||
ret storiface.WorkerReturn
|
||||
|
||||
mockSeal *mock.SectorMgr
|
||||
@ -32,7 +32,7 @@ type testWorker struct {
|
||||
Worker
|
||||
}
|
||||
|
||||
func newTestWorker(wcfg WorkerConfig, lstor *stores.Local, ret storiface.WorkerReturn) *testWorker {
|
||||
func newTestWorker(wcfg WorkerConfig, lstor *paths.Local, ret storiface.WorkerReturn) *testWorker {
|
||||
acceptTasks := map[sealtasks.TaskType]struct{}{}
|
||||
for _, taskType := range wcfg.TaskTypes {
|
||||
acceptTasks[taskType] = struct{}{}
|
||||
|
@ -23,9 +23,9 @@ import (
|
||||
"github.com/filecoin-project/go-statestore"
|
||||
"github.com/filecoin-project/specs-storage/storage"
|
||||
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
ffiwrapper "github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
|
||||
stores "github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
storiface "github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
@ -49,9 +49,9 @@ type ExecutorFunc func() (ffiwrapper.Storage, error)
|
||||
type EnvFunc func(string) (string, bool)
|
||||
|
||||
type LocalWorker struct {
|
||||
storage stores.Store
|
||||
localStore *stores.Local
|
||||
sindex stores.SectorIndex
|
||||
storage paths.Store
|
||||
localStore *paths.Local
|
||||
sindex paths.SectorIndex
|
||||
ret storiface.WorkerReturn
|
||||
executor ExecutorFunc
|
||||
noSwap bool
|
||||
@ -73,7 +73,7 @@ type LocalWorker struct {
|
||||
closing chan struct{}
|
||||
}
|
||||
|
||||
func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, envLookup EnvFunc, store stores.Store, local *stores.Local, sindex stores.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker {
|
||||
func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, envLookup EnvFunc, store paths.Store, local *paths.Local, sindex paths.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker {
|
||||
acceptTasks := map[sealtasks.TaskType]struct{}{}
|
||||
for _, taskType := range wcfg.TaskTypes {
|
||||
acceptTasks[taskType] = struct{}{}
|
||||
@ -135,7 +135,7 @@ func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, envLookup EnvFunc,
|
||||
return w
|
||||
}
|
||||
|
||||
func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker {
|
||||
func NewLocalWorker(wcfg WorkerConfig, store paths.Store, local *paths.Local, sindex paths.SectorIndex, ret storiface.WorkerReturn, cst *statestore.StateStore) *LocalWorker {
|
||||
return newLocalWorker(nil, wcfg, os.LookupEnv, store, local, sindex, ret, cst)
|
||||
}
|
||||
|
||||
|
@ -11,12 +11,12 @@ import (
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-statestore"
|
||||
|
||||
"github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
type hangStore struct {
|
||||
stores.Store
|
||||
paths.Store
|
||||
|
||||
challengeReads chan struct{}
|
||||
unhang chan struct{}
|
||||
|
@ -41,7 +41,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/node/impl"
|
||||
"github.com/filecoin-project/lotus/node/modules"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/stores"
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
@ -198,9 +198,9 @@ func PrepareMiner(t *TestEnvironment) (*LotusMiner, error) {
|
||||
}
|
||||
}
|
||||
|
||||
var localPaths []stores.LocalPath
|
||||
var localPaths []paths.LocalPath
|
||||
|
||||
b, err := json.MarshalIndent(&stores.LocalStorageMeta{
|
||||
b, err := json.MarshalIndent(&paths.LocalStorageMeta{
|
||||
ID: storiface.ID(uuid.New().String()),
|
||||
Weight: 10,
|
||||
CanSeal: true,
|
||||
@ -214,11 +214,11 @@ func PrepareMiner(t *TestEnvironment) (*LotusMiner, error) {
|
||||
return nil, fmt.Errorf("persisting storage metadata (%s): %w", filepath.Join(lr.Path(), "sectorstore.json"), err)
|
||||
}
|
||||
|
||||
localPaths = append(localPaths, stores.LocalPath{
|
||||
localPaths = append(localPaths, paths.LocalPath{
|
||||
Path: lr.Path(),
|
||||
})
|
||||
|
||||
if err := lr.SetStorage(func(sc *stores.StorageConfig) {
|
||||
if err := lr.SetStorage(func(sc *paths.StorageConfig) {
|
||||
sc.StoragePaths = append(sc.StoragePaths, localPaths...)
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
|
Loading…
Reference in New Issue
Block a user