From 1461e475da004efd5d44ade6161c5d2bef259686 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 5 Mar 2020 23:02:01 +0100 Subject: [PATCH] storage: Support adding paths at runtime --- api/api_storage.go | 2 ++ api/apistruct/struct.go | 6 +++++ node/impl/storminer.go | 10 ++++++++ node/modules/services.go | 2 +- node/node_test.go | 41 ++++++++++-------------------- node/repo/memrepo.go | 42 ++++++++++++++++++++++++++++--- storage/sbmock/sbmock_test.go | 2 +- storage/sealmgr/advmgr/manager.go | 20 +++++++++++++++ storage/sealmgr/advmgr/storage.go | 38 +++++++++++++++------------- 9 files changed, 112 insertions(+), 51 deletions(-) diff --git a/api/api_storage.go b/api/api_storage.go index ba6efc93c..6b3705b79 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -124,6 +124,8 @@ type StorageMiner interface { DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error DealsList(ctx context.Context) ([]storagemarket.StorageDeal, error) + + StorageAddLocal(ctx context.Context, path string) error } type SealRes struct { diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 8fb9ddfa8..e11a89aab 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -180,6 +180,8 @@ type StorageMinerStruct struct { DealsImportData func(ctx context.Context, dealPropCid cid.Cid, file string) error `perm:"write"` DealsList func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"` + + StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"` } } @@ -645,6 +647,10 @@ func (c *StorageMinerStruct) DealsList(ctx context.Context) ([]storagemarket.Sto return c.Internal.DealsList(ctx) } +func (c *StorageMinerStruct) StorageAddLocal(ctx context.Context, path string) error { + return c.Internal.StorageAddLocal(ctx, path) +} + var _ api.Common = &CommonStruct{} var _ api.FullNode = &FullNodeStruct{} var _ api.StorageMiner = &StorageMinerStruct{} diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 372211738..150f25130 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -21,6 +21,7 @@ import ( "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/storage" + "github.com/filecoin-project/lotus/storage/sealmgr/advmgr" "github.com/filecoin-project/lotus/storage/sectorblocks" ) @@ -35,6 +36,7 @@ type StorageMinerAPI struct { Miner *storage.Miner BlockMiner *miner.Miner Full api.FullNode + StorageMgr *advmgr.Manager `optional:"true"` } func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) { @@ -300,4 +302,12 @@ func (sm *StorageMinerAPI) DealsImportData(ctx context.Context, deal cid.Cid, fn return sm.StorageProvider.ImportDataForDeal(ctx, deal, fi) } +func (sm *StorageMinerAPI) StorageAddLocal(ctx context.Context, path string) error { + if sm.StorageMgr == nil { + return xerrors.Errorf("no storage manager") + } + + return sm.StorageMgr.AddLocalStorage(path) +} + var _ api.StorageMiner = &StorageMinerAPI{} diff --git a/node/modules/services.go b/node/modules/services.go index 49bc4e5ae..cac24391b 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -71,7 +71,7 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.P func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, mpool *messagepool.MessagePool) { ctx := helpers.LifecycleCtx(mctx, lc) - msgsub, err := ps.Subscribe(build.BlocksTopic) + msgsub, err := ps.Subscribe(build.MessagesTopic) if err != nil { panic(err) } diff --git a/node/node_test.go b/node/node_test.go index b97809b2e..2b5ac072d 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -6,13 +6,10 @@ import ( "crypto/rand" "io/ioutil" "net/http/httptest" - "path/filepath" "testing" "time" "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" - badger "github.com/ipfs/go-ds-badger2" logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/peer" @@ -39,11 +36,12 @@ import ( "github.com/filecoin-project/lotus/lib/jsonrpc" "github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/node" - "github.com/filecoin-project/lotus/node/impl" "github.com/filecoin-project/lotus/node/modules" modtest "github.com/filecoin-project/lotus/node/modules/testing" "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/storage/sbmock" + "github.com/filecoin-project/lotus/storage/sealmgr" + "github.com/filecoin-project/lotus/storage/sealmgr/advmgr" ) func init() { @@ -240,40 +238,24 @@ func builder(t *testing.T, nFull int, storage []int) ([]test.TestNode, []test.Te f := fulls[full] if _, err := f.FullNode.WalletImport(ctx, &keys[i].KeyInfo); err != nil { - return nil, nil + t.Fatal(err) } if err := f.FullNode.WalletSetDefault(ctx, keys[i].Address); err != nil { - return nil, nil + t.Fatal(err) } genMiner := maddrs[i] wa := genms[i].Worker storers[i] = testStorageNode(ctx, t, wa, genMiner, pk, f, mn, node.Options()) - - sma := storers[i].StorageMiner.(*impl.StorageMinerAPI) - - psd := presealDirs[i] - mds, err := badger.NewDatastore(filepath.Join(psd, "badger"), nil) - if err != nil { - t.Fatal(err) - } - - osb, err := sectorbuilder.New(§orbuilder.Config{ - SealProofType: abi.RegisteredProof_StackedDRG2KiBSeal, - PoStProofType: abi.RegisteredProof_StackedDRG2KiBPoSt, - WorkerThreads: 2, - Miner: genMiner, - Paths: sectorbuilder.SimplePath(psd), - }, namespace.Wrap(mds, datastore.NewKey("/sectorbuilder"))) - if err != nil { - t.Fatal(err) - } - - if err := sma.SectorBuilder.(*sectorbuilder.SectorBuilder).ImportFrom(osb, false); err != nil { + if err := storers[i].StorageAddLocal(ctx, presealDirs[i]); err != nil { t.Fatal(err) } + /* + sma := storers[i].StorageMiner.(*impl.StorageMinerAPI) + psd := presealDirs[i] + */ } if err := mn.LinkAll(); err != nil { @@ -394,7 +376,10 @@ func mockSbBuilder(t *testing.T, nFull int, storage []int) ([]test.TestNode, []t wa := genms[i].Worker storers[i] = testStorageNode(ctx, t, wa, genMiner, pk, f, mn, node.Options( - node.Override(new(sectorbuilder.Interface), sbmock.NewMockSectorBuilder(5, build.SectorSizes[0])), + node.Override(new(sealmgr.Manager), func() (sealmgr.Manager, error) { + return sealmgr.NewSimpleManager(nil, genMiner, sbmock.NewMockSectorBuilder(5, build.SectorSizes[0])) + }), + node.Unset(new(*advmgr.Manager)), )) } diff --git a/node/repo/memrepo.go b/node/repo/memrepo.go index d1b491831..ddc283bee 100644 --- a/node/repo/memrepo.go +++ b/node/repo/memrepo.go @@ -1,10 +1,13 @@ package repo import ( + "encoding/json" "io/ioutil" "os" + "path/filepath" "sync" + "github.com/google/uuid" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" dssync "github.com/ipfs/go-datastore/sync" @@ -40,7 +43,9 @@ type lockedMemRepo struct { } func (lmem *lockedMemRepo) GetStorage() (config.StorageConfig, error) { - panic("implement me") + return config.StorageConfig{StoragePaths: []config.LocalPath{ + {Path: lmem.Path()}, + }}, nil } func (lmem *lockedMemRepo) SetStorage(config.StorageConfig) error { @@ -48,14 +53,45 @@ func (lmem *lockedMemRepo) SetStorage(config.StorageConfig) error { } func (lmem *lockedMemRepo) Path() string { + lmem.Lock() + defer lmem.Unlock() + + if lmem.tempDir != "" { + return lmem.tempDir + } + t, err := ioutil.TempDir(os.TempDir(), "lotus-memrepo-temp-") if err != nil { panic(err) // only used in tests, probably fine } - lmem.Lock() + if lmem.t == StorageMiner { + if err := config.WriteStorageFile(filepath.Join(t, fsStorageConfig), config.StorageConfig{ + StoragePaths: []config.LocalPath{ + {Path: t}, + }}); err != nil { + panic(err) + } + + b, err := json.MarshalIndent(&config.StorageMeta{ + ID: uuid.New().String(), + Weight: 10, + CanSeal: true, + CanStore: true, + }, "", " ") + if err != nil { + panic(err) + } + + if err := ioutil.WriteFile(filepath.Join(t, "sectorstore.json"), b, 0644); err != nil { + panic(err) + } + } + + + + lmem.tempDir = t - lmem.Unlock() return t } diff --git a/storage/sbmock/sbmock_test.go b/storage/sbmock/sbmock_test.go index d76efa375..6bfe9e062 100644 --- a/storage/sbmock/sbmock_test.go +++ b/storage/sbmock/sbmock_test.go @@ -20,7 +20,7 @@ func TestOpFinish(t *testing.T) { finished := make(chan struct{}) go func() { - _, _, err := sb.SealPreCommit(ctx, sid, abi.SealRandomness{}, pieces) + _, err := sb.SealPreCommit1(ctx, sid, abi.SealRandomness{}, pieces) if err != nil { t.Error(err) return diff --git a/storage/sealmgr/advmgr/manager.go b/storage/sealmgr/advmgr/manager.go index bbe241c1d..cbc3dfb97 100644 --- a/storage/sealmgr/advmgr/manager.go +++ b/storage/sealmgr/advmgr/manager.go @@ -5,6 +5,7 @@ import ( "io" "github.com/ipfs/go-cid" + "github.com/mitchellh/go-homedir" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" @@ -84,6 +85,25 @@ func New(ls LocalStorage, cfg *sectorbuilder.Config, sc SectorIDCounter) (*Manag return m, nil } +func (m *Manager) AddLocalStorage(path string) error { + path, err := homedir.Expand(path) + if err != nil { + return xerrors.Errorf("expanding local path: %w", err) + } + + if err := m.storage.openPath(path); err != nil { + return xerrors.Errorf("opening local path: %w", err) + } + + sc, err := m.storage.localStorage.GetStorage() + if err != nil { + return xerrors.Errorf("get storage config: %w", err) + } + + sc.StoragePaths = append(sc.StoragePaths, config.LocalPath{Path: path}) + return nil +} + func (m *Manager) SectorSize() abi.SectorSize { sz, _ := m.scfg.SealProofType.SectorSize() return sz diff --git a/storage/sealmgr/advmgr/storage.go b/storage/sealmgr/advmgr/storage.go index cb5533c21..564935942 100644 --- a/storage/sealmgr/advmgr/storage.go +++ b/storage/sealmgr/advmgr/storage.go @@ -34,7 +34,19 @@ type path struct { sectors map[abi.SectorID]sectorbuilder.SectorFileType } -func openPath(p string, meta config.StorageMeta) (path, error) { +func (st *storage) openPath(p string) error { + mb, err := ioutil.ReadFile(filepath.Join(p, metaFile)) + if err != nil { + return xerrors.Errorf("reading storage metadata for %s: %w", p, err) + } + + var meta config.StorageMeta + if err := json.Unmarshal(mb, &meta); err != nil { + return xerrors.Errorf("unmarshalling storage metadata for %s: %w", p, err) + } + + // TODO: Check existing / dedupe + out := path{ meta: meta, local: p, @@ -46,25 +58,27 @@ func openPath(p string, meta config.StorageMeta) (path, error) { if err != nil { if os.IsNotExist(err) { if err := os.MkdirAll(filepath.Join(p, t.String()), 0755); err != nil { - return path{}, xerrors.Errorf("mkdir '%s': %w", filepath.Join(p, t.String()), err) + return xerrors.Errorf("mkdir '%s': %w", filepath.Join(p, t.String()), err) } continue } - return path{}, xerrors.Errorf("listing %s: %w", filepath.Join(p, t.String()), err) + return xerrors.Errorf("listing %s: %w", filepath.Join(p, t.String()), err) } for _, ent := range ents { sid, err := parseSectorID(ent.Name()) if err != nil { - return path{}, xerrors.Errorf("parse sector id %s: %w", ent.Name(), err) + return xerrors.Errorf("parse sector id %s: %w", ent.Name(), err) } out.sectors[sid] |= t } } - return out, nil + st.paths = append(st.paths, out) + + return nil } func (st *storage) open() error { @@ -81,22 +95,10 @@ func (st *storage) open() error { } for _, path := range cfg.StoragePaths { - mb, err := ioutil.ReadFile(filepath.Join(path.Path, metaFile)) - if err != nil { - return xerrors.Errorf("reading storage metadata for %s: %w", path.Path, err) - } - - var meta config.StorageMeta - if err := json.Unmarshal(mb, &meta); err != nil { - return xerrors.Errorf("unmarshalling storage metadata for %s: %w", path.Path, err) - } - - pi, err := openPath(path.Path, meta) + err := st.openPath(path.Path) if err != nil { return xerrors.Errorf("opening path %s: %w", path.Path, err) } - - st.paths = append(st.paths, pi) } return nil