storage: StorageRedeclareLocal on lotus-worker
This commit is contained in:
parent
e252e666bd
commit
943d2a72c6
@ -178,7 +178,7 @@ type StorageMiner interface {
|
||||
|
||||
StorageAddLocal(ctx context.Context, path string) error //perm:admin
|
||||
//StorageDetachLocal(ctx context.Context, path string) error //perm:admin
|
||||
//StorageRedeclareLocal(ctx context.Context, id storiface.ID) error //perm:admin
|
||||
//StorageRedeclareLocal(ctx context.Context, id storiface.ID, dropMissing bool) error //perm:admin
|
||||
|
||||
MarketImportDealData(ctx context.Context, propcid cid.Cid, path string) error //perm:write
|
||||
MarketListDeals(ctx context.Context) ([]*MarketDeal, error) //perm:read
|
||||
|
@ -59,9 +59,9 @@ type Worker interface {
|
||||
// Storage / Other
|
||||
Remove(ctx context.Context, sector abi.SectorID) error //perm:admin
|
||||
|
||||
StorageAddLocal(ctx context.Context, path string) error //perm:admin
|
||||
StorageDetachLocal(ctx context.Context, path string) error //perm:admin
|
||||
StorageRedeclareLocal(ctx context.Context, id storiface.ID) error //perm:admin
|
||||
StorageAddLocal(ctx context.Context, path string) error //perm:admin
|
||||
StorageDetachLocal(ctx context.Context, path string) error //perm:admin
|
||||
StorageRedeclareLocal(ctx context.Context, id storiface.ID, dropMissing bool) error //perm:admin
|
||||
|
||||
// SetEnabled marks the worker as enabled/disabled. Not that this setting
|
||||
// may take a few seconds to propagate to task scheduler
|
||||
|
@ -7,6 +7,15 @@ import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
blocks "github.com/ipfs/go-block-format"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/libp2p/go-libp2p-core/metrics"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p-core/protocol"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-bitfield"
|
||||
datatransfer "github.com/filecoin-project/go-data-transfer"
|
||||
@ -21,6 +30,7 @@ import (
|
||||
"github.com/filecoin-project/go-state-types/dline"
|
||||
abinetwork "github.com/filecoin-project/go-state-types/network"
|
||||
"github.com/filecoin-project/go-state-types/proof"
|
||||
|
||||
apitypes "github.com/filecoin-project/lotus/api/types"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin"
|
||||
lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||
@ -32,14 +42,6 @@ import (
|
||||
"github.com/filecoin-project/lotus/storage/sealer/fsutil"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/sealtasks"
|
||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
"github.com/google/uuid"
|
||||
blocks "github.com/ipfs/go-block-format"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/libp2p/go-libp2p-core/metrics"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p-core/protocol"
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
var ErrNotSupported = xerrors.New("method not supported")
|
||||
@ -969,7 +971,7 @@ type WorkerStruct struct {
|
||||
|
||||
StorageDetachLocal func(p0 context.Context, p1 string) error `perm:"admin"`
|
||||
|
||||
StorageRedeclareLocal func(p0 context.Context, p1 storiface.ID) error `perm:"admin"`
|
||||
StorageRedeclareLocal func(p0 context.Context, p1 storiface.ID, p2 bool) error `perm:"admin"`
|
||||
|
||||
TaskDisable func(p0 context.Context, p1 sealtasks.TaskType) error `perm:"admin"`
|
||||
|
||||
@ -5597,14 +5599,14 @@ func (s *WorkerStub) StorageDetachLocal(p0 context.Context, p1 string) error {
|
||||
return ErrNotSupported
|
||||
}
|
||||
|
||||
func (s *WorkerStruct) StorageRedeclareLocal(p0 context.Context, p1 storiface.ID) error {
|
||||
func (s *WorkerStruct) StorageRedeclareLocal(p0 context.Context, p1 storiface.ID, p2 bool) error {
|
||||
if s.Internal.StorageRedeclareLocal == nil {
|
||||
return ErrNotSupported
|
||||
}
|
||||
return s.Internal.StorageRedeclareLocal(p0, p1)
|
||||
return s.Internal.StorageRedeclareLocal(p0, p1, p2)
|
||||
}
|
||||
|
||||
func (s *WorkerStub) StorageRedeclareLocal(p0 context.Context, p1 storiface.ID) error {
|
||||
func (s *WorkerStub) StorageRedeclareLocal(p0 context.Context, p1 storiface.ID, p2 bool) error {
|
||||
return ErrNotSupported
|
||||
}
|
||||
|
||||
|
@ -5,6 +5,11 @@ package v0api
|
||||
import (
|
||||
"context"
|
||||
|
||||
blocks "github.com/ipfs/go-block-format"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-bitfield"
|
||||
datatransfer "github.com/filecoin-project/go-data-transfer"
|
||||
@ -16,6 +21,7 @@ import (
|
||||
"github.com/filecoin-project/go-state-types/crypto"
|
||||
"github.com/filecoin-project/go-state-types/dline"
|
||||
abinetwork "github.com/filecoin-project/go-state-types/network"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
apitypes "github.com/filecoin-project/lotus/api/types"
|
||||
lminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||
@ -23,10 +29,6 @@ import (
|
||||
marketevents "github.com/filecoin-project/lotus/markets/loggers"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
"github.com/filecoin-project/lotus/node/repo/imports"
|
||||
blocks "github.com/ipfs/go-block-format"
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
var ErrNotSupported = xerrors.New("method not supported")
|
||||
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -130,9 +130,8 @@ func (w *Worker) StorageDetachLocal(ctx context.Context, path string) error {
|
||||
return w.LocalStore.ClosePath(ctx, localPath.ID)
|
||||
}
|
||||
|
||||
func (w *Worker) StorageRedeclareLocal(ctx context.Context, id storiface.ID) error {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
func (w *Worker) StorageRedeclareLocal(ctx context.Context, id storiface.ID, dropMissing bool) error {
|
||||
return w.LocalStore.Redeclare(ctx, &id, dropMissing)
|
||||
}
|
||||
|
||||
func (w *Worker) SetEnabled(ctx context.Context, enabled bool) error {
|
||||
|
@ -160,6 +160,7 @@
|
||||
* [StorageAuthVerify](#StorageAuthVerify)
|
||||
* [StorageBestAlloc](#StorageBestAlloc)
|
||||
* [StorageDeclareSector](#StorageDeclareSector)
|
||||
* [StorageDetach](#StorageDetach)
|
||||
* [StorageDropSector](#StorageDropSector)
|
||||
* [StorageFindSector](#StorageFindSector)
|
||||
* [StorageGetLocks](#StorageGetLocks)
|
||||
@ -3398,6 +3399,21 @@ Inputs:
|
||||
|
||||
Response: `{}`
|
||||
|
||||
### StorageDetach
|
||||
|
||||
|
||||
Perms: admin
|
||||
|
||||
Inputs:
|
||||
```json
|
||||
[
|
||||
"76f1988b-ef30-4d7e-b3ec-9a627f4ba5a8",
|
||||
"string value"
|
||||
]
|
||||
```
|
||||
|
||||
Response: `{}`
|
||||
|
||||
### StorageDropSector
|
||||
|
||||
|
||||
|
@ -2131,7 +2131,8 @@ Perms: admin
|
||||
Inputs:
|
||||
```json
|
||||
[
|
||||
"76f1988b-ef30-4d7e-b3ec-9a627f4ba5a8"
|
||||
"76f1988b-ef30-4d7e-b3ec-9a627f4ba5a8",
|
||||
true
|
||||
]
|
||||
```
|
||||
|
||||
|
@ -253,7 +253,7 @@ func (st *Local) OpenPath(ctx context.Context, p string) error {
|
||||
return xerrors.Errorf("declaring storage in index: %w", err)
|
||||
}
|
||||
|
||||
if err := st.declareSectors(ctx, p, meta.ID, meta.CanStore); err != nil {
|
||||
if err := st.declareSectors(ctx, p, meta.ID, meta.CanStore, false); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -299,7 +299,7 @@ func (st *Local) open(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (st *Local) Redeclare(ctx context.Context) error {
|
||||
func (st *Local) Redeclare(ctx context.Context, filterId *storiface.ID, dropMissingDecls bool) error {
|
||||
st.localLk.Lock()
|
||||
defer st.localLk.Unlock()
|
||||
|
||||
@ -323,6 +323,9 @@ func (st *Local) Redeclare(ctx context.Context) error {
|
||||
log.Errorf("storage path ID changed: %s; %s -> %s", p.local, id, meta.ID)
|
||||
continue
|
||||
}
|
||||
if filterId != nil && *filterId != id {
|
||||
continue
|
||||
}
|
||||
|
||||
err = st.index.StorageAttach(ctx, storiface.StorageInfo{
|
||||
ID: id,
|
||||
@ -340,7 +343,7 @@ func (st *Local) Redeclare(ctx context.Context) error {
|
||||
return xerrors.Errorf("redeclaring storage in index: %w", err)
|
||||
}
|
||||
|
||||
if err := st.declareSectors(ctx, p.local, meta.ID, meta.CanStore); err != nil {
|
||||
if err := st.declareSectors(ctx, p.local, meta.ID, meta.CanStore, dropMissingDecls); err != nil {
|
||||
return xerrors.Errorf("redeclaring sectors: %w", err)
|
||||
}
|
||||
}
|
||||
@ -348,7 +351,19 @@ func (st *Local) Redeclare(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (st *Local) declareSectors(ctx context.Context, p string, id storiface.ID, primary bool) error {
|
||||
func (st *Local) declareSectors(ctx context.Context, p string, id storiface.ID, primary, dropMissing bool) error {
|
||||
indexed := map[storiface.Decl]struct{}{}
|
||||
if dropMissing {
|
||||
decls, err := st.index.StorageList(ctx)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting declaration list: %w", err)
|
||||
}
|
||||
|
||||
for _, decl := range decls[id] {
|
||||
indexed[decl] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
for _, t := range storiface.PathTypes {
|
||||
ents, err := ioutil.ReadDir(filepath.Join(p, t.String()))
|
||||
if err != nil {
|
||||
@ -372,12 +387,29 @@ func (st *Local) declareSectors(ctx context.Context, p string, id storiface.ID,
|
||||
return xerrors.Errorf("parse sector id %s: %w", ent.Name(), err)
|
||||
}
|
||||
|
||||
delete(indexed, storiface.Decl{
|
||||
SectorID: sid,
|
||||
SectorFileType: t,
|
||||
})
|
||||
|
||||
if err := st.index.StorageDeclareSector(ctx, id, sid, t, primary); err != nil {
|
||||
return xerrors.Errorf("declare sector %d(t:%d) -> %s: %w", sid, t, id, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(indexed) > 0 {
|
||||
log.Warnw("index contains sectors which are missing in the storage path", "count", len(indexed), "dropMissing", dropMissing)
|
||||
}
|
||||
|
||||
if dropMissing {
|
||||
for decl := range indexed {
|
||||
if err := st.index.StorageDropSector(ctx, id, decl.SectorID, decl.SectorFileType); err != nil {
|
||||
return xerrors.Errorf("dropping sector %v from index: %w", decl, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user