Merge pull request #9032 from filecoin-project/feat/storage-redeclare
feat: sealing: storage redeclare/detach
This commit is contained in:
commit
881e16ec75
@ -937,6 +937,11 @@ workflows:
|
||||
suite: itest-nonce
|
||||
target: "./itests/nonce_test.go"
|
||||
|
||||
- test:
|
||||
name: test-itest-path_detach_redeclare
|
||||
suite: itest-path_detach_redeclare
|
||||
target: "./itests/path_detach_redeclare_test.go"
|
||||
|
||||
- test:
|
||||
name: test-itest-path_type_filters
|
||||
suite: itest-path_type_filters
|
||||
|
@ -155,6 +155,7 @@ type StorageMiner interface {
|
||||
|
||||
// paths.SectorIndex
|
||||
StorageAttach(context.Context, storiface.StorageInfo, fsutil.FsStat) error //perm:admin
|
||||
StorageDetach(ctx context.Context, id storiface.ID, url string) error //perm:admin
|
||||
StorageInfo(context.Context, storiface.ID) (storiface.StorageInfo, error) //perm:admin
|
||||
StorageReportHealth(context.Context, storiface.ID, storiface.HealthReport) error //perm:admin
|
||||
StorageDeclareSector(ctx context.Context, storageID storiface.ID, s abi.SectorID, ft storiface.SectorFileType, primary bool) error //perm:admin
|
||||
@ -182,6 +183,10 @@ type StorageMiner interface {
|
||||
|
||||
StorageAuthVerify(ctx context.Context, token string) ([]auth.Permission, error) //perm:read
|
||||
|
||||
StorageAddLocal(ctx context.Context, path string) error //perm:admin
|
||||
StorageDetachLocal(ctx context.Context, path string) error //perm:admin
|
||||
StorageRedeclareLocal(ctx context.Context, id *storiface.ID, dropMissing bool) error //perm:admin
|
||||
|
||||
MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error //perm:write
|
||||
MarketListDeals(ctx context.Context) ([]*MarketDeal, error) //perm:read
|
||||
MarketListRetrievalDeals(ctx context.Context) ([]retrievalmarket.ProviderDealState, error) //perm:read
|
||||
@ -279,8 +284,6 @@ type StorageMiner interface {
|
||||
DealsConsiderUnverifiedStorageDeals(context.Context) (bool, error) //perm:admin
|
||||
DealsSetConsiderUnverifiedStorageDeals(context.Context, bool) error //perm:admin
|
||||
|
||||
StorageAddLocal(ctx context.Context, path string) error //perm:admin
|
||||
|
||||
PiecesListPieces(ctx context.Context) ([]cid.Cid, error) //perm:read
|
||||
PiecesListCidInfos(ctx context.Context) ([]cid.Cid, error) //perm:read
|
||||
PiecesGetPieceInfo(ctx context.Context, pieceCid cid.Cid) (*piecestore.PieceInfo, error) //perm:read
|
||||
|
@ -59,7 +59,10 @@ type Worker interface {
|
||||
// Storage / Other
|
||||
Remove(ctx context.Context, sector abi.SectorID) error //perm:admin
|
||||
|
||||
StorageLocal(ctx context.Context) (map[storiface.ID]string, error) //perm:admin
|
||||
StorageAddLocal(ctx context.Context, path string) error //perm:admin
|
||||
StorageDetachLocal(ctx context.Context, path string) error //perm:admin
|
||||
StorageRedeclareLocal(ctx context.Context, id *storiface.ID, dropMissing bool) error //perm:admin
|
||||
|
||||
// SetEnabled marks the worker as enabled/disabled. Not that this setting
|
||||
// may take a few seconds to propagate to task scheduler
|
||||
|
@ -272,6 +272,8 @@ func init() {
|
||||
Read: [storiface.FileTypes]uint{2, 3, 0},
|
||||
},
|
||||
})
|
||||
storifaceid := storiface.ID("1399aa04-2625-44b1-bad4-bd07b59b22c4")
|
||||
addExample(&storifaceid)
|
||||
|
||||
// worker specific
|
||||
addExample(storiface.AcquireMove)
|
||||
|
@ -864,6 +864,10 @@ type StorageMinerStruct struct {
|
||||
|
||||
StorageDeclareSector func(p0 context.Context, p1 storiface.ID, p2 abi.SectorID, p3 storiface.SectorFileType, p4 bool) error `perm:"admin"`
|
||||
|
||||
StorageDetach func(p0 context.Context, p1 storiface.ID, p2 string) error `perm:"admin"`
|
||||
|
||||
StorageDetachLocal func(p0 context.Context, p1 string) error `perm:"admin"`
|
||||
|
||||
StorageDropSector func(p0 context.Context, p1 storiface.ID, p2 abi.SectorID, p3 storiface.SectorFileType) error `perm:"admin"`
|
||||
|
||||
StorageFindSector func(p0 context.Context, p1 abi.SectorID, p2 storiface.SectorFileType, p3 abi.SectorSize, p4 bool) ([]storiface.SectorStorageInfo, error) `perm:"admin"`
|
||||
@ -878,6 +882,8 @@ type StorageMinerStruct struct {
|
||||
|
||||
StorageLock func(p0 context.Context, p1 abi.SectorID, p2 storiface.SectorFileType, p3 storiface.SectorFileType) error `perm:"admin"`
|
||||
|
||||
StorageRedeclareLocal func(p0 context.Context, p1 *storiface.ID, p2 bool) error `perm:"admin"`
|
||||
|
||||
StorageReportHealth func(p0 context.Context, p1 storiface.ID, p2 storiface.HealthReport) error `perm:"admin"`
|
||||
|
||||
StorageStat func(p0 context.Context, p1 storiface.ID) (fsutil.FsStat, error) `perm:"admin"`
|
||||
@ -973,6 +979,12 @@ type WorkerStruct struct {
|
||||
|
||||
StorageAddLocal func(p0 context.Context, p1 string) error `perm:"admin"`
|
||||
|
||||
StorageDetachLocal func(p0 context.Context, p1 string) error `perm:"admin"`
|
||||
|
||||
StorageLocal func(p0 context.Context) (map[storiface.ID]string, error) `perm:"admin"`
|
||||
|
||||
StorageRedeclareLocal func(p0 context.Context, p1 *storiface.ID, p2 bool) error `perm:"admin"`
|
||||
|
||||
TaskDisable func(p0 context.Context, p1 sealtasks.TaskType) error `perm:"admin"`
|
||||
|
||||
TaskEnable func(p0 context.Context, p1 sealtasks.TaskType) error `perm:"admin"`
|
||||
@ -5104,6 +5116,28 @@ func (s *StorageMinerStub) StorageDeclareSector(p0 context.Context, p1 storiface
|
||||
return ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *StorageMinerStruct) StorageDetach(p0 context.Context, p1 storiface.ID, p2 string) error {
|
||||
if s.Internal.StorageDetach == nil {
|
||||
return ErrNotSupported
|
||||
}
|
||||
return s.Internal.StorageDetach(p0, p1, p2)
|
||||
}
|
||||
|
||||
func (s *StorageMinerStub) StorageDetach(p0 context.Context, p1 storiface.ID, p2 string) error {
|
||||
return ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *StorageMinerStruct) StorageDetachLocal(p0 context.Context, p1 string) error {
|
||||
if s.Internal.StorageDetachLocal == nil {
|
||||
return ErrNotSupported
|
||||
}
|
||||
return s.Internal.StorageDetachLocal(p0, p1)
|
||||
}
|
||||
|
||||
func (s *StorageMinerStub) StorageDetachLocal(p0 context.Context, p1 string) error {
|
||||
return ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *StorageMinerStruct) StorageDropSector(p0 context.Context, p1 storiface.ID, p2 abi.SectorID, p3 storiface.SectorFileType) error {
|
||||
if s.Internal.StorageDropSector == nil {
|
||||
return ErrNotSupported
|
||||
@ -5181,6 +5215,17 @@ func (s *StorageMinerStub) StorageLock(p0 context.Context, p1 abi.SectorID, p2 s
|
||||
return ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *StorageMinerStruct) StorageRedeclareLocal(p0 context.Context, p1 *storiface.ID, p2 bool) error {
|
||||
if s.Internal.StorageRedeclareLocal == nil {
|
||||
return ErrNotSupported
|
||||
}
|
||||
return s.Internal.StorageRedeclareLocal(p0, p1, p2)
|
||||
}
|
||||
|
||||
func (s *StorageMinerStub) StorageRedeclareLocal(p0 context.Context, p1 *storiface.ID, p2 bool) error {
|
||||
return ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *StorageMinerStruct) StorageReportHealth(p0 context.Context, p1 storiface.ID, p2 storiface.HealthReport) error {
|
||||
if s.Internal.StorageReportHealth == nil {
|
||||
return ErrNotSupported
|
||||
@ -5610,6 +5655,39 @@ func (s *WorkerStub) StorageAddLocal(p0 context.Context, p1 string) error {
|
||||
return ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *WorkerStruct) StorageDetachLocal(p0 context.Context, p1 string) error {
|
||||
if s.Internal.StorageDetachLocal == nil {
|
||||
return ErrNotSupported
|
||||
}
|
||||
return s.Internal.StorageDetachLocal(p0, p1)
|
||||
}
|
||||
|
||||
func (s *WorkerStub) StorageDetachLocal(p0 context.Context, p1 string) error {
|
||||
return ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *WorkerStruct) StorageLocal(p0 context.Context) (map[storiface.ID]string, error) {
|
||||
if s.Internal.StorageLocal == nil {
|
||||
return *new(map[storiface.ID]string), ErrNotSupported
|
||||
}
|
||||
return s.Internal.StorageLocal(p0)
|
||||
}
|
||||
|
||||
func (s *WorkerStub) StorageLocal(p0 context.Context) (map[storiface.ID]string, error) {
|
||||
return *new(map[storiface.ID]string), ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *WorkerStruct) StorageRedeclareLocal(p0 context.Context, p1 *storiface.ID, p2 bool) error {
|
||||
if s.Internal.StorageRedeclareLocal == nil {
|
||||
return ErrNotSupported
|
||||
}
|
||||
return s.Internal.StorageRedeclareLocal(p0, p1, p2)
|
||||
}
|
||||
|
||||
func (s *WorkerStub) StorageRedeclareLocal(p0 context.Context, p1 *storiface.ID, p2 bool) error {
|
||||
return ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *WorkerStruct) TaskDisable(p0 context.Context, p1 sealtasks.TaskType) error {
|
||||
if s.Internal.TaskDisable == nil {
|
||||
return ErrNotSupported
|
||||
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -46,6 +46,8 @@ long term for proving (references as 'store') as well as how sectors will be
|
||||
stored while moving through the sealing pipeline (references as 'seal').`,
|
||||
Subcommands: []*cli.Command{
|
||||
storageAttachCmd,
|
||||
storageDetachCmd,
|
||||
storageRedeclareCmd,
|
||||
storageListCmd,
|
||||
storageFindCmd,
|
||||
storageCleanupCmd,
|
||||
@ -174,6 +176,82 @@ over time
|
||||
},
|
||||
}
|
||||
|
||||
var storageDetachCmd = &cli.Command{
|
||||
Name: "detach",
|
||||
Usage: "detach local storage path",
|
||||
Flags: []cli.Flag{
|
||||
&cli.BoolFlag{
|
||||
Name: "really-do-it",
|
||||
},
|
||||
},
|
||||
ArgsUsage: "[path]",
|
||||
Action: func(cctx *cli.Context) error {
|
||||
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer closer()
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
|
||||
if !cctx.Args().Present() {
|
||||
return xerrors.Errorf("must specify storage path")
|
||||
}
|
||||
|
||||
p, err := homedir.Expand(cctx.Args().First())
|
||||
if err != nil {
|
||||
return xerrors.Errorf("expanding path: %w", err)
|
||||
}
|
||||
|
||||
if !cctx.Bool("really-do-it") {
|
||||
return xerrors.Errorf("pass --really-do-it to execute the action")
|
||||
}
|
||||
|
||||
return nodeApi.StorageDetachLocal(ctx, p)
|
||||
},
|
||||
}
|
||||
|
||||
var storageRedeclareCmd = &cli.Command{
|
||||
Name: "redeclare",
|
||||
Usage: "redeclare sectors in a local storage path",
|
||||
Flags: []cli.Flag{
|
||||
&cli.StringFlag{
|
||||
Name: "id",
|
||||
Usage: "storage path ID",
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "all",
|
||||
Usage: "redeclare all storage paths",
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "drop-missing",
|
||||
Usage: "Drop index entries with missing files",
|
||||
},
|
||||
},
|
||||
Action: func(cctx *cli.Context) error {
|
||||
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer closer()
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
|
||||
if cctx.IsSet("id") && cctx.Bool("all") {
|
||||
return xerrors.Errorf("--id and --all can't be passed at the same time")
|
||||
}
|
||||
|
||||
if cctx.IsSet("id") {
|
||||
id := storiface.ID(cctx.String("id"))
|
||||
return nodeApi.StorageRedeclareLocal(ctx, &id, cctx.Bool("drop-missing"))
|
||||
}
|
||||
|
||||
if cctx.Bool("all") {
|
||||
return nodeApi.StorageRedeclareLocal(ctx, nil, cctx.Bool("drop-missing"))
|
||||
}
|
||||
|
||||
return xerrors.Errorf("either --all or --id must be specified")
|
||||
},
|
||||
}
|
||||
|
||||
var storageListCmd = &cli.Command{
|
||||
Name: "list",
|
||||
Usage: "list local storage paths",
|
||||
|
@ -619,7 +619,7 @@ var runCmd = &cli.Command{
|
||||
if redeclareStorage {
|
||||
log.Info("Redeclaring local storage")
|
||||
|
||||
if err := localStore.Redeclare(ctx); err != nil {
|
||||
if err := localStore.Redeclare(ctx, nil, false); err != nil {
|
||||
log.Errorf("Redeclaring local storage failed: %+v", err)
|
||||
|
||||
select {
|
||||
|
@ -65,6 +65,20 @@ func (w *Worker) Version(context.Context) (api.Version, error) {
|
||||
return api.WorkerAPIVersion0, nil
|
||||
}
|
||||
|
||||
func (w *Worker) StorageLocal(ctx context.Context) (map[storiface.ID]string, error) {
|
||||
l, err := w.LocalStore.Local(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
out := map[storiface.ID]string{}
|
||||
for _, st := range l {
|
||||
out[st.ID] = st.LocalPath
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (w *Worker) StorageAddLocal(ctx context.Context, path string) error {
|
||||
path, err := homedir.Expand(path)
|
||||
if err != nil {
|
||||
@ -84,6 +98,58 @@ func (w *Worker) StorageAddLocal(ctx context.Context, path string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Worker) StorageDetachLocal(ctx context.Context, path string) error {
|
||||
path, err := homedir.Expand(path)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("expanding local path: %w", err)
|
||||
}
|
||||
|
||||
// check that we have the path opened
|
||||
lps, err := w.LocalStore.Local(ctx)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting local path list: %w", err)
|
||||
}
|
||||
|
||||
var localPath *storiface.StoragePath
|
||||
for _, lp := range lps {
|
||||
if lp.LocalPath == path {
|
||||
lp := lp // copy to make the linter happy
|
||||
localPath = &lp
|
||||
break
|
||||
}
|
||||
}
|
||||
if localPath == nil {
|
||||
return xerrors.Errorf("no local paths match '%s'", path)
|
||||
}
|
||||
|
||||
// drop from the persisted storage.json
|
||||
var found bool
|
||||
if err := w.Storage.SetStorage(func(sc *paths.StorageConfig) {
|
||||
out := make([]paths.LocalPath, 0, len(sc.StoragePaths))
|
||||
for _, storagePath := range sc.StoragePaths {
|
||||
if storagePath.Path != path {
|
||||
out = append(out, storagePath)
|
||||
continue
|
||||
}
|
||||
found = true
|
||||
}
|
||||
sc.StoragePaths = out
|
||||
}); err != nil {
|
||||
return xerrors.Errorf("set storage config: %w", err)
|
||||
}
|
||||
if !found {
|
||||
// maybe this is fine?
|
||||
return xerrors.Errorf("path not found in storage.json")
|
||||
}
|
||||
|
||||
// unregister locally, drop from sector index
|
||||
return w.LocalStore.ClosePath(ctx, localPath.ID)
|
||||
}
|
||||
|
||||
func (w *Worker) StorageRedeclareLocal(ctx context.Context, id *storiface.ID, dropMissing bool) error {
|
||||
return w.LocalStore.Redeclare(ctx, id, dropMissing)
|
||||
}
|
||||
|
||||
func (w *Worker) SetEnabled(ctx context.Context, enabled bool) error {
|
||||
disabled := int64(1)
|
||||
if enabled {
|
||||
@ -123,3 +189,4 @@ func (w *Worker) Shutdown(ctx context.Context) error {
|
||||
}
|
||||
|
||||
var _ storiface.WorkerCalls = &Worker{}
|
||||
var _ api.Worker = &Worker{}
|
||||
|
@ -24,6 +24,8 @@ var storageCmd = &cli.Command{
|
||||
Usage: "manage sector storage",
|
||||
Subcommands: []*cli.Command{
|
||||
storageAttachCmd,
|
||||
storageDetachCmd,
|
||||
storageRedeclareCmd,
|
||||
},
|
||||
}
|
||||
|
||||
@ -128,3 +130,79 @@ var storageAttachCmd = &cli.Command{
|
||||
return nodeApi.StorageAddLocal(ctx, p)
|
||||
},
|
||||
}
|
||||
|
||||
var storageDetachCmd = &cli.Command{
|
||||
Name: "detach",
|
||||
Usage: "detach local storage path",
|
||||
Flags: []cli.Flag{
|
||||
&cli.BoolFlag{
|
||||
Name: "really-do-it",
|
||||
},
|
||||
},
|
||||
ArgsUsage: "[path]",
|
||||
Action: func(cctx *cli.Context) error {
|
||||
nodeApi, closer, err := lcli.GetWorkerAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer closer()
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
|
||||
if !cctx.Args().Present() {
|
||||
return xerrors.Errorf("must specify storage path")
|
||||
}
|
||||
|
||||
p, err := homedir.Expand(cctx.Args().First())
|
||||
if err != nil {
|
||||
return xerrors.Errorf("expanding path: %w", err)
|
||||
}
|
||||
|
||||
if !cctx.Bool("really-do-it") {
|
||||
return xerrors.Errorf("pass --really-do-it to execute the action")
|
||||
}
|
||||
|
||||
return nodeApi.StorageDetachLocal(ctx, p)
|
||||
},
|
||||
}
|
||||
|
||||
var storageRedeclareCmd = &cli.Command{
|
||||
Name: "redeclare",
|
||||
Usage: "redeclare sectors in a local storage path",
|
||||
Flags: []cli.Flag{
|
||||
&cli.StringFlag{
|
||||
Name: "id",
|
||||
Usage: "storage path ID",
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "all",
|
||||
Usage: "redeclare all storage paths",
|
||||
},
|
||||
&cli.BoolFlag{
|
||||
Name: "drop-missing",
|
||||
Usage: "Drop index entries with missing files",
|
||||
},
|
||||
},
|
||||
Action: func(cctx *cli.Context) error {
|
||||
nodeApi, closer, err := lcli.GetWorkerAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer closer()
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
|
||||
if cctx.IsSet("id") && cctx.Bool("all") {
|
||||
return xerrors.Errorf("--id and --all can't be passed at the same time")
|
||||
}
|
||||
|
||||
if cctx.IsSet("id") {
|
||||
id := storiface.ID(cctx.String("id"))
|
||||
return nodeApi.StorageRedeclareLocal(ctx, &id, cctx.Bool("drop-missing"))
|
||||
}
|
||||
|
||||
if cctx.Bool("all") {
|
||||
return nodeApi.StorageRedeclareLocal(ctx, nil, cctx.Bool("drop-missing"))
|
||||
}
|
||||
|
||||
return xerrors.Errorf("either --all or --id must be specified")
|
||||
},
|
||||
}
|
||||
|
@ -162,6 +162,8 @@
|
||||
* [StorageAuthVerify](#StorageAuthVerify)
|
||||
* [StorageBestAlloc](#StorageBestAlloc)
|
||||
* [StorageDeclareSector](#StorageDeclareSector)
|
||||
* [StorageDetach](#StorageDetach)
|
||||
* [StorageDetachLocal](#StorageDetachLocal)
|
||||
* [StorageDropSector](#StorageDropSector)
|
||||
* [StorageFindSector](#StorageFindSector)
|
||||
* [StorageGetLocks](#StorageGetLocks)
|
||||
@ -169,6 +171,7 @@
|
||||
* [StorageList](#StorageList)
|
||||
* [StorageLocal](#StorageLocal)
|
||||
* [StorageLock](#StorageLock)
|
||||
* [StorageRedeclareLocal](#StorageRedeclareLocal)
|
||||
* [StorageReportHealth](#StorageReportHealth)
|
||||
* [StorageStat](#StorageStat)
|
||||
* [StorageTryLock](#StorageTryLock)
|
||||
@ -3437,6 +3440,35 @@ Inputs:
|
||||
|
||||
Response: `{}`
|
||||
|
||||
### StorageDetach
|
||||
|
||||
|
||||
Perms: admin
|
||||
|
||||
Inputs:
|
||||
```json
|
||||
[
|
||||
"76f1988b-ef30-4d7e-b3ec-9a627f4ba5a8",
|
||||
"string value"
|
||||
]
|
||||
```
|
||||
|
||||
Response: `{}`
|
||||
|
||||
### StorageDetachLocal
|
||||
|
||||
|
||||
Perms: admin
|
||||
|
||||
Inputs:
|
||||
```json
|
||||
[
|
||||
"string value"
|
||||
]
|
||||
```
|
||||
|
||||
Response: `{}`
|
||||
|
||||
### StorageDropSector
|
||||
|
||||
|
||||
@ -3633,6 +3665,21 @@ Inputs:
|
||||
|
||||
Response: `{}`
|
||||
|
||||
### StorageRedeclareLocal
|
||||
|
||||
|
||||
Perms: admin
|
||||
|
||||
Inputs:
|
||||
```json
|
||||
[
|
||||
"1399aa04-2625-44b1-bad4-bd07b59b22c4",
|
||||
true
|
||||
]
|
||||
```
|
||||
|
||||
Response: `{}`
|
||||
|
||||
### StorageReportHealth
|
||||
|
||||
|
||||
|
@ -39,6 +39,9 @@
|
||||
* [SetEnabled](#SetEnabled)
|
||||
* [Storage](#Storage)
|
||||
* [StorageAddLocal](#StorageAddLocal)
|
||||
* [StorageDetachLocal](#StorageDetachLocal)
|
||||
* [StorageLocal](#StorageLocal)
|
||||
* [StorageRedeclareLocal](#StorageRedeclareLocal)
|
||||
* [Task](#Task)
|
||||
* [TaskDisable](#TaskDisable)
|
||||
* [TaskEnable](#TaskEnable)
|
||||
@ -2118,6 +2121,49 @@ Inputs:
|
||||
|
||||
Response: `{}`
|
||||
|
||||
### StorageDetachLocal
|
||||
|
||||
|
||||
Perms: admin
|
||||
|
||||
Inputs:
|
||||
```json
|
||||
[
|
||||
"string value"
|
||||
]
|
||||
```
|
||||
|
||||
Response: `{}`
|
||||
|
||||
### StorageLocal
|
||||
|
||||
|
||||
Perms: admin
|
||||
|
||||
Inputs: `null`
|
||||
|
||||
Response:
|
||||
```json
|
||||
{
|
||||
"76f1988b-ef30-4d7e-b3ec-9a627f4ba5a8": "/data/path"
|
||||
}
|
||||
```
|
||||
|
||||
### StorageRedeclareLocal
|
||||
|
||||
|
||||
Perms: admin
|
||||
|
||||
Inputs:
|
||||
```json
|
||||
[
|
||||
"1399aa04-2625-44b1-bad4-bd07b59b22c4",
|
||||
true
|
||||
]
|
||||
```
|
||||
|
||||
Response: `{}`
|
||||
|
||||
## Task
|
||||
|
||||
|
||||
|
@ -2152,6 +2152,8 @@ DESCRIPTION:
|
||||
|
||||
COMMANDS:
|
||||
attach attach local storage path
|
||||
detach detach local storage path
|
||||
redeclare redeclare sectors in a local storage path
|
||||
list list local storage paths
|
||||
find find sector in the storage system
|
||||
cleanup trigger cleanup actions
|
||||
@ -2202,6 +2204,34 @@ OPTIONS:
|
||||
|
||||
```
|
||||
|
||||
### lotus-miner storage detach
|
||||
```
|
||||
NAME:
|
||||
lotus-miner storage detach - detach local storage path
|
||||
|
||||
USAGE:
|
||||
lotus-miner storage detach [command options] [path]
|
||||
|
||||
OPTIONS:
|
||||
--really-do-it (default: false)
|
||||
|
||||
```
|
||||
|
||||
### lotus-miner storage redeclare
|
||||
```
|
||||
NAME:
|
||||
lotus-miner storage redeclare - redeclare sectors in a local storage path
|
||||
|
||||
USAGE:
|
||||
lotus-miner storage redeclare [command options] [arguments...]
|
||||
|
||||
OPTIONS:
|
||||
--all redeclare all storage paths (default: false)
|
||||
--drop-missing Drop index entries with missing files (default: false)
|
||||
--id value storage path ID
|
||||
|
||||
```
|
||||
|
||||
### lotus-miner storage list
|
||||
```
|
||||
NAME:
|
||||
|
@ -96,6 +96,8 @@ USAGE:
|
||||
|
||||
COMMANDS:
|
||||
attach attach local storage path
|
||||
detach detach local storage path
|
||||
redeclare redeclare sectors in a local storage path
|
||||
help, h Shows a list of commands or help for one command
|
||||
|
||||
OPTIONS:
|
||||
@ -122,6 +124,34 @@ OPTIONS:
|
||||
|
||||
```
|
||||
|
||||
### lotus-worker storage detach
|
||||
```
|
||||
NAME:
|
||||
lotus-worker storage detach - detach local storage path
|
||||
|
||||
USAGE:
|
||||
lotus-worker storage detach [command options] [path]
|
||||
|
||||
OPTIONS:
|
||||
--really-do-it (default: false)
|
||||
|
||||
```
|
||||
|
||||
### lotus-worker storage redeclare
|
||||
```
|
||||
NAME:
|
||||
lotus-worker storage redeclare - redeclare sectors in a local storage path
|
||||
|
||||
USAGE:
|
||||
lotus-worker storage redeclare [command options] [arguments...]
|
||||
|
||||
OPTIONS:
|
||||
--all redeclare all storage paths (default: false)
|
||||
--drop-missing Drop index entries with missing files (default: false)
|
||||
--id value storage path ID
|
||||
|
||||
```
|
||||
|
||||
## lotus-worker set
|
||||
```
|
||||
NAME:
|
||||
|
@ -74,7 +74,7 @@ func TestBatchDealInput(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
checkNoPadding := func() {
|
||||
sl, err := miner.SectorsList(ctx)
|
||||
sl, err := miner.SectorsListNonGenesis(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
sort.Slice(sl, func(i, j int) bool {
|
||||
@ -125,7 +125,7 @@ func TestBatchDealInput(t *testing.T) {
|
||||
|
||||
checkNoPadding()
|
||||
|
||||
sl, err := miner.SectorsList(ctx)
|
||||
sl, err := miner.SectorsListNonGenesis(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(sl), expectSectors)
|
||||
}
|
||||
|
@ -60,11 +60,11 @@ func runTestCCUpgrade(t *testing.T) *kit.TestFullNode {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1)
|
||||
CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner)
|
||||
fmt.Printf("CCUpgrade: %d\n", CCUpgrade)
|
||||
|
||||
miner.PledgeSectors(ctx, 1, 0, nil)
|
||||
sl, err := miner.SectorsList(ctx)
|
||||
sl, err := miner.SectorsListNonGenesis(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sl, 1, "expected 1 sector")
|
||||
require.Equal(t, CCUpgrade, sl[0], "unexpected sector number")
|
||||
@ -79,7 +79,7 @@ func runTestCCUpgrade(t *testing.T) *kit.TestFullNode {
|
||||
err = miner.SectorMarkForUpgrade(ctx, sl[0], true)
|
||||
require.NoError(t, err)
|
||||
|
||||
sl, err = miner.SectorsList(ctx)
|
||||
sl, err = miner.SectorsListNonGenesis(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sl, 1, "expected 1 sector")
|
||||
|
||||
|
@ -308,7 +308,7 @@ func TestDeadlineToggling(t *testing.T) {
|
||||
// terminate sectors on minerD
|
||||
{
|
||||
var terminationDeclarationParams []miner2.TerminationDeclaration
|
||||
secs, err := minerD.SectorsList(ctx)
|
||||
secs, err := minerD.SectorsListNonGenesis(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, secs, sectorsD)
|
||||
|
||||
|
@ -70,7 +70,7 @@ func TestQuotePriceForUnsealedRetrieval(t *testing.T) {
|
||||
//stm: @STORAGE_LIST_001, @MINER_SECTOR_LIST_001
|
||||
ss, err := miner.StorageList(context.Background())
|
||||
require.NoError(t, err)
|
||||
_, err = miner.SectorsList(ctx)
|
||||
_, err = miner.SectorsListNonGenesis(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
//stm: @STORAGE_DROP_SECTOR_001, @STORAGE_LIST_001
|
||||
@ -95,7 +95,7 @@ iLoop:
|
||||
// remove the other unsealed file as well
|
||||
ss, err = miner.StorageList(context.Background())
|
||||
require.NoError(t, err)
|
||||
_, err = miner.SectorsList(ctx)
|
||||
_, err = miner.SectorsListNonGenesis(ctx)
|
||||
require.NoError(t, err)
|
||||
for storeID, sd := range ss {
|
||||
for _, sector := range sd {
|
||||
|
@ -292,7 +292,7 @@ func (dh *DealHarness) WaitDealPublished(ctx context.Context, deal *cid.Cid) {
|
||||
}
|
||||
|
||||
func (dh *DealHarness) StartSealingWaiting(ctx context.Context) {
|
||||
snums, err := dh.main.SectorsList(ctx)
|
||||
snums, err := dh.main.SectorsListNonGenesis(ctx)
|
||||
require.NoError(dh.t, err)
|
||||
for _, snum := range snums {
|
||||
si, err := dh.main.SectorsStatus(ctx, snum, false)
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
@ -20,13 +21,13 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
cborutil "github.com/filecoin-project/go-cbor-util"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-state-types/big"
|
||||
"github.com/filecoin-project/go-state-types/builtin"
|
||||
"github.com/filecoin-project/go-state-types/exitcode"
|
||||
"github.com/filecoin-project/go-state-types/network"
|
||||
"github.com/filecoin-project/go-statestore"
|
||||
"github.com/filecoin-project/go-storedcounter"
|
||||
miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/miner"
|
||||
power3 "github.com/filecoin-project/specs-actors/v3/actors/builtin/power"
|
||||
|
||||
@ -56,6 +57,7 @@ import (
|
||||
testing2 "github.com/filecoin-project/lotus/node/modules/testing"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
pipeline "github.com/filecoin-project/lotus/storage/pipeline"
|
||||
sectorstorage "github.com/filecoin-project/lotus/storage/sealer"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/mock"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
@ -233,9 +235,12 @@ func (n *Ensemble) Miner(minerNode *TestMiner, full *TestFullNode, opts ...NodeO
|
||||
}
|
||||
|
||||
ownerKey := options.ownerKey
|
||||
var presealSectors int
|
||||
|
||||
if !n.bootstrapped {
|
||||
presealSectors = options.sectors
|
||||
|
||||
var (
|
||||
sectors = options.sectors
|
||||
k *types.KeyInfo
|
||||
genm *genesis.Miner
|
||||
)
|
||||
@ -246,9 +251,9 @@ func (n *Ensemble) Miner(minerNode *TestMiner, full *TestFullNode, opts ...NodeO
|
||||
|
||||
// Create the preseal commitment.
|
||||
if n.options.mockProofs {
|
||||
genm, k, err = mock.PreSeal(proofType, actorAddr, sectors)
|
||||
genm, k, err = mock.PreSeal(proofType, actorAddr, presealSectors)
|
||||
} else {
|
||||
genm, k, err = seed.PreSeal(actorAddr, proofType, 0, sectors, tdir, []byte("make genesis mem random"), nil, true)
|
||||
genm, k, err = seed.PreSeal(actorAddr, proofType, 0, presealSectors, tdir, []byte("make genesis mem random"), nil, true)
|
||||
}
|
||||
require.NoError(n.t, err)
|
||||
|
||||
@ -279,6 +284,7 @@ func (n *Ensemble) Miner(minerNode *TestMiner, full *TestFullNode, opts ...NodeO
|
||||
OwnerKey: ownerKey,
|
||||
FullNode: full,
|
||||
PresealDir: tdir,
|
||||
PresealSectors: presealSectors,
|
||||
options: options,
|
||||
RemoteListener: rl,
|
||||
}
|
||||
@ -535,13 +541,10 @@ func (n *Ensemble) Start() *Ensemble {
|
||||
err = ds.Put(ctx, datastore.NewKey("miner-address"), m.ActorAddr.Bytes())
|
||||
require.NoError(n.t, err)
|
||||
|
||||
nic := storedcounter.New(ds, datastore.NewKey(modules.StorageCounterDSPrefix))
|
||||
for i := 0; i < m.options.sectors; i++ {
|
||||
_, err := nic.Next()
|
||||
require.NoError(n.t, err)
|
||||
if i < len(n.genesis.miners) && !n.bootstrapped {
|
||||
// if this is a genesis miner, import preseal metadata
|
||||
require.NoError(n.t, importPreSealMeta(ctx, n.genesis.miners[i], ds))
|
||||
}
|
||||
_, err = nic.Next()
|
||||
require.NoError(n.t, err)
|
||||
|
||||
// using real proofs, therefore need real sectors.
|
||||
if !n.bootstrapped && !n.options.mockProofs {
|
||||
@ -913,3 +916,46 @@ func (n *Ensemble) generateGenesis() *genesis.Template {
|
||||
|
||||
return templ
|
||||
}
|
||||
|
||||
func importPreSealMeta(ctx context.Context, meta genesis.Miner, mds dtypes.MetadataDS) error {
|
||||
maxSectorID := abi.SectorNumber(0)
|
||||
for _, sector := range meta.Sectors {
|
||||
sectorKey := datastore.NewKey(pipeline.SectorStorePrefix).ChildString(fmt.Sprint(sector.SectorID))
|
||||
|
||||
commD := sector.CommD
|
||||
commR := sector.CommR
|
||||
|
||||
info := &pipeline.SectorInfo{
|
||||
State: pipeline.Proving,
|
||||
SectorNumber: sector.SectorID,
|
||||
Pieces: []pipeline.Piece{
|
||||
{
|
||||
Piece: abi.PieceInfo{
|
||||
Size: abi.PaddedPieceSize(meta.SectorSize),
|
||||
PieceCID: commD,
|
||||
},
|
||||
DealInfo: nil, // todo: likely possible to get, but not really that useful
|
||||
},
|
||||
},
|
||||
CommD: &commD,
|
||||
CommR: &commR,
|
||||
}
|
||||
|
||||
b, err := cborutil.Dump(info)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := mds.Put(ctx, sectorKey, b); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if sector.SectorID > maxSectorID {
|
||||
maxSectorID = sector.SectorID
|
||||
}
|
||||
}
|
||||
|
||||
buf := make([]byte, binary.MaxVarintLen64)
|
||||
size := binary.PutUvarint(buf, uint64(maxSectorID))
|
||||
return mds.Put(ctx, datastore.NewKey(modules.StorageCounterDSPrefix), buf[:size])
|
||||
}
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
@ -78,6 +79,7 @@ type TestMiner struct {
|
||||
|
||||
FullNode *TestFullNode
|
||||
PresealDir string
|
||||
PresealSectors int
|
||||
|
||||
Libp2p struct {
|
||||
PeerID peer.ID
|
||||
@ -128,9 +130,9 @@ func (tm *TestMiner) StartPledge(ctx context.Context, n, existing int, blockNoti
|
||||
}
|
||||
|
||||
for {
|
||||
s, err := tm.StorageMiner.SectorsList(ctx) // Note - the test builder doesn't import genesis sectors into FSM
|
||||
s, err := tm.SectorsListNonGenesis(ctx)
|
||||
require.NoError(tm.t, err)
|
||||
fmt.Printf("Sectors: %d\n", len(s))
|
||||
fmt.Printf("Sectors: %d (n %d, ex %d)\n", len(s), n, existing)
|
||||
if len(s) >= n+existing {
|
||||
break
|
||||
}
|
||||
@ -140,7 +142,7 @@ func (tm *TestMiner) StartPledge(ctx context.Context, n, existing int, blockNoti
|
||||
|
||||
fmt.Printf("All sectors is fsm\n")
|
||||
|
||||
s, err := tm.StorageMiner.SectorsList(ctx)
|
||||
s, err := tm.SectorsListNonGenesis(ctx)
|
||||
require.NoError(tm.t, err)
|
||||
|
||||
toCheck := map[abi.SectorNumber]struct{}{}
|
||||
@ -205,3 +207,15 @@ func (tm *TestMiner) AddStorage(ctx context.Context, t *testing.T, conf func(*pa
|
||||
|
||||
return cfg.ID
|
||||
}
|
||||
func (tm *TestMiner) SectorsListNonGenesis(ctx context.Context) ([]abi.SectorNumber, error) {
|
||||
l, err := tm.SectorsList(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// sort just in case
|
||||
sort.Slice(l, func(i, j int) bool {
|
||||
return l[i] < l[j]
|
||||
})
|
||||
|
||||
return l[tm.PresealSectors:], nil
|
||||
}
|
||||
|
@ -2,13 +2,21 @@ package kit
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
// TestWorker represents a worker enrolled in an Ensemble.
|
||||
@ -29,3 +37,42 @@ type TestWorker struct {
|
||||
|
||||
options nodeOpts
|
||||
}
|
||||
|
||||
func (tm *TestWorker) AddStorage(ctx context.Context, t *testing.T, conf func(*paths.LocalStorageMeta)) storiface.ID {
|
||||
p := t.TempDir()
|
||||
|
||||
if err := os.MkdirAll(p, 0755); err != nil {
|
||||
if !os.IsExist(err) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
_, err := os.Stat(filepath.Join(p, metaFile))
|
||||
if !os.IsNotExist(err) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
cfg := &paths.LocalStorageMeta{
|
||||
ID: storiface.ID(uuid.New().String()),
|
||||
Weight: 10,
|
||||
CanSeal: false,
|
||||
CanStore: false,
|
||||
}
|
||||
|
||||
conf(cfg)
|
||||
|
||||
if !(cfg.CanStore || cfg.CanSeal) {
|
||||
t.Fatal("must specify at least one of CanStore or cfg.CanSeal")
|
||||
}
|
||||
|
||||
b, err := json.MarshalIndent(cfg, "", " ")
|
||||
require.NoError(t, err)
|
||||
|
||||
err = ioutil.WriteFile(filepath.Join(p, metaFile), b, 0644)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = tm.StorageAddLocal(ctx, p)
|
||||
require.NoError(t, err)
|
||||
|
||||
return cfg.ID
|
||||
}
|
||||
|
413
itests/path_detach_redeclare_test.go
Normal file
413
itests/path_detach_redeclare_test.go
Normal file
@ -0,0 +1,413 @@
|
||||
package itests
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/itests/kit"
|
||||
"github.com/filecoin-project/lotus/storage/paths"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
func TestPathDetachRedeclare(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
_ = logging.SetLogLevel("storageminer", "INFO")
|
||||
|
||||
var (
|
||||
client kit.TestFullNode
|
||||
miner kit.TestMiner
|
||||
wiw, wdw kit.TestWorker
|
||||
)
|
||||
ens := kit.NewEnsemble(t, kit.LatestActorsAt(-1)).
|
||||
FullNode(&client, kit.ThroughRPC()).
|
||||
Miner(&miner, &client, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.PresealSectors(2), kit.NoStorage()).
|
||||
Worker(&miner, &wiw, kit.ThroughRPC(), kit.NoStorage(), kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWinningPoSt})).
|
||||
Worker(&miner, &wdw, kit.ThroughRPC(), kit.NoStorage(), kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWindowPoSt})).
|
||||
Start()
|
||||
|
||||
ens.InterconnectAll()
|
||||
|
||||
// check there's only one path
|
||||
sps, err := miner.StorageList(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sps, 1)
|
||||
|
||||
var id storiface.ID
|
||||
for s := range sps {
|
||||
id = s
|
||||
}
|
||||
|
||||
local, err := miner.StorageLocal(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, local, 1)
|
||||
require.Greater(t, len(local[id]), 1)
|
||||
|
||||
oldLocal := local[id]
|
||||
|
||||
// check sectors
|
||||
checkSectors(ctx, t, client, miner, 2, 0)
|
||||
|
||||
// detach preseal path
|
||||
require.NoError(t, miner.StorageDetachLocal(ctx, local[id]))
|
||||
|
||||
// check that there are no paths, post checks fail
|
||||
sps, err = miner.StorageList(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sps, 0)
|
||||
local, err = miner.StorageLocal(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, local, 0)
|
||||
|
||||
checkSectors(ctx, t, client, miner, 2, 2)
|
||||
|
||||
// attach a new path
|
||||
newId := miner.AddStorage(ctx, t, func(cfg *paths.LocalStorageMeta) {
|
||||
cfg.CanStore = true
|
||||
})
|
||||
|
||||
sps, err = miner.StorageList(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sps, 1)
|
||||
local, err = miner.StorageLocal(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, local, 1)
|
||||
require.Greater(t, len(local[newId]), 1)
|
||||
|
||||
newLocal := local[newId]
|
||||
|
||||
// move sector data to the new path
|
||||
|
||||
// note: dest path already exist so we only want to .Join src
|
||||
require.NoError(t, exec.Command("cp", "--recursive", filepath.Join(oldLocal, "sealed"), newLocal).Run())
|
||||
require.NoError(t, exec.Command("cp", "--recursive", filepath.Join(oldLocal, "cache"), newLocal).Run())
|
||||
require.NoError(t, exec.Command("cp", "--recursive", filepath.Join(oldLocal, "unsealed"), newLocal).Run())
|
||||
|
||||
// check that sector files aren't indexed, post checks fail
|
||||
sps, err = miner.StorageList(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sps, 1)
|
||||
require.Len(t, sps[newId], 0)
|
||||
|
||||
// redeclare sectors
|
||||
require.NoError(t, miner.StorageRedeclareLocal(ctx, nil, false))
|
||||
|
||||
// check that sector files exist, post checks work
|
||||
sps, err = miner.StorageList(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sps, 1)
|
||||
require.Len(t, sps[newId], 2)
|
||||
|
||||
checkSectors(ctx, t, client, miner, 2, 0)
|
||||
|
||||
// remove one sector, one post check fails
|
||||
require.NoError(t, os.RemoveAll(filepath.Join(newLocal, "sealed", "s-t01000-0")))
|
||||
require.NoError(t, os.RemoveAll(filepath.Join(newLocal, "cache", "s-t01000-0")))
|
||||
checkSectors(ctx, t, client, miner, 2, 1)
|
||||
|
||||
// redeclare with no drop, still see sector in the index
|
||||
require.NoError(t, miner.StorageRedeclareLocal(ctx, nil, false))
|
||||
|
||||
sps, err = miner.StorageList(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sps, 1)
|
||||
require.Len(t, sps[newId], 2)
|
||||
|
||||
// redeclare with drop, don't see the sector in the index
|
||||
require.NoError(t, miner.StorageRedeclareLocal(ctx, nil, true))
|
||||
|
||||
sps, err = miner.StorageList(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sps, 1)
|
||||
require.Len(t, sps[newId], 1)
|
||||
require.Equal(t, abi.SectorNumber(1), sps[newId][0].SectorID.Number)
|
||||
}
|
||||
|
||||
func TestPathDetachRedeclareWorker(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
_ = logging.SetLogLevel("storageminer", "INFO")
|
||||
|
||||
var (
|
||||
client kit.TestFullNode
|
||||
miner kit.TestMiner
|
||||
wiw, wdw, sealw kit.TestWorker
|
||||
)
|
||||
ens := kit.NewEnsemble(t, kit.LatestActorsAt(-1)).
|
||||
FullNode(&client, kit.ThroughRPC()).
|
||||
Miner(&miner, &client, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.PresealSectors(2), kit.NoStorage()).
|
||||
Worker(&miner, &wiw, kit.ThroughRPC(), kit.NoStorage(), kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWinningPoSt})).
|
||||
Worker(&miner, &wdw, kit.ThroughRPC(), kit.NoStorage(), kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWindowPoSt})).
|
||||
Worker(&miner, &sealw, kit.ThroughRPC(), kit.NoStorage(), kit.WithSealWorkerTasks).
|
||||
Start()
|
||||
|
||||
ens.InterconnectAll()
|
||||
|
||||
// check there's only one path on the miner, none on the worker
|
||||
sps, err := miner.StorageList(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sps, 1)
|
||||
|
||||
var id storiface.ID
|
||||
for s := range sps {
|
||||
id = s
|
||||
}
|
||||
|
||||
local, err := miner.StorageLocal(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, local, 1)
|
||||
require.Greater(t, len(local[id]), 1)
|
||||
|
||||
oldLocal := local[id]
|
||||
|
||||
local, err = sealw.StorageLocal(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, local, 0)
|
||||
|
||||
// check sectors
|
||||
checkSectors(ctx, t, client, miner, 2, 0)
|
||||
|
||||
// detach preseal path from the miner
|
||||
require.NoError(t, miner.StorageDetachLocal(ctx, oldLocal))
|
||||
|
||||
// check that there are no paths, post checks fail
|
||||
sps, err = miner.StorageList(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sps, 0)
|
||||
local, err = miner.StorageLocal(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, local, 0)
|
||||
|
||||
checkSectors(ctx, t, client, miner, 2, 2)
|
||||
|
||||
// attach a new path
|
||||
newId := sealw.AddStorage(ctx, t, func(cfg *paths.LocalStorageMeta) {
|
||||
cfg.CanStore = true
|
||||
})
|
||||
|
||||
sps, err = miner.StorageList(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sps, 1)
|
||||
local, err = sealw.StorageLocal(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, local, 1)
|
||||
require.Greater(t, len(local[newId]), 1)
|
||||
|
||||
newLocalTemp := local[newId]
|
||||
|
||||
// move sector data to the new path
|
||||
|
||||
// note: dest path already exist so we only want to .Join src
|
||||
require.NoError(t, exec.Command("cp", "--recursive", filepath.Join(oldLocal, "sealed"), newLocalTemp).Run())
|
||||
require.NoError(t, exec.Command("cp", "--recursive", filepath.Join(oldLocal, "cache"), newLocalTemp).Run())
|
||||
require.NoError(t, exec.Command("cp", "--recursive", filepath.Join(oldLocal, "unsealed"), newLocalTemp).Run())
|
||||
|
||||
// check that sector files aren't indexed, post checks fail
|
||||
sps, err = miner.StorageList(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sps, 1)
|
||||
require.Len(t, sps[newId], 0)
|
||||
|
||||
// redeclare sectors
|
||||
require.NoError(t, sealw.StorageRedeclareLocal(ctx, nil, false))
|
||||
|
||||
// check that sector files exist, post checks work
|
||||
sps, err = miner.StorageList(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sps, 1)
|
||||
require.Len(t, sps[newId], 2)
|
||||
|
||||
checkSectors(ctx, t, client, miner, 2, 0)
|
||||
|
||||
// drop the path from the worker
|
||||
require.NoError(t, sealw.StorageDetachLocal(ctx, newLocalTemp))
|
||||
local, err = sealw.StorageLocal(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, local, 0)
|
||||
|
||||
// add a new one again, and move the sectors there
|
||||
newId = sealw.AddStorage(ctx, t, func(cfg *paths.LocalStorageMeta) {
|
||||
cfg.CanStore = true
|
||||
})
|
||||
|
||||
sps, err = miner.StorageList(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sps, 1)
|
||||
local, err = sealw.StorageLocal(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, local, 1)
|
||||
require.Greater(t, len(local[newId]), 1)
|
||||
|
||||
newLocal := local[newId]
|
||||
|
||||
// move sector data to the new path
|
||||
|
||||
// note: dest path already exist so we only want to .Join src
|
||||
require.NoError(t, exec.Command("cp", "--recursive", filepath.Join(newLocalTemp, "sealed"), newLocal).Run())
|
||||
require.NoError(t, exec.Command("cp", "--recursive", filepath.Join(newLocalTemp, "cache"), newLocal).Run())
|
||||
require.NoError(t, exec.Command("cp", "--recursive", filepath.Join(newLocalTemp, "unsealed"), newLocal).Run())
|
||||
|
||||
// redeclare sectors
|
||||
require.NoError(t, sealw.StorageRedeclareLocal(ctx, nil, false))
|
||||
|
||||
// check that sector files exist, post checks work
|
||||
sps, err = miner.StorageList(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sps, 1)
|
||||
require.Len(t, sps[newId], 2)
|
||||
|
||||
checkSectors(ctx, t, client, miner, 2, 0)
|
||||
|
||||
// remove one sector, one check fails
|
||||
require.NoError(t, os.RemoveAll(filepath.Join(newLocal, "sealed", "s-t01000-0")))
|
||||
require.NoError(t, os.RemoveAll(filepath.Join(newLocal, "cache", "s-t01000-0")))
|
||||
checkSectors(ctx, t, client, miner, 2, 1)
|
||||
|
||||
// redeclare with no drop, still see sector in the index
|
||||
require.NoError(t, sealw.StorageRedeclareLocal(ctx, nil, false))
|
||||
|
||||
sps, err = miner.StorageList(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sps, 1)
|
||||
require.Len(t, sps[newId], 2)
|
||||
|
||||
// redeclare with drop, don't see the sector in the index
|
||||
require.NoError(t, sealw.StorageRedeclareLocal(ctx, nil, true))
|
||||
|
||||
sps, err = miner.StorageList(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sps, 1)
|
||||
require.Len(t, sps[newId], 1)
|
||||
require.Equal(t, abi.SectorNumber(1), sps[newId][0].SectorID.Number)
|
||||
}
|
||||
|
||||
func TestPathDetachShared(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
_ = logging.SetLogLevel("storageminer", "INFO")
|
||||
|
||||
var (
|
||||
client kit.TestFullNode
|
||||
miner kit.TestMiner
|
||||
wiw, wdw, sealw kit.TestWorker
|
||||
)
|
||||
ens := kit.NewEnsemble(t, kit.LatestActorsAt(-1)).
|
||||
FullNode(&client, kit.ThroughRPC()).
|
||||
Miner(&miner, &client, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.PresealSectors(2), kit.NoStorage()).
|
||||
Worker(&miner, &wiw, kit.ThroughRPC(), kit.NoStorage(), kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWinningPoSt})).
|
||||
Worker(&miner, &wdw, kit.ThroughRPC(), kit.NoStorage(), kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWindowPoSt})).
|
||||
Worker(&miner, &sealw, kit.ThroughRPC(), kit.NoStorage(), kit.WithSealWorkerTasks).
|
||||
Start()
|
||||
|
||||
ens.InterconnectAll()
|
||||
|
||||
// check there's only one path on the miner, none on the worker
|
||||
sps, err := miner.StorageList(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sps, 1)
|
||||
|
||||
var id storiface.ID
|
||||
for s := range sps {
|
||||
id = s
|
||||
}
|
||||
|
||||
// check that there's only one URL for the path (provided by the miner node)
|
||||
si, err := miner.StorageInfo(ctx, id)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, si.URLs, 1)
|
||||
|
||||
local, err := miner.StorageLocal(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, local, 1)
|
||||
require.Greater(t, len(local[id]), 1)
|
||||
|
||||
minerLocal := local[id]
|
||||
|
||||
local, err = sealw.StorageLocal(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, local, 0)
|
||||
|
||||
// share the genesis sector path with the worker
|
||||
require.NoError(t, sealw.StorageAddLocal(ctx, minerLocal))
|
||||
|
||||
// still just one path, but accessible from two places
|
||||
sps, err = miner.StorageList(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sps, 1)
|
||||
|
||||
// should see 2 urls now
|
||||
si, err = miner.StorageInfo(ctx, id)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, si.URLs, 2)
|
||||
|
||||
// drop the path from the worker
|
||||
require.NoError(t, sealw.StorageDetachLocal(ctx, minerLocal))
|
||||
|
||||
// the path is still registered
|
||||
sps, err = miner.StorageList(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sps, 1)
|
||||
|
||||
// but with just one URL (the miner)
|
||||
si, err = miner.StorageInfo(ctx, id)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, si.URLs, 1)
|
||||
|
||||
// now also drop from the minel and check that the path is gone
|
||||
require.NoError(t, miner.StorageDetachLocal(ctx, minerLocal))
|
||||
|
||||
sps, err = miner.StorageList(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sps, 0)
|
||||
}
|
||||
|
||||
func checkSectors(ctx context.Context, t *testing.T, api kit.TestFullNode, miner kit.TestMiner, expectChecked, expectBad int) {
|
||||
addr, err := miner.ActorAddress(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
mid, err := address.IDFromAddress(addr)
|
||||
require.NoError(t, err)
|
||||
|
||||
info, err := api.StateMinerInfo(ctx, addr, types.EmptyTSK)
|
||||
require.NoError(t, err)
|
||||
|
||||
partitions, err := api.StateMinerPartitions(ctx, addr, 0, types.EmptyTSK)
|
||||
require.NoError(t, err)
|
||||
par := partitions[0]
|
||||
|
||||
sectorInfos, err := api.StateMinerSectors(ctx, addr, &par.LiveSectors, types.EmptyTSK)
|
||||
require.NoError(t, err)
|
||||
|
||||
var tocheck []storiface.SectorRef
|
||||
for _, info := range sectorInfos {
|
||||
si := abi.SectorID{
|
||||
Miner: abi.ActorID(mid),
|
||||
Number: info.SectorNumber,
|
||||
}
|
||||
|
||||
tocheck = append(tocheck, storiface.SectorRef{
|
||||
ProofType: info.SealProof,
|
||||
ID: si,
|
||||
})
|
||||
}
|
||||
|
||||
require.Len(t, tocheck, expectChecked)
|
||||
|
||||
bad, err := miner.CheckProvable(ctx, info.WindowPoStProofType, tocheck, true)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, bad, expectBad)
|
||||
}
|
@ -95,7 +95,7 @@ func TestSDRUpgrade(t *testing.T) {
|
||||
// before.
|
||||
miner.PledgeSectors(ctx, 9, 0, pledge)
|
||||
|
||||
s, err := miner.SectorsList(ctx)
|
||||
s, err := miner.SectorsListNonGenesis(ctx)
|
||||
require.NoError(t, err)
|
||||
sort.Slice(s, func(i, j int) bool {
|
||||
return s[i] < s[j]
|
||||
|
@ -34,10 +34,10 @@ func TestMakeAvailable(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1)
|
||||
CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner)
|
||||
|
||||
miner.PledgeSectors(ctx, 1, 0, nil)
|
||||
sl, err := miner.SectorsList(ctx)
|
||||
sl, err := miner.SectorsListNonGenesis(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sl, 1, "expected 1 sector")
|
||||
require.Equal(t, CCUpgrade, sl[0], "unexpected sector number")
|
||||
@ -48,7 +48,7 @@ func TestMakeAvailable(t *testing.T) {
|
||||
}
|
||||
client.WaitForSectorActive(ctx, t, CCUpgrade, maddr)
|
||||
|
||||
sl, err = miner.SectorsList(ctx)
|
||||
sl, err = miner.SectorsListNonGenesis(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sl, 1, "expected 1 sector")
|
||||
|
||||
@ -64,7 +64,7 @@ func TestMakeAvailable(t *testing.T) {
|
||||
outPath := dh.PerformRetrieval(context.Background(), deal, res.Root, false)
|
||||
kit.AssertFilesEqual(t, inPath, outPath)
|
||||
|
||||
sl, err = miner.SectorsList(ctx)
|
||||
sl, err = miner.SectorsListNonGenesis(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sl, 1, "expected 1 sector")
|
||||
|
||||
|
@ -96,7 +96,7 @@ func TestMinerBalanceCollateral(t *testing.T) {
|
||||
}
|
||||
|
||||
// check that sector messages had zero value set
|
||||
sl, err := miner.SectorsList(ctx)
|
||||
sl, err := miner.SectorsListNonGenesis(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, number := range sl {
|
||||
|
@ -34,12 +34,12 @@ func TestPreferNoUpgrade(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1)
|
||||
Sealed := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 2)
|
||||
CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner)
|
||||
Sealed := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1)
|
||||
|
||||
{
|
||||
miner.PledgeSectors(ctx, 1, 0, nil)
|
||||
sl, err := miner.SectorsList(ctx)
|
||||
sl, err := miner.SectorsListNonGenesis(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sl, 1, "expected 1 sector")
|
||||
require.Equal(t, CCUpgrade, sl[0], "unexpected sector number")
|
||||
@ -53,7 +53,7 @@ func TestPreferNoUpgrade(t *testing.T) {
|
||||
err = miner.SectorMarkForUpgrade(ctx, sl[0], true)
|
||||
require.NoError(t, err)
|
||||
|
||||
sl, err = miner.SectorsList(ctx)
|
||||
sl, err = miner.SectorsListNonGenesis(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sl, 1, "expected 1 sector")
|
||||
}
|
||||
@ -68,7 +68,7 @@ func TestPreferNoUpgrade(t *testing.T) {
|
||||
kit.AssertFilesEqual(t, inPath, outPath)
|
||||
}
|
||||
|
||||
sl, err := miner.SectorsList(ctx)
|
||||
sl, err := miner.SectorsListNonGenesis(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sl, 2, "expected 2 sectors")
|
||||
|
||||
|
@ -31,11 +31,11 @@ func TestAbortUpgradeAvailable(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1)
|
||||
CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner)
|
||||
fmt.Printf("CCUpgrade: %d\n", CCUpgrade)
|
||||
|
||||
miner.PledgeSectors(ctx, 1, 0, nil)
|
||||
sl, err := miner.SectorsList(ctx)
|
||||
sl, err := miner.SectorsListNonGenesis(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sl, 1, "expected 1 sector")
|
||||
require.Equal(t, CCUpgrade, sl[0], "unexpected sector number")
|
||||
@ -49,7 +49,7 @@ func TestAbortUpgradeAvailable(t *testing.T) {
|
||||
err = miner.SectorMarkForUpgrade(ctx, sl[0], true)
|
||||
require.NoError(t, err)
|
||||
|
||||
sl, err = miner.SectorsList(ctx)
|
||||
sl, err = miner.SectorsListNonGenesis(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, sl, 1, "expected 1 sector")
|
||||
|
||||
|
@ -94,7 +94,7 @@ func TestWindowPostDispute(t *testing.T) {
|
||||
require.Equal(t, p.MinerPower.RawBytePower, types.NewInt(uint64(ssz)))
|
||||
|
||||
//stm: @MINER_SECTOR_LIST_001
|
||||
evilSectors, err := evilMiner.SectorsList(ctx)
|
||||
evilSectors, err := evilMiner.SectorsListNonGenesis(ctx)
|
||||
require.NoError(t, err)
|
||||
evilSectorNo := evilSectors[0] // only one.
|
||||
//stm: @CHAIN_STATE_SECTOR_PARTITION_001
|
||||
|
@ -1213,6 +1213,22 @@ func (sm *StorageMinerAPI) StorageAddLocal(ctx context.Context, path string) err
|
||||
return sm.StorageMgr.AddLocalStorage(ctx, path)
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) StorageDetachLocal(ctx context.Context, path string) error {
|
||||
if sm.StorageMgr == nil {
|
||||
return xerrors.Errorf("no storage manager")
|
||||
}
|
||||
|
||||
return sm.StorageMgr.DetachLocalStorage(ctx, path)
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) StorageRedeclareLocal(ctx context.Context, id *storiface.ID, dropMissing bool) error {
|
||||
if sm.StorageMgr == nil {
|
||||
return xerrors.Errorf("no storage manager")
|
||||
}
|
||||
|
||||
return sm.StorageMgr.RedeclareLocalStorage(ctx, id, dropMissing)
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) PiecesListPieces(ctx context.Context) ([]cid.Cid, error) {
|
||||
return sm.PieceStore.ListPieceInfoKeys()
|
||||
}
|
||||
|
@ -30,6 +30,7 @@ var SkippedHeartbeatThresh = HeartbeatInterval * 5
|
||||
|
||||
type SectorIndex interface { // part of storage-miner api
|
||||
StorageAttach(context.Context, storiface.StorageInfo, fsutil.FsStat) error
|
||||
StorageDetach(ctx context.Context, id storiface.ID, url string) error
|
||||
StorageInfo(context.Context, storiface.ID) (storiface.StorageInfo, error)
|
||||
StorageReportHealth(context.Context, storiface.ID, storiface.HealthReport) error
|
||||
|
||||
@ -217,6 +218,94 @@ func (i *Index) StorageAttach(ctx context.Context, si storiface.StorageInfo, st
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *Index) StorageDetach(ctx context.Context, id storiface.ID, url string) error {
|
||||
i.lk.Lock()
|
||||
defer i.lk.Unlock()
|
||||
|
||||
// ent: *storageEntry
|
||||
ent, ok := i.stores[id]
|
||||
if !ok {
|
||||
return xerrors.Errorf("storage '%s' isn't registered", id)
|
||||
}
|
||||
|
||||
// check if this is the only path provider/url for this pathID
|
||||
drop := true
|
||||
if len(ent.info.URLs) > 0 {
|
||||
drop = len(ent.info.URLs) == 1 // only one url
|
||||
|
||||
if drop && ent.info.URLs[0] != url {
|
||||
return xerrors.Errorf("not dropping path, requested and index urls don't match ('%s' != '%s')", url, ent.info.URLs[0])
|
||||
}
|
||||
}
|
||||
|
||||
if drop {
|
||||
if a, hasAlert := i.pathAlerts[id]; hasAlert && i.alerting != nil {
|
||||
if i.alerting.IsRaised(a) {
|
||||
i.alerting.Resolve(a, map[string]string{
|
||||
"message": "path detached",
|
||||
})
|
||||
}
|
||||
delete(i.pathAlerts, id)
|
||||
}
|
||||
|
||||
// stats
|
||||
var droppedEntries, primaryEntries, droppedDecls int
|
||||
|
||||
// drop declarations
|
||||
for decl, dms := range i.sectors {
|
||||
var match bool
|
||||
for _, dm := range dms {
|
||||
if dm.storage == id {
|
||||
match = true
|
||||
droppedEntries++
|
||||
if dm.primary {
|
||||
primaryEntries++
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// if no entries match, nothing to do here
|
||||
if !match {
|
||||
continue
|
||||
}
|
||||
|
||||
// if there's a match, and only one entry, drop the whole declaration
|
||||
if len(dms) <= 1 {
|
||||
delete(i.sectors, decl)
|
||||
droppedDecls++
|
||||
continue
|
||||
}
|
||||
|
||||
// rewrite entries with the path we're dropping filtered out
|
||||
filtered := make([]*declMeta, 0, len(dms)-1)
|
||||
for _, dm := range dms {
|
||||
if dm.storage != id {
|
||||
filtered = append(filtered, dm)
|
||||
}
|
||||
}
|
||||
|
||||
i.sectors[decl] = filtered
|
||||
}
|
||||
|
||||
delete(i.stores, id)
|
||||
|
||||
log.Warnw("Dropping sector storage", "path", id, "url", url, "droppedEntries", droppedEntries, "droppedPrimaryEntries", primaryEntries, "droppedDecls", droppedDecls)
|
||||
} else {
|
||||
newUrls := make([]string, 0, len(ent.info.URLs))
|
||||
for _, u := range ent.info.URLs {
|
||||
if u != url {
|
||||
newUrls = append(newUrls, u)
|
||||
}
|
||||
}
|
||||
ent.info.URLs = newUrls
|
||||
|
||||
log.Warnw("Dropping sector path endpoint", "path", id, "url", url)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *Index) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error {
|
||||
i.lk.Lock()
|
||||
defer i.lk.Unlock()
|
||||
|
@ -218,6 +218,10 @@ func (st *Local) OpenPath(ctx context.Context, p string) error {
|
||||
return xerrors.Errorf("unmarshalling storage metadata for %s: %w", p, err)
|
||||
}
|
||||
|
||||
if p, exists := st.paths[meta.ID]; exists {
|
||||
return xerrors.Errorf("path with ID %s already opened: '%s'", meta.ID, p.local)
|
||||
}
|
||||
|
||||
// TODO: Check existing / dedupe
|
||||
|
||||
out := &path{
|
||||
@ -249,7 +253,7 @@ func (st *Local) OpenPath(ctx context.Context, p string) error {
|
||||
return xerrors.Errorf("declaring storage in index: %w", err)
|
||||
}
|
||||
|
||||
if err := st.declareSectors(ctx, p, meta.ID, meta.CanStore); err != nil {
|
||||
if err := st.declareSectors(ctx, p, meta.ID, meta.CanStore, false); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -258,6 +262,25 @@ func (st *Local) OpenPath(ctx context.Context, p string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (st *Local) ClosePath(ctx context.Context, id storiface.ID) error {
|
||||
st.localLk.Lock()
|
||||
defer st.localLk.Unlock()
|
||||
|
||||
if _, exists := st.paths[id]; !exists {
|
||||
return xerrors.Errorf("path with ID %s isn't opened", id)
|
||||
}
|
||||
|
||||
for _, url := range st.urls {
|
||||
if err := st.index.StorageDetach(ctx, id, url); err != nil {
|
||||
return xerrors.Errorf("dropping path (id='%s' url='%s'): %w", id, url, err)
|
||||
}
|
||||
}
|
||||
|
||||
delete(st.paths, id)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (st *Local) open(ctx context.Context) error {
|
||||
cfg, err := st.localStorage.GetStorage()
|
||||
if err != nil {
|
||||
@ -276,7 +299,7 @@ func (st *Local) open(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (st *Local) Redeclare(ctx context.Context) error {
|
||||
func (st *Local) Redeclare(ctx context.Context, filterId *storiface.ID, dropMissingDecls bool) error {
|
||||
st.localLk.Lock()
|
||||
defer st.localLk.Unlock()
|
||||
|
||||
@ -300,6 +323,9 @@ func (st *Local) Redeclare(ctx context.Context) error {
|
||||
log.Errorf("storage path ID changed: %s; %s -> %s", p.local, id, meta.ID)
|
||||
continue
|
||||
}
|
||||
if filterId != nil && *filterId != id {
|
||||
continue
|
||||
}
|
||||
|
||||
err = st.index.StorageAttach(ctx, storiface.StorageInfo{
|
||||
ID: id,
|
||||
@ -317,7 +343,7 @@ func (st *Local) Redeclare(ctx context.Context) error {
|
||||
return xerrors.Errorf("redeclaring storage in index: %w", err)
|
||||
}
|
||||
|
||||
if err := st.declareSectors(ctx, p.local, meta.ID, meta.CanStore); err != nil {
|
||||
if err := st.declareSectors(ctx, p.local, meta.ID, meta.CanStore, dropMissingDecls); err != nil {
|
||||
return xerrors.Errorf("redeclaring sectors: %w", err)
|
||||
}
|
||||
}
|
||||
@ -325,7 +351,24 @@ func (st *Local) Redeclare(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (st *Local) declareSectors(ctx context.Context, p string, id storiface.ID, primary bool) error {
|
||||
func (st *Local) declareSectors(ctx context.Context, p string, id storiface.ID, primary, dropMissing bool) error {
|
||||
indexed := map[storiface.Decl]struct{}{}
|
||||
if dropMissing {
|
||||
decls, err := st.index.StorageList(ctx)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting declaration list: %w", err)
|
||||
}
|
||||
|
||||
for _, decl := range decls[id] {
|
||||
for _, fileType := range decl.SectorFileType.AllSet() {
|
||||
indexed[storiface.Decl{
|
||||
SectorID: decl.SectorID,
|
||||
SectorFileType: fileType,
|
||||
}] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, t := range storiface.PathTypes {
|
||||
ents, err := ioutil.ReadDir(filepath.Join(p, t.String()))
|
||||
if err != nil {
|
||||
@ -349,12 +392,29 @@ func (st *Local) declareSectors(ctx context.Context, p string, id storiface.ID,
|
||||
return xerrors.Errorf("parse sector id %s: %w", ent.Name(), err)
|
||||
}
|
||||
|
||||
delete(indexed, storiface.Decl{
|
||||
SectorID: sid,
|
||||
SectorFileType: t,
|
||||
})
|
||||
|
||||
if err := st.index.StorageDeclareSector(ctx, id, sid, t, primary); err != nil {
|
||||
return xerrors.Errorf("declare sector %d(t:%d) -> %s: %w", sid, t, id, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(indexed) > 0 {
|
||||
log.Warnw("index contains sectors which are missing in the storage path", "count", len(indexed), "dropMissing", dropMissing)
|
||||
}
|
||||
|
||||
if dropMissing {
|
||||
for decl := range indexed {
|
||||
if err := st.index.StorageDropSector(ctx, id, decl.SectorID, decl.SectorFileType); err != nil {
|
||||
return xerrors.Errorf("dropping sector %v from index: %w", decl, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -82,6 +82,20 @@ func (mr *MockSectorIndexMockRecorder) StorageDeclareSector(arg0, arg1, arg2, ar
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageDeclareSector", reflect.TypeOf((*MockSectorIndex)(nil).StorageDeclareSector), arg0, arg1, arg2, arg3, arg4)
|
||||
}
|
||||
|
||||
// StorageDetach mocks base method.
|
||||
func (m *MockSectorIndex) StorageDetach(arg0 context.Context, arg1 storiface.ID, arg2 string) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "StorageDetach", arg0, arg1, arg2)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// StorageDetach indicates an expected call of StorageDetach.
|
||||
func (mr *MockSectorIndexMockRecorder) StorageDetach(arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageDetach", reflect.TypeOf((*MockSectorIndex)(nil).StorageDetach), arg0, arg1, arg2)
|
||||
}
|
||||
|
||||
// StorageDropSector mocks base method.
|
||||
func (m *MockSectorIndex) StorageDropSector(arg0 context.Context, arg1 storiface.ID, arg2 abi.SectorID, arg3 storiface.SectorFileType) error {
|
||||
m.ctrl.T.Helper()
|
||||
|
@ -238,6 +238,58 @@ func (m *Manager) AddLocalStorage(ctx context.Context, path string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) DetachLocalStorage(ctx context.Context, path string) error {
|
||||
path, err := homedir.Expand(path)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("expanding local path: %w", err)
|
||||
}
|
||||
|
||||
// check that we have the path opened
|
||||
lps, err := m.localStore.Local(ctx)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting local path list: %w", err)
|
||||
}
|
||||
|
||||
var localPath *storiface.StoragePath
|
||||
for _, lp := range lps {
|
||||
if lp.LocalPath == path {
|
||||
lp := lp // copy to make the linter happy
|
||||
localPath = &lp
|
||||
break
|
||||
}
|
||||
}
|
||||
if localPath == nil {
|
||||
return xerrors.Errorf("no local paths match '%s'", path)
|
||||
}
|
||||
|
||||
// drop from the persisted storage.json
|
||||
var found bool
|
||||
if err := m.ls.SetStorage(func(sc *paths.StorageConfig) {
|
||||
out := make([]paths.LocalPath, 0, len(sc.StoragePaths))
|
||||
for _, storagePath := range sc.StoragePaths {
|
||||
if storagePath.Path != path {
|
||||
out = append(out, storagePath)
|
||||
continue
|
||||
}
|
||||
found = true
|
||||
}
|
||||
sc.StoragePaths = out
|
||||
}); err != nil {
|
||||
return xerrors.Errorf("set storage config: %w", err)
|
||||
}
|
||||
if !found {
|
||||
// maybe this is fine?
|
||||
return xerrors.Errorf("path not found in storage.json")
|
||||
}
|
||||
|
||||
// unregister locally, drop from sector index
|
||||
return m.localStore.ClosePath(ctx, localPath.ID)
|
||||
}
|
||||
|
||||
func (m *Manager) RedeclareLocalStorage(ctx context.Context, id *storiface.ID, dropMissing bool) error {
|
||||
return m.localStore.Redeclare(ctx, id, dropMissing)
|
||||
}
|
||||
|
||||
func (m *Manager) AddWorker(ctx context.Context, w Worker) error {
|
||||
sessID, err := w.Session(ctx)
|
||||
if err != nil {
|
||||
|
@ -99,6 +99,18 @@ func (t SectorFileType) Strings() []string {
|
||||
return out
|
||||
}
|
||||
|
||||
func (t SectorFileType) AllSet() []SectorFileType {
|
||||
var out []SectorFileType
|
||||
for _, fileType := range PathTypes {
|
||||
if fileType&t == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
out = append(out, fileType)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func (t SectorFileType) Has(singleType SectorFileType) bool {
|
||||
return t&singleType == singleType
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user