split off lmdb support to a different branch.
This commit is contained in:
parent
3733456bca
commit
1b51c10d78
@ -27,8 +27,6 @@ func NewLiveSetEnv(path string, liveSetType string) (LiveSetEnv, error) {
|
||||
return NewBloomLiveSetEnv()
|
||||
case "bolt":
|
||||
return NewBoltLiveSetEnv(filepath.Join(path, "sweep.bolt"))
|
||||
case "lmdb":
|
||||
return NewLMDBLiveSetEnv(filepath.Join(path, "sweep.lmdb"))
|
||||
default:
|
||||
return nil, xerrors.Errorf("unknown live set type %s", liveSetType)
|
||||
}
|
||||
|
@ -1,117 +0,0 @@
|
||||
package splitstore
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
"github.com/ledgerwatch/lmdb-go/lmdb"
|
||||
)
|
||||
|
||||
var LMDBLiveSetMapSize int64 = 1 << 34 // 16G; TODO grow the map dynamically
|
||||
|
||||
type LMDBLiveSetEnv struct {
|
||||
env *lmdb.Env
|
||||
}
|
||||
|
||||
var _ LiveSetEnv = (*LMDBLiveSetEnv)(nil)
|
||||
|
||||
type LMDBLiveSet struct {
|
||||
env *lmdb.Env
|
||||
db lmdb.DBI
|
||||
}
|
||||
|
||||
var _ LiveSet = (*LMDBLiveSet)(nil)
|
||||
|
||||
func NewLMDBLiveSetEnv(path string) (*LMDBLiveSetEnv, error) {
|
||||
env, err := lmdb.NewEnv()
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to initialize LDMB env: %w", err)
|
||||
}
|
||||
if err = env.SetMapSize(LMDBLiveSetMapSize); err != nil {
|
||||
return nil, xerrors.Errorf("failed to set LMDB map size: %w", err)
|
||||
}
|
||||
if err = env.SetMaxDBs(2); err != nil {
|
||||
return nil, xerrors.Errorf("failed to set LMDB max dbs: %w", err)
|
||||
}
|
||||
// if err = env.SetMaxReaders(1); err != nil {
|
||||
// return nil, xerrors.Errorf("failed to set LMDB max readers: %w", err)
|
||||
// }
|
||||
|
||||
if st, err := os.Stat(path); os.IsNotExist(err) {
|
||||
if err := os.MkdirAll(path, 0777); err != nil {
|
||||
return nil, xerrors.Errorf("failed to create LMDB data directory at %s: %w", path, err)
|
||||
}
|
||||
} else if err != nil {
|
||||
return nil, xerrors.Errorf("failed to stat LMDB data dir: %w", err)
|
||||
} else if !st.IsDir() {
|
||||
return nil, xerrors.Errorf("LMDB path is not a directory %s", path)
|
||||
}
|
||||
err = env.Open(path, lmdb.NoSync|lmdb.WriteMap|lmdb.MapAsync|lmdb.NoReadahead, 0777)
|
||||
if err != nil {
|
||||
env.Close() //nolint:errcheck
|
||||
return nil, xerrors.Errorf("error opening LMDB database: %w", err)
|
||||
}
|
||||
|
||||
return &LMDBLiveSetEnv{env: env}, nil
|
||||
}
|
||||
|
||||
func (e *LMDBLiveSetEnv) NewLiveSet(name string, hint int64) (LiveSet, error) {
|
||||
return NewLMDBLiveSet(e.env, name+".lmdb")
|
||||
}
|
||||
|
||||
func (e *LMDBLiveSetEnv) Close() error {
|
||||
return e.env.Close()
|
||||
}
|
||||
|
||||
func NewLMDBLiveSet(env *lmdb.Env, name string) (*LMDBLiveSet, error) {
|
||||
var db lmdb.DBI
|
||||
err := env.Update(func(txn *lmdb.Txn) (err error) {
|
||||
db, err = txn.CreateDBI(name)
|
||||
return
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &LMDBLiveSet{env: env, db: db}, nil
|
||||
}
|
||||
|
||||
func (s *LMDBLiveSet) Mark(cid cid.Cid) error {
|
||||
return s.env.Update(func(txn *lmdb.Txn) error {
|
||||
err := txn.Put(s.db, cid.Hash(), markBytes, 0)
|
||||
if err == nil || lmdb.IsErrno(err, lmdb.KeyExist) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
func (s *LMDBLiveSet) Has(cid cid.Cid) (has bool, err error) {
|
||||
err = s.env.View(func(txn *lmdb.Txn) error {
|
||||
txn.RawRead = true
|
||||
|
||||
_, err := txn.Get(s.db, cid.Hash())
|
||||
if err != nil {
|
||||
if lmdb.IsNotFound(err) {
|
||||
has = false
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
has = true
|
||||
return nil
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (s *LMDBLiveSet) Close() error {
|
||||
return s.env.Update(func(txn *lmdb.Txn) error {
|
||||
return txn.Drop(s.db, true)
|
||||
})
|
||||
}
|
@ -8,10 +8,6 @@ import (
|
||||
"github.com/multiformats/go-multihash"
|
||||
)
|
||||
|
||||
func TestLMDBLiveSet(t *testing.T) {
|
||||
testLiveSet(t, "lmdb")
|
||||
}
|
||||
|
||||
func TestBoltLiveSet(t *testing.T) {
|
||||
testLiveSet(t, "bolt")
|
||||
}
|
||||
|
@ -1,27 +0,0 @@
|
||||
package splitstore
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/ledgerwatch/lmdb-go/lmdb"
|
||||
)
|
||||
|
||||
func withMaxReadersRetry(f func() error) error {
|
||||
retry:
|
||||
err := f()
|
||||
if err != nil && lmdb.IsErrno(err, lmdb.ReadersFull) {
|
||||
dt := time.Microsecond + time.Duration(rand.Intn(int(10*time.Microsecond)))
|
||||
log.Debugf("MDB_READERS_FULL; retrying operation in %s", dt)
|
||||
time.Sleep(dt)
|
||||
goto retry
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error performing lmdb operation: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
@ -24,8 +24,6 @@ func NewTrackingStore(path string, trackingStoreType string) (TrackingStore, err
|
||||
switch trackingStoreType {
|
||||
case "", "bolt":
|
||||
return NewBoltTrackingStore(filepath.Join(path, "snoop.bolt"))
|
||||
case "lmdb":
|
||||
return NewLMDBTrackingStore(filepath.Join(path, "snoop.lmdb"))
|
||||
default:
|
||||
return nil, xerrors.Errorf("unknown tracking store type %s", trackingStoreType)
|
||||
}
|
||||
|
@ -1,180 +0,0 @@
|
||||
package splitstore
|
||||
|
||||
import (
|
||||
"os"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
"github.com/ledgerwatch/lmdb-go/lmdb"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
)
|
||||
|
||||
var LMDBTrackingStoreMapSize int64 = 1 << 34 // 16G -- TODO grow the map dynamically
|
||||
|
||||
type LMDBTrackingStore struct {
|
||||
env *lmdb.Env
|
||||
db lmdb.DBI
|
||||
}
|
||||
|
||||
var _ TrackingStore = (*LMDBTrackingStore)(nil)
|
||||
|
||||
func NewLMDBTrackingStore(path string) (*LMDBTrackingStore, error) {
|
||||
env, err := lmdb.NewEnv()
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to initialize LMDB env: %w", err)
|
||||
}
|
||||
if err = env.SetMapSize(LMDBTrackingStoreMapSize); err != nil {
|
||||
return nil, xerrors.Errorf("failed to set LMDB map size: %w", err)
|
||||
}
|
||||
if err = env.SetMaxDBs(1); err != nil {
|
||||
return nil, xerrors.Errorf("failed to set LMDB max dbs: %w", err)
|
||||
}
|
||||
|
||||
if st, err := os.Stat(path); os.IsNotExist(err) {
|
||||
if err := os.MkdirAll(path, 0777); err != nil {
|
||||
return nil, xerrors.Errorf("failed to create LMDB data directory at %s: %w", path, err)
|
||||
}
|
||||
} else if err != nil {
|
||||
return nil, xerrors.Errorf("failed to stat LMDB data dir: %w", err)
|
||||
} else if !st.IsDir() {
|
||||
return nil, xerrors.Errorf("LMDB path is not a directory %s", path)
|
||||
}
|
||||
|
||||
err = env.Open(path, lmdb.NoSync|lmdb.WriteMap|lmdb.MapAsync|lmdb.NoReadahead, 0777)
|
||||
if err != nil {
|
||||
env.Close() //nolint:errcheck
|
||||
return nil, xerrors.Errorf("error opening LMDB database: %w", err)
|
||||
}
|
||||
|
||||
s := new(LMDBTrackingStore)
|
||||
s.env = env
|
||||
err = env.Update(func(txn *lmdb.Txn) (err error) {
|
||||
s.db, err = txn.CreateDBI("snoop")
|
||||
return err
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("error creating tracking store: %w", err)
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *LMDBTrackingStore) Put(cid cid.Cid, epoch abi.ChainEpoch) error {
|
||||
val := epochToBytes(epoch)
|
||||
return withMaxReadersRetry(
|
||||
func() error {
|
||||
return s.env.Update(func(txn *lmdb.Txn) error {
|
||||
err := txn.Put(s.db, cid.Hash(), val, 0)
|
||||
if err == nil || lmdb.IsErrno(err, lmdb.KeyExist) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func (s *LMDBTrackingStore) PutBatch(cids []cid.Cid, epoch abi.ChainEpoch) error {
|
||||
val := epochToBytes(epoch)
|
||||
return withMaxReadersRetry(
|
||||
func() error {
|
||||
return s.env.Update(func(txn *lmdb.Txn) error {
|
||||
for _, cid := range cids {
|
||||
err := txn.Put(s.db, cid.Hash(), val, 0)
|
||||
if err == nil || lmdb.IsErrno(err, lmdb.KeyExist) {
|
||||
continue
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func (s *LMDBTrackingStore) Get(cid cid.Cid) (epoch abi.ChainEpoch, err error) {
|
||||
err = withMaxReadersRetry(
|
||||
func() error {
|
||||
return s.env.View(func(txn *lmdb.Txn) error {
|
||||
txn.RawRead = true
|
||||
|
||||
val, err := txn.Get(s.db, cid.Hash())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
epoch = bytesToEpoch(val)
|
||||
return nil
|
||||
})
|
||||
})
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (s *LMDBTrackingStore) Delete(cid cid.Cid) error {
|
||||
return withMaxReadersRetry(
|
||||
func() error {
|
||||
return s.env.Update(func(txn *lmdb.Txn) error {
|
||||
return txn.Del(s.db, cid.Hash(), nil)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func (s *LMDBTrackingStore) DeleteBatch(cids map[cid.Cid]struct{}) error {
|
||||
return withMaxReadersRetry(
|
||||
func() error {
|
||||
return s.env.Update(func(txn *lmdb.Txn) error {
|
||||
for cid := range cids {
|
||||
err := txn.Del(s.db, cid.Hash(), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func (s *LMDBTrackingStore) ForEach(f func(cid.Cid, abi.ChainEpoch) error) error {
|
||||
return withMaxReadersRetry(
|
||||
func() error {
|
||||
return s.env.View(func(txn *lmdb.Txn) error {
|
||||
txn.RawRead = true
|
||||
cur, err := txn.OpenCursor(s.db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer cur.Close()
|
||||
|
||||
for {
|
||||
k, v, err := cur.Get(nil, nil, lmdb.Next)
|
||||
if err != nil {
|
||||
if lmdb.IsNotFound(err) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
cid := cid.NewCidV1(cid.Raw, k)
|
||||
epoch := bytesToEpoch(v)
|
||||
|
||||
err = f(cid, epoch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func (s *LMDBTrackingStore) Sync() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *LMDBTrackingStore) Close() error {
|
||||
s.env.CloseDBI(s.db)
|
||||
return s.env.Close()
|
||||
}
|
@ -10,10 +10,6 @@ import (
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
)
|
||||
|
||||
func TestLMDBTrackingStore(t *testing.T) {
|
||||
testTrackingStore(t, "lmdb")
|
||||
}
|
||||
|
||||
func TestBoltTrackingStore(t *testing.T) {
|
||||
testTrackingStore(t, "bolt")
|
||||
}
|
||||
|
2
go.mod
2
go.mod
@ -28,7 +28,6 @@ require (
|
||||
github.com/filecoin-project/go-address v0.0.5
|
||||
github.com/filecoin-project/go-amt-ipld/v2 v2.1.1-0.20201006184820-924ee87a1349 // indirect
|
||||
github.com/filecoin-project/go-bitfield v0.2.4
|
||||
github.com/filecoin-project/go-bs-lmdb v1.0.3
|
||||
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2
|
||||
github.com/filecoin-project/go-commp-utils v0.0.0-20201119054358-b88f7a96a434
|
||||
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03
|
||||
@ -96,7 +95,6 @@ require (
|
||||
github.com/ipld/go-car v0.1.1-0.20201119040415-11b6074b6d4d
|
||||
github.com/ipld/go-ipld-prime v0.5.1-0.20201021195245-109253e8a018
|
||||
github.com/kelseyhightower/envconfig v1.4.0
|
||||
github.com/ledgerwatch/lmdb-go v1.17.4
|
||||
github.com/lib/pq v1.7.0
|
||||
github.com/libp2p/go-buffer-pool v0.0.2
|
||||
github.com/libp2p/go-eventbus v0.2.1
|
||||
|
6
go.sum
6
go.sum
@ -246,8 +246,6 @@ github.com/filecoin-project/go-bitfield v0.2.3/go.mod h1:CNl9WG8hgR5mttCnUErjcQj
|
||||
github.com/filecoin-project/go-bitfield v0.2.3/go.mod h1:CNl9WG8hgR5mttCnUErjcQjGvuiZjRqK9rHVBsQF4oM=
|
||||
github.com/filecoin-project/go-bitfield v0.2.4 h1:uZ7MeE+XfM5lqrHJZ93OnhQKc/rveW8p9au0C68JPgk=
|
||||
github.com/filecoin-project/go-bitfield v0.2.4/go.mod h1:CNl9WG8hgR5mttCnUErjcQjGvuiZjRqK9rHVBsQF4oM=
|
||||
github.com/filecoin-project/go-bs-lmdb v1.0.3 h1:QRf/yMw5hFjqMIpi9mi/Hkh4qberUI++56XAdB0VgwM=
|
||||
github.com/filecoin-project/go-bs-lmdb v1.0.3/go.mod h1:peFIZ9XEE9OLFkCzi7FMlr84UexqVKj6+AyxZD5SiGs=
|
||||
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2 h1:av5fw6wmm58FYMgJeoB/lK9XXrgdugYiTqkdxjTy9k8=
|
||||
github.com/filecoin-project/go-cbor-util v0.0.0-20191219014500-08c40a1e63a2/go.mod h1:pqTiPHobNkOVM5thSRsHYjyQfq7O5QSCMhvuu9JoDlg=
|
||||
github.com/filecoin-project/go-commp-utils v0.0.0-20201119054358-b88f7a96a434 h1:0kHszkYP3hgApcjl5x4rpwONhN9+j7XDobf6at5XfHs=
|
||||
@ -756,8 +754,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/ledgerwatch/lmdb-go v1.17.4 h1:dDgPXUrzFWG/EB3RwOKZ+P3XGAlbsZxmVahjc+qWwyA=
|
||||
github.com/ledgerwatch/lmdb-go v1.17.4/go.mod h1:NKRpCxksoTQPyxsUcBiVOe0135uqnJsnf6cElxmOL0o=
|
||||
github.com/lib/pq v1.7.0 h1:h93mCPfUSkaul3Ka/VG8uZdmW1uMHDGxzu0NWHuJmHY=
|
||||
github.com/lib/pq v1.7.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
|
||||
github.com/libp2p/go-addr-util v0.0.1/go.mod h1:4ac6O7n9rIAKB1dnd+s8IbbMXkt+oBpzX4/+RACcnlQ=
|
||||
@ -1323,8 +1319,6 @@ github.com/prometheus/procfs v0.1.0 h1:jhMy6QXfi3y2HEzFoyuCj40z4OZIIHHPtFyCMftmv
|
||||
github.com/prometheus/procfs v0.1.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
|
||||
github.com/raulk/clock v1.1.0 h1:dpb29+UKMbLqiU/jqIJptgLR1nn23HLgMY0sTCDza5Y=
|
||||
github.com/raulk/clock v1.1.0/go.mod h1:3MpVxdZ/ODBQDxbN+kzshf5OSZwPjtMDx6BBXBmOeY0=
|
||||
github.com/raulk/go-bs-tests v0.0.4 h1:gYUYmIFMBnp2mtZQuiP/ZGtSTSPvmDBjWBz0xTZz4X8=
|
||||
github.com/raulk/go-bs-tests v0.0.4/go.mod h1:ZREaOSaReTvV4nY7Qh6Lkl+QisYXNBWcPRa0gjrIaG4=
|
||||
github.com/raulk/go-watchdog v1.0.1 h1:qgm3DIJAeb+2byneLrQJ7kvmDLGxN2vy3apXyGaDKN4=
|
||||
github.com/raulk/go-watchdog v1.0.1/go.mod h1:lzSbAl5sh4rtI8tYHU01BWIDzgzqaQLj6RcA1i4mlqI=
|
||||
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
|
||||
|
@ -613,8 +613,6 @@ func Repo(r repo.Repo) Option {
|
||||
If(cfg.EnableSplitstore,
|
||||
If(cfg.Splitstore.GetHotStoreType() == "badger",
|
||||
Override(new(dtypes.HotBlockstore), modules.BadgerHotBlockstore)),
|
||||
If(cfg.Splitstore.GetHotStoreType() == "lmdb",
|
||||
Override(new(dtypes.HotBlockstore), modules.LMDBHotBlockstore)),
|
||||
Override(new(dtypes.SplitBlockstore), modules.SplitBlockstore(cfg)),
|
||||
Override(new(dtypes.ChainBlockstore), modules.ChainSplitBlockstore),
|
||||
Override(new(dtypes.StateBlockstore), modules.StateSplitBlockstore),
|
||||
|
@ -5,15 +5,13 @@ import (
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
lmdbbs "github.com/filecoin-project/go-bs-lmdb"
|
||||
badgerbs "github.com/filecoin-project/lotus/blockstore/badger"
|
||||
bstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
"go.uber.org/fx"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/lotus/blockstore"
|
||||
badgerbs "github.com/filecoin-project/lotus/blockstore/badger"
|
||||
"github.com/filecoin-project/lotus/chain/store/splitstore"
|
||||
"github.com/filecoin-project/lotus/node/config"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
@ -38,34 +36,6 @@ func UniversalBlockstore(lc fx.Lifecycle, mctx helpers.MetricsCtx, r repo.Locked
|
||||
return bs, err
|
||||
}
|
||||
|
||||
func LMDBHotBlockstore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.HotBlockstore, error) {
|
||||
path, err := r.SplitstorePath()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
path = filepath.Join(path, "hot.lmdb")
|
||||
bs, err := lmdbbs.Open(&lmdbbs.Options{
|
||||
Path: path,
|
||||
InitialMmapSize: 4 << 30, // 4GiB.
|
||||
MmapGrowthStepFactor: 1.25, // scale slower than the default of 1.5
|
||||
MmapGrowthStepMax: 4 << 30, // 4GiB
|
||||
RetryDelay: 10 * time.Microsecond,
|
||||
MaxReaders: 1024,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
lc.Append(fx.Hook{
|
||||
OnStop: func(_ context.Context) error {
|
||||
return bs.Close()
|
||||
}})
|
||||
|
||||
hot := blockstore.WrapIDStore(bs)
|
||||
return hot, err
|
||||
}
|
||||
|
||||
func BadgerHotBlockstore(lc fx.Lifecycle, r repo.LockedRepo) (dtypes.HotBlockstore, error) {
|
||||
path, err := r.SplitstorePath()
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user