refactor(store/v2): Refactor PebbleDB Iterator (#19195)

This commit is contained in:
Aleksandr Bezobchuk 2024-01-24 11:31:55 -05:00 committed by GitHub
parent 38d56db428
commit 53e1e98916
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 272 additions and 106 deletions

View File

@ -47,19 +47,6 @@ func newPebbleDBIterator(src *pebble.Iterator, prefix, mvccStart, mvccEnd []byte
valid = src.First()
}
if valid {
// The first key may not represent the desired target version, so seek to
// the correct location by moving the cursor to the first key < version + 1.
firstKey, _, ok := SplitMVCCKey(src.Key())
if !ok {
// XXX: This should not happen as that would indicate we have a malformed
// MVCC key.
valid = false
} else {
valid = src.SeekLT(MVCCEncode(firstKey, version+1))
}
}
itr := &iterator{
source: src,
prefix: prefix,
@ -70,12 +57,32 @@ func newPebbleDBIterator(src *pebble.Iterator, prefix, mvccStart, mvccEnd []byte
reverse: reverse,
}
// The cursor might now be pointing at a key/value pair that is tombstoned.
// If so, we must move the cursor.
if itr.valid && itr.cursorTombstoned() {
itr.Next()
}
if valid {
currKey, currKeyVersion, ok := SplitMVCCKey(itr.source.Key())
if !ok {
// XXX: This should not happen as that would indicate we have a malformed
// MVCC value.
panic(fmt.Sprintf("invalid PebbleDB MVCC value: %s", itr.source.Key()))
}
curKeyVersionDecoded, err := decodeUint64Ascending(currKeyVersion)
if err != nil {
itr.valid = false
return itr
}
// We need to check whether initial key iterator visits has a version <= requested
// version. If larger version, call next to find another key which does.
if curKeyVersionDecoded > itr.version {
itr.Next()
} else {
// If version is less, seek to the largest version of that key <= requested
// iterator version. It is guaranteed this won't move the iterator to a key
// that is invalid since curKeyVersionDecoded <= requested iterator version,
// so there exists at least one version of currKey SeekLT may move to.
itr.valid = itr.source.SeekLT(MVCCEncode(currKey, itr.version+1))
}
}
return itr
}
@ -113,73 +120,11 @@ func (itr *iterator) Value() []byte {
}
func (itr *iterator) Next() {
currKey, _, ok := SplitMVCCKey(itr.source.Key())
if !ok {
// XXX: This should not happen as that would indicate we have a malformed
// MVCC key.
panic(fmt.Sprintf("invalid PebbleDB MVCC key: %s", itr.source.Key()))
}
var next bool
if itr.reverse {
// Since PebbleDB has no PrevPrefix API, we must manually seek to the next
// key that is lexicographically less than the current key.
next = itr.source.SeekLT(MVCCEncode(currKey, 0))
itr.nextReverse()
} else {
// move the cursor to the next key
next = itr.source.NextPrefix()
itr.nextForward()
}
// First move the iterator to the next prefix, which may not correspond to the
// desired version for that key, e.g. if the key was written at a later version,
// so we seek back to the latest desired version, s.t. the version <= itr.version.
if next {
nextKey, _, ok := SplitMVCCKey(itr.source.Key())
if !ok {
// XXX: This should not happen as that would indicate we have a malformed
// MVCC key.
itr.valid = false
return
}
if !bytes.HasPrefix(nextKey, itr.prefix) {
// the next key must have itr.prefix as the prefix
itr.valid = false
return
}
// Move the iterator to the closest version of the desired version, so we
// append the current iterator key to the prefix and seek to that key.
itr.valid = itr.source.SeekLT(MVCCEncode(nextKey, itr.version+1))
tmpKey, _, ok := SplitMVCCKey(itr.source.Key())
if !ok {
// XXX: This should not happen as that would indicate we have a malformed
// MVCC key.
itr.valid = false
return
}
// There exists cases where the SeekLT() call moved us back to the same key
// we started at, so we must move to next key, i.e. two keys forward.
if bytes.Equal(tmpKey, currKey) {
if itr.source.NextPrefix() {
itr.Next()
} else {
itr.valid = false
return
}
}
// The cursor might now be pointing at a key/value pair that is tombstoned.
// If so, we must move the cursor.
if itr.valid && itr.cursorTombstoned() {
itr.Next()
}
return
}
itr.valid = false
}
func (itr *iterator) Valid() bool {
@ -315,3 +260,167 @@ func (itr *iterator) DebugRawIterate() {
}
}
}
func (itr *iterator) nextForward() {
if !itr.source.Valid() {
itr.valid = false
return
}
currKey, _, ok := SplitMVCCKey(itr.source.Key())
if !ok {
// XXX: This should not happen as that would indicate we have a malformed
// MVCC key.
panic(fmt.Sprintf("invalid PebbleDB MVCC key: %s", itr.source.Key()))
}
next := itr.source.NextPrefix()
// First move the iterator to the next prefix, which may not correspond to the
// desired version for that key, e.g. if the key was written at a later version,
// so we seek back to the latest desired version, s.t. the version is <= itr.version.
if next {
nextKey, _, ok := SplitMVCCKey(itr.source.Key())
if !ok {
// XXX: This should not happen as that would indicate we have a malformed
// MVCC key.
itr.valid = false
return
}
if !bytes.HasPrefix(nextKey, itr.prefix) {
// the next key must have itr.prefix as the prefix
itr.valid = false
return
}
// Move the iterator to the closest version to the desired version, so we
// append the current iterator key to the prefix and seek to that key.
itr.valid = itr.source.SeekLT(MVCCEncode(nextKey, itr.version+1))
tmpKey, tmpKeyVersion, ok := SplitMVCCKey(itr.source.Key())
if !ok {
// XXX: This should not happen as that would indicate we have a malformed
// MVCC key.
itr.valid = false
return
}
// There exists cases where the SeekLT() call moved us back to the same key
// we started at, so we must move to next key, i.e. two keys forward.
if bytes.Equal(tmpKey, currKey) {
if itr.source.NextPrefix() {
itr.nextForward()
_, tmpKeyVersion, ok = SplitMVCCKey(itr.source.Key())
if !ok {
// XXX: This should not happen as that would indicate we have a malformed
// MVCC key.
itr.valid = false
return
}
} else {
itr.valid = false
return
}
}
// We need to verify that every Next call either moves the iterator to a key
// whose version is less than or equal to requested iterator version, or
// exhausts the iterator.
tmpKeyVersionDecoded, err := decodeUint64Ascending(tmpKeyVersion)
if err != nil {
itr.valid = false
return
}
// If iterator is at a entry whose version is higher than requested version,
// call nextForward again.
if tmpKeyVersionDecoded > itr.version {
itr.nextForward()
}
// The cursor might now be pointing at a key/value pair that is tombstoned.
// If so, we must move the cursor.
if itr.valid && itr.cursorTombstoned() {
itr.nextForward()
}
return
}
itr.valid = false
}
func (itr *iterator) nextReverse() {
if !itr.source.Valid() {
itr.valid = false
return
}
currKey, _, ok := SplitMVCCKey(itr.source.Key())
if !ok {
// XXX: This should not happen as that would indicate we have a malformed
// MVCC key.
panic(fmt.Sprintf("invalid PebbleDB MVCC key: %s", itr.source.Key()))
}
next := itr.source.SeekLT(MVCCEncode(currKey, 0))
// First move the iterator to the next prefix, which may not correspond to the
// desired version for that key, e.g. if the key was written at a later version,
// so we seek back to the latest desired version, s.t. the version is <= itr.version.
if next {
nextKey, _, ok := SplitMVCCKey(itr.source.Key())
if !ok {
// XXX: This should not happen as that would indicate we have a malformed
// MVCC key.
itr.valid = false
return
}
if !bytes.HasPrefix(nextKey, itr.prefix) {
// the next key must have itr.prefix as the prefix
itr.valid = false
return
}
// Move the iterator to the closest version to the desired version, so we
// append the current iterator key to the prefix and seek to that key.
itr.valid = itr.source.SeekLT(MVCCEncode(nextKey, itr.version+1))
_, tmpKeyVersion, ok := SplitMVCCKey(itr.source.Key())
if !ok {
// XXX: This should not happen as that would indicate we have a malformed
// MVCC key.
itr.valid = false
return
}
// We need to verify that every Next call either moves the iterator to a key
// whose version is less than or equal to requested iterator version, or
// exhausts the iterator.
tmpKeyVersionDecoded, err := decodeUint64Ascending(tmpKeyVersion)
if err != nil {
itr.valid = false
return
}
// If iterator is at a entry whose version is higher than requested version,
// call nextReverse again.
if tmpKeyVersionDecoded > itr.version {
itr.nextReverse()
}
// The cursor might now be pointing at a key/value pair that is tombstoned.
// If so, we must move the cursor.
if itr.valid && itr.cursorTombstoned() {
itr.nextReverse()
}
return
}
itr.valid = false
}

View File

@ -3,7 +3,9 @@ package storage
import (
"fmt"
"slices"
"testing"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"cosmossdk.io/store/v2"
@ -402,30 +404,11 @@ func (s *StorageTestSuite) TestDatabaseIterator_SkipVersion() {
defer db.Close()
cs := store.NewChangesetWithPairs(map[string]store.KVPairs{storeKey1: {
{Key: []byte("keyC"), Value: []byte("value003")},
}})
s.Require().NoError(db.ApplyChangeset(58827506, cs))
cs = store.NewChangesetWithPairs(map[string]store.KVPairs{storeKey1: {
{Key: []byte("keyE"), Value: []byte("value000")},
}})
s.Require().NoError(db.ApplyChangeset(58827506, cs))
cs = store.NewChangesetWithPairs(map[string]store.KVPairs{storeKey1: {
{Key: []byte("keyF"), Value: []byte("value000")},
}})
s.Require().NoError(db.ApplyChangeset(58827506, cs))
cs = store.NewChangesetWithPairs(map[string]store.KVPairs{storeKey1: {
{Key: []byte("keyC"), Value: []byte("value004")},
}})
s.Require().NoError(db.ApplyChangeset(58833605, cs))
cs = store.NewChangesetWithPairs(map[string]store.KVPairs{storeKey1: {
{Key: []byte("keyD"), Value: []byte("value006")},
}})
s.Require().NoError(db.ApplyChangeset(58833606, cs))
DBApplyChangeset(s.T(), db, 58827506, storeKey1, [][]byte{[]byte("keyC")}, [][]byte{[]byte("value003")})
DBApplyChangeset(s.T(), db, 58827506, storeKey1, [][]byte{[]byte("keyE")}, [][]byte{[]byte("value000")})
DBApplyChangeset(s.T(), db, 58827506, storeKey1, [][]byte{[]byte("keyF")}, [][]byte{[]byte("value000")})
DBApplyChangeset(s.T(), db, 58833605, storeKey1, [][]byte{[]byte("keyC")}, [][]byte{[]byte("value004")})
DBApplyChangeset(s.T(), db, 58833606, storeKey1, [][]byte{[]byte("keyD")}, [][]byte{[]byte("value006")})
itr, err := db.Iterator(storeKey1, 58831525, []byte("key"), nil)
s.Require().NoError(err)
@ -439,6 +422,60 @@ func (s *StorageTestSuite) TestDatabaseIterator_SkipVersion() {
s.Require().Equal(3, len(count))
}
func (s *StorageTestSuite) TestDatabaseIterator_ForwardIteration() {
db, err := s.NewDB(s.T().TempDir())
s.Require().NoError(err)
defer db.Close()
DBApplyChangeset(s.T(), db, 8, storeKey1, [][]byte{[]byte("keyA")}, [][]byte{[]byte("value001")})
DBApplyChangeset(s.T(), db, 9, storeKey1, [][]byte{[]byte("keyB")}, [][]byte{[]byte("value002")})
DBApplyChangeset(s.T(), db, 10, storeKey1, [][]byte{[]byte("keyC")}, [][]byte{[]byte("value003")})
DBApplyChangeset(s.T(), db, 11, storeKey1, [][]byte{[]byte("keyD")}, [][]byte{[]byte("value004")})
DBApplyChangeset(s.T(), db, 2, storeKey1, [][]byte{[]byte("keyD")}, [][]byte{[]byte("value007")})
DBApplyChangeset(s.T(), db, 3, storeKey1, [][]byte{[]byte("keyE")}, [][]byte{[]byte("value008")})
DBApplyChangeset(s.T(), db, 4, storeKey1, [][]byte{[]byte("keyF")}, [][]byte{[]byte("value009")})
DBApplyChangeset(s.T(), db, 5, storeKey1, [][]byte{[]byte("keyH")}, [][]byte{[]byte("value010")})
itr, err := db.Iterator(storeKey1, 6, nil, []byte("keyZ"))
s.Require().NoError(err)
defer itr.Close()
count := 0
for ; itr.Valid(); itr.Next() {
count++
}
s.Require().Equal(4, count)
}
func (s *StorageTestSuite) TestDatabaseIterator_ForwardIterationHigher() {
db, err := s.NewDB(s.T().TempDir())
s.Require().NoError(err)
defer db.Close()
DBApplyChangeset(s.T(), db, 9, storeKey1, [][]byte{[]byte("keyB")}, [][]byte{[]byte("value002")})
DBApplyChangeset(s.T(), db, 10, storeKey1, [][]byte{[]byte("keyC")}, [][]byte{[]byte("value003")})
DBApplyChangeset(s.T(), db, 11, storeKey1, [][]byte{[]byte("keyD")}, [][]byte{[]byte("value004")})
DBApplyChangeset(s.T(), db, 12, storeKey1, [][]byte{[]byte("keyD")}, [][]byte{[]byte("value007")})
DBApplyChangeset(s.T(), db, 13, storeKey1, [][]byte{[]byte("keyE")}, [][]byte{[]byte("value008")})
DBApplyChangeset(s.T(), db, 14, storeKey1, [][]byte{[]byte("keyF")}, [][]byte{[]byte("value009")})
DBApplyChangeset(s.T(), db, 15, storeKey1, [][]byte{[]byte("keyH")}, [][]byte{[]byte("value010")})
itr, err := db.Iterator(storeKey1, 6, nil, []byte("keyZ"))
s.Require().NoError(err)
defer itr.Close()
count := 0
for ; itr.Valid(); itr.Next() {
count++
}
s.Require().Equal(0, count)
}
func (s *StorageTestSuite) TestDatabase_IteratorNoDomain() {
db, err := s.NewDB(s.T().TempDir())
s.Require().NoError(err)
@ -593,3 +630,23 @@ func (s *StorageTestSuite) TestDatabase_Prune_KeepRecent() {
s.Require().NoError(err)
s.Require().Equal([]byte("val200"), bz)
}
func DBApplyChangeset(
t *testing.T,
db store.VersionedDatabase,
version uint64,
storeKey string,
keys, vals [][]byte,
) {
t.Helper()
require.Greater(t, version, uint64(0))
require.Equal(t, len(keys), len(vals))
cs := store.NewChangeset()
for i := 0; i < len(keys); i++ {
cs.AddKVPair(storeKey, store.KVPair{Key: keys[i], Value: vals[i]})
}
require.NoError(t, db.ApplyChangeset(version, cs))
}