refactor lmdb specific snoop/liveset code into their own files

paves the way for different back ends
This commit is contained in:
vyzo 2021-02-26 20:54:47 +02:00
parent 99c7d8e3eb
commit ee751f88cd
7 changed files with 278 additions and 260 deletions

View File

@ -1,110 +1,13 @@
package splitstore
import (
"os"
"golang.org/x/xerrors"
"github.com/ledgerwatch/lmdb-go/lmdb"
cid "github.com/ipfs/go-cid"
)
var LiveSetMapSize int64 = 1 << 34 // 16G; TODO this may be a little too big, we should figure out how to gradually grow the map.
type LiveSet interface {
Mark(cid.Cid) error
Has(cid.Cid) (bool, error)
Close() error
}
type liveSet struct {
env *lmdb.Env
db lmdb.DBI
}
var markBytes = []byte{}
func NewLiveSetEnv(path string) (*lmdb.Env, error) {
env, err := lmdb.NewEnv()
if err != nil {
return nil, xerrors.Errorf("failed to initialize LDMB env: %w", err)
}
if err = env.SetMapSize(LiveSetMapSize); err != nil {
return nil, xerrors.Errorf("failed to set LMDB map size: %w", err)
}
if err = env.SetMaxDBs(2); err != nil {
return nil, xerrors.Errorf("failed to set LMDB max dbs: %w", err)
}
// if err = env.SetMaxReaders(1); err != nil {
// return nil, xerrors.Errorf("failed to set LMDB max readers: %w", err)
// }
if st, err := os.Stat(path); os.IsNotExist(err) {
if err := os.MkdirAll(path, 0777); err != nil {
return nil, xerrors.Errorf("failed to create LMDB data directory at %s: %w", path, err)
}
} else if err != nil {
return nil, xerrors.Errorf("failed to stat LMDB data dir: %w", err)
} else if !st.IsDir() {
return nil, xerrors.Errorf("LMDB path is not a directory %s", path)
}
err = env.Open(path, lmdb.NoSync|lmdb.WriteMap|lmdb.MapAsync|lmdb.NoReadahead, 0777)
if err != nil {
env.Close() //nolint:errcheck
return nil, xerrors.Errorf("error opening LMDB database: %w", err)
}
return env, nil
}
func NewLiveSet(env *lmdb.Env, name string) (LiveSet, error) {
var db lmdb.DBI
err := env.Update(func(txn *lmdb.Txn) (err error) {
db, err = txn.CreateDBI(name)
return
})
if err != nil {
return nil, err
}
return &liveSet{env: env, db: db}, nil
}
func (s *liveSet) Mark(cid cid.Cid) error {
return s.env.Update(func(txn *lmdb.Txn) error {
err := txn.Put(s.db, cid.Hash(), markBytes, 0)
if err == nil || lmdb.IsErrno(err, lmdb.KeyExist) {
return nil
}
return err
})
}
func (s *liveSet) Has(cid cid.Cid) (has bool, err error) {
err = s.env.View(func(txn *lmdb.Txn) error {
txn.RawRead = true
_, err := txn.Get(s.db, cid.Hash())
if err != nil {
if lmdb.IsNotFound(err) {
has = false
return nil
}
return err
}
has = true
return nil
})
return
}
func (s *liveSet) Close() error {
return s.env.Update(func(txn *lmdb.Txn) error {
return txn.Drop(s.db, true)
})
}

View File

@ -0,0 +1,104 @@
package splitstore
import (
"os"
"golang.org/x/xerrors"
"github.com/ledgerwatch/lmdb-go/lmdb"
cid "github.com/ipfs/go-cid"
)
var LMDBLiveSetMapSize int64 = 1 << 34 // 16G; TODO grow the map dynamically
type LMDBLiveSet struct {
env *lmdb.Env
db lmdb.DBI
}
var _ LiveSet = (*LMDBLiveSet)(nil)
func NewLMDBLiveSetEnv(path string) (*lmdb.Env, error) {
env, err := lmdb.NewEnv()
if err != nil {
return nil, xerrors.Errorf("failed to initialize LDMB env: %w", err)
}
if err = env.SetMapSize(LMDBLiveSetMapSize); err != nil {
return nil, xerrors.Errorf("failed to set LMDB map size: %w", err)
}
if err = env.SetMaxDBs(2); err != nil {
return nil, xerrors.Errorf("failed to set LMDB max dbs: %w", err)
}
// if err = env.SetMaxReaders(1); err != nil {
// return nil, xerrors.Errorf("failed to set LMDB max readers: %w", err)
// }
if st, err := os.Stat(path); os.IsNotExist(err) {
if err := os.MkdirAll(path, 0777); err != nil {
return nil, xerrors.Errorf("failed to create LMDB data directory at %s: %w", path, err)
}
} else if err != nil {
return nil, xerrors.Errorf("failed to stat LMDB data dir: %w", err)
} else if !st.IsDir() {
return nil, xerrors.Errorf("LMDB path is not a directory %s", path)
}
err = env.Open(path, lmdb.NoSync|lmdb.WriteMap|lmdb.MapAsync|lmdb.NoReadahead, 0777)
if err != nil {
env.Close() //nolint:errcheck
return nil, xerrors.Errorf("error opening LMDB database: %w", err)
}
return env, nil
}
func NewLMDBLiveSet(env *lmdb.Env, name string) (*LMDBLiveSet, error) {
var db lmdb.DBI
err := env.Update(func(txn *lmdb.Txn) (err error) {
db, err = txn.CreateDBI(name)
return
})
if err != nil {
return nil, err
}
return &LMDBLiveSet{env: env, db: db}, nil
}
func (s *LMDBLiveSet) Mark(cid cid.Cid) error {
return s.env.Update(func(txn *lmdb.Txn) error {
err := txn.Put(s.db, cid.Hash(), markBytes, 0)
if err == nil || lmdb.IsErrno(err, lmdb.KeyExist) {
return nil
}
return err
})
}
func (s *LMDBLiveSet) Has(cid cid.Cid) (has bool, err error) {
err = s.env.View(func(txn *lmdb.Txn) error {
txn.RawRead = true
_, err := txn.Get(s.db, cid.Hash())
if err != nil {
if lmdb.IsNotFound(err) {
has = false
return nil
}
return err
}
has = true
return nil
})
return
}
func (s *LMDBLiveSet) Close() error {
return s.env.Update(func(txn *lmdb.Txn) error {
return txn.Drop(s.db, true)
})
}

View File

@ -37,12 +37,12 @@ func TestLiveSet(t *testing.T) {
t.Fatal(err)
}
hotSet, err := NewLiveSet(env, "hot")
hotSet, err := NewLMDBLiveSet(env, "hot")
if err != nil {
t.Fatal(err)
}
coldSet, err := NewLiveSet(env, "cold")
coldSet, err := NewLMDBLiveSet(env, "cold")
if err != nil {
t.Fatal(err)
}
@ -109,12 +109,12 @@ func TestLiveSet(t *testing.T) {
t.Fatal(err)
}
hotSet, err = NewLiveSet(env, "hot")
hotSet, err = NewLMDBLiveSet(env, "hot")
if err != nil {
t.Fatal(err)
}
coldSet, err = NewLiveSet(env, "cold")
coldSet, err = NewLMDBLiveSet(env, "cold")
if err != nil {
t.Fatal(err)
}

View File

@ -1,19 +1,10 @@
package splitstore
import (
"os"
"golang.org/x/xerrors"
"github.com/ledgerwatch/lmdb-go/lmdb"
cid "github.com/ipfs/go-cid"
"github.com/filecoin-project/go-state-types/abi"
cid "github.com/ipfs/go-cid"
)
var TrackingStoreMapSize int64 = 1 << 34 // 16G; TODO this may be a little too big, we should figure out how to gradually grow the map.
type TrackingStore interface {
Put(cid.Cid, abi.ChainEpoch) error
PutBatch([]cid.Cid, abi.ChainEpoch) error
@ -22,143 +13,3 @@ type TrackingStore interface {
ForEach(func(cid.Cid, abi.ChainEpoch) error) error
Close() error
}
type trackingStore struct {
env *lmdb.Env
db lmdb.DBI
}
func NewTrackingStore(path string) (TrackingStore, error) {
env, err := lmdb.NewEnv()
if err != nil {
return nil, xerrors.Errorf("failed to initialize LMDB env: %w", err)
}
if err = env.SetMapSize(TrackingStoreMapSize); err != nil {
return nil, xerrors.Errorf("failed to set LMDB map size: %w", err)
}
if err = env.SetMaxDBs(1); err != nil {
return nil, xerrors.Errorf("failed to set LMDB max dbs: %w", err)
}
if st, err := os.Stat(path); os.IsNotExist(err) {
if err := os.MkdirAll(path, 0777); err != nil {
return nil, xerrors.Errorf("failed to create LMDB data directory at %s: %w", path, err)
}
} else if err != nil {
return nil, xerrors.Errorf("failed to stat LMDB data dir: %w", err)
} else if !st.IsDir() {
return nil, xerrors.Errorf("LMDB path is not a directory %s", path)
}
err = env.Open(path, lmdb.NoSync|lmdb.WriteMap|lmdb.MapAsync|lmdb.NoReadahead, 0777)
if err != nil {
env.Close() //nolint:errcheck
return nil, xerrors.Errorf("error opening LMDB database: %w", err)
}
s := new(trackingStore)
s.env = env
err = env.Update(func(txn *lmdb.Txn) (err error) {
s.db, err = txn.CreateDBI("snoop")
return err
})
if err != nil {
return nil, xerrors.Errorf("error creating tracking store: %w", err)
}
return s, nil
}
func (s *trackingStore) Put(cid cid.Cid, epoch abi.ChainEpoch) error {
val := epochToBytes(epoch)
return withMaxReadersRetry(
func() error {
return s.env.Update(func(txn *lmdb.Txn) error {
return txn.Put(s.db, cid.Hash(), val, 0)
})
})
}
func (s *trackingStore) PutBatch(cids []cid.Cid, epoch abi.ChainEpoch) error {
val := epochToBytes(epoch)
return withMaxReadersRetry(
func() error {
return s.env.Update(func(txn *lmdb.Txn) error {
for _, cid := range cids {
err := txn.Put(s.db, cid.Hash(), val, 0)
if err != nil {
return err
}
}
return nil
})
})
}
func (s *trackingStore) Get(cid cid.Cid) (epoch abi.ChainEpoch, err error) {
err = withMaxReadersRetry(
func() error {
return s.env.View(func(txn *lmdb.Txn) error {
txn.RawRead = true
val, err := txn.Get(s.db, cid.Hash())
if err != nil {
return err
}
epoch = bytesToEpoch(val)
return nil
})
})
return
}
func (s *trackingStore) Delete(cid cid.Cid) error {
return withMaxReadersRetry(
func() error {
return s.env.Update(func(txn *lmdb.Txn) error {
return txn.Del(s.db, cid.Hash(), nil)
})
})
}
func (s *trackingStore) ForEach(f func(cid.Cid, abi.ChainEpoch) error) error {
return withMaxReadersRetry(
func() error {
return s.env.View(func(txn *lmdb.Txn) error {
txn.RawRead = true
cur, err := txn.OpenCursor(s.db)
if err != nil {
return err
}
defer cur.Close()
for {
k, v, err := cur.Get(nil, nil, lmdb.Next)
if err != nil {
if lmdb.IsNotFound(err) {
return nil
}
return err
}
cid := cid.NewCidV1(cid.Raw, k)
epoch := bytesToEpoch(v)
err = f(cid, epoch)
if err != nil {
return err
}
}
})
})
}
func (s *trackingStore) Close() error {
s.env.CloseDBI(s.db)
return s.env.Close()
}

View File

@ -0,0 +1,157 @@
package splitstore
import (
"os"
"golang.org/x/xerrors"
"github.com/ledgerwatch/lmdb-go/lmdb"
cid "github.com/ipfs/go-cid"
"github.com/filecoin-project/go-state-types/abi"
)
var LMDBTrackingStoreMapSize int64 = 1 << 34 // 16G -- TODO grow the map dynamically
type LMDBTrackingStore struct {
env *lmdb.Env
db lmdb.DBI
}
var _ TrackingStore = (*LMDBTrackingStore)(nil)
func NewLMDBTrackingStore(path string) (TrackingStore, error) {
env, err := lmdb.NewEnv()
if err != nil {
return nil, xerrors.Errorf("failed to initialize LMDB env: %w", err)
}
if err = env.SetMapSize(LMDBTrackingStoreMapSize); err != nil {
return nil, xerrors.Errorf("failed to set LMDB map size: %w", err)
}
if err = env.SetMaxDBs(1); err != nil {
return nil, xerrors.Errorf("failed to set LMDB max dbs: %w", err)
}
if st, err := os.Stat(path); os.IsNotExist(err) {
if err := os.MkdirAll(path, 0777); err != nil {
return nil, xerrors.Errorf("failed to create LMDB data directory at %s: %w", path, err)
}
} else if err != nil {
return nil, xerrors.Errorf("failed to stat LMDB data dir: %w", err)
} else if !st.IsDir() {
return nil, xerrors.Errorf("LMDB path is not a directory %s", path)
}
err = env.Open(path, lmdb.NoSync|lmdb.WriteMap|lmdb.MapAsync|lmdb.NoReadahead, 0777)
if err != nil {
env.Close() //nolint:errcheck
return nil, xerrors.Errorf("error opening LMDB database: %w", err)
}
s := new(LMDBTrackingStore)
s.env = env
err = env.Update(func(txn *lmdb.Txn) (err error) {
s.db, err = txn.CreateDBI("snoop")
return err
})
if err != nil {
return nil, xerrors.Errorf("error creating tracking store: %w", err)
}
return s, nil
}
func (s *LMDBTrackingStore) Put(cid cid.Cid, epoch abi.ChainEpoch) error {
val := epochToBytes(epoch)
return withMaxReadersRetry(
func() error {
return s.env.Update(func(txn *lmdb.Txn) error {
return txn.Put(s.db, cid.Hash(), val, 0)
})
})
}
func (s *LMDBTrackingStore) PutBatch(cids []cid.Cid, epoch abi.ChainEpoch) error {
val := epochToBytes(epoch)
return withMaxReadersRetry(
func() error {
return s.env.Update(func(txn *lmdb.Txn) error {
for _, cid := range cids {
err := txn.Put(s.db, cid.Hash(), val, 0)
if err != nil {
return err
}
}
return nil
})
})
}
func (s *LMDBTrackingStore) Get(cid cid.Cid) (epoch abi.ChainEpoch, err error) {
err = withMaxReadersRetry(
func() error {
return s.env.View(func(txn *lmdb.Txn) error {
txn.RawRead = true
val, err := txn.Get(s.db, cid.Hash())
if err != nil {
return err
}
epoch = bytesToEpoch(val)
return nil
})
})
return
}
func (s *LMDBTrackingStore) Delete(cid cid.Cid) error {
return withMaxReadersRetry(
func() error {
return s.env.Update(func(txn *lmdb.Txn) error {
return txn.Del(s.db, cid.Hash(), nil)
})
})
}
func (s *LMDBTrackingStore) ForEach(f func(cid.Cid, abi.ChainEpoch) error) error {
return withMaxReadersRetry(
func() error {
return s.env.View(func(txn *lmdb.Txn) error {
txn.RawRead = true
cur, err := txn.OpenCursor(s.db)
if err != nil {
return err
}
defer cur.Close()
for {
k, v, err := cur.Get(nil, nil, lmdb.Next)
if err != nil {
if lmdb.IsNotFound(err) {
return nil
}
return err
}
cid := cid.NewCidV1(cid.Raw, k)
epoch := bytesToEpoch(v)
err = f(cid, epoch)
if err != nil {
return err
}
}
})
})
}
func (s *LMDBTrackingStore) Close() error {
s.env.CloseDBI(s.db)
return s.env.Close()
}

View File

@ -1,7 +1,6 @@
package splitstore
import (
"context"
"testing"
"golang.org/x/xerrors"
@ -46,7 +45,7 @@ func TestTrackingStore(t *testing.T) {
}
}
s, err := NewTrackingStore("/tmp/snoop-test")
s, err := NewLMDBTrackingStore("/tmp/snoop-test")
if err != nil {
t.Fatal(err)
}
@ -89,14 +88,18 @@ func TestTrackingStore(t *testing.T) {
k4.String(): {},
}
ch, _ := s.Keys(context.Background()) //nolint:errcheck
for k := range ch {
err = s.ForEach(func(k cid.Cid, _ abi.ChainEpoch) error {
_, ok := allKeys[k.String()]
if !ok {
t.Fatal("unexpected key")
}
delete(allKeys, k.String())
return nil
})
if err != nil {
t.Fatal(err)
}
if len(allKeys) != 0 {
@ -109,7 +112,7 @@ func TestTrackingStore(t *testing.T) {
t.Fatal(err)
}
s, err = NewTrackingStore("/tmp/snoop-test")
s, err = NewLMDBTrackingStore("/tmp/snoop-test")
if err != nil {
t.Fatal(err)
}

View File

@ -67,13 +67,13 @@ var _ bstore.Blockstore = (*SplitStore)(nil)
// compaction.
func NewSplitStore(path string, ds dstore.Datastore, cold, hot bstore.Blockstore) (*SplitStore, error) {
// the tracking store
snoop, err := NewTrackingStore(filepath.Join(path, "snoop.db"))
snoop, err := NewLMDBTrackingStore(filepath.Join(path, "snoop.lmdb"))
if err != nil {
return nil, err
}
// the liveset env
env, err := NewLiveSetEnv(filepath.Join(path, "sweep.db"))
env, err := NewLMDBLiveSetEnv(filepath.Join(path, "sweep.lmdb"))
if err != nil {
snoop.Close() //nolint:errcheck
return nil, err
@ -322,14 +322,14 @@ func (s *SplitStore) HeadChange(revert, apply []*types.TipSet) error {
func (s *SplitStore) compact() {
// create two on disk live sets, one for marking the cold finality region
// and one for marking the hot region
hotSet, err := NewLiveSet(s.env, "hot")
hotSet, err := NewLMDBLiveSet(s.env, "hot")
if err != nil {
// TODO do something better here
panic(err)
}
defer hotSet.Close() //nolint:errcheck
coldSet, err := NewLiveSet(s.env, "cold")
coldSet, err := NewLMDBLiveSet(s.env, "cold")
if err != nil {
// TODO do something better here
panic(err)