From 53e1e989163ffc28ddfe9b3c647a32636c68f598 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Wed, 24 Jan 2024 11:31:55 -0500 Subject: [PATCH] refactor(store/v2): Refactor PebbleDB Iterator (#19195) --- store/storage/pebbledb/iterator.go | 273 +++++++++++++++++++--------- store/storage/storage_test_suite.go | 105 ++++++++--- 2 files changed, 272 insertions(+), 106 deletions(-) diff --git a/store/storage/pebbledb/iterator.go b/store/storage/pebbledb/iterator.go index 6bd60a2cfe..6b16805e9d 100644 --- a/store/storage/pebbledb/iterator.go +++ b/store/storage/pebbledb/iterator.go @@ -47,19 +47,6 @@ func newPebbleDBIterator(src *pebble.Iterator, prefix, mvccStart, mvccEnd []byte valid = src.First() } - if valid { - // The first key may not represent the desired target version, so seek to - // the correct location by moving the cursor to the first key < version + 1. - firstKey, _, ok := SplitMVCCKey(src.Key()) - if !ok { - // XXX: This should not happen as that would indicate we have a malformed - // MVCC key. - valid = false - } else { - valid = src.SeekLT(MVCCEncode(firstKey, version+1)) - } - } - itr := &iterator{ source: src, prefix: prefix, @@ -70,12 +57,32 @@ func newPebbleDBIterator(src *pebble.Iterator, prefix, mvccStart, mvccEnd []byte reverse: reverse, } - // The cursor might now be pointing at a key/value pair that is tombstoned. - // If so, we must move the cursor. - if itr.valid && itr.cursorTombstoned() { - itr.Next() - } + if valid { + currKey, currKeyVersion, ok := SplitMVCCKey(itr.source.Key()) + if !ok { + // XXX: This should not happen as that would indicate we have a malformed + // MVCC value. + panic(fmt.Sprintf("invalid PebbleDB MVCC value: %s", itr.source.Key())) + } + curKeyVersionDecoded, err := decodeUint64Ascending(currKeyVersion) + if err != nil { + itr.valid = false + return itr + } + + // We need to check whether initial key iterator visits has a version <= requested + // version. If larger version, call next to find another key which does. + if curKeyVersionDecoded > itr.version { + itr.Next() + } else { + // If version is less, seek to the largest version of that key <= requested + // iterator version. It is guaranteed this won't move the iterator to a key + // that is invalid since curKeyVersionDecoded <= requested iterator version, + // so there exists at least one version of currKey SeekLT may move to. + itr.valid = itr.source.SeekLT(MVCCEncode(currKey, itr.version+1)) + } + } return itr } @@ -113,73 +120,11 @@ func (itr *iterator) Value() []byte { } func (itr *iterator) Next() { - currKey, _, ok := SplitMVCCKey(itr.source.Key()) - if !ok { - // XXX: This should not happen as that would indicate we have a malformed - // MVCC key. - panic(fmt.Sprintf("invalid PebbleDB MVCC key: %s", itr.source.Key())) - } - - var next bool if itr.reverse { - // Since PebbleDB has no PrevPrefix API, we must manually seek to the next - // key that is lexicographically less than the current key. - next = itr.source.SeekLT(MVCCEncode(currKey, 0)) + itr.nextReverse() } else { - // move the cursor to the next key - next = itr.source.NextPrefix() + itr.nextForward() } - - // First move the iterator to the next prefix, which may not correspond to the - // desired version for that key, e.g. if the key was written at a later version, - // so we seek back to the latest desired version, s.t. the version <= itr.version. - if next { - nextKey, _, ok := SplitMVCCKey(itr.source.Key()) - if !ok { - // XXX: This should not happen as that would indicate we have a malformed - // MVCC key. - itr.valid = false - return - } - if !bytes.HasPrefix(nextKey, itr.prefix) { - // the next key must have itr.prefix as the prefix - itr.valid = false - return - } - - // Move the iterator to the closest version of the desired version, so we - // append the current iterator key to the prefix and seek to that key. - itr.valid = itr.source.SeekLT(MVCCEncode(nextKey, itr.version+1)) - - tmpKey, _, ok := SplitMVCCKey(itr.source.Key()) - if !ok { - // XXX: This should not happen as that would indicate we have a malformed - // MVCC key. - itr.valid = false - return - } - - // There exists cases where the SeekLT() call moved us back to the same key - // we started at, so we must move to next key, i.e. two keys forward. - if bytes.Equal(tmpKey, currKey) { - if itr.source.NextPrefix() { - itr.Next() - } else { - itr.valid = false - return - } - } - - // The cursor might now be pointing at a key/value pair that is tombstoned. - // If so, we must move the cursor. - if itr.valid && itr.cursorTombstoned() { - itr.Next() - } - - return - } - - itr.valid = false } func (itr *iterator) Valid() bool { @@ -315,3 +260,167 @@ func (itr *iterator) DebugRawIterate() { } } } + +func (itr *iterator) nextForward() { + if !itr.source.Valid() { + itr.valid = false + return + } + + currKey, _, ok := SplitMVCCKey(itr.source.Key()) + if !ok { + // XXX: This should not happen as that would indicate we have a malformed + // MVCC key. + panic(fmt.Sprintf("invalid PebbleDB MVCC key: %s", itr.source.Key())) + } + + next := itr.source.NextPrefix() + + // First move the iterator to the next prefix, which may not correspond to the + // desired version for that key, e.g. if the key was written at a later version, + // so we seek back to the latest desired version, s.t. the version is <= itr.version. + if next { + nextKey, _, ok := SplitMVCCKey(itr.source.Key()) + if !ok { + // XXX: This should not happen as that would indicate we have a malformed + // MVCC key. + itr.valid = false + return + } + + if !bytes.HasPrefix(nextKey, itr.prefix) { + // the next key must have itr.prefix as the prefix + itr.valid = false + return + } + + // Move the iterator to the closest version to the desired version, so we + // append the current iterator key to the prefix and seek to that key. + itr.valid = itr.source.SeekLT(MVCCEncode(nextKey, itr.version+1)) + + tmpKey, tmpKeyVersion, ok := SplitMVCCKey(itr.source.Key()) + if !ok { + // XXX: This should not happen as that would indicate we have a malformed + // MVCC key. + itr.valid = false + return + } + + // There exists cases where the SeekLT() call moved us back to the same key + // we started at, so we must move to next key, i.e. two keys forward. + if bytes.Equal(tmpKey, currKey) { + if itr.source.NextPrefix() { + itr.nextForward() + + _, tmpKeyVersion, ok = SplitMVCCKey(itr.source.Key()) + if !ok { + // XXX: This should not happen as that would indicate we have a malformed + // MVCC key. + itr.valid = false + return + } + + } else { + itr.valid = false + return + } + } + + // We need to verify that every Next call either moves the iterator to a key + // whose version is less than or equal to requested iterator version, or + // exhausts the iterator. + tmpKeyVersionDecoded, err := decodeUint64Ascending(tmpKeyVersion) + if err != nil { + itr.valid = false + return + } + + // If iterator is at a entry whose version is higher than requested version, + // call nextForward again. + if tmpKeyVersionDecoded > itr.version { + itr.nextForward() + } + + // The cursor might now be pointing at a key/value pair that is tombstoned. + // If so, we must move the cursor. + if itr.valid && itr.cursorTombstoned() { + itr.nextForward() + } + + return + } + + itr.valid = false +} + +func (itr *iterator) nextReverse() { + if !itr.source.Valid() { + itr.valid = false + return + } + + currKey, _, ok := SplitMVCCKey(itr.source.Key()) + if !ok { + // XXX: This should not happen as that would indicate we have a malformed + // MVCC key. + panic(fmt.Sprintf("invalid PebbleDB MVCC key: %s", itr.source.Key())) + } + + next := itr.source.SeekLT(MVCCEncode(currKey, 0)) + + // First move the iterator to the next prefix, which may not correspond to the + // desired version for that key, e.g. if the key was written at a later version, + // so we seek back to the latest desired version, s.t. the version is <= itr.version. + if next { + nextKey, _, ok := SplitMVCCKey(itr.source.Key()) + if !ok { + // XXX: This should not happen as that would indicate we have a malformed + // MVCC key. + itr.valid = false + return + } + + if !bytes.HasPrefix(nextKey, itr.prefix) { + // the next key must have itr.prefix as the prefix + itr.valid = false + return + } + + // Move the iterator to the closest version to the desired version, so we + // append the current iterator key to the prefix and seek to that key. + itr.valid = itr.source.SeekLT(MVCCEncode(nextKey, itr.version+1)) + + _, tmpKeyVersion, ok := SplitMVCCKey(itr.source.Key()) + if !ok { + // XXX: This should not happen as that would indicate we have a malformed + // MVCC key. + itr.valid = false + return + } + + // We need to verify that every Next call either moves the iterator to a key + // whose version is less than or equal to requested iterator version, or + // exhausts the iterator. + tmpKeyVersionDecoded, err := decodeUint64Ascending(tmpKeyVersion) + if err != nil { + itr.valid = false + return + } + + // If iterator is at a entry whose version is higher than requested version, + // call nextReverse again. + if tmpKeyVersionDecoded > itr.version { + itr.nextReverse() + } + + // The cursor might now be pointing at a key/value pair that is tombstoned. + // If so, we must move the cursor. + if itr.valid && itr.cursorTombstoned() { + itr.nextReverse() + } + + return + } + + itr.valid = false +} diff --git a/store/storage/storage_test_suite.go b/store/storage/storage_test_suite.go index b43ed40895..28be858d50 100644 --- a/store/storage/storage_test_suite.go +++ b/store/storage/storage_test_suite.go @@ -3,7 +3,9 @@ package storage import ( "fmt" "slices" + "testing" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "cosmossdk.io/store/v2" @@ -402,30 +404,11 @@ func (s *StorageTestSuite) TestDatabaseIterator_SkipVersion() { defer db.Close() - cs := store.NewChangesetWithPairs(map[string]store.KVPairs{storeKey1: { - {Key: []byte("keyC"), Value: []byte("value003")}, - }}) - s.Require().NoError(db.ApplyChangeset(58827506, cs)) - - cs = store.NewChangesetWithPairs(map[string]store.KVPairs{storeKey1: { - {Key: []byte("keyE"), Value: []byte("value000")}, - }}) - s.Require().NoError(db.ApplyChangeset(58827506, cs)) - - cs = store.NewChangesetWithPairs(map[string]store.KVPairs{storeKey1: { - {Key: []byte("keyF"), Value: []byte("value000")}, - }}) - s.Require().NoError(db.ApplyChangeset(58827506, cs)) - - cs = store.NewChangesetWithPairs(map[string]store.KVPairs{storeKey1: { - {Key: []byte("keyC"), Value: []byte("value004")}, - }}) - s.Require().NoError(db.ApplyChangeset(58833605, cs)) - - cs = store.NewChangesetWithPairs(map[string]store.KVPairs{storeKey1: { - {Key: []byte("keyD"), Value: []byte("value006")}, - }}) - s.Require().NoError(db.ApplyChangeset(58833606, cs)) + DBApplyChangeset(s.T(), db, 58827506, storeKey1, [][]byte{[]byte("keyC")}, [][]byte{[]byte("value003")}) + DBApplyChangeset(s.T(), db, 58827506, storeKey1, [][]byte{[]byte("keyE")}, [][]byte{[]byte("value000")}) + DBApplyChangeset(s.T(), db, 58827506, storeKey1, [][]byte{[]byte("keyF")}, [][]byte{[]byte("value000")}) + DBApplyChangeset(s.T(), db, 58833605, storeKey1, [][]byte{[]byte("keyC")}, [][]byte{[]byte("value004")}) + DBApplyChangeset(s.T(), db, 58833606, storeKey1, [][]byte{[]byte("keyD")}, [][]byte{[]byte("value006")}) itr, err := db.Iterator(storeKey1, 58831525, []byte("key"), nil) s.Require().NoError(err) @@ -439,6 +422,60 @@ func (s *StorageTestSuite) TestDatabaseIterator_SkipVersion() { s.Require().Equal(3, len(count)) } +func (s *StorageTestSuite) TestDatabaseIterator_ForwardIteration() { + db, err := s.NewDB(s.T().TempDir()) + s.Require().NoError(err) + defer db.Close() + + DBApplyChangeset(s.T(), db, 8, storeKey1, [][]byte{[]byte("keyA")}, [][]byte{[]byte("value001")}) + DBApplyChangeset(s.T(), db, 9, storeKey1, [][]byte{[]byte("keyB")}, [][]byte{[]byte("value002")}) + DBApplyChangeset(s.T(), db, 10, storeKey1, [][]byte{[]byte("keyC")}, [][]byte{[]byte("value003")}) + DBApplyChangeset(s.T(), db, 11, storeKey1, [][]byte{[]byte("keyD")}, [][]byte{[]byte("value004")}) + + DBApplyChangeset(s.T(), db, 2, storeKey1, [][]byte{[]byte("keyD")}, [][]byte{[]byte("value007")}) + DBApplyChangeset(s.T(), db, 3, storeKey1, [][]byte{[]byte("keyE")}, [][]byte{[]byte("value008")}) + DBApplyChangeset(s.T(), db, 4, storeKey1, [][]byte{[]byte("keyF")}, [][]byte{[]byte("value009")}) + DBApplyChangeset(s.T(), db, 5, storeKey1, [][]byte{[]byte("keyH")}, [][]byte{[]byte("value010")}) + + itr, err := db.Iterator(storeKey1, 6, nil, []byte("keyZ")) + s.Require().NoError(err) + + defer itr.Close() + count := 0 + for ; itr.Valid(); itr.Next() { + count++ + } + + s.Require().Equal(4, count) +} + +func (s *StorageTestSuite) TestDatabaseIterator_ForwardIterationHigher() { + db, err := s.NewDB(s.T().TempDir()) + s.Require().NoError(err) + defer db.Close() + + DBApplyChangeset(s.T(), db, 9, storeKey1, [][]byte{[]byte("keyB")}, [][]byte{[]byte("value002")}) + DBApplyChangeset(s.T(), db, 10, storeKey1, [][]byte{[]byte("keyC")}, [][]byte{[]byte("value003")}) + DBApplyChangeset(s.T(), db, 11, storeKey1, [][]byte{[]byte("keyD")}, [][]byte{[]byte("value004")}) + + DBApplyChangeset(s.T(), db, 12, storeKey1, [][]byte{[]byte("keyD")}, [][]byte{[]byte("value007")}) + DBApplyChangeset(s.T(), db, 13, storeKey1, [][]byte{[]byte("keyE")}, [][]byte{[]byte("value008")}) + DBApplyChangeset(s.T(), db, 14, storeKey1, [][]byte{[]byte("keyF")}, [][]byte{[]byte("value009")}) + DBApplyChangeset(s.T(), db, 15, storeKey1, [][]byte{[]byte("keyH")}, [][]byte{[]byte("value010")}) + + itr, err := db.Iterator(storeKey1, 6, nil, []byte("keyZ")) + s.Require().NoError(err) + + defer itr.Close() + + count := 0 + for ; itr.Valid(); itr.Next() { + count++ + } + + s.Require().Equal(0, count) +} + func (s *StorageTestSuite) TestDatabase_IteratorNoDomain() { db, err := s.NewDB(s.T().TempDir()) s.Require().NoError(err) @@ -593,3 +630,23 @@ func (s *StorageTestSuite) TestDatabase_Prune_KeepRecent() { s.Require().NoError(err) s.Require().Equal([]byte("val200"), bz) } + +func DBApplyChangeset( + t *testing.T, + db store.VersionedDatabase, + version uint64, + storeKey string, + keys, vals [][]byte, +) { + t.Helper() + + require.Greater(t, version, uint64(0)) + require.Equal(t, len(keys), len(vals)) + + cs := store.NewChangeset() + for i := 0; i < len(keys); i++ { + cs.AddKVPair(storeKey, store.KVPair{Key: keys[i], Value: vals[i]}) + } + + require.NoError(t, db.ApplyChangeset(version, cs)) +}