diff --git a/api/api_worker.go b/api/api_worker.go index e59e9f922..6ebe03147 100644 --- a/api/api_worker.go +++ b/api/api_worker.go @@ -59,6 +59,7 @@ type Worker interface { // Storage / Other Remove(ctx context.Context, sector abi.SectorID) error //perm:admin + StorageLocal(ctx context.Context) (map[storiface.ID]string, error) //perm:admin StorageAddLocal(ctx context.Context, path string) error //perm:admin StorageDetachLocal(ctx context.Context, path string) error //perm:admin StorageRedeclareLocal(ctx context.Context, id *storiface.ID, dropMissing bool) error //perm:admin diff --git a/api/proxy_gen.go b/api/proxy_gen.go index 961625606..9a296a4e3 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -975,6 +975,8 @@ type WorkerStruct struct { StorageDetachLocal func(p0 context.Context, p1 string) error `perm:"admin"` + StorageLocal func(p0 context.Context) (map[storiface.ID]string, error) `perm:"admin"` + StorageRedeclareLocal func(p0 context.Context, p1 *storiface.ID, p2 bool) error `perm:"admin"` TaskDisable func(p0 context.Context, p1 sealtasks.TaskType) error `perm:"admin"` @@ -5625,6 +5627,17 @@ func (s *WorkerStub) StorageDetachLocal(p0 context.Context, p1 string) error { return ErrNotSupported } +func (s *WorkerStruct) StorageLocal(p0 context.Context) (map[storiface.ID]string, error) { + if s.Internal.StorageLocal == nil { + return *new(map[storiface.ID]string), ErrNotSupported + } + return s.Internal.StorageLocal(p0) +} + +func (s *WorkerStub) StorageLocal(p0 context.Context) (map[storiface.ID]string, error) { + return *new(map[storiface.ID]string), ErrNotSupported +} + func (s *WorkerStruct) StorageRedeclareLocal(p0 context.Context, p1 *storiface.ID, p2 bool) error { if s.Internal.StorageRedeclareLocal == nil { return ErrNotSupported diff --git a/build/openrpc/full.json.gz b/build/openrpc/full.json.gz index 1cf63d4a7..6d9447322 100644 Binary files a/build/openrpc/full.json.gz and b/build/openrpc/full.json.gz differ diff --git a/build/openrpc/gateway.json.gz b/build/openrpc/gateway.json.gz index 54fa97243..1667fa6b7 100644 Binary files a/build/openrpc/gateway.json.gz and b/build/openrpc/gateway.json.gz differ diff --git a/build/openrpc/miner.json.gz b/build/openrpc/miner.json.gz index 659ecc71e..8dd0e151d 100644 Binary files a/build/openrpc/miner.json.gz and b/build/openrpc/miner.json.gz differ diff --git a/build/openrpc/worker.json.gz b/build/openrpc/worker.json.gz index 210ef99b9..881d99256 100644 Binary files a/build/openrpc/worker.json.gz and b/build/openrpc/worker.json.gz differ diff --git a/cmd/lotus-worker/sealworker/rpc.go b/cmd/lotus-worker/sealworker/rpc.go index fb58aa7da..6fcb9d05d 100644 --- a/cmd/lotus-worker/sealworker/rpc.go +++ b/cmd/lotus-worker/sealworker/rpc.go @@ -65,6 +65,20 @@ func (w *Worker) Version(context.Context) (api.Version, error) { return api.WorkerAPIVersion0, nil } +func (w *Worker) StorageLocal(ctx context.Context) (map[storiface.ID]string, error) { + l, err := w.LocalStore.Local(ctx) + if err != nil { + return nil, err + } + + out := map[storiface.ID]string{} + for _, st := range l { + out[st.ID] = st.LocalPath + } + + return out, nil +} + func (w *Worker) StorageAddLocal(ctx context.Context, path string) error { path, err := homedir.Expand(path) if err != nil { diff --git a/documentation/en/api-v0-methods-worker.md b/documentation/en/api-v0-methods-worker.md index 8f7c780bc..713e3f7ae 100644 --- a/documentation/en/api-v0-methods-worker.md +++ b/documentation/en/api-v0-methods-worker.md @@ -39,6 +39,7 @@ * [Storage](#Storage) * [StorageAddLocal](#StorageAddLocal) * [StorageDetachLocal](#StorageDetachLocal) + * [StorageLocal](#StorageLocal) * [StorageRedeclareLocal](#StorageRedeclareLocal) * [Task](#Task) * [TaskDisable](#TaskDisable) @@ -2123,6 +2124,20 @@ Inputs: Response: `{}` +### StorageLocal + + +Perms: admin + +Inputs: `null` + +Response: +```json +{ + "76f1988b-ef30-4d7e-b3ec-9a627f4ba5a8": "/data/path" +} +``` + ### StorageRedeclareLocal diff --git a/itests/kit/ensemble.go b/itests/kit/ensemble.go index ef02edd79..0e28babb5 100644 --- a/itests/kit/ensemble.go +++ b/itests/kit/ensemble.go @@ -6,8 +6,6 @@ import ( "crypto/rand" "encoding/binary" "fmt" - cborutil "github.com/filecoin-project/go-cbor-util" - pipeline "github.com/filecoin-project/lotus/storage/pipeline" "io/ioutil" "net" "net/http" @@ -23,6 +21,7 @@ import ( "github.com/stretchr/testify/require" "github.com/filecoin-project/go-address" + cborutil "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/builtin" @@ -58,6 +57,7 @@ import ( testing2 "github.com/filecoin-project/lotus/node/modules/testing" "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/storage/paths" + pipeline "github.com/filecoin-project/lotus/storage/pipeline" sectorstorage "github.com/filecoin-project/lotus/storage/sealer" "github.com/filecoin-project/lotus/storage/sealer/mock" "github.com/filecoin-project/lotus/storage/sealer/storiface" diff --git a/itests/kit/node_worker.go b/itests/kit/node_worker.go index 538c739a7..3a6a55c55 100644 --- a/itests/kit/node_worker.go +++ b/itests/kit/node_worker.go @@ -2,13 +2,21 @@ package kit import ( "context" + "encoding/json" + "io/ioutil" "net" "net/http" + "os" + "path/filepath" "testing" + "github.com/google/uuid" "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/storage/paths" + "github.com/filecoin-project/lotus/storage/sealer/storiface" ) // TestWorker represents a worker enrolled in an Ensemble. @@ -29,3 +37,42 @@ type TestWorker struct { options nodeOpts } + +func (tm *TestWorker) AddStorage(ctx context.Context, t *testing.T, conf func(*paths.LocalStorageMeta)) storiface.ID { + p := t.TempDir() + + if err := os.MkdirAll(p, 0755); err != nil { + if !os.IsExist(err) { + require.NoError(t, err) + } + } + + _, err := os.Stat(filepath.Join(p, metaFile)) + if !os.IsNotExist(err) { + require.NoError(t, err) + } + + cfg := &paths.LocalStorageMeta{ + ID: storiface.ID(uuid.New().String()), + Weight: 10, + CanSeal: false, + CanStore: false, + } + + conf(cfg) + + if !(cfg.CanStore || cfg.CanSeal) { + t.Fatal("must specify at least one of CanStore or cfg.CanSeal") + } + + b, err := json.MarshalIndent(cfg, "", " ") + require.NoError(t, err) + + err = ioutil.WriteFile(filepath.Join(p, metaFile), b, 0644) + require.NoError(t, err) + + err = tm.StorageAddLocal(ctx, p) + require.NoError(t, err) + + return cfg.ID +} diff --git a/itests/path_detach_redeclare_test.go b/itests/path_detach_redeclare_test.go index 60811964f..b9eab0d26 100644 --- a/itests/path_detach_redeclare_test.go +++ b/itests/path_detach_redeclare_test.go @@ -2,19 +2,22 @@ package itests import ( "context" + "os" + "os/exec" + "path/filepath" + "testing" + + logging "github.com/ipfs/go-log/v2" + "github.com/stretchr/testify/require" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/itests/kit" "github.com/filecoin-project/lotus/storage/paths" "github.com/filecoin-project/lotus/storage/sealer/sealtasks" "github.com/filecoin-project/lotus/storage/sealer/storiface" - logging "github.com/ipfs/go-log/v2" - "github.com/stretchr/testify/require" - "os" - "os/exec" - "path/filepath" - "testing" ) func TestPathDetachRedeclare(t *testing.T) { @@ -132,6 +135,165 @@ func TestPathDetachRedeclare(t *testing.T) { require.Equal(t, abi.SectorNumber(1), sps[newId][0].SectorID.Number) } +func TestPathDetachRedeclareWorker(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + _ = logging.SetLogLevel("storageminer", "INFO") + + var ( + client kit.TestFullNode + miner kit.TestMiner + wiw, wdw, sealw kit.TestWorker + ) + ens := kit.NewEnsemble(t, kit.LatestActorsAt(-1)). + FullNode(&client, kit.ThroughRPC()). + Miner(&miner, &client, kit.WithAllSubsystems(), kit.ThroughRPC(), kit.PresealSectors(2), kit.NoStorage()). + Worker(&miner, &wiw, kit.ThroughRPC(), kit.NoStorage(), kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWinningPoSt})). + Worker(&miner, &wdw, kit.ThroughRPC(), kit.NoStorage(), kit.WithTaskTypes([]sealtasks.TaskType{sealtasks.TTGenerateWindowPoSt})). + Worker(&miner, &sealw, kit.ThroughRPC(), kit.NoStorage(), kit.WithSealWorkerTasks). + Start() + + ens.InterconnectAll() + + // check there's only one path on the miner, none on the worker + sps, err := miner.StorageList(ctx) + require.NoError(t, err) + require.Len(t, sps, 1) + + var id storiface.ID + for s := range sps { + id = s + } + + local, err := miner.StorageLocal(ctx) + require.NoError(t, err) + require.Len(t, local, 1) + require.Greater(t, len(local[id]), 1) + + oldLocal := local[id] + + local, err = sealw.StorageLocal(ctx) + require.NoError(t, err) + require.Len(t, local, 0) + + // check sectors + checkSectors(t, ctx, client, miner, 2, 0) + + // detach preseal path from the miner + require.NoError(t, miner.StorageDetachLocal(ctx, oldLocal)) + + // check that there are no paths, post checks fail + sps, err = miner.StorageList(ctx) + require.NoError(t, err) + require.Len(t, sps, 0) + local, err = miner.StorageLocal(ctx) + require.NoError(t, err) + require.Len(t, local, 0) + + checkSectors(t, ctx, client, miner, 2, 2) + + // attach a new path + newId := sealw.AddStorage(ctx, t, func(cfg *paths.LocalStorageMeta) { + cfg.CanStore = true + }) + + sps, err = miner.StorageList(ctx) + require.NoError(t, err) + require.Len(t, sps, 1) + local, err = sealw.StorageLocal(ctx) + require.NoError(t, err) + require.Len(t, local, 1) + require.Greater(t, len(local[newId]), 1) + + newLocalTemp := local[newId] + + // move sector data to the new path + + // note: dest path already exist so we only want to .Join src + require.NoError(t, exec.Command("cp", "--recursive", filepath.Join(oldLocal, "sealed"), newLocalTemp).Run()) + require.NoError(t, exec.Command("cp", "--recursive", filepath.Join(oldLocal, "cache"), newLocalTemp).Run()) + require.NoError(t, exec.Command("cp", "--recursive", filepath.Join(oldLocal, "unsealed"), newLocalTemp).Run()) + + // check that sector files aren't indexed, post checks fail + sps, err = miner.StorageList(ctx) + require.NoError(t, err) + require.Len(t, sps, 1) + require.Len(t, sps[newId], 0) + + // redeclare sectors + require.NoError(t, sealw.StorageRedeclareLocal(ctx, nil, false)) + + // check that sector files exist, post checks work + sps, err = miner.StorageList(ctx) + require.NoError(t, err) + require.Len(t, sps, 1) + require.Len(t, sps[newId], 2) + + checkSectors(t, ctx, client, miner, 2, 0) + + // drop the path from the worker + require.NoError(t, sealw.StorageDetachLocal(ctx, newLocalTemp)) + local, err = sealw.StorageLocal(ctx) + require.NoError(t, err) + require.Len(t, local, 0) + + // add a new one again, and move the sectors there + newId = sealw.AddStorage(ctx, t, func(cfg *paths.LocalStorageMeta) { + cfg.CanStore = true + }) + + sps, err = miner.StorageList(ctx) + require.NoError(t, err) + require.Len(t, sps, 1) + local, err = sealw.StorageLocal(ctx) + require.NoError(t, err) + require.Len(t, local, 1) + require.Greater(t, len(local[newId]), 1) + + newLocal := local[newId] + + // move sector data to the new path + + // note: dest path already exist so we only want to .Join src + require.NoError(t, exec.Command("cp", "--recursive", filepath.Join(newLocalTemp, "sealed"), newLocal).Run()) + require.NoError(t, exec.Command("cp", "--recursive", filepath.Join(newLocalTemp, "cache"), newLocal).Run()) + require.NoError(t, exec.Command("cp", "--recursive", filepath.Join(newLocalTemp, "unsealed"), newLocal).Run()) + + // redeclare sectors + require.NoError(t, sealw.StorageRedeclareLocal(ctx, nil, false)) + + // check that sector files exist, post checks work + sps, err = miner.StorageList(ctx) + require.NoError(t, err) + require.Len(t, sps, 1) + require.Len(t, sps[newId], 2) + + checkSectors(t, ctx, client, miner, 2, 0) + + // remove one sector, one check fails + require.NoError(t, os.RemoveAll(filepath.Join(newLocal, "sealed", "s-t01000-0"))) + require.NoError(t, os.RemoveAll(filepath.Join(newLocal, "cache", "s-t01000-0"))) + checkSectors(t, ctx, client, miner, 2, 1) + + // redeclare with no drop, still see sector in the index + require.NoError(t, sealw.StorageRedeclareLocal(ctx, nil, false)) + + sps, err = miner.StorageList(ctx) + require.NoError(t, err) + require.Len(t, sps, 1) + require.Len(t, sps[newId], 2) + + // redeclare with drop, don't see the sector in the index + require.NoError(t, sealw.StorageRedeclareLocal(ctx, nil, true)) + + sps, err = miner.StorageList(ctx) + require.NoError(t, err) + require.Len(t, sps, 1) + require.Len(t, sps[newId], 1) + require.Equal(t, abi.SectorNumber(1), sps[newId][0].SectorID.Number) +} + func checkSectors(t *testing.T, ctx context.Context, api kit.TestFullNode, miner kit.TestMiner, expectChecked, expectBad int) { addr, err := miner.ActorAddress(ctx) require.NoError(t, err)