fix(store/v2/pebble): handle version 0 in keys (#22524)
This commit is contained in:
parent
e74799e1c3
commit
005ecad6c9
@ -17,6 +17,17 @@ type Batch struct {
|
||||
batch *pebble.Batch
|
||||
version uint64
|
||||
sync bool
|
||||
size int
|
||||
}
|
||||
|
||||
const (
|
||||
oneIf64Bit = ^uint(0) >> 63
|
||||
maxUint32OrInt = (1<<31)<<oneIf64Bit - 1
|
||||
maxVarintLen32 = 5
|
||||
)
|
||||
|
||||
func keyValueSize(key, value []byte) int {
|
||||
return len(key) + len(value) + 1 + 2*maxVarintLen32
|
||||
}
|
||||
|
||||
func NewBatch(storage *pebble.DB, version uint64, sync bool) (*Batch, error) {
|
||||
@ -34,6 +45,7 @@ func NewBatch(storage *pebble.DB, version uint64, sync bool) (*Batch, error) {
|
||||
batch: batch,
|
||||
version: version,
|
||||
sync: sync,
|
||||
size: keyValueSize([]byte(latestVersionKey), versionBz[:]),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -50,9 +62,20 @@ func (b *Batch) set(storeKey []byte, tombstone uint64, key, value []byte) error
|
||||
prefixedKey := MVCCEncode(prependStoreKey(storeKey, key), b.version)
|
||||
prefixedVal := MVCCEncode(value, tombstone)
|
||||
|
||||
size := keyValueSize(prefixedKey, prefixedVal)
|
||||
if b.size+size > maxUint32OrInt {
|
||||
// 4 GB is huge, probably genesis; flush and reset
|
||||
if err := b.batch.Commit(&pebble.WriteOptions{Sync: b.sync}); err != nil {
|
||||
return fmt.Errorf("max batch size exceed: failed to write PebbleDB batch: %w", err)
|
||||
}
|
||||
b.batch.Reset()
|
||||
b.size = 0
|
||||
}
|
||||
|
||||
if err := b.batch.Set(prefixedKey, prefixedVal, nil); err != nil {
|
||||
return fmt.Errorf("failed to write PebbleDB batch: %w", err)
|
||||
}
|
||||
b.size += size
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -238,11 +238,14 @@ func (db *Database) Prune(version uint64) (err error) {
|
||||
return fmt.Errorf("invalid PebbleDB MVCC key: %s", prefixedKey)
|
||||
}
|
||||
|
||||
keyVersion, err := decodeUint64Ascending(verBz)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to decode key version: %w", err)
|
||||
var keyVersion uint64
|
||||
// handle version 0 (no version prefix)
|
||||
if len(verBz) > 0 {
|
||||
keyVersion, err = decodeUint64Ascending(verBz)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to decode key version: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// seek to next key if we are at a version which is higher than prune height
|
||||
if keyVersion > version {
|
||||
itr.NextPrefix()
|
||||
@ -432,9 +435,13 @@ func getMVCCSlice(db *pebble.DB, storeKey, key []byte, version uint64) ([]byte,
|
||||
return nil, fmt.Errorf("invalid PebbleDB MVCC key: %s", itr.Key())
|
||||
}
|
||||
|
||||
keyVersion, err := decodeUint64Ascending(vBz)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to decode key version: %w", err)
|
||||
var keyVersion uint64
|
||||
// handle version 0 (no version prefix)
|
||||
if len(vBz) > 0 {
|
||||
keyVersion, err = decodeUint64Ascending(vBz)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to decode key version: %w", err)
|
||||
}
|
||||
}
|
||||
if keyVersion > version {
|
||||
return nil, fmt.Errorf("key version too large: %d", keyVersion)
|
||||
|
||||
@ -216,15 +216,20 @@ func (itr *iterator) DebugRawIterate() {
|
||||
valid = itr.source.SeekLT(MVCCEncode(firstKey, itr.version+1))
|
||||
}
|
||||
|
||||
var err error
|
||||
for valid {
|
||||
key, vBz, ok := SplitMVCCKey(itr.source.Key())
|
||||
if !ok {
|
||||
panic(fmt.Sprintf("invalid PebbleDB MVCC key: %s", itr.source.Key()))
|
||||
}
|
||||
|
||||
version, err := decodeUint64Ascending(vBz)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("failed to decode key version: %w", err))
|
||||
var version uint64
|
||||
// handle version 0 (no version prefix)
|
||||
if len(vBz) > 0 {
|
||||
version, err = decodeUint64Ascending(vBz)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("failed to decode key version: %w", err))
|
||||
}
|
||||
}
|
||||
|
||||
val, tombBz, ok := SplitMVCCKey(itr.source.Value())
|
||||
|
||||
Loading…
Reference in New Issue
Block a user