storage: Support adding paths at runtime
This commit is contained in:
parent
a5e5918fc5
commit
1461e475da
@ -124,6 +124,8 @@ type StorageMiner interface {
|
||||
|
||||
DealsImportData(ctx context.Context, dealPropCid cid.Cid, file string) error
|
||||
DealsList(ctx context.Context) ([]storagemarket.StorageDeal, error)
|
||||
|
||||
StorageAddLocal(ctx context.Context, path string) error
|
||||
}
|
||||
|
||||
type SealRes struct {
|
||||
|
@ -180,6 +180,8 @@ type StorageMinerStruct struct {
|
||||
|
||||
DealsImportData func(ctx context.Context, dealPropCid cid.Cid, file string) error `perm:"write"`
|
||||
DealsList func(ctx context.Context) ([]storagemarket.StorageDeal, error) `perm:"read"`
|
||||
|
||||
StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"`
|
||||
}
|
||||
}
|
||||
|
||||
@ -645,6 +647,10 @@ func (c *StorageMinerStruct) DealsList(ctx context.Context) ([]storagemarket.Sto
|
||||
return c.Internal.DealsList(ctx)
|
||||
}
|
||||
|
||||
func (c *StorageMinerStruct) StorageAddLocal(ctx context.Context, path string) error {
|
||||
return c.Internal.StorageAddLocal(ctx, path)
|
||||
}
|
||||
|
||||
var _ api.Common = &CommonStruct{}
|
||||
var _ api.FullNode = &FullNodeStruct{}
|
||||
var _ api.StorageMiner = &StorageMinerStruct{}
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/miner"
|
||||
"github.com/filecoin-project/lotus/storage"
|
||||
"github.com/filecoin-project/lotus/storage/sealmgr/advmgr"
|
||||
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
||||
)
|
||||
|
||||
@ -35,6 +36,7 @@ type StorageMinerAPI struct {
|
||||
Miner *storage.Miner
|
||||
BlockMiner *miner.Miner
|
||||
Full api.FullNode
|
||||
StorageMgr *advmgr.Manager `optional:"true"`
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) ServeRemote(w http.ResponseWriter, r *http.Request) {
|
||||
@ -300,4 +302,12 @@ func (sm *StorageMinerAPI) DealsImportData(ctx context.Context, deal cid.Cid, fn
|
||||
return sm.StorageProvider.ImportDataForDeal(ctx, deal, fi)
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) StorageAddLocal(ctx context.Context, path string) error {
|
||||
if sm.StorageMgr == nil {
|
||||
return xerrors.Errorf("no storage manager")
|
||||
}
|
||||
|
||||
return sm.StorageMgr.AddLocalStorage(path)
|
||||
}
|
||||
|
||||
var _ api.StorageMiner = &StorageMinerAPI{}
|
||||
|
@ -71,7 +71,7 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.P
|
||||
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, mpool *messagepool.MessagePool) {
|
||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||
|
||||
msgsub, err := ps.Subscribe(build.BlocksTopic)
|
||||
msgsub, err := ps.Subscribe(build.MessagesTopic)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -6,13 +6,10 @@ import (
|
||||
"crypto/rand"
|
||||
"io/ioutil"
|
||||
"net/http/httptest"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-datastore/namespace"
|
||||
badger "github.com/ipfs/go-ds-badger2"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"github.com/libp2p/go-libp2p-core/crypto"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
@ -39,11 +36,12 @@ import (
|
||||
"github.com/filecoin-project/lotus/lib/jsonrpc"
|
||||
"github.com/filecoin-project/lotus/miner"
|
||||
"github.com/filecoin-project/lotus/node"
|
||||
"github.com/filecoin-project/lotus/node/impl"
|
||||
"github.com/filecoin-project/lotus/node/modules"
|
||||
modtest "github.com/filecoin-project/lotus/node/modules/testing"
|
||||
"github.com/filecoin-project/lotus/node/repo"
|
||||
"github.com/filecoin-project/lotus/storage/sbmock"
|
||||
"github.com/filecoin-project/lotus/storage/sealmgr"
|
||||
"github.com/filecoin-project/lotus/storage/sealmgr/advmgr"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -240,40 +238,24 @@ func builder(t *testing.T, nFull int, storage []int) ([]test.TestNode, []test.Te
|
||||
|
||||
f := fulls[full]
|
||||
if _, err := f.FullNode.WalletImport(ctx, &keys[i].KeyInfo); err != nil {
|
||||
return nil, nil
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := f.FullNode.WalletSetDefault(ctx, keys[i].Address); err != nil {
|
||||
return nil, nil
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
genMiner := maddrs[i]
|
||||
wa := genms[i].Worker
|
||||
|
||||
storers[i] = testStorageNode(ctx, t, wa, genMiner, pk, f, mn, node.Options())
|
||||
|
||||
if err := storers[i].StorageAddLocal(ctx, presealDirs[i]); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
/*
|
||||
sma := storers[i].StorageMiner.(*impl.StorageMinerAPI)
|
||||
|
||||
psd := presealDirs[i]
|
||||
mds, err := badger.NewDatastore(filepath.Join(psd, "badger"), nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
osb, err := sectorbuilder.New(§orbuilder.Config{
|
||||
SealProofType: abi.RegisteredProof_StackedDRG2KiBSeal,
|
||||
PoStProofType: abi.RegisteredProof_StackedDRG2KiBPoSt,
|
||||
WorkerThreads: 2,
|
||||
Miner: genMiner,
|
||||
Paths: sectorbuilder.SimplePath(psd),
|
||||
}, namespace.Wrap(mds, datastore.NewKey("/sectorbuilder")))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := sma.SectorBuilder.(*sectorbuilder.SectorBuilder).ImportFrom(osb, false); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
*/
|
||||
}
|
||||
|
||||
if err := mn.LinkAll(); err != nil {
|
||||
@ -394,7 +376,10 @@ func mockSbBuilder(t *testing.T, nFull int, storage []int) ([]test.TestNode, []t
|
||||
wa := genms[i].Worker
|
||||
|
||||
storers[i] = testStorageNode(ctx, t, wa, genMiner, pk, f, mn, node.Options(
|
||||
node.Override(new(sectorbuilder.Interface), sbmock.NewMockSectorBuilder(5, build.SectorSizes[0])),
|
||||
node.Override(new(sealmgr.Manager), func() (sealmgr.Manager, error) {
|
||||
return sealmgr.NewSimpleManager(nil, genMiner, sbmock.NewMockSectorBuilder(5, build.SectorSizes[0]))
|
||||
}),
|
||||
node.Unset(new(*advmgr.Manager)),
|
||||
))
|
||||
}
|
||||
|
||||
|
@ -1,10 +1,13 @@
|
||||
package repo
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/ipfs/go-datastore"
|
||||
"github.com/ipfs/go-datastore/namespace"
|
||||
dssync "github.com/ipfs/go-datastore/sync"
|
||||
@ -40,7 +43,9 @@ type lockedMemRepo struct {
|
||||
}
|
||||
|
||||
func (lmem *lockedMemRepo) GetStorage() (config.StorageConfig, error) {
|
||||
panic("implement me")
|
||||
return config.StorageConfig{StoragePaths: []config.LocalPath{
|
||||
{Path: lmem.Path()},
|
||||
}}, nil
|
||||
}
|
||||
|
||||
func (lmem *lockedMemRepo) SetStorage(config.StorageConfig) error {
|
||||
@ -48,14 +53,45 @@ func (lmem *lockedMemRepo) SetStorage(config.StorageConfig) error {
|
||||
}
|
||||
|
||||
func (lmem *lockedMemRepo) Path() string {
|
||||
lmem.Lock()
|
||||
defer lmem.Unlock()
|
||||
|
||||
if lmem.tempDir != "" {
|
||||
return lmem.tempDir
|
||||
}
|
||||
|
||||
t, err := ioutil.TempDir(os.TempDir(), "lotus-memrepo-temp-")
|
||||
if err != nil {
|
||||
panic(err) // only used in tests, probably fine
|
||||
}
|
||||
|
||||
lmem.Lock()
|
||||
if lmem.t == StorageMiner {
|
||||
if err := config.WriteStorageFile(filepath.Join(t, fsStorageConfig), config.StorageConfig{
|
||||
StoragePaths: []config.LocalPath{
|
||||
{Path: t},
|
||||
}}); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
b, err := json.MarshalIndent(&config.StorageMeta{
|
||||
ID: uuid.New().String(),
|
||||
Weight: 10,
|
||||
CanSeal: true,
|
||||
CanStore: true,
|
||||
}, "", " ")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err := ioutil.WriteFile(filepath.Join(t, "sectorstore.json"), b, 0644); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
lmem.tempDir = t
|
||||
lmem.Unlock()
|
||||
return t
|
||||
}
|
||||
|
||||
|
@ -20,7 +20,7 @@ func TestOpFinish(t *testing.T) {
|
||||
|
||||
finished := make(chan struct{})
|
||||
go func() {
|
||||
_, _, err := sb.SealPreCommit(ctx, sid, abi.SealRandomness{}, pieces)
|
||||
_, err := sb.SealPreCommit1(ctx, sid, abi.SealRandomness{}, pieces)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"io"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/mitchellh/go-homedir"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
@ -84,6 +85,25 @@ func New(ls LocalStorage, cfg *sectorbuilder.Config, sc SectorIDCounter) (*Manag
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func (m *Manager) AddLocalStorage(path string) error {
|
||||
path, err := homedir.Expand(path)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("expanding local path: %w", err)
|
||||
}
|
||||
|
||||
if err := m.storage.openPath(path); err != nil {
|
||||
return xerrors.Errorf("opening local path: %w", err)
|
||||
}
|
||||
|
||||
sc, err := m.storage.localStorage.GetStorage()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("get storage config: %w", err)
|
||||
}
|
||||
|
||||
sc.StoragePaths = append(sc.StoragePaths, config.LocalPath{Path: path})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) SectorSize() abi.SectorSize {
|
||||
sz, _ := m.scfg.SealProofType.SectorSize()
|
||||
return sz
|
||||
|
@ -34,7 +34,19 @@ type path struct {
|
||||
sectors map[abi.SectorID]sectorbuilder.SectorFileType
|
||||
}
|
||||
|
||||
func openPath(p string, meta config.StorageMeta) (path, error) {
|
||||
func (st *storage) openPath(p string) error {
|
||||
mb, err := ioutil.ReadFile(filepath.Join(p, metaFile))
|
||||
if err != nil {
|
||||
return xerrors.Errorf("reading storage metadata for %s: %w", p, err)
|
||||
}
|
||||
|
||||
var meta config.StorageMeta
|
||||
if err := json.Unmarshal(mb, &meta); err != nil {
|
||||
return xerrors.Errorf("unmarshalling storage metadata for %s: %w", p, err)
|
||||
}
|
||||
|
||||
// TODO: Check existing / dedupe
|
||||
|
||||
out := path{
|
||||
meta: meta,
|
||||
local: p,
|
||||
@ -46,25 +58,27 @@ func openPath(p string, meta config.StorageMeta) (path, error) {
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
if err := os.MkdirAll(filepath.Join(p, t.String()), 0755); err != nil {
|
||||
return path{}, xerrors.Errorf("mkdir '%s': %w", filepath.Join(p, t.String()), err)
|
||||
return xerrors.Errorf("mkdir '%s': %w", filepath.Join(p, t.String()), err)
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
return path{}, xerrors.Errorf("listing %s: %w", filepath.Join(p, t.String()), err)
|
||||
return xerrors.Errorf("listing %s: %w", filepath.Join(p, t.String()), err)
|
||||
}
|
||||
|
||||
for _, ent := range ents {
|
||||
sid, err := parseSectorID(ent.Name())
|
||||
if err != nil {
|
||||
return path{}, xerrors.Errorf("parse sector id %s: %w", ent.Name(), err)
|
||||
return xerrors.Errorf("parse sector id %s: %w", ent.Name(), err)
|
||||
}
|
||||
|
||||
out.sectors[sid] |= t
|
||||
}
|
||||
}
|
||||
|
||||
return out, nil
|
||||
st.paths = append(st.paths, out)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (st *storage) open() error {
|
||||
@ -81,22 +95,10 @@ func (st *storage) open() error {
|
||||
}
|
||||
|
||||
for _, path := range cfg.StoragePaths {
|
||||
mb, err := ioutil.ReadFile(filepath.Join(path.Path, metaFile))
|
||||
if err != nil {
|
||||
return xerrors.Errorf("reading storage metadata for %s: %w", path.Path, err)
|
||||
}
|
||||
|
||||
var meta config.StorageMeta
|
||||
if err := json.Unmarshal(mb, &meta); err != nil {
|
||||
return xerrors.Errorf("unmarshalling storage metadata for %s: %w", path.Path, err)
|
||||
}
|
||||
|
||||
pi, err := openPath(path.Path, meta)
|
||||
err := st.openPath(path.Path)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("opening path %s: %w", path.Path, err)
|
||||
}
|
||||
|
||||
st.paths = append(st.paths, pi)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
Loading…
Reference in New Issue
Block a user