From c73326e96dc8a032e8b8b76533ab14789846874f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Mon, 1 Jun 2020 20:11:55 +0200 Subject: [PATCH] fsrepo: Use LevelDB for metadata datastore --- api/test/test.go | 1 + api/test/window_post.go | 6 +-- chain/gen/gen.go | 2 +- chain/store/store_test.go | 2 +- cmd/lotus-shed/import-car.go | 2 +- cmd/lotus/daemon.go | 2 +- go.mod | 3 ++ go.sum | 3 ++ node/modules/chain.go | 2 +- node/repo/fsrepo.go | 18 --------- node/repo/fsrepo_ds.go | 74 ++++++++++++++++++++++++++++++++++++ 11 files changed, 89 insertions(+), 26 deletions(-) create mode 100644 node/repo/fsrepo_ds.go diff --git a/api/test/test.go b/api/test/test.go index 06f0584fd..5972ca757 100644 --- a/api/test/test.go +++ b/api/test/test.go @@ -20,6 +20,7 @@ type TestStorageNode struct { } var PresealGenesis = -1 + const GenesisPreseals = 2 type StorageMiner struct { diff --git a/api/test/window_post.go b/api/test/window_post.go index b821df2a7..1239d5424 100644 --- a/api/test/window_post.go +++ b/api/test/window_post.go @@ -143,11 +143,11 @@ func TestWindowPost(t *testing.T, b APIBuilder, blocktime time.Duration, nSector head, err := client.ChainHead(ctx) require.NoError(t, err) - if head.Height() > di.PeriodStart + (miner2.WPoStProvingPeriod) + 2 { + if head.Height() > di.PeriodStart+(miner2.WPoStProvingPeriod)+2 { break } - if head.Height() % 100 == 0 { + if head.Height()%100 == 0 { fmt.Printf("@%d\n", head.Height()) } time.Sleep(blocktime) @@ -160,7 +160,7 @@ func TestWindowPost(t *testing.T, b APIBuilder, blocktime time.Duration, nSector require.NoError(t, err) require.Equal(t, p.MinerPower, p.TotalPower) - require.Equal(t, p.MinerPower.RawBytePower, types.NewInt(uint64(ssz) * uint64(nSectors + GenesisPreseals))) + require.Equal(t, p.MinerPower.RawBytePower, types.NewInt(uint64(ssz)*uint64(nSectors+GenesisPreseals))) // TODO: Inject faults here diff --git a/chain/gen/gen.go b/chain/gen/gen.go index b0c4cc044..de2ad9226 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -104,7 +104,7 @@ func NewGeneratorWithSectors(numSectors int) (*ChainGen, error) { return nil, xerrors.Errorf("failed to get metadata datastore: %w", err) } - bds, err := lr.Datastore("/blocks") + bds, err := lr.Datastore("/chain") if err != nil { return nil, xerrors.Errorf("failed to get blocks datastore: %w", err) } diff --git a/chain/store/store_test.go b/chain/store/store_test.go index 495b91b61..41b875916 100644 --- a/chain/store/store_test.go +++ b/chain/store/store_test.go @@ -55,7 +55,7 @@ func BenchmarkGetRandomness(b *testing.B) { b.Fatal(err) } - bds, err := lr.Datastore("/blocks") + bds, err := lr.Datastore("/chain") if err != nil { b.Fatal(err) } diff --git a/cmd/lotus-shed/import-car.go b/cmd/lotus-shed/import-car.go index f2f517a7f..970341a3a 100644 --- a/cmd/lotus-shed/import-car.go +++ b/cmd/lotus-shed/import-car.go @@ -42,7 +42,7 @@ var importCarCmd = &cli.Command{ return xerrors.Errorf("opening the car file: %w", err) } - ds, err := lr.Datastore("/blocks") + ds, err := lr.Datastore("/chain") if err != nil { return err } diff --git a/cmd/lotus/daemon.go b/cmd/lotus/daemon.go index 9e4e7d1a3..2894e0eea 100644 --- a/cmd/lotus/daemon.go +++ b/cmd/lotus/daemon.go @@ -271,7 +271,7 @@ func ImportChain(r repo.Repo, fname string) error { } defer lr.Close() //nolint:errcheck - ds, err := lr.Datastore("/blocks") + ds, err := lr.Datastore("/chain") if err != nil { return err } diff --git a/go.mod b/go.mod index d8659f350..e4a4a9993 100644 --- a/go.mod +++ b/go.mod @@ -48,6 +48,8 @@ require ( github.com/ipfs/go-cid v0.0.6-0.20200501230655-7c82f3b81c00 github.com/ipfs/go-datastore v0.4.4 github.com/ipfs/go-ds-badger2 v0.1.0 + github.com/ipfs/go-ds-leveldb v0.4.2 + github.com/ipfs/go-ds-measure v0.1.0 github.com/ipfs/go-filestore v1.0.0 github.com/ipfs/go-fs-lock v0.0.1 github.com/ipfs/go-graphsync v0.0.6-0.20200504202014-9d5f2c26a103 @@ -101,6 +103,7 @@ require ( github.com/opentracing/opentracing-go v1.1.0 github.com/stretchr/objx v0.2.0 // indirect github.com/stretchr/testify v1.5.1 + github.com/syndtr/goleveldb v1.0.0 github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba github.com/whyrusleeping/cbor-gen v0.0.0-20200504204219-64967432584d github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 diff --git a/go.sum b/go.sum index c68604e6d..57da8c1ce 100644 --- a/go.sum +++ b/go.sum @@ -387,6 +387,7 @@ github.com/ipfs/go-datastore v0.0.1/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAK github.com/ipfs/go-datastore v0.0.5/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE= github.com/ipfs/go-datastore v0.1.0/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE= github.com/ipfs/go-datastore v0.1.1/go.mod h1:w38XXW9kVFNp57Zj5knbKWM2T+KOZCGDRVNdgPHtbHw= +github.com/ipfs/go-datastore v0.3.0/go.mod h1:w38XXW9kVFNp57Zj5knbKWM2T+KOZCGDRVNdgPHtbHw= github.com/ipfs/go-datastore v0.3.1/go.mod h1:w38XXW9kVFNp57Zj5knbKWM2T+KOZCGDRVNdgPHtbHw= github.com/ipfs/go-datastore v0.4.0/go.mod h1:SX/xMIKoCszPqp+z9JhPYCmoOoXTvaa13XEbGtsFUhA= github.com/ipfs/go-datastore v0.4.1/go.mod h1:SX/xMIKoCszPqp+z9JhPYCmoOoXTvaa13XEbGtsFUhA= @@ -407,6 +408,8 @@ github.com/ipfs/go-ds-leveldb v0.1.0/go.mod h1:hqAW8y4bwX5LWcCtku2rFNX3vjDZCy5LZ github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= github.com/ipfs/go-ds-leveldb v0.4.2 h1:QmQoAJ9WkPMUfBLnu1sBVy0xWWlJPg0m4kRAiJL9iaw= github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= +github.com/ipfs/go-ds-measure v0.1.0 h1:vE4TyY4aeLeVgnnPBC5QzKIjKrqzha0NCujTfgvVbVQ= +github.com/ipfs/go-ds-measure v0.1.0/go.mod h1:1nDiFrhLlwArTME1Ees2XaBOl49OoCgd2A3f8EchMSY= github.com/ipfs/go-filestore v1.0.0 h1:QR7ekKH+q2AGiWDc7W2Q0qHuYSRZGUJqUn0GsegEPb0= github.com/ipfs/go-filestore v1.0.0/go.mod h1:/XOCuNtIe2f1YPbiXdYvD0BKLA0JR1MgPiFOdcuu9SM= github.com/ipfs/go-fs-lock v0.0.1 h1:XHX8uW4jQBYWHj59XXcjg7BHlHxV9ZOYs6Y43yb7/l0= diff --git a/node/modules/chain.go b/node/modules/chain.go index ca0281acf..229a97cc4 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -61,7 +61,7 @@ func MessagePool(lc fx.Lifecycle, sm *stmgr.StateManager, ps *pubsub.PubSub, ds } func ChainBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.LockedRepo) (dtypes.ChainBlockstore, error) { - blocks, err := r.Datastore("/blocks") + blocks, err := r.Datastore("/chain") if err != nil { return nil, err } diff --git a/node/repo/fsrepo.go b/node/repo/fsrepo.go index a1cc9a6ec..75b560d65 100644 --- a/node/repo/fsrepo.go +++ b/node/repo/fsrepo.go @@ -12,8 +12,6 @@ import ( "sync" "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" - badger "github.com/ipfs/go-ds-badger2" fslock "github.com/ipfs/go-fs-lock" logging "github.com/ipfs/go-log/v2" "github.com/mitchellh/go-homedir" @@ -267,22 +265,6 @@ func (fsr *fsLockedRepo) stillValid() error { return nil } -func (fsr *fsLockedRepo) Datastore(ns string) (datastore.Batching, error) { - fsr.dsOnce.Do(func() { - opts := badger.DefaultOptions - opts.Truncate = true - - fsr.ds, fsr.dsErr = badger.NewDatastore(fsr.join(fsDatastore), &opts) - /*if fsr.dsErr == nil { - fsr.ds = datastore.NewLogDatastore(fsr.ds, "fsrepo") - }*/ - }) - if fsr.dsErr != nil { - return nil, fsr.dsErr - } - return namespace.Wrap(fsr.ds, datastore.NewKey(ns)), nil -} - func (fsr *fsLockedRepo) Config() (interface{}, error) { if err := fsr.stillValid(); err != nil { return nil, err diff --git a/node/repo/fsrepo_ds.go b/node/repo/fsrepo_ds.go new file mode 100644 index 000000000..034635c4f --- /dev/null +++ b/node/repo/fsrepo_ds.go @@ -0,0 +1,74 @@ +package repo + +import ( + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/mount" + "github.com/ipfs/go-datastore/namespace" + "golang.org/x/xerrors" + "os" + "path/filepath" + + badger "github.com/ipfs/go-ds-badger2" + levelds "github.com/ipfs/go-ds-leveldb" + "github.com/ipfs/go-ds-measure" + ldbopts "github.com/syndtr/goleveldb/leveldb/opt" +) + +var fsDatastores = map[string]func(path string) (datastore.Batching, error){ + "chain": badgerDs, + "metadata": levelDs, + + // Those need to be fast for large writes... but also need a really good GC :c + "staging": badgerDs, // miner specific + "client": badgerDs, // client specific +} + +func badgerDs(path string) (datastore.Batching, error) { + opts := badger.DefaultOptions + opts.Truncate = true + + return badger.NewDatastore(path, &opts) +} + +func levelDs(path string) (datastore.Batching, error) { + return levelds.NewDatastore(path, &levelds.Options{ + Compression: ldbopts.NoCompression, + }) +} + +func (fsr *fsLockedRepo) openDatastore() (datastore.Batching, error) { + if err := os.MkdirAll(fsr.join(fsDatastore), 0755); err != nil { + return nil, xerrors.Errorf("mkdir %s: %w", fsr.join(fsDatastore), err) + } + + var mounts []mount.Mount + + for p, ctor := range fsDatastores { + prefix := datastore.NewKey(p) + + // TODO: optimization: don't init datastores we don't need + ds, err := ctor(fsr.join(filepath.Join(fsDatastore, p))) + if err != nil { + return nil, xerrors.Errorf("opening datastore %s: %w", prefix, err) + } + + ds = measure.New("fsrepo."+p, ds) + + mounts = append(mounts, mount.Mount{ + Prefix: prefix, + Datastore: ds, + }) + } + + return mount.New(mounts), nil +} + +func (fsr *fsLockedRepo) Datastore(ns string) (datastore.Batching, error) { + fsr.dsOnce.Do(func() { + fsr.ds, fsr.dsErr = fsr.openDatastore() + }) + if fsr.dsErr != nil { + return nil, fsr.dsErr + } + return namespace.Wrap(fsr.ds, datastore.NewKey(ns)), nil +}