feat(store/v2): support reverse iteration for PebbleDB (#18193)
This commit is contained in:
parent
3c9168d460
commit
7590e91914
@ -182,11 +182,31 @@ func (db *Database) Iterator(storeKey string, version uint64, start, end []byte)
|
||||
return nil, fmt.Errorf("failed to create PebbleDB iterator: %w", err)
|
||||
}
|
||||
|
||||
return newPebbleDBIterator(itr, storePrefix(storeKey), start, end, version), nil
|
||||
return newPebbleDBIterator(itr, storePrefix(storeKey), start, end, version, false), nil
|
||||
}
|
||||
|
||||
func (db *Database) ReverseIterator(storeKey string, version uint64, start, end []byte) (store.Iterator, error) {
|
||||
panic("not implemented!")
|
||||
if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) {
|
||||
return nil, store.ErrKeyEmpty
|
||||
}
|
||||
|
||||
if start != nil && end != nil && bytes.Compare(start, end) > 0 {
|
||||
return nil, store.ErrStartAfterEnd
|
||||
}
|
||||
|
||||
lowerBound := MVCCEncode(prependStoreKey(storeKey, start), 0)
|
||||
|
||||
var upperBound []byte
|
||||
if end != nil {
|
||||
upperBound = MVCCEncode(prependStoreKey(storeKey, end), 0)
|
||||
}
|
||||
|
||||
itr, err := db.storage.NewIter(&pebble.IterOptions{LowerBound: lowerBound, UpperBound: upperBound})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create PebbleDB iterator: %w", err)
|
||||
}
|
||||
|
||||
return newPebbleDBIterator(itr, storePrefix(storeKey), start, end, version, true), nil
|
||||
}
|
||||
|
||||
func storePrefix(storeKey string) []byte {
|
||||
|
||||
@ -3,17 +3,12 @@ package pebbledb
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"cosmossdk.io/store/v2"
|
||||
"cosmossdk.io/store/v2/storage"
|
||||
)
|
||||
|
||||
const (
|
||||
storeKey1 = "store1"
|
||||
)
|
||||
|
||||
func TestStorageTestSuite(t *testing.T) {
|
||||
s := &storage.StorageTestSuite{
|
||||
NewDB: func(dir string) (store.VersionedDatabase, error) {
|
||||
@ -26,11 +21,3 @@ func TestStorageTestSuite(t *testing.T) {
|
||||
}
|
||||
suite.Run(t, s)
|
||||
}
|
||||
|
||||
func TestDatabase_ReverseIterator(t *testing.T) {
|
||||
db, err := New(t.TempDir())
|
||||
require.NoError(t, err)
|
||||
defer db.Close()
|
||||
|
||||
require.Panics(t, func() { _, _ = db.ReverseIterator(storeKey1, 1, []byte("key000"), nil) })
|
||||
}
|
||||
|
||||
@ -26,14 +26,21 @@ type iterator struct {
|
||||
prefix, start, end []byte
|
||||
version uint64
|
||||
valid bool
|
||||
reverse bool
|
||||
}
|
||||
|
||||
func newPebbleDBIterator(src *pebble.Iterator, prefix, mvccStart, mvccEnd []byte, version uint64) *iterator {
|
||||
func newPebbleDBIterator(src *pebble.Iterator, prefix, mvccStart, mvccEnd []byte, version uint64, reverse bool) *iterator {
|
||||
// move the underlying PebbleDB iterator to the first key
|
||||
valid := src.First()
|
||||
var valid bool
|
||||
if reverse {
|
||||
valid = src.Last()
|
||||
} else {
|
||||
valid = src.First()
|
||||
}
|
||||
|
||||
if valid {
|
||||
// The first key may not represent the desired target version, so move the
|
||||
// cursor to the correct location.
|
||||
// The first key may not represent the desired target version, so seek to
|
||||
// the correct location by moving the cursor to the first key < version + 1.
|
||||
firstKey, _, ok := SplitMVCCKey(src.Key())
|
||||
if !ok {
|
||||
// XXX: This should not happen as that would indicate we have a malformed
|
||||
@ -51,6 +58,7 @@ func newPebbleDBIterator(src *pebble.Iterator, prefix, mvccStart, mvccEnd []byte
|
||||
end: mvccEnd,
|
||||
version: version,
|
||||
valid: valid,
|
||||
reverse: reverse,
|
||||
}
|
||||
|
||||
// The cursor might now be pointing at a key/value pair that is tombstoned.
|
||||
@ -96,10 +104,27 @@ func (itr *iterator) Value() []byte {
|
||||
}
|
||||
|
||||
func (itr *iterator) Next() bool {
|
||||
var next bool
|
||||
if itr.reverse {
|
||||
currKey, _, ok := SplitMVCCKey(itr.source.Key())
|
||||
if !ok {
|
||||
// XXX: This should not happen as that would indicate we have a malformed
|
||||
// MVCC key.
|
||||
panic(fmt.Sprintf("invalid PebbleDB MVCC key: %s", itr.source.Key()))
|
||||
}
|
||||
|
||||
// Since PebbleDB has no PrevPrefix API, we must manually seek to the next
|
||||
// key that is lexicographically less than the current key.
|
||||
next = itr.source.SeekLT(MVCCEncode(currKey, 0))
|
||||
} else {
|
||||
// move the cursor to the next key
|
||||
next = itr.source.NextPrefix()
|
||||
}
|
||||
|
||||
// First move the iterator to the next prefix, which may not correspond to the
|
||||
// desired version for that key, e.g. if the key was written at a later version,
|
||||
// so we seek back to the latest desired version, s.t. the version is <= itr.version.
|
||||
if itr.source.NextPrefix() {
|
||||
if next {
|
||||
nextKey, _, ok := SplitMVCCKey(itr.source.Key())
|
||||
if !ok {
|
||||
// XXX: This should not happen as that would indicate we have a malformed
|
||||
@ -237,7 +262,14 @@ func (itr *iterator) DebugRawIterate() {
|
||||
|
||||
fmt.Printf("KEY: %s, VALUE: %s, VERSION: %d, TOMBSTONE: %d\n", key, val, version, tombstone)
|
||||
|
||||
if itr.source.NextPrefix() {
|
||||
var next bool
|
||||
if itr.reverse {
|
||||
next = itr.source.SeekLT(MVCCEncode(key, 0))
|
||||
} else {
|
||||
next = itr.source.NextPrefix()
|
||||
}
|
||||
|
||||
if next {
|
||||
nextKey, _, ok := SplitMVCCKey(itr.source.Key())
|
||||
if !ok {
|
||||
panic(fmt.Sprintf("invalid PebbleDB MVCC key: %s", itr.source.Key()))
|
||||
|
||||
Loading…
Reference in New Issue
Block a user