wip storage redeclare/detach

This commit is contained in:
Łukasz Magiera 2022-07-13 20:09:21 +02:00
parent a843c52e38
commit f947e65f15
13 changed files with 243 additions and 5 deletions

View File

@ -175,6 +175,10 @@ type StorageMiner interface {
StorageAuthVerify(ctx context.Context, token string) ([]auth.Permission, error) //perm:read StorageAuthVerify(ctx context.Context, token string) ([]auth.Permission, error) //perm:read
StorageAddLocal(ctx context.Context, path string) error //perm:admin
//StorageDetachLocal(ctx context.Context, path string) error //perm:admin
//StorageRedeclareLocal(ctx context.Context, id storiface.ID) error //perm:admin
MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error //perm:write MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error //perm:write
MarketListDeals(ctx context.Context) ([]*MarketDeal, error) //perm:read MarketListDeals(ctx context.Context) ([]*MarketDeal, error) //perm:read
MarketListRetrievalDeals(ctx context.Context) ([]retrievalmarket.ProviderDealState, error) //perm:read MarketListRetrievalDeals(ctx context.Context) ([]retrievalmarket.ProviderDealState, error) //perm:read
@ -272,8 +276,6 @@ type StorageMiner interface {
DealsConsiderUnverifiedStorageDeals(context.Context) (bool, error) //perm:admin DealsConsiderUnverifiedStorageDeals(context.Context) (bool, error) //perm:admin
DealsSetConsiderUnverifiedStorageDeals(context.Context, bool) error //perm:admin DealsSetConsiderUnverifiedStorageDeals(context.Context, bool) error //perm:admin
StorageAddLocal(ctx context.Context, path string) error //perm:admin
PiecesListPieces(ctx context.Context) ([]cid.Cid, error) //perm:read PiecesListPieces(ctx context.Context) ([]cid.Cid, error) //perm:read
PiecesListCidInfos(ctx context.Context) ([]cid.Cid, error) //perm:read PiecesListCidInfos(ctx context.Context) ([]cid.Cid, error) //perm:read
PiecesGetPieceInfo(ctx context.Context, pieceCid cid.Cid) (*piecestore.PieceInfo, error) //perm:read PiecesGetPieceInfo(ctx context.Context, pieceCid cid.Cid) (*piecestore.PieceInfo, error) //perm:read

View File

@ -59,7 +59,9 @@ type Worker interface {
// Storage / Other // Storage / Other
Remove(ctx context.Context, sector abi.SectorID) error //perm:admin Remove(ctx context.Context, sector abi.SectorID) error //perm:admin
StorageAddLocal(ctx context.Context, path string) error //perm:admin StorageAddLocal(ctx context.Context, path string) error //perm:admin
StorageDetachLocal(ctx context.Context, path string) error //perm:admin
StorageRedeclareLocal(ctx context.Context, id storiface.ID) error //perm:admin
// SetEnabled marks the worker as enabled/disabled. Not that this setting // SetEnabled marks the worker as enabled/disabled. Not that this setting
// may take a few seconds to propagate to task scheduler // may take a few seconds to propagate to task scheduler

View File

@ -850,7 +850,7 @@ type StorageMinerStruct struct {
SectorsUpdate func(p0 context.Context, p1 abi.SectorNumber, p2 SectorState) error `perm:"admin"` SectorsUpdate func(p0 context.Context, p1 abi.SectorNumber, p2 SectorState) error `perm:"admin"`
StorageAddLocal func(p0 context.Context, p1 string) error `perm:"admin"` StorageAddLocal func(p0 context.Context, p1 string) error ``
StorageAttach func(p0 context.Context, p1 storiface.StorageInfo, p2 fsutil.FsStat) error `perm:"admin"` StorageAttach func(p0 context.Context, p1 storiface.StorageInfo, p2 fsutil.FsStat) error `perm:"admin"`
@ -967,6 +967,10 @@ type WorkerStruct struct {
StorageAddLocal func(p0 context.Context, p1 string) error `perm:"admin"` StorageAddLocal func(p0 context.Context, p1 string) error `perm:"admin"`
StorageDetachLocal func(p0 context.Context, p1 string) error `perm:"admin"`
StorageRedeclareLocal func(p0 context.Context, p1 storiface.ID) error `perm:"admin"`
TaskDisable func(p0 context.Context, p1 sealtasks.TaskType) error `perm:"admin"` TaskDisable func(p0 context.Context, p1 sealtasks.TaskType) error `perm:"admin"`
TaskEnable func(p0 context.Context, p1 sealtasks.TaskType) error `perm:"admin"` TaskEnable func(p0 context.Context, p1 sealtasks.TaskType) error `perm:"admin"`
@ -5571,6 +5575,28 @@ func (s *WorkerStub) StorageAddLocal(p0 context.Context, p1 string) error {
return ErrNotSupported return ErrNotSupported
} }
func (s *WorkerStruct) StorageDetachLocal(p0 context.Context, p1 string) error {
if s.Internal.StorageDetachLocal == nil {
return ErrNotSupported
}
return s.Internal.StorageDetachLocal(p0, p1)
}
func (s *WorkerStub) StorageDetachLocal(p0 context.Context, p1 string) error {
return ErrNotSupported
}
func (s *WorkerStruct) StorageRedeclareLocal(p0 context.Context, p1 storiface.ID) error {
if s.Internal.StorageRedeclareLocal == nil {
return ErrNotSupported
}
return s.Internal.StorageRedeclareLocal(p0, p1)
}
func (s *WorkerStub) StorageRedeclareLocal(p0 context.Context, p1 storiface.ID) error {
return ErrNotSupported
}
func (s *WorkerStruct) TaskDisable(p0 context.Context, p1 sealtasks.TaskType) error { func (s *WorkerStruct) TaskDisable(p0 context.Context, p1 sealtasks.TaskType) error {
if s.Internal.TaskDisable == nil { if s.Internal.TaskDisable == nil {
return ErrNotSupported return ErrNotSupported

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -84,6 +84,57 @@ func (w *Worker) StorageAddLocal(ctx context.Context, path string) error {
return nil return nil
} }
func (w *Worker) StorageDetachLocal(ctx context.Context, path string) error {
path, err := homedir.Expand(path)
if err != nil {
return xerrors.Errorf("expanding local path: %w", err)
}
// check that we have the path opened
lps, err := w.LocalStore.Local(ctx)
if err != nil {
return xerrors.Errorf("getting local path list: %w", err)
}
var localPath *storiface.StoragePath
for _, lp := range lps {
if lp.LocalPath == path {
localPath = &lp
break
}
}
if localPath == nil {
return xerrors.Errorf("no local paths match '%s'", path)
}
// drop from the persisted storage.json
var found bool
if err := w.Storage.SetStorage(func(sc *paths.StorageConfig) {
out := make([]paths.LocalPath, 0, len(sc.StoragePaths))
for _, storagePath := range sc.StoragePaths {
if storagePath.Path != path {
out = append(out, storagePath)
return
}
found = true
}
}); err != nil {
return xerrors.Errorf("set storage config: %w", err)
}
if !found {
// maybe this is fine?
return xerrors.Errorf("path not found in storage.json")
}
// unregister locally, drop from sector index
return w.LocalStore.ClosePath(ctx, localPath.ID)
}
func (w *Worker) StorageRedeclareLocal(ctx context.Context, id storiface.ID) error {
//TODO implement me
panic("implement me")
}
func (w *Worker) SetEnabled(ctx context.Context, enabled bool) error { func (w *Worker) SetEnabled(ctx context.Context, enabled bool) error {
disabled := int64(1) disabled := int64(1)
if enabled { if enabled {
@ -119,3 +170,4 @@ func (w *Worker) Discover(ctx context.Context) (apitypes.OpenRPCDocument, error)
} }
var _ storiface.WorkerCalls = &Worker{} var _ storiface.WorkerCalls = &Worker{}
var _ api.Worker = &Worker{}

View File

@ -3259,7 +3259,7 @@ Response: `{}`
### StorageAddLocal ### StorageAddLocal
Perms: admin Perms:
Inputs: Inputs:
```json ```json

View File

@ -38,6 +38,8 @@
* [SetEnabled](#SetEnabled) * [SetEnabled](#SetEnabled)
* [Storage](#Storage) * [Storage](#Storage)
* [StorageAddLocal](#StorageAddLocal) * [StorageAddLocal](#StorageAddLocal)
* [StorageDetachLocal](#StorageDetachLocal)
* [StorageRedeclareLocal](#StorageRedeclareLocal)
* [Task](#Task) * [Task](#Task)
* [TaskDisable](#TaskDisable) * [TaskDisable](#TaskDisable)
* [TaskEnable](#TaskEnable) * [TaskEnable](#TaskEnable)
@ -2107,6 +2109,34 @@ Inputs:
Response: `{}` Response: `{}`
### StorageDetachLocal
Perms: admin
Inputs:
```json
[
"string value"
]
```
Response: `{}`
### StorageRedeclareLocal
Perms: admin
Inputs:
```json
[
"76f1988b-ef30-4d7e-b3ec-9a627f4ba5a8"
]
```
Response: `{}`
## Task ## Task

View File

@ -30,6 +30,7 @@ var SkippedHeartbeatThresh = HeartbeatInterval * 5
type SectorIndex interface { // part of storage-miner api type SectorIndex interface { // part of storage-miner api
StorageAttach(context.Context, storiface.StorageInfo, fsutil.FsStat) error StorageAttach(context.Context, storiface.StorageInfo, fsutil.FsStat) error
StorageDetach(ctx context.Context, id storiface.ID, url string) error
StorageInfo(context.Context, storiface.ID) (storiface.StorageInfo, error) StorageInfo(context.Context, storiface.ID) (storiface.StorageInfo, error)
StorageReportHealth(context.Context, storiface.ID, storiface.HealthReport) error StorageReportHealth(context.Context, storiface.ID, storiface.HealthReport) error
@ -217,6 +218,94 @@ func (i *Index) StorageAttach(ctx context.Context, si storiface.StorageInfo, st
return nil return nil
} }
func (i *Index) StorageDetach(ctx context.Context, id storiface.ID, url string) error {
i.lk.Lock()
defer i.lk.Unlock()
// ent: *storageEntry
ent, ok := i.stores[id]
if !ok {
return xerrors.Errorf("storage '%s' isn't registerd", id)
}
// check if this is the only path provider/url for this pathID
drop := true
if len(ent.info.URLs) > 0 {
drop = len(ent.info.URLs) == 1 // only one url
if ent.info.URLs[0] != url {
return xerrors.Errorf("not dropping path, requested and index urls don't match ('%s' != '%s')", url, ent.info.URLs[0])
}
}
if drop {
if a, hasAlert := i.pathAlerts[id]; hasAlert && i.alerting != nil {
if i.alerting.IsRaised(a) {
i.alerting.Resolve(a, map[string]string{
"message": "path detached",
})
}
delete(i.pathAlerts, id)
}
// stats
var droppedEntries, primaryEntries, droppedDecls int
// drop declarations
for decl, dms := range i.sectors {
var match bool
for _, dm := range dms {
if dm.storage == id {
match = true
droppedEntries++
if dm.primary {
primaryEntries++
}
break
}
}
// if no entries match, nothing to do here
if !match {
continue
}
// if there's a match, and only one entry, drop the whole declaration
if len(dms) <= 1 {
delete(i.sectors, decl)
droppedDecls++
continue
}
// rewrite entries with the path we're dropping filtered out
filtered := make([]*declMeta, 0, len(dms)-1)
for _, dm := range dms {
if dm.storage != id {
filtered = append(filtered, dm)
}
}
i.sectors[decl] = filtered
}
delete(i.stores, id)
log.Warnw("Dropping sector storage", "path", id, "url", url, "droppedEntries", droppedEntries, "droppedPrimaryEntries", primaryEntries, "droppedDecls", droppedDecls)
} else {
newUrls := make([]string, 0, len(ent.info.URLs))
for _, u := range ent.info.URLs {
if u != url {
newUrls = append(newUrls, u)
}
}
ent.info.URLs = newUrls
log.Warnw("Dropping sector path endpoint", "path", id, "url", url)
}
return nil
}
func (i *Index) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error { func (i *Index) StorageReportHealth(ctx context.Context, id storiface.ID, report storiface.HealthReport) error {
i.lk.Lock() i.lk.Lock()
defer i.lk.Unlock() defer i.lk.Unlock()

View File

@ -218,6 +218,10 @@ func (st *Local) OpenPath(ctx context.Context, p string) error {
return xerrors.Errorf("unmarshalling storage metadata for %s: %w", p, err) return xerrors.Errorf("unmarshalling storage metadata for %s: %w", p, err)
} }
if p, exists := st.paths[meta.ID]; exists {
return xerrors.Errorf("path with ID %s already opened: '%s'", meta.ID, p.local)
}
// TODO: Check existing / dedupe // TODO: Check existing / dedupe
out := &path{ out := &path{
@ -258,6 +262,25 @@ func (st *Local) OpenPath(ctx context.Context, p string) error {
return nil return nil
} }
func (st *Local) ClosePath(ctx context.Context, id storiface.ID) error {
st.localLk.Lock()
defer st.localLk.Unlock()
if _, exists := st.paths[id]; !exists {
return xerrors.Errorf("path with ID %s isn't opened")
}
for _, url := range st.urls {
if err := st.index.StorageDetach(ctx, id, url); err != nil {
return xerrors.Errorf("dropping path (id='%s' url=''): %w", id, url, err)
}
}
delete(st.paths, id)
return nil
}
func (st *Local) open(ctx context.Context) error { func (st *Local) open(ctx context.Context) error {
cfg, err := st.localStorage.GetStorage() cfg, err := st.localStorage.GetStorage()
if err != nil { if err != nil {

View File

@ -82,6 +82,20 @@ func (mr *MockSectorIndexMockRecorder) StorageDeclareSector(arg0, arg1, arg2, ar
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageDeclareSector", reflect.TypeOf((*MockSectorIndex)(nil).StorageDeclareSector), arg0, arg1, arg2, arg3, arg4) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageDeclareSector", reflect.TypeOf((*MockSectorIndex)(nil).StorageDeclareSector), arg0, arg1, arg2, arg3, arg4)
} }
// StorageDetach mocks base method.
func (m *MockSectorIndex) StorageDetach(arg0 context.Context, arg1 storiface.ID, arg2 string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StorageDetach", arg0, arg1, arg2)
ret0, _ := ret[0].(error)
return ret0
}
// StorageDetach indicates an expected call of StorageDetach.
func (mr *MockSectorIndexMockRecorder) StorageDetach(arg0, arg1, arg2 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StorageDetach", reflect.TypeOf((*MockSectorIndex)(nil).StorageDetach), arg0, arg1, arg2)
}
// StorageDropSector mocks base method. // StorageDropSector mocks base method.
func (m *MockSectorIndex) StorageDropSector(arg0 context.Context, arg1 storiface.ID, arg2 abi.SectorID, arg3 storiface.SectorFileType) error { func (m *MockSectorIndex) StorageDropSector(arg0 context.Context, arg1 storiface.ID, arg2 abi.SectorID, arg3 storiface.SectorFileType) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()