From ee751f88cdee736e0ec1e89637ae1b4d1a9fa20c Mon Sep 17 00:00:00 2001 From: vyzo Date: Fri, 26 Feb 2021 20:54:47 +0200 Subject: [PATCH] refactor lmdb specific snoop/liveset code into their own files paves the way for different back ends --- chain/store/splitstore/liveset.go | 97 --------------- chain/store/splitstore/liveset_lmdb.go | 104 ++++++++++++++++ chain/store/splitstore/liveset_test.go | 8 +- chain/store/splitstore/snoop.go | 151 +----------------------- chain/store/splitstore/snoop_lmdb.go | 157 +++++++++++++++++++++++++ chain/store/splitstore/snoop_test.go | 13 +- chain/store/splitstore/splitstore.go | 8 +- 7 files changed, 278 insertions(+), 260 deletions(-) create mode 100644 chain/store/splitstore/liveset_lmdb.go create mode 100644 chain/store/splitstore/snoop_lmdb.go diff --git a/chain/store/splitstore/liveset.go b/chain/store/splitstore/liveset.go index 3fea285d0..5a7c8e6f2 100644 --- a/chain/store/splitstore/liveset.go +++ b/chain/store/splitstore/liveset.go @@ -1,110 +1,13 @@ package splitstore import ( - "os" - - "golang.org/x/xerrors" - - "github.com/ledgerwatch/lmdb-go/lmdb" - cid "github.com/ipfs/go-cid" ) -var LiveSetMapSize int64 = 1 << 34 // 16G; TODO this may be a little too big, we should figure out how to gradually grow the map. - type LiveSet interface { Mark(cid.Cid) error Has(cid.Cid) (bool, error) Close() error } -type liveSet struct { - env *lmdb.Env - db lmdb.DBI -} - var markBytes = []byte{} - -func NewLiveSetEnv(path string) (*lmdb.Env, error) { - env, err := lmdb.NewEnv() - if err != nil { - return nil, xerrors.Errorf("failed to initialize LDMB env: %w", err) - } - if err = env.SetMapSize(LiveSetMapSize); err != nil { - return nil, xerrors.Errorf("failed to set LMDB map size: %w", err) - } - if err = env.SetMaxDBs(2); err != nil { - return nil, xerrors.Errorf("failed to set LMDB max dbs: %w", err) - } - // if err = env.SetMaxReaders(1); err != nil { - // return nil, xerrors.Errorf("failed to set LMDB max readers: %w", err) - // } - - if st, err := os.Stat(path); os.IsNotExist(err) { - if err := os.MkdirAll(path, 0777); err != nil { - return nil, xerrors.Errorf("failed to create LMDB data directory at %s: %w", path, err) - } - } else if err != nil { - return nil, xerrors.Errorf("failed to stat LMDB data dir: %w", err) - } else if !st.IsDir() { - return nil, xerrors.Errorf("LMDB path is not a directory %s", path) - } - err = env.Open(path, lmdb.NoSync|lmdb.WriteMap|lmdb.MapAsync|lmdb.NoReadahead, 0777) - if err != nil { - env.Close() //nolint:errcheck - return nil, xerrors.Errorf("error opening LMDB database: %w", err) - } - - return env, nil -} - -func NewLiveSet(env *lmdb.Env, name string) (LiveSet, error) { - var db lmdb.DBI - err := env.Update(func(txn *lmdb.Txn) (err error) { - db, err = txn.CreateDBI(name) - return - }) - - if err != nil { - return nil, err - } - - return &liveSet{env: env, db: db}, nil -} - -func (s *liveSet) Mark(cid cid.Cid) error { - return s.env.Update(func(txn *lmdb.Txn) error { - err := txn.Put(s.db, cid.Hash(), markBytes, 0) - if err == nil || lmdb.IsErrno(err, lmdb.KeyExist) { - return nil - } - return err - }) -} - -func (s *liveSet) Has(cid cid.Cid) (has bool, err error) { - err = s.env.View(func(txn *lmdb.Txn) error { - txn.RawRead = true - - _, err := txn.Get(s.db, cid.Hash()) - if err != nil { - if lmdb.IsNotFound(err) { - has = false - return nil - } - - return err - } - - has = true - return nil - }) - - return -} - -func (s *liveSet) Close() error { - return s.env.Update(func(txn *lmdb.Txn) error { - return txn.Drop(s.db, true) - }) -} diff --git a/chain/store/splitstore/liveset_lmdb.go b/chain/store/splitstore/liveset_lmdb.go new file mode 100644 index 000000000..3eb44dbe1 --- /dev/null +++ b/chain/store/splitstore/liveset_lmdb.go @@ -0,0 +1,104 @@ +package splitstore + +import ( + "os" + + "golang.org/x/xerrors" + + "github.com/ledgerwatch/lmdb-go/lmdb" + + cid "github.com/ipfs/go-cid" +) + +var LMDBLiveSetMapSize int64 = 1 << 34 // 16G; TODO grow the map dynamically + +type LMDBLiveSet struct { + env *lmdb.Env + db lmdb.DBI +} + +var _ LiveSet = (*LMDBLiveSet)(nil) + +func NewLMDBLiveSetEnv(path string) (*lmdb.Env, error) { + env, err := lmdb.NewEnv() + if err != nil { + return nil, xerrors.Errorf("failed to initialize LDMB env: %w", err) + } + if err = env.SetMapSize(LMDBLiveSetMapSize); err != nil { + return nil, xerrors.Errorf("failed to set LMDB map size: %w", err) + } + if err = env.SetMaxDBs(2); err != nil { + return nil, xerrors.Errorf("failed to set LMDB max dbs: %w", err) + } + // if err = env.SetMaxReaders(1); err != nil { + // return nil, xerrors.Errorf("failed to set LMDB max readers: %w", err) + // } + + if st, err := os.Stat(path); os.IsNotExist(err) { + if err := os.MkdirAll(path, 0777); err != nil { + return nil, xerrors.Errorf("failed to create LMDB data directory at %s: %w", path, err) + } + } else if err != nil { + return nil, xerrors.Errorf("failed to stat LMDB data dir: %w", err) + } else if !st.IsDir() { + return nil, xerrors.Errorf("LMDB path is not a directory %s", path) + } + err = env.Open(path, lmdb.NoSync|lmdb.WriteMap|lmdb.MapAsync|lmdb.NoReadahead, 0777) + if err != nil { + env.Close() //nolint:errcheck + return nil, xerrors.Errorf("error opening LMDB database: %w", err) + } + + return env, nil +} + +func NewLMDBLiveSet(env *lmdb.Env, name string) (*LMDBLiveSet, error) { + var db lmdb.DBI + err := env.Update(func(txn *lmdb.Txn) (err error) { + db, err = txn.CreateDBI(name) + return + }) + + if err != nil { + return nil, err + } + + return &LMDBLiveSet{env: env, db: db}, nil +} + +func (s *LMDBLiveSet) Mark(cid cid.Cid) error { + return s.env.Update(func(txn *lmdb.Txn) error { + err := txn.Put(s.db, cid.Hash(), markBytes, 0) + if err == nil || lmdb.IsErrno(err, lmdb.KeyExist) { + return nil + } + return err + }) +} + +func (s *LMDBLiveSet) Has(cid cid.Cid) (has bool, err error) { + err = s.env.View(func(txn *lmdb.Txn) error { + txn.RawRead = true + + _, err := txn.Get(s.db, cid.Hash()) + if err != nil { + if lmdb.IsNotFound(err) { + has = false + return nil + } + + return err + } + + has = true + return nil + }) + + return +} + +func (s *LMDBLiveSet) Close() error { + return s.env.Update(func(txn *lmdb.Txn) error { + return txn.Drop(s.db, true) + }) +} diff --git a/chain/store/splitstore/liveset_test.go b/chain/store/splitstore/liveset_test.go index 449b3c92f..7d03e873a 100644 --- a/chain/store/splitstore/liveset_test.go +++ b/chain/store/splitstore/liveset_test.go @@ -37,12 +37,12 @@ func TestLiveSet(t *testing.T) { t.Fatal(err) } - hotSet, err := NewLiveSet(env, "hot") + hotSet, err := NewLMDBLiveSet(env, "hot") if err != nil { t.Fatal(err) } - coldSet, err := NewLiveSet(env, "cold") + coldSet, err := NewLMDBLiveSet(env, "cold") if err != nil { t.Fatal(err) } @@ -109,12 +109,12 @@ func TestLiveSet(t *testing.T) { t.Fatal(err) } - hotSet, err = NewLiveSet(env, "hot") + hotSet, err = NewLMDBLiveSet(env, "hot") if err != nil { t.Fatal(err) } - coldSet, err = NewLiveSet(env, "cold") + coldSet, err = NewLMDBLiveSet(env, "cold") if err != nil { t.Fatal(err) } diff --git a/chain/store/splitstore/snoop.go b/chain/store/splitstore/snoop.go index b260c2008..a762be79c 100644 --- a/chain/store/splitstore/snoop.go +++ b/chain/store/splitstore/snoop.go @@ -1,19 +1,10 @@ package splitstore import ( - "os" - - "golang.org/x/xerrors" - - "github.com/ledgerwatch/lmdb-go/lmdb" - - cid "github.com/ipfs/go-cid" - "github.com/filecoin-project/go-state-types/abi" + cid "github.com/ipfs/go-cid" ) -var TrackingStoreMapSize int64 = 1 << 34 // 16G; TODO this may be a little too big, we should figure out how to gradually grow the map. - type TrackingStore interface { Put(cid.Cid, abi.ChainEpoch) error PutBatch([]cid.Cid, abi.ChainEpoch) error @@ -22,143 +13,3 @@ type TrackingStore interface { ForEach(func(cid.Cid, abi.ChainEpoch) error) error Close() error } - -type trackingStore struct { - env *lmdb.Env - db lmdb.DBI -} - -func NewTrackingStore(path string) (TrackingStore, error) { - env, err := lmdb.NewEnv() - if err != nil { - return nil, xerrors.Errorf("failed to initialize LMDB env: %w", err) - } - if err = env.SetMapSize(TrackingStoreMapSize); err != nil { - return nil, xerrors.Errorf("failed to set LMDB map size: %w", err) - } - if err = env.SetMaxDBs(1); err != nil { - return nil, xerrors.Errorf("failed to set LMDB max dbs: %w", err) - } - - if st, err := os.Stat(path); os.IsNotExist(err) { - if err := os.MkdirAll(path, 0777); err != nil { - return nil, xerrors.Errorf("failed to create LMDB data directory at %s: %w", path, err) - } - } else if err != nil { - return nil, xerrors.Errorf("failed to stat LMDB data dir: %w", err) - } else if !st.IsDir() { - return nil, xerrors.Errorf("LMDB path is not a directory %s", path) - } - - err = env.Open(path, lmdb.NoSync|lmdb.WriteMap|lmdb.MapAsync|lmdb.NoReadahead, 0777) - if err != nil { - env.Close() //nolint:errcheck - return nil, xerrors.Errorf("error opening LMDB database: %w", err) - } - - s := new(trackingStore) - s.env = env - err = env.Update(func(txn *lmdb.Txn) (err error) { - s.db, err = txn.CreateDBI("snoop") - return err - }) - - if err != nil { - return nil, xerrors.Errorf("error creating tracking store: %w", err) - } - - return s, nil -} - -func (s *trackingStore) Put(cid cid.Cid, epoch abi.ChainEpoch) error { - val := epochToBytes(epoch) - return withMaxReadersRetry( - func() error { - return s.env.Update(func(txn *lmdb.Txn) error { - return txn.Put(s.db, cid.Hash(), val, 0) - }) - }) -} - -func (s *trackingStore) PutBatch(cids []cid.Cid, epoch abi.ChainEpoch) error { - val := epochToBytes(epoch) - return withMaxReadersRetry( - func() error { - return s.env.Update(func(txn *lmdb.Txn) error { - for _, cid := range cids { - err := txn.Put(s.db, cid.Hash(), val, 0) - if err != nil { - return err - } - } - - return nil - }) - }) -} - -func (s *trackingStore) Get(cid cid.Cid) (epoch abi.ChainEpoch, err error) { - err = withMaxReadersRetry( - func() error { - return s.env.View(func(txn *lmdb.Txn) error { - txn.RawRead = true - - val, err := txn.Get(s.db, cid.Hash()) - if err != nil { - return err - } - - epoch = bytesToEpoch(val) - return nil - }) - }) - - return -} - -func (s *trackingStore) Delete(cid cid.Cid) error { - return withMaxReadersRetry( - func() error { - return s.env.Update(func(txn *lmdb.Txn) error { - return txn.Del(s.db, cid.Hash(), nil) - }) - }) -} - -func (s *trackingStore) ForEach(f func(cid.Cid, abi.ChainEpoch) error) error { - return withMaxReadersRetry( - func() error { - return s.env.View(func(txn *lmdb.Txn) error { - txn.RawRead = true - cur, err := txn.OpenCursor(s.db) - if err != nil { - return err - } - defer cur.Close() - - for { - k, v, err := cur.Get(nil, nil, lmdb.Next) - if err != nil { - if lmdb.IsNotFound(err) { - return nil - } - - return err - } - - cid := cid.NewCidV1(cid.Raw, k) - epoch := bytesToEpoch(v) - - err = f(cid, epoch) - if err != nil { - return err - } - } - }) - }) -} - -func (s *trackingStore) Close() error { - s.env.CloseDBI(s.db) - return s.env.Close() -} diff --git a/chain/store/splitstore/snoop_lmdb.go b/chain/store/splitstore/snoop_lmdb.go new file mode 100644 index 000000000..08c139f52 --- /dev/null +++ b/chain/store/splitstore/snoop_lmdb.go @@ -0,0 +1,157 @@ +package splitstore + +import ( + "os" + + "golang.org/x/xerrors" + + "github.com/ledgerwatch/lmdb-go/lmdb" + + cid "github.com/ipfs/go-cid" + + "github.com/filecoin-project/go-state-types/abi" +) + +var LMDBTrackingStoreMapSize int64 = 1 << 34 // 16G -- TODO grow the map dynamically + +type LMDBTrackingStore struct { + env *lmdb.Env + db lmdb.DBI +} + +var _ TrackingStore = (*LMDBTrackingStore)(nil) + +func NewLMDBTrackingStore(path string) (TrackingStore, error) { + env, err := lmdb.NewEnv() + if err != nil { + return nil, xerrors.Errorf("failed to initialize LMDB env: %w", err) + } + if err = env.SetMapSize(LMDBTrackingStoreMapSize); err != nil { + return nil, xerrors.Errorf("failed to set LMDB map size: %w", err) + } + if err = env.SetMaxDBs(1); err != nil { + return nil, xerrors.Errorf("failed to set LMDB max dbs: %w", err) + } + + if st, err := os.Stat(path); os.IsNotExist(err) { + if err := os.MkdirAll(path, 0777); err != nil { + return nil, xerrors.Errorf("failed to create LMDB data directory at %s: %w", path, err) + } + } else if err != nil { + return nil, xerrors.Errorf("failed to stat LMDB data dir: %w", err) + } else if !st.IsDir() { + return nil, xerrors.Errorf("LMDB path is not a directory %s", path) + } + + err = env.Open(path, lmdb.NoSync|lmdb.WriteMap|lmdb.MapAsync|lmdb.NoReadahead, 0777) + if err != nil { + env.Close() //nolint:errcheck + return nil, xerrors.Errorf("error opening LMDB database: %w", err) + } + + s := new(LMDBTrackingStore) + s.env = env + err = env.Update(func(txn *lmdb.Txn) (err error) { + s.db, err = txn.CreateDBI("snoop") + return err + }) + + if err != nil { + return nil, xerrors.Errorf("error creating tracking store: %w", err) + } + + return s, nil +} + +func (s *LMDBTrackingStore) Put(cid cid.Cid, epoch abi.ChainEpoch) error { + val := epochToBytes(epoch) + return withMaxReadersRetry( + func() error { + return s.env.Update(func(txn *lmdb.Txn) error { + return txn.Put(s.db, cid.Hash(), val, 0) + }) + }) +} + +func (s *LMDBTrackingStore) PutBatch(cids []cid.Cid, epoch abi.ChainEpoch) error { + val := epochToBytes(epoch) + return withMaxReadersRetry( + func() error { + return s.env.Update(func(txn *lmdb.Txn) error { + for _, cid := range cids { + err := txn.Put(s.db, cid.Hash(), val, 0) + if err != nil { + return err + } + } + + return nil + }) + }) +} + +func (s *LMDBTrackingStore) Get(cid cid.Cid) (epoch abi.ChainEpoch, err error) { + err = withMaxReadersRetry( + func() error { + return s.env.View(func(txn *lmdb.Txn) error { + txn.RawRead = true + + val, err := txn.Get(s.db, cid.Hash()) + if err != nil { + return err + } + + epoch = bytesToEpoch(val) + return nil + }) + }) + + return +} + +func (s *LMDBTrackingStore) Delete(cid cid.Cid) error { + return withMaxReadersRetry( + func() error { + return s.env.Update(func(txn *lmdb.Txn) error { + return txn.Del(s.db, cid.Hash(), nil) + }) + }) +} + +func (s *LMDBTrackingStore) ForEach(f func(cid.Cid, abi.ChainEpoch) error) error { + return withMaxReadersRetry( + func() error { + return s.env.View(func(txn *lmdb.Txn) error { + txn.RawRead = true + cur, err := txn.OpenCursor(s.db) + if err != nil { + return err + } + defer cur.Close() + + for { + k, v, err := cur.Get(nil, nil, lmdb.Next) + if err != nil { + if lmdb.IsNotFound(err) { + return nil + } + + return err + } + + cid := cid.NewCidV1(cid.Raw, k) + epoch := bytesToEpoch(v) + + err = f(cid, epoch) + if err != nil { + return err + } + } + }) + }) +} + +func (s *LMDBTrackingStore) Close() error { + s.env.CloseDBI(s.db) + return s.env.Close() +} diff --git a/chain/store/splitstore/snoop_test.go b/chain/store/splitstore/snoop_test.go index 816fdbdb7..225c5936b 100644 --- a/chain/store/splitstore/snoop_test.go +++ b/chain/store/splitstore/snoop_test.go @@ -1,7 +1,6 @@ package splitstore import ( - "context" "testing" "golang.org/x/xerrors" @@ -46,7 +45,7 @@ func TestTrackingStore(t *testing.T) { } } - s, err := NewTrackingStore("/tmp/snoop-test") + s, err := NewLMDBTrackingStore("/tmp/snoop-test") if err != nil { t.Fatal(err) } @@ -89,14 +88,18 @@ func TestTrackingStore(t *testing.T) { k4.String(): {}, } - ch, _ := s.Keys(context.Background()) //nolint:errcheck - for k := range ch { + err = s.ForEach(func(k cid.Cid, _ abi.ChainEpoch) error { _, ok := allKeys[k.String()] if !ok { t.Fatal("unexpected key") } delete(allKeys, k.String()) + return nil + }) + + if err != nil { + t.Fatal(err) } if len(allKeys) != 0 { @@ -109,7 +112,7 @@ func TestTrackingStore(t *testing.T) { t.Fatal(err) } - s, err = NewTrackingStore("/tmp/snoop-test") + s, err = NewLMDBTrackingStore("/tmp/snoop-test") if err != nil { t.Fatal(err) } diff --git a/chain/store/splitstore/splitstore.go b/chain/store/splitstore/splitstore.go index 71f271b82..82ffc1feb 100644 --- a/chain/store/splitstore/splitstore.go +++ b/chain/store/splitstore/splitstore.go @@ -67,13 +67,13 @@ var _ bstore.Blockstore = (*SplitStore)(nil) // compaction. func NewSplitStore(path string, ds dstore.Datastore, cold, hot bstore.Blockstore) (*SplitStore, error) { // the tracking store - snoop, err := NewTrackingStore(filepath.Join(path, "snoop.db")) + snoop, err := NewLMDBTrackingStore(filepath.Join(path, "snoop.lmdb")) if err != nil { return nil, err } // the liveset env - env, err := NewLiveSetEnv(filepath.Join(path, "sweep.db")) + env, err := NewLMDBLiveSetEnv(filepath.Join(path, "sweep.lmdb")) if err != nil { snoop.Close() //nolint:errcheck return nil, err @@ -322,14 +322,14 @@ func (s *SplitStore) HeadChange(revert, apply []*types.TipSet) error { func (s *SplitStore) compact() { // create two on disk live sets, one for marking the cold finality region // and one for marking the hot region - hotSet, err := NewLiveSet(s.env, "hot") + hotSet, err := NewLMDBLiveSet(s.env, "hot") if err != nil { // TODO do something better here panic(err) } defer hotSet.Close() //nolint:errcheck - coldSet, err := NewLiveSet(s.env, "cold") + coldSet, err := NewLMDBLiveSet(s.env, "cold") if err != nil { // TODO do something better here panic(err)