refactor: make cachekv store thread-safe again (#14378)
Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>
This commit is contained in:
parent
9983148b7a
commit
f1ee974ec8
@ -128,6 +128,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
|
||||
* (x/auth/vesting) [#13502](https://github.com/cosmos/cosmos-sdk/pull/13502) Add Amino Msg registration for `MsgCreatePeriodicVestingAccount`.
|
||||
* (x/auth)[#13780](https://github.com/cosmos/cosmos-sdk/pull/13780) `id` (type of int64) in `AccountAddressByID` grpc query is now deprecated, update to account-id(type of uint64) to use `AccountAddressByID`.
|
||||
* (x/group) [#13876](https://github.com/cosmos/cosmos-sdk/pull/13876) Fix group MinExecutionPeriod that is checked on execution now, instead of voting period end.
|
||||
* (store) [#14378](https://github.com/cosmos/cosmos-sdk/pull/14378) The `CacheKV` store is thread-safe again, which includes improved iteration and deletion logic. Iteration is on a strictly isolated view now, which is breaking from previous behavior.
|
||||
|
||||
### API Breaking Changes
|
||||
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"bytes"
|
||||
"errors"
|
||||
|
||||
"github.com/cosmos/cosmos-sdk/store/types"
|
||||
"github.com/tidwall/btree"
|
||||
)
|
||||
|
||||
@ -21,23 +22,24 @@ var errKeyEmpty = errors.New("key cannot be empty")
|
||||
//
|
||||
// We choose tidwall/btree over google/btree here because it provides API to implement step iterator directly.
|
||||
type BTree struct {
|
||||
tree btree.BTreeG[item]
|
||||
tree *btree.BTreeG[item]
|
||||
}
|
||||
|
||||
// NewBTree creates a wrapper around `btree.BTreeG`.
|
||||
func NewBTree() *BTree {
|
||||
return &BTree{tree: *btree.NewBTreeGOptions(byKeys, btree.Options{
|
||||
Degree: bTreeDegree,
|
||||
// Contract: cachekv store must not be called concurrently
|
||||
NoLocks: true,
|
||||
})}
|
||||
func NewBTree() BTree {
|
||||
return BTree{
|
||||
tree: btree.NewBTreeGOptions(byKeys, btree.Options{
|
||||
Degree: bTreeDegree,
|
||||
NoLocks: false,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
func (bt *BTree) Set(key, value []byte) {
|
||||
func (bt BTree) Set(key, value []byte) {
|
||||
bt.tree.Set(newItem(key, value))
|
||||
}
|
||||
|
||||
func (bt *BTree) Get(key []byte) []byte {
|
||||
func (bt BTree) Get(key []byte) []byte {
|
||||
i, found := bt.tree.Get(newItem(key, nil))
|
||||
if !found {
|
||||
return nil
|
||||
@ -45,22 +47,30 @@ func (bt *BTree) Get(key []byte) []byte {
|
||||
return i.value
|
||||
}
|
||||
|
||||
func (bt *BTree) Delete(key []byte) {
|
||||
func (bt BTree) Delete(key []byte) {
|
||||
bt.tree.Delete(newItem(key, nil))
|
||||
}
|
||||
|
||||
func (bt *BTree) Iterator(start, end []byte) (*memIterator, error) { //nolint:revive
|
||||
func (bt BTree) Iterator(start, end []byte) (types.Iterator, error) {
|
||||
if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) {
|
||||
return nil, errKeyEmpty
|
||||
}
|
||||
return NewMemIterator(start, end, bt, make(map[string]struct{}), true), nil
|
||||
return newMemIterator(start, end, bt, true), nil
|
||||
}
|
||||
|
||||
func (bt *BTree) ReverseIterator(start, end []byte) (*memIterator, error) { //nolint:revive
|
||||
func (bt BTree) ReverseIterator(start, end []byte) (types.Iterator, error) {
|
||||
if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) {
|
||||
return nil, errKeyEmpty
|
||||
}
|
||||
return NewMemIterator(start, end, bt, make(map[string]struct{}), false), nil
|
||||
return newMemIterator(start, end, bt, false), nil
|
||||
}
|
||||
|
||||
// Copy the tree. This is a copy-on-write operation and is very fast because
|
||||
// it only performs a shadowed copy.
|
||||
func (bt BTree) Copy() BTree {
|
||||
return BTree{
|
||||
tree: bt.tree.Copy(),
|
||||
}
|
||||
}
|
||||
|
||||
// item is a btree item with byte slices as keys and values
|
||||
|
||||
@ -3,6 +3,7 @@ package internal
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/cosmos/cosmos-sdk/store/types"
|
||||
sdk "github.com/cosmos/cosmos-sdk/types"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@ -181,7 +182,7 @@ func TestDBIterator(t *testing.T) {
|
||||
verifyIterator(t, ritr, nil, "reverse iterator with empty db")
|
||||
}
|
||||
|
||||
func verifyIterator(t *testing.T, itr *memIterator, expected []int64, msg string) {
|
||||
func verifyIterator(t *testing.T, itr types.Iterator, expected []int64, msg string) {
|
||||
i := 0
|
||||
for itr.Valid() {
|
||||
key := itr.Key()
|
||||
|
||||
@ -11,7 +11,7 @@ import (
|
||||
var _ types.Iterator = (*memIterator)(nil)
|
||||
|
||||
// memIterator iterates over iterKVCache items.
|
||||
// if key is nil, means it was deleted.
|
||||
// if value is nil, means it was deleted.
|
||||
// Implements Iterator.
|
||||
type memIterator struct {
|
||||
iter btree.IterG[item]
|
||||
@ -19,12 +19,10 @@ type memIterator struct {
|
||||
start []byte
|
||||
end []byte
|
||||
ascending bool
|
||||
lastKey []byte
|
||||
deleted map[string]struct{}
|
||||
valid bool
|
||||
}
|
||||
|
||||
func NewMemIterator(start, end []byte, items *BTree, deleted map[string]struct{}, ascending bool) *memIterator { //nolint:revive
|
||||
func newMemIterator(start, end []byte, items BTree, ascending bool) *memIterator {
|
||||
iter := items.tree.Iter()
|
||||
var valid bool
|
||||
if ascending {
|
||||
@ -52,8 +50,6 @@ func NewMemIterator(start, end []byte, items *BTree, deleted map[string]struct{}
|
||||
start: start,
|
||||
end: end,
|
||||
ascending: ascending,
|
||||
lastKey: nil,
|
||||
deleted: deleted,
|
||||
valid: valid,
|
||||
}
|
||||
|
||||
@ -113,21 +109,7 @@ func (mi *memIterator) Key() []byte {
|
||||
}
|
||||
|
||||
func (mi *memIterator) Value() []byte {
|
||||
item := mi.iter.Item()
|
||||
key := item.key
|
||||
// We need to handle the case where deleted is modified and includes our current key
|
||||
// We handle this by maintaining a lastKey object in the iterator.
|
||||
// If the current key is the same as the last key (and last key is not nil / the start)
|
||||
// then we are calling value on the same thing as last time.
|
||||
// Therefore we don't check the mi.deleted to see if this key is included in there.
|
||||
if _, ok := mi.deleted[string(key)]; ok {
|
||||
if mi.lastKey == nil || !bytes.Equal(key, mi.lastKey) {
|
||||
// not re-calling on old last key
|
||||
return nil
|
||||
}
|
||||
}
|
||||
mi.lastKey = key
|
||||
return item.value
|
||||
return mi.iter.Item().value
|
||||
}
|
||||
|
||||
func (mi *memIterator) assertValid() {
|
||||
|
||||
@ -24,14 +24,11 @@ type cValue struct {
|
||||
}
|
||||
|
||||
// Store wraps an in-memory cache around an underlying types.KVStore.
|
||||
// If a cached value is nil but deleted is defined for the corresponding key,
|
||||
// it means the parent doesn't have the key. (No need to delete upon Write())
|
||||
type Store struct {
|
||||
mtx sync.Mutex
|
||||
cache map[string]*cValue
|
||||
deleted map[string]struct{}
|
||||
unsortedCache map[string]struct{}
|
||||
sortedCache *internal.BTree // always ascending sorted
|
||||
sortedCache internal.BTree // always ascending sorted
|
||||
parent types.KVStore
|
||||
}
|
||||
|
||||
@ -41,7 +38,6 @@ var _ types.CacheKVStore = (*Store)(nil)
|
||||
func NewStore(parent types.KVStore) *Store {
|
||||
return &Store{
|
||||
cache: make(map[string]*cValue),
|
||||
deleted: make(map[string]struct{}),
|
||||
unsortedCache: make(map[string]struct{}),
|
||||
sortedCache: internal.NewBTree(),
|
||||
parent: parent,
|
||||
@ -63,7 +59,7 @@ func (store *Store) Get(key []byte) (value []byte) {
|
||||
cacheValue, ok := store.cache[conv.UnsafeBytesToStr(key)]
|
||||
if !ok {
|
||||
value = store.parent.Get(key)
|
||||
store.setCacheValue(key, value, false, false)
|
||||
store.setCacheValue(key, value, false)
|
||||
} else {
|
||||
value = cacheValue.value
|
||||
}
|
||||
@ -79,7 +75,7 @@ func (store *Store) Set(key []byte, value []byte) {
|
||||
types.AssertValidKey(key)
|
||||
types.AssertValidValue(value)
|
||||
|
||||
store.setCacheValue(key, value, false, true)
|
||||
store.setCacheValue(key, value, true)
|
||||
}
|
||||
|
||||
// Has implements types.KVStore.
|
||||
@ -94,7 +90,7 @@ func (store *Store) Delete(key []byte) {
|
||||
defer store.mtx.Unlock()
|
||||
|
||||
types.AssertValidKey(key)
|
||||
store.setCacheValue(key, nil, true, true)
|
||||
store.setCacheValue(key, nil, true)
|
||||
}
|
||||
|
||||
// Implements Cachetypes.KVStore.
|
||||
@ -102,7 +98,7 @@ func (store *Store) Write() {
|
||||
store.mtx.Lock()
|
||||
defer store.mtx.Unlock()
|
||||
|
||||
if len(store.cache) == 0 && len(store.deleted) == 0 && len(store.unsortedCache) == 0 {
|
||||
if len(store.cache) == 0 && len(store.unsortedCache) == 0 {
|
||||
store.sortedCache = internal.NewBTree()
|
||||
return
|
||||
}
|
||||
@ -122,19 +118,16 @@ func (store *Store) Write() {
|
||||
// TODO: Consider allowing usage of Batch, which would allow the write to
|
||||
// at least happen atomically.
|
||||
for _, key := range keys {
|
||||
if store.isDeleted(key) {
|
||||
// We use []byte(key) instead of conv.UnsafeStrToBytes because we cannot
|
||||
// be sure if the underlying store might do a save with the byteslice or
|
||||
// not. Once we get confirmation that .Delete is guaranteed not to
|
||||
// save the byteslice, then we can assume only a read-only copy is sufficient.
|
||||
store.parent.Delete([]byte(key))
|
||||
continue
|
||||
}
|
||||
|
||||
// We use []byte(key) instead of conv.UnsafeStrToBytes because we cannot
|
||||
// be sure if the underlying store might do a save with the byteslice or
|
||||
// not. Once we get confirmation that .Delete is guaranteed not to
|
||||
// save the byteslice, then we can assume only a read-only copy is sufficient.
|
||||
cacheValue := store.cache[key]
|
||||
if cacheValue.value != nil {
|
||||
// It already exists in the parent, hence delete it.
|
||||
// It already exists in the parent, hence update it.
|
||||
store.parent.Set([]byte(key), cacheValue.value)
|
||||
} else {
|
||||
store.parent.Delete([]byte(key))
|
||||
}
|
||||
}
|
||||
|
||||
@ -144,9 +137,6 @@ func (store *Store) Write() {
|
||||
for key := range store.cache {
|
||||
delete(store.cache, key)
|
||||
}
|
||||
for key := range store.deleted {
|
||||
delete(store.deleted, key)
|
||||
}
|
||||
for key := range store.unsortedCache {
|
||||
delete(store.unsortedCache, key)
|
||||
}
|
||||
@ -180,16 +170,24 @@ func (store *Store) iterator(start, end []byte, ascending bool) types.Iterator {
|
||||
store.mtx.Lock()
|
||||
defer store.mtx.Unlock()
|
||||
|
||||
var parent, cache types.Iterator
|
||||
store.dirtyItems(start, end)
|
||||
isoSortedCache := store.sortedCache.Copy()
|
||||
|
||||
var (
|
||||
err error
|
||||
parent, cache types.Iterator
|
||||
)
|
||||
|
||||
if ascending {
|
||||
parent = store.parent.Iterator(start, end)
|
||||
cache, err = isoSortedCache.Iterator(start, end)
|
||||
} else {
|
||||
parent = store.parent.ReverseIterator(start, end)
|
||||
cache, err = isoSortedCache.ReverseIterator(start, end)
|
||||
}
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
store.dirtyItems(start, end)
|
||||
cache = internal.NewMemIterator(start, end, store.sortedCache, store.deleted, ascending)
|
||||
|
||||
return internal.NewCacheMergeIterator(parent, cache, ascending)
|
||||
}
|
||||
@ -370,13 +368,7 @@ func (store *Store) clearUnsortedCacheSubset(unsorted []*kv.Pair, sortState sort
|
||||
}
|
||||
|
||||
for _, item := range unsorted {
|
||||
if item.Value == nil {
|
||||
// deleted element, tracked by store.deleted
|
||||
// setting arbitrary value
|
||||
store.sortedCache.Set(item.Key, []byte{})
|
||||
continue
|
||||
}
|
||||
|
||||
// sortedCache is able to store `nil` value to represent deleted items.
|
||||
store.sortedCache.Set(item.Key, item.Value)
|
||||
}
|
||||
}
|
||||
@ -385,23 +377,14 @@ func (store *Store) clearUnsortedCacheSubset(unsorted []*kv.Pair, sortState sort
|
||||
// etc
|
||||
|
||||
// Only entrypoint to mutate store.cache.
|
||||
func (store *Store) setCacheValue(key, value []byte, deleted bool, dirty bool) {
|
||||
// A `nil` value means a deletion.
|
||||
func (store *Store) setCacheValue(key, value []byte, dirty bool) {
|
||||
keyStr := conv.UnsafeBytesToStr(key)
|
||||
store.cache[keyStr] = &cValue{
|
||||
value: value,
|
||||
dirty: dirty,
|
||||
}
|
||||
if deleted {
|
||||
store.deleted[keyStr] = struct{}{}
|
||||
} else {
|
||||
delete(store.deleted, keyStr)
|
||||
}
|
||||
if dirty {
|
||||
store.unsortedCache[keyStr] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
func (store *Store) isDeleted(key string) bool {
|
||||
_, ok := store.deleted[key]
|
||||
return ok
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user