fix(store/v2): fix the pebbledb storage implementation (#21837)
Co-authored-by: Julien Robert <julien@rbrt.fr>
This commit is contained in:
parent
06c2fc72df
commit
a7a0daf704
@ -68,27 +68,27 @@ func New(dataDir string) (*Database, error) {
|
||||
return nil, fmt.Errorf("failed to open PebbleDB: %w", err)
|
||||
}
|
||||
|
||||
pruneHeight, err := getPruneHeight(db)
|
||||
earliestVersion, err := getEarliestVersion(db)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get prune height: %w", err)
|
||||
return nil, fmt.Errorf("failed to get the earliest version: %w", err)
|
||||
}
|
||||
|
||||
return &Database{
|
||||
storage: db,
|
||||
earliestVersion: pruneHeight + 1,
|
||||
earliestVersion: earliestVersion,
|
||||
sync: true,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewWithDB(storage *pebble.DB, sync bool) *Database {
|
||||
pruneHeight, err := getPruneHeight(storage)
|
||||
earliestVersion, err := getEarliestVersion(storage)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("failed to get prune height: %w", err))
|
||||
panic(fmt.Errorf("failed to get the earliest version: %w", err))
|
||||
}
|
||||
|
||||
return &Database{
|
||||
storage: storage,
|
||||
earliestVersion: pruneHeight + 1,
|
||||
earliestVersion: earliestVersion,
|
||||
sync: sync,
|
||||
}
|
||||
}
|
||||
@ -362,7 +362,10 @@ func prependStoreKey(storeKey, key []byte) []byte {
|
||||
return []byte(fmt.Sprintf("%s%s", storePrefix(storeKey), key))
|
||||
}
|
||||
|
||||
func getPruneHeight(storage *pebble.DB) (uint64, error) {
|
||||
// getEarliestVersion returns the earliest version set in the database.
|
||||
// It is calculated by prune height + 1. If the prune height is not set, it
|
||||
// returns 0.
|
||||
func getEarliestVersion(storage *pebble.DB) (uint64, error) {
|
||||
bz, closer, err := storage.Get([]byte(pruneHeightKey))
|
||||
if err != nil {
|
||||
if errors.Is(err, pebble.ErrNotFound) {
|
||||
@ -377,7 +380,7 @@ func getPruneHeight(storage *pebble.DB) (uint64, error) {
|
||||
return 0, closer.Close()
|
||||
}
|
||||
|
||||
return binary.LittleEndian.Uint64(bz), closer.Close()
|
||||
return binary.LittleEndian.Uint64(bz) + 1, closer.Close()
|
||||
}
|
||||
|
||||
func valTombstoned(value []byte) bool {
|
||||
|
||||
@ -82,6 +82,12 @@ func newPebbleDBIterator(src *pebble.Iterator, prefix, mvccStart, mvccEnd []byte
|
||||
// so there exists at least one version of currKey SeekLT may move to.
|
||||
itr.valid = itr.source.SeekLT(MVCCEncode(currKey, itr.version+1))
|
||||
}
|
||||
|
||||
// The cursor might now be pointing at a key/value pair that is tombstoned.
|
||||
// If so, we must move the cursor.
|
||||
if itr.valid && itr.cursorTombstoned() {
|
||||
itr.Next()
|
||||
}
|
||||
}
|
||||
return itr
|
||||
}
|
||||
|
||||
@ -485,6 +485,33 @@ func (s *StorageTestSuite) TestDatabaseIterator_ForwardIterationHigher() {
|
||||
s.Require().Equal(0, count)
|
||||
}
|
||||
|
||||
func (s *StorageTestSuite) TestDatabaseIterator_WithDelete() {
|
||||
db, err := s.NewDB(s.T().TempDir())
|
||||
s.Require().NoError(err)
|
||||
defer db.Close()
|
||||
|
||||
dbApplyChangeset(s.T(), db, 1, storeKey1, [][]byte{[]byte("keyA")}, [][]byte{[]byte("value001")})
|
||||
dbApplyChangeset(s.T(), db, 2, storeKey1, [][]byte{[]byte("keyA")}, [][]byte{nil}) // delete
|
||||
|
||||
itr, err := db.Iterator(storeKey1Bytes, 1, nil, nil)
|
||||
s.Require().NoError(err)
|
||||
|
||||
count := 0
|
||||
for ; itr.Valid(); itr.Next() {
|
||||
count++
|
||||
}
|
||||
s.Require().Equal(1, count)
|
||||
|
||||
itr, err = db.Iterator(storeKey1Bytes, 2, nil, nil)
|
||||
s.Require().NoError(err)
|
||||
|
||||
count = 0
|
||||
for ; itr.Valid(); itr.Next() {
|
||||
count++
|
||||
}
|
||||
s.Require().Equal(0, count)
|
||||
}
|
||||
|
||||
func (s *StorageTestSuite) TestDatabase_IteratorNoDomain() {
|
||||
db, err := s.NewDB(s.T().TempDir())
|
||||
s.Require().NoError(err)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user