refactor(store/v2): Refactor RootStore (#19012)

This commit is contained in:
Aleksandr Bezobchuk 2024-01-12 13:22:42 -05:00 committed by GitHub
parent 5452586687
commit 60b0221405
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 229 additions and 2252 deletions

View File

@ -1,10 +0,0 @@
# Branch KVStore
The `branch.Store` implementation defines a `BranchedKVStore` that contains a
reference to a `VersionedDatabase`, i.e. an SS backend. The `branch.Store` is
meant to be used as the primary store used in a `RootStore` implementation. It
provides the ability to get the current `ChangeSet`, branching, and writing to
a parent store (if one is defined). Note, all reads first pass through the
staged, i.e. dirty writes. If a key is not found in the staged writes, the read
is then passed to the parent store (if one is defined), finally falling back to
the backing SS engine.

View File

@ -1,148 +0,0 @@
package branch
import (
"slices"
corestore "cosmossdk.io/core/store"
"cosmossdk.io/store/v2"
)
var _ corestore.Iterator = (*iterator)(nil)
// iterator walks over both the KVStore's changeset, i.e. dirty writes, and the
// parent iterator, which can either be another KVStore or the SS backend, at the
// same time.
//
// Note, writes that happen on the KVStore over an iterator will not affect the
// iterator. This is because when an iterator is created, it takes a current
// snapshot of the changeset.
type iterator struct {
parentItr corestore.Iterator
start []byte
end []byte
key []byte
value []byte
keys []string
values store.KVPairs
reverse bool
exhausted bool // exhausted reflects if the parent iterator is exhausted or not
}
// Domain returns the domain of the iterator. The caller must not modify the
// return values.
func (itr *iterator) Domain() ([]byte, []byte) {
return itr.start, itr.end
}
func (itr *iterator) Key() []byte {
return slices.Clone(itr.key)
}
func (itr *iterator) Value() []byte {
return slices.Clone(itr.value)
}
func (itr *iterator) Close() error {
itr.key = nil
itr.value = nil
itr.keys = nil
itr.values = nil
return itr.parentItr.Close()
}
func (itr *iterator) Next() {
for {
switch {
case itr.exhausted && len(itr.keys) == 0: // exhausted both
itr.key = nil
itr.value = nil
return
case itr.exhausted: // exhausted parent iterator but not store (dirty writes) iterator
nextKey := itr.keys[0]
nextValue := itr.values[0]
// pop off the key
itr.keys[0] = ""
itr.keys = itr.keys[1:]
// pop off the value
itr.values[0].Value = nil
itr.values = itr.values[1:]
if nextValue.Value != nil {
itr.key = []byte(nextKey)
itr.value = nextValue.Value
return
}
case len(itr.keys) == 0: // exhausted store (dirty writes) iterator but not parent iterator
itr.key = itr.parentItr.Key()
itr.value = itr.parentItr.Value()
itr.parentItr.Next()
itr.exhausted = !itr.parentItr.Valid()
return
default: // parent iterator is not exhausted and we have store (dirty writes) remaining
dirtyKey := itr.keys[0]
dirtyVal := itr.values[0]
parentKey := itr.parentItr.Key()
parentKeyStr := string(parentKey)
switch {
case (!itr.reverse && dirtyKey < parentKeyStr) || (itr.reverse && dirtyKey > parentKeyStr): // dirty key should come before parent's key
// pop off key
itr.keys[0] = ""
itr.keys = itr.keys[1:]
// pop off value
itr.values[0].Value = nil
itr.values = itr.values[1:]
if dirtyVal.Value != nil {
itr.key = []byte(dirtyKey)
itr.value = dirtyVal.Value
return
}
case (!itr.reverse && parentKeyStr < dirtyKey) || (itr.reverse && parentKeyStr > dirtyKey): // parent's key should come before dirty key
itr.key = parentKey
itr.value = itr.parentItr.Value()
itr.parentItr.Next()
itr.exhausted = !itr.parentItr.Valid()
return
default:
// pop off key
itr.keys[0] = ""
itr.keys = itr.keys[1:]
// pop off value
itr.values[0].Value = nil
itr.values = itr.values[1:]
itr.parentItr.Next()
itr.exhausted = !itr.parentItr.Valid()
if dirtyVal.Value != nil {
itr.key = []byte(dirtyKey)
itr.value = dirtyVal.Value
return
}
}
}
}
}
func (itr *iterator) Valid() bool {
return itr.key != nil && itr.value != nil
}
func (itr *iterator) Error() error {
return itr.parentItr.Error()
}

View File

@ -1,314 +0,0 @@
package branch
import (
"fmt"
"io"
"slices"
"sync"
"golang.org/x/exp/maps"
corestore "cosmossdk.io/core/store"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/kv/trace"
)
var _ store.BranchedKVStore = (*Store)(nil)
// Store implements both a KVStore and BranchedKVStore interfaces. It is used to
// accumulate writes that can be later committed to backing SS and SC engines or
// discarded altogether. If a read is not found through an uncommitted write, it
// will be delegated to the SS backend.
type Store struct {
mu sync.Mutex
// storage reflects backing storage (SS) for reads that are not found in uncommitted volatile state
storage store.VersionedDatabase
// version indicates the latest version to handle reads falling through to SS
version uint64
// storeKey reflects the store key used for the store
storeKey string
// parent reflects a parent store if branched (it may be nil)
parent store.KVStore
// changeset reflects the uncommitted writes to the store as it contains a mapping
// from key to a KVPair.
changeset map[string]store.KVPair
}
func New(storeKey string, ss store.VersionedDatabase) (store.BranchedKVStore, error) {
latestVersion, err := ss.GetLatestVersion()
if err != nil {
return nil, err
}
return &Store{
storage: ss,
storeKey: storeKey,
version: latestVersion,
changeset: make(map[string]store.KVPair),
}, nil
}
func NewWithParent(parent store.KVStore) store.BranchedKVStore {
return &Store{
parent: parent,
storeKey: parent.GetStoreKey(),
changeset: make(map[string]store.KVPair),
}
}
func (s *Store) GetStoreKey() string {
return s.storeKey
}
func (s *Store) GetStoreType() store.StoreType {
return store.StoreTypeBranch
}
// GetChangeset returns the uncommitted writes to the store, ordered by key.
func (s *Store) GetChangeset() *store.Changeset {
keys := maps.Keys(s.changeset)
slices.Sort(keys)
pairs := make(store.KVPairs, len(keys))
for i, key := range keys {
kvPair := s.changeset[key]
pairs[i] = store.KVPair{
Key: []byte(key),
Value: slices.Clone(kvPair.Value),
StoreKey: s.storeKey,
}
}
return store.NewChangesetWithPairs(map[string]store.KVPairs{s.storeKey: pairs})
}
func (s *Store) Reset(toVersion uint64) error {
s.mu.Lock()
defer s.mu.Unlock()
if err := s.storage.SetLatestVersion(toVersion); err != nil {
return fmt.Errorf("failed to set SS latest version %d: %w", toVersion, err)
}
clear(s.changeset)
s.version = toVersion
return nil
}
func (s *Store) Branch() store.BranchedKVStore {
return NewWithParent(s)
}
func (s *Store) BranchWithTrace(w io.Writer, tc store.TraceContext) store.BranchedKVStore {
return NewWithParent(trace.New(s, w, tc))
}
func (s *Store) Has(key []byte) bool {
store.AssertValidKey(key)
s.mu.Lock()
defer s.mu.Unlock()
// if the write is present in the changeset, i.e. a dirty write, evaluate it
if kvPair, ok := s.changeset[string(key)]; ok {
// a non-nil value indicates presence
return kvPair.Value != nil
}
// if the store is branched, check the parent store
if s.parent != nil {
return s.parent.Has(key)
}
// otherwise, we fallback to SS
ok, err := s.storage.Has(s.storeKey, s.version, key)
if err != nil {
panic(err)
}
return ok
}
func (s *Store) Get(key []byte) []byte {
store.AssertValidKey(key)
s.mu.Lock()
defer s.mu.Unlock()
// if the write is present in the changeset, i.e. a dirty write, evaluate it
if kvPair, ok := s.changeset[string(key)]; ok {
if kvPair.Value == nil {
return nil
}
return slices.Clone(kvPair.Value)
}
// if the store is branched, check the parent store
if s.parent != nil {
return s.parent.Get(key)
}
// otherwise, we fallback to SS
bz, err := s.storage.Get(s.storeKey, s.version, key)
if err != nil {
panic(err)
}
return bz
}
func (s *Store) Set(key, value []byte) {
store.AssertValidKey(key)
store.AssertValidValue(value)
s.mu.Lock()
defer s.mu.Unlock()
// omit the key as that can be inferred from the map key
s.changeset[string(key)] = store.KVPair{Value: slices.Clone(value)}
}
func (s *Store) Delete(key []byte) {
store.AssertValidKey(key)
s.mu.Lock()
defer s.mu.Unlock()
// omit the key as that can be inferred from the map key
s.changeset[string(key)] = store.KVPair{Value: nil}
}
func (s *Store) Write() {
s.mu.Lock()
defer s.mu.Unlock()
// Note, we're only flushing the writes up to the parent, if it exists. We are
// not writing to the SS backend as that will happen in Commit().
if s.parent != nil {
keys := maps.Keys(s.changeset)
slices.Sort(keys)
// flush changes upstream to the parent in sorted order by key
for _, key := range keys {
kvPair := s.changeset[key]
if kvPair.Value == nil {
s.parent.Delete([]byte(key))
} else {
s.parent.Set([]byte(key), kvPair.Value)
}
}
}
}
// Iterator creates an iterator over the domain [start, end), which walks over
// both the KVStore's changeset, i.e. dirty writes, and the parent iterator,
// which can either be another KVStore or the SS backend, at the same time.
//
// Note, writes that happen on the KVStore over an iterator will not affect the
// iterator. This is because when an iterator is created, it takes a current
// snapshot of the changeset.
func (s *Store) Iterator(start, end []byte) corestore.Iterator {
s.mu.Lock()
defer s.mu.Unlock()
var parentItr corestore.Iterator
if s.parent != nil {
parentItr = s.parent.Iterator(start, end)
} else {
var err error
parentItr, err = s.storage.Iterator(s.storeKey, s.version, start, end)
if err != nil {
panic(err)
}
}
return s.newIterator(parentItr, start, end, false)
}
// ReverseIterator creates a reverse iterator over the domain [start, end), which
// walks over both the KVStore's changeset, i.e. dirty writes, and the parent
// iterator, which can either be another KVStore or the SS backend, at the same
// time.
//
// Note, writes that happen on the KVStore over an iterator will not affect the
// iterator. This is because when an iterator is created, it takes a current
// snapshot of the changeset.
func (s *Store) ReverseIterator(start, end []byte) corestore.Iterator {
s.mu.Lock()
defer s.mu.Unlock()
var parentItr corestore.Iterator
if s.parent != nil {
parentItr = s.parent.ReverseIterator(start, end)
} else {
var err error
parentItr, err = s.storage.ReverseIterator(s.storeKey, s.version, start, end)
if err != nil {
panic(err)
}
}
return s.newIterator(parentItr, start, end, true)
}
func (s *Store) newIterator(parentItr corestore.Iterator, start, end []byte, reverse bool) *iterator {
startStr := string(start)
endStr := string(end)
keys := make([]string, 0, len(s.changeset))
for key := range s.changeset {
switch {
case start != nil && end != nil:
if key >= startStr && key < endStr {
keys = append(keys, key)
}
case start != nil:
if key >= startStr {
keys = append(keys, key)
}
case end != nil:
if key < endStr {
keys = append(keys, key)
}
default:
keys = append(keys, key)
}
}
slices.Sort(keys)
if reverse {
slices.Reverse(keys)
}
values := make(store.KVPairs, len(keys))
for i, key := range keys {
values[i] = s.changeset[key]
}
itr := &iterator{
parentItr: parentItr,
start: start,
end: end,
keys: keys,
values: values,
reverse: reverse,
exhausted: !parentItr.Valid(),
}
// call Next() to move the iterator to the first key/value entry
itr.Next()
return itr
}

View File

@ -1,548 +0,0 @@
package branch_test
import (
"fmt"
"testing"
"github.com/stretchr/testify/suite"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/kv/branch"
"cosmossdk.io/store/v2/storage"
"cosmossdk.io/store/v2/storage/sqlite"
)
const storeKey = "storeKey"
type StoreTestSuite struct {
suite.Suite
storage store.VersionedDatabase
kvStore store.BranchedKVStore
}
func TestStorageTestSuite(t *testing.T) {
suite.Run(t, &StoreTestSuite{})
}
func (s *StoreTestSuite) SetupTest() {
sqliteDB, err := sqlite.New(s.T().TempDir())
ss := storage.NewStorageStore(sqliteDB)
s.Require().NoError(err)
cs := store.NewChangesetWithPairs(map[string]store.KVPairs{storeKey: {}})
for i := 0; i < 100; i++ {
key := fmt.Sprintf("key%03d", i) // key000, key001, ..., key099
val := fmt.Sprintf("val%03d", i) // val000, val001, ..., val099
cs.AddKVPair(storeKey, store.KVPair{Key: []byte(key), Value: []byte(val)})
}
s.Require().NoError(ss.ApplyChangeset(1, cs))
kvStore, err := branch.New(storeKey, ss)
s.Require().NoError(err)
s.storage = ss
s.kvStore = kvStore
}
func (s *StoreTestSuite) TestGetStoreType() {
s.Require().Equal(store.StoreTypeBranch, s.kvStore.GetStoreType())
}
func (s *StoreTestSuite) TestGetChangeset() {
// initial store with no writes should have an empty changeset
cs := s.kvStore.GetChangeset()
s.Require().Zero(cs.Size())
// perform some writes
s.kvStore.Set([]byte("key000"), []byte("updated_val000"))
s.kvStore.Delete([]byte("key001"))
cs = s.kvStore.GetChangeset()
s.Require().Equal(cs.Size(), 2)
}
func (s *StoreTestSuite) TestReset() {
s.Require().NoError(s.kvStore.Reset(1))
cs := s.kvStore.GetChangeset()
s.Require().Zero(cs.Size())
}
func (s *StoreTestSuite) TestGet() {
// perform read of key000, which is not dirty
bz := s.kvStore.Get([]byte("key000"))
s.Require().Equal([]byte("val000"), bz)
// update key000 and perform a read which should reflect the new value
s.kvStore.Set([]byte("key000"), []byte("updated_val000"))
bz = s.kvStore.Get([]byte("key000"))
s.Require().Equal([]byte("updated_val000"), bz)
// ensure the primary SS backend is not modified
bz, err := s.storage.Get(storeKey, 1, []byte("key000"))
s.Require().NoError(err)
s.Require().Equal([]byte("val000"), bz)
}
func (s *StoreTestSuite) TestHas() {
// perform read of key000, which is not dirty thus falling back to SS
ok := s.kvStore.Has([]byte("key000"))
s.Require().True(ok)
ok = s.kvStore.Has([]byte("key100"))
s.Require().False(ok)
// perform a write of a brand new key not in SS, but in the changeset
s.kvStore.Set([]byte("key100"), []byte("val100"))
ok = s.kvStore.Has([]byte("key100"))
s.Require().True(ok)
}
func (s *StoreTestSuite) TestBranch() {
// perform a few writes on the original store
s.kvStore.Set([]byte("key000"), []byte("updated_val000"))
s.kvStore.Set([]byte("key001"), []byte("updated_val001"))
// create a new branch
b := s.kvStore.Branch()
// update an existing dirty write
b.Set([]byte("key001"), []byte("branched_updated_val001"))
// perform reads on the branched store without writing first
// key000 is dirty in the original store, but not in the branched store
s.Require().Equal([]byte("updated_val000"), b.Get([]byte("key000")))
// key001 is dirty in both the original and branched store, but branched store
// should reflect the branched write.
s.Require().Equal([]byte("branched_updated_val001"), b.Get([]byte("key001")))
// key002 is not dirty in either store, so should fall back to SS
s.Require().Equal([]byte("val002"), b.Get([]byte("key002")))
// ensure the original store is not modified
s.Require().Equal([]byte("updated_val001"), s.kvStore.Get([]byte("key001")))
s.Require().Equal(1, b.GetChangeset().Size())
s.Require().Equal([]byte("key001"), b.GetChangeset().Pairs[storeKey][0].Key)
// write the branched store and ensure all writes are flushed to the parent
b.Write()
s.Require().Equal([]byte("branched_updated_val001"), s.kvStore.Get([]byte("key001")))
s.Require().Equal(2, s.kvStore.GetChangeset().Size())
}
func (s *StoreTestSuite) TestIterator_NoWrites() {
// iterator without an end domain
s.Run("start_only", func() {
itr := s.kvStore.Iterator([]byte("key000"), nil)
defer itr.Close()
var i, count int
for ; itr.Valid(); itr.Next() {
s.Require().Equal([]byte(fmt.Sprintf("key%03d", i)), itr.Key(), string(itr.Key()))
s.Require().Equal([]byte(fmt.Sprintf("val%03d", i)), itr.Value())
i++
count++
}
s.Require().Equal(100, count)
s.Require().NoError(itr.Error())
// seek past domain, which should make the iterator invalid and produce an error
s.Require().False(itr.Valid())
})
// iterator without a start domain
s.Run("end_only", func() {
itr := s.kvStore.Iterator(nil, []byte("key100"))
defer itr.Close()
var i, count int
for ; itr.Valid(); itr.Next() {
s.Require().Equal([]byte(fmt.Sprintf("key%03d", i)), itr.Key(), string(itr.Key()))
s.Require().Equal([]byte(fmt.Sprintf("val%03d", i)), itr.Value())
i++
count++
}
s.Require().Equal(100, count)
s.Require().NoError(itr.Error())
// seek past domain, which should make the iterator invalid and produce an error
s.Require().False(itr.Valid())
})
// iterator with with a start and end domain
s.Run("start_and_end", func() {
itr := s.kvStore.Iterator([]byte("key000"), []byte("key050"))
defer itr.Close()
var i, count int
for ; itr.Valid(); itr.Next() {
s.Require().Equal([]byte(fmt.Sprintf("key%03d", i)), itr.Key(), string(itr.Key()))
s.Require().Equal([]byte(fmt.Sprintf("val%03d", i)), itr.Value())
i++
count++
}
s.Require().Equal(50, count)
s.Require().NoError(itr.Error())
// seek past domain, which should make the iterator invalid and produce an error
s.Require().False(itr.Valid())
})
// iterator with an open domain
s.Run("open_domain", func() {
itr := s.kvStore.Iterator(nil, nil)
defer itr.Close()
var i, count int
for ; itr.Valid(); itr.Next() {
s.Require().Equal([]byte(fmt.Sprintf("key%03d", i)), itr.Key(), string(itr.Key()))
s.Require().Equal([]byte(fmt.Sprintf("val%03d", i)), itr.Value())
i++
count++
}
s.Require().Equal(100, count)
s.Require().NoError(itr.Error())
// seek past domain, which should make the iterator invalid and produce an error
s.Require().False(itr.Valid())
})
}
func (s *StoreTestSuite) TestIterator_DirtyWrites() {
// modify all even keys
for i := 0; i < 100; i++ {
if i%2 == 0 {
key := fmt.Sprintf("key%03d", i) // key000, key002, ...
val := fmt.Sprintf("updated_val%03d", i) // updated_val000, updated_val002, ...
s.kvStore.Set([]byte(key), []byte(val))
}
}
// add some new keys to ensure we cover those as well
for i := 100; i < 150; i++ {
key := fmt.Sprintf("key%03d", i) // key100, key101, ...
val := fmt.Sprintf("val%03d", i) // val100, val101, ...
s.kvStore.Set([]byte(key), []byte(val))
}
// iterator without an end domain
s.Run("start_only", func() {
itr := s.kvStore.Iterator([]byte("key000"), nil)
defer itr.Close()
var i, count int
for ; itr.Valid(); itr.Next() {
s.Require().Equal([]byte(fmt.Sprintf("key%03d", i)), itr.Key(), string(itr.Key()))
if i%2 == 0 && i < 100 {
s.Require().Equal([]byte(fmt.Sprintf("updated_val%03d", i)), itr.Value())
} else {
s.Require().Equal([]byte(fmt.Sprintf("val%03d", i)), itr.Value())
}
i++
count++
}
s.Require().Equal(150, count)
s.Require().NoError(itr.Error())
// seek past domain, which should make the iterator invalid and produce an error
s.Require().False(itr.Valid())
})
// iterator without a start domain
s.Run("end_only", func() {
itr := s.kvStore.Iterator(nil, []byte("key150"))
defer itr.Close()
var i, count int
for ; itr.Valid(); itr.Next() {
s.Require().Equal([]byte(fmt.Sprintf("key%03d", i)), itr.Key(), string(itr.Key()))
if i%2 == 0 && i < 100 {
s.Require().Equal([]byte(fmt.Sprintf("updated_val%03d", i)), itr.Value())
} else {
s.Require().Equal([]byte(fmt.Sprintf("val%03d", i)), itr.Value())
}
i++
count++
}
s.Require().Equal(150, count)
s.Require().NoError(itr.Error())
// seek past domain, which should make the iterator invalid and produce an error
s.Require().False(itr.Valid())
})
// iterator with with a start and end domain
s.Run("start_and_end", func() {
itr := s.kvStore.Iterator([]byte("key000"), []byte("key050"))
defer itr.Close()
var i, count int
for ; itr.Valid(); itr.Next() {
s.Require().Equal([]byte(fmt.Sprintf("key%03d", i)), itr.Key(), string(itr.Key()))
if i%2 == 0 && i < 100 {
s.Require().Equal([]byte(fmt.Sprintf("updated_val%03d", i)), itr.Value())
} else {
s.Require().Equal([]byte(fmt.Sprintf("val%03d", i)), itr.Value())
}
i++
count++
}
s.Require().Equal(50, count)
s.Require().NoError(itr.Error())
// seek past domain, which should make the iterator invalid and produce an error
s.Require().False(itr.Valid())
})
// iterator with an open domain
s.Run("open_domain", func() {
itr := s.kvStore.Iterator(nil, nil)
defer itr.Close()
var i, count int
for ; itr.Valid(); itr.Next() {
s.Require().Equal([]byte(fmt.Sprintf("key%03d", i)), itr.Key(), string(itr.Key()))
if i%2 == 0 && i < 100 {
s.Require().Equal([]byte(fmt.Sprintf("updated_val%03d", i)), itr.Value())
} else {
s.Require().Equal([]byte(fmt.Sprintf("val%03d", i)), itr.Value())
}
i++
count++
}
s.Require().Equal(150, count)
s.Require().NoError(itr.Error())
// seek past domain, which should make the iterator invalid and produce an error
s.Require().False(itr.Valid())
})
}
func (s *StoreTestSuite) TestReverseIterator_NoWrites() {
// reverse iterator without an end domain
s.Run("start_only", func() {
itr := s.kvStore.ReverseIterator([]byte("key000"), nil)
defer itr.Close()
i := 99
var count int
for ; itr.Valid(); itr.Next() {
s.Require().Equal([]byte(fmt.Sprintf("key%03d", i)), itr.Key(), string(itr.Key()))
s.Require().Equal([]byte(fmt.Sprintf("val%03d", i)), itr.Value())
i--
count++
}
s.Require().Equal(100, count)
s.Require().NoError(itr.Error())
// seek past domain, which should make the iterator invalid and produce an error
s.Require().False(itr.Valid())
})
// reverse iterator without a start domain
s.Run("end_only", func() {
itr := s.kvStore.ReverseIterator(nil, []byte("key100"))
defer itr.Close()
i := 99
var count int
for ; itr.Valid(); itr.Next() {
s.Require().Equal([]byte(fmt.Sprintf("key%03d", i)), itr.Key(), string(itr.Key()))
s.Require().Equal([]byte(fmt.Sprintf("val%03d", i)), itr.Value())
i--
count++
}
s.Require().Equal(100, count)
s.Require().NoError(itr.Error())
// seek past domain, which should make the iterator invalid and produce an error
s.Require().False(itr.Valid())
})
// reverse iterator with with a start and end domain
s.Run("start_and_end", func() {
itr := s.kvStore.ReverseIterator([]byte("key000"), []byte("key050"))
defer itr.Close()
i := 49
var count int
for ; itr.Valid(); itr.Next() {
s.Require().Equal([]byte(fmt.Sprintf("key%03d", i)), itr.Key(), string(itr.Key()))
s.Require().Equal([]byte(fmt.Sprintf("val%03d", i)), itr.Value())
i--
count++
}
s.Require().Equal(50, count)
s.Require().NoError(itr.Error())
// seek past domain, which should make the iterator invalid and produce an error
s.Require().False(itr.Valid())
})
// reverse iterator with an open domain
s.Run("open_domain", func() {
itr := s.kvStore.ReverseIterator(nil, nil)
defer itr.Close()
i := 99
var count int
for ; itr.Valid(); itr.Next() {
s.Require().Equal([]byte(fmt.Sprintf("key%03d", i)), itr.Key(), string(itr.Key()))
s.Require().Equal([]byte(fmt.Sprintf("val%03d", i)), itr.Value())
i--
count++
}
s.Require().Equal(100, count)
s.Require().NoError(itr.Error())
// seek past domain, which should make the iterator invalid and produce an error
s.Require().False(itr.Valid())
})
}
func (s *StoreTestSuite) TestReverseIterator_DirtyWrites() {
// modify all even keys
for i := 0; i < 100; i++ {
if i%2 == 0 {
key := fmt.Sprintf("key%03d", i) // key000, key002, ...
val := fmt.Sprintf("updated_val%03d", i) // updated_val000, updated_val002, ...
s.kvStore.Set([]byte(key), []byte(val))
}
}
// add some new keys to ensure we cover those as well
for i := 100; i < 150; i++ {
key := fmt.Sprintf("key%03d", i) // key100, key101, ...
val := fmt.Sprintf("val%03d", i) // val100, val101, ...
s.kvStore.Set([]byte(key), []byte(val))
}
// reverse iterator without an end domain
s.Run("start_only", func() {
itr := s.kvStore.ReverseIterator([]byte("key000"), nil)
defer itr.Close()
i := 149
var count int
for ; itr.Valid(); itr.Next() {
s.Require().Equal([]byte(fmt.Sprintf("key%03d", i)), itr.Key(), "itr_key: %s, count: %d", string(itr.Key()), count)
if i%2 == 0 && i < 100 {
s.Require().Equal([]byte(fmt.Sprintf("updated_val%03d", i)), itr.Value())
} else {
s.Require().Equal([]byte(fmt.Sprintf("val%03d", i)), itr.Value())
}
i--
count++
}
s.Require().Equal(150, count)
s.Require().NoError(itr.Error())
// seek past domain, which should make the iterator invalid and produce an error
s.Require().False(itr.Valid())
})
// iterator without a start domain
s.Run("end_only", func() {
itr := s.kvStore.ReverseIterator(nil, []byte("key150"))
defer itr.Close()
i := 149
var count int
for ; itr.Valid(); itr.Next() {
s.Require().Equal([]byte(fmt.Sprintf("key%03d", i)), itr.Key(), string(itr.Key()))
if i%2 == 0 && i < 100 {
s.Require().Equal([]byte(fmt.Sprintf("updated_val%03d", i)), itr.Value())
} else {
s.Require().Equal([]byte(fmt.Sprintf("val%03d", i)), itr.Value())
}
i--
count++
}
s.Require().Equal(150, count)
s.Require().NoError(itr.Error())
// seek past domain, which should make the iterator invalid and produce an error
s.Require().False(itr.Valid())
})
// iterator with with a start and end domain
s.Run("start_and_end", func() {
itr := s.kvStore.ReverseIterator([]byte("key000"), []byte("key050"))
defer itr.Close()
i := 49
var count int
for ; itr.Valid(); itr.Next() {
s.Require().Equal([]byte(fmt.Sprintf("key%03d", i)), itr.Key(), string(itr.Key()))
if i%2 == 0 && i < 100 {
s.Require().Equal([]byte(fmt.Sprintf("updated_val%03d", i)), itr.Value())
} else {
s.Require().Equal([]byte(fmt.Sprintf("val%03d", i)), itr.Value())
}
i--
count++
}
s.Require().Equal(50, count)
s.Require().NoError(itr.Error())
// seek past domain, which should make the iterator invalid and produce an error
s.Require().False(itr.Valid())
})
// iterator with an open domain
s.Run("open_domain", func() {
itr := s.kvStore.ReverseIterator(nil, nil)
defer itr.Close()
i := 149
var count int
for ; itr.Valid(); itr.Next() {
s.Require().Equal([]byte(fmt.Sprintf("key%03d", i)), itr.Key(), string(itr.Key()))
if i%2 == 0 && i < 100 {
s.Require().Equal([]byte(fmt.Sprintf("updated_val%03d", i)), itr.Value())
} else {
s.Require().Equal([]byte(fmt.Sprintf("val%03d", i)), itr.Value())
}
i--
count++
}
s.Require().Equal(150, count)
s.Require().NoError(itr.Error())
// seek past domain, which should make the iterator invalid and produce an error
s.Require().False(itr.Valid())
})
}

View File

@ -1,8 +0,0 @@
# Memory KVStore
The `mem.Store` implementation defines an in-memory `KVStore`, which is internally
backed by a thread-safe BTree. The `mem.Store` does not provide any branching
functionality and should be used as an ephemeral store, typically reset between
blocks. A `mem.Store` contains no reference to a parent store, but can be used
as a parent store for other stores. The `mem.Store` is can be useful for testing
purposes and where state persistence is not required or should be ephemeral.

View File

@ -1,122 +0,0 @@
package mem
import (
"bytes"
"github.com/tidwall/btree"
"golang.org/x/exp/slices"
corestore "cosmossdk.io/core/store"
"cosmossdk.io/store/v2"
)
var _ corestore.Iterator = (*iterator)(nil)
type iterator struct {
treeItr btree.IterG[store.KVPair]
start []byte
end []byte
reverse bool
valid bool
}
func newIterator(tree *btree.BTreeG[store.KVPair], start, end []byte, reverse bool) corestore.Iterator {
if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) {
panic(store.ErrKeyEmpty)
}
if start != nil && end != nil && bytes.Compare(start, end) > 0 {
panic(store.ErrStartAfterEnd)
}
iter := tree.Iter()
var valid bool
if reverse {
if end != nil {
valid = iter.Seek(store.KVPair{Key: end, Value: nil})
if !valid {
valid = iter.Last()
} else {
valid = iter.Prev() // end is exclusive
}
} else {
valid = iter.Last()
}
} else {
if start != nil {
valid = iter.Seek(store.KVPair{Key: start, Value: nil})
} else {
valid = iter.First()
}
}
itr := &iterator{
treeItr: iter,
start: start,
end: end,
reverse: reverse,
valid: valid,
}
if itr.valid {
itr.valid = itr.keyInRange(itr.Key())
}
return itr
}
// Domain returns the domain of the iterator. The caller must not modify the
// return values.
func (itr *iterator) Domain() ([]byte, []byte) {
return itr.start, itr.end
}
func (itr *iterator) Valid() bool {
return itr.valid
}
func (itr *iterator) Key() []byte {
return slices.Clone(itr.treeItr.Item().Key)
}
func (itr *iterator) Value() []byte {
return slices.Clone(itr.treeItr.Item().Value)
}
func (itr *iterator) Next() {
if !itr.valid {
return
}
if !itr.reverse {
itr.valid = itr.treeItr.Next()
} else {
itr.valid = itr.treeItr.Prev()
}
if itr.valid {
itr.valid = itr.keyInRange(itr.Key())
}
return
}
func (itr *iterator) Close() error {
itr.treeItr.Release()
return nil
}
func (itr *iterator) Error() error {
return nil
}
func (itr *iterator) keyInRange(key []byte) bool {
if !itr.reverse && itr.end != nil && bytes.Compare(key, itr.end) >= 0 {
return false
}
if itr.reverse && itr.start != nil && bytes.Compare(key, itr.start) < 0 {
return false
}
return true
}

View File

@ -1,102 +0,0 @@
package mem
import (
"bytes"
"github.com/tidwall/btree"
corestore "cosmossdk.io/core/store"
"cosmossdk.io/store/v2"
)
// degree defines the approximate number of items and children per B-tree node.
const degree = 32
var _ store.KVStore = (*Store)(nil)
// Store defines an in-memory KVStore backed by a BTree for storage, indexing,
// and iteration. Note, the store is ephemeral and does not support commitment.
// If using the store between blocks or commitments, the caller must ensure to
// either create a new store or call Reset() on the existing store.
type Store struct {
storeKey string
tree *btree.BTreeG[store.KVPair]
}
func New(storeKey string) store.KVStore {
return &Store{
storeKey: storeKey,
tree: btree.NewBTreeGOptions(
func(a, b store.KVPair) bool { return bytes.Compare(a.Key, b.Key) <= -1 },
btree.Options{
Degree: degree,
NoLocks: false,
}),
}
}
func (s *Store) GetStoreKey() string {
return s.storeKey
}
func (s *Store) GetStoreType() store.StoreType {
return store.StoreTypeMem
}
func (s *Store) Get(key []byte) []byte {
store.AssertValidKey(key)
kvPair, ok := s.tree.Get(store.KVPair{Key: key})
if !ok || kvPair.Value == nil {
return nil
}
return kvPair.Value
}
func (s *Store) Has(key []byte) bool {
store.AssertValidKey(key)
return s.Get(key) != nil
}
func (s *Store) Set(key, value []byte) {
store.AssertValidKey(key)
store.AssertValidValue(value)
s.tree.Set(store.KVPair{Key: key, Value: value})
}
func (s *Store) Delete(key []byte) {
store.AssertValidKey(key)
s.tree.Set(store.KVPair{Key: key, Value: nil})
}
func (s *Store) GetChangeset() *store.Changeset {
itr := s.Iterator(nil, nil)
defer itr.Close()
var kvPairs store.KVPairs
for ; itr.Valid(); itr.Next() {
kvPairs = append(kvPairs, store.KVPair{
Key: itr.Key(),
Value: itr.Value(),
})
}
return store.NewChangesetWithPairs(map[string]store.KVPairs{s.storeKey: kvPairs})
}
func (s *Store) Reset(_ uint64) error {
s.tree.Clear()
return nil
}
func (s *Store) Iterator(start, end []byte) corestore.Iterator {
return newIterator(s.tree, start, end, false)
}
func (s *Store) ReverseIterator(start, end []byte) corestore.Iterator {
return newIterator(s.tree, start, end, true)
}

View File

@ -1,250 +0,0 @@
package mem_test
import (
"fmt"
"testing"
"github.com/stretchr/testify/suite"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/kv/mem"
)
const storeKey = "storeKey"
type StoreTestSuite struct {
suite.Suite
kvStore store.KVStore
}
func TestStorageTestSuite(t *testing.T) {
suite.Run(t, &StoreTestSuite{})
}
func (s *StoreTestSuite) SetupTest() {
s.kvStore = mem.New(storeKey)
}
func (s *StoreTestSuite) TestGetStoreType() {
s.Require().Equal(store.StoreTypeMem, s.kvStore.GetStoreType())
}
func (s *StoreTestSuite) TestGetChangeset() {
// initial store with no writes should have an empty changeset
cs := s.kvStore.GetChangeset()
s.Require().Zero(cs.Size())
// perform some writes
s.kvStore.Set([]byte("key000"), []byte("updated_val000"))
s.kvStore.Delete([]byte("key001"))
cs = s.kvStore.GetChangeset()
s.Require().Equal(cs.Size(), 2)
}
func (s *StoreTestSuite) TestReset() {
s.Require().NoError(s.kvStore.Reset(1))
cs := s.kvStore.GetChangeset()
s.Require().Zero(cs.Size())
}
func (s *StoreTestSuite) TestCRUD() {
bz := s.kvStore.Get([]byte("key000"))
s.Require().Nil(bz)
s.Require().False(s.kvStore.Has([]byte("key000")))
s.kvStore.Set([]byte("key000"), []byte("val000"))
bz = s.kvStore.Get([]byte("key000"))
s.Require().Equal([]byte("val000"), bz)
s.Require().True(s.kvStore.Has([]byte("key000")))
s.kvStore.Delete([]byte("key000"))
bz = s.kvStore.Get([]byte("key000"))
s.Require().Nil(bz)
s.Require().False(s.kvStore.Has([]byte("key000")))
}
func (s *StoreTestSuite) TestIterator() {
for i := 0; i < 100; i++ {
key := fmt.Sprintf("key%03d", i) // key000, key001, ..., key099
val := fmt.Sprintf("val%03d", i) // val000, val001, ..., val099
s.kvStore.Set([]byte(key), []byte(val))
}
// iterator without an end domain
s.Run("start_only", func() {
itr := s.kvStore.Iterator([]byte("key000"), nil)
defer itr.Close()
var i, count int
for ; itr.Valid(); itr.Next() {
s.Require().Equal([]byte(fmt.Sprintf("key%03d", i)), itr.Key(), string(itr.Key()))
s.Require().Equal([]byte(fmt.Sprintf("val%03d", i)), itr.Value())
i++
count++
}
s.Require().Equal(100, count)
s.Require().NoError(itr.Error())
// seek past domain, which should make the iterator invalid and produce an error
s.Require().False(itr.Valid())
})
// iterator without a start domain
s.Run("end_only", func() {
itr := s.kvStore.Iterator(nil, []byte("key100"))
defer itr.Close()
var i, count int
for ; itr.Valid(); itr.Next() {
s.Require().Equal([]byte(fmt.Sprintf("key%03d", i)), itr.Key(), string(itr.Key()))
s.Require().Equal([]byte(fmt.Sprintf("val%03d", i)), itr.Value())
i++
count++
}
s.Require().Equal(100, count)
s.Require().NoError(itr.Error())
// seek past domain, which should make the iterator invalid and produce an error
s.Require().False(itr.Valid())
})
// iterator with with a start and end domain
s.Run("start_and_end", func() {
itr := s.kvStore.Iterator([]byte("key000"), []byte("key050"))
defer itr.Close()
var i, count int
for ; itr.Valid(); itr.Next() {
s.Require().Equal([]byte(fmt.Sprintf("key%03d", i)), itr.Key(), string(itr.Key()))
s.Require().Equal([]byte(fmt.Sprintf("val%03d", i)), itr.Value())
i++
count++
}
s.Require().Equal(50, count)
s.Require().NoError(itr.Error())
// seek past domain, which should make the iterator invalid and produce an error
s.Require().False(itr.Valid())
})
// iterator with an open domain
s.Run("open_domain", func() {
itr := s.kvStore.Iterator(nil, nil)
defer itr.Close()
var i, count int
for ; itr.Valid(); itr.Next() {
s.Require().Equal([]byte(fmt.Sprintf("key%03d", i)), itr.Key(), string(itr.Key()))
s.Require().Equal([]byte(fmt.Sprintf("val%03d", i)), itr.Value())
i++
count++
}
s.Require().Equal(100, count)
s.Require().NoError(itr.Error())
// seek past domain, which should make the iterator invalid and produce an error
s.Require().False(itr.Valid())
})
}
func (s *StoreTestSuite) TestReverseIterator() {
for i := 0; i < 100; i++ {
key := fmt.Sprintf("key%03d", i) // key000, key001, ..., key099
val := fmt.Sprintf("val%03d", i) // val000, val001, ..., val099
s.kvStore.Set([]byte(key), []byte(val))
}
// reverse iterator without an end domain
s.Run("start_only", func() {
itr := s.kvStore.ReverseIterator([]byte("key000"), nil)
defer itr.Close()
i := 99
var count int
for ; itr.Valid(); itr.Next() {
s.Require().Equal([]byte(fmt.Sprintf("key%03d", i)), itr.Key(), string(itr.Key()))
s.Require().Equal([]byte(fmt.Sprintf("val%03d", i)), itr.Value())
i--
count++
}
s.Require().Equal(100, count)
s.Require().NoError(itr.Error())
// seek past domain, which should make the iterator invalid and produce an error
s.Require().False(itr.Valid())
})
// reverse iterator without a start domain
s.Run("end_only", func() {
itr := s.kvStore.ReverseIterator(nil, []byte("key100"))
defer itr.Close()
i := 99
var count int
for ; itr.Valid(); itr.Next() {
s.Require().Equal([]byte(fmt.Sprintf("key%03d", i)), itr.Key(), string(itr.Key()))
s.Require().Equal([]byte(fmt.Sprintf("val%03d", i)), itr.Value())
i--
count++
}
s.Require().Equal(100, count)
s.Require().NoError(itr.Error())
// seek past domain, which should make the iterator invalid and produce an error
s.Require().False(itr.Valid())
})
// reverse iterator with with a start and end domain
s.Run("start_and_end", func() {
itr := s.kvStore.ReverseIterator([]byte("key000"), []byte("key050"))
defer itr.Close()
i := 49
var count int
for ; itr.Valid(); itr.Next() {
s.Require().Equal([]byte(fmt.Sprintf("key%03d", i)), itr.Key(), string(itr.Key()))
s.Require().Equal([]byte(fmt.Sprintf("val%03d", i)), itr.Value())
i--
count++
}
s.Require().Equal(50, count)
s.Require().NoError(itr.Error())
// seek past domain, which should make the iterator invalid and produce an error
s.Require().False(itr.Valid())
})
// reverse iterator with an open domain
s.Run("open_domain", func() {
itr := s.kvStore.ReverseIterator(nil, nil)
defer itr.Close()
i := 99
var count int
for ; itr.Valid(); itr.Next() {
s.Require().Equal([]byte(fmt.Sprintf("key%03d", i)), itr.Key(), string(itr.Key()))
s.Require().Equal([]byte(fmt.Sprintf("val%03d", i)), itr.Value())
i--
count++
}
s.Require().Equal(100, count)
s.Require().NoError(itr.Error())
// seek past domain, which should make the iterator invalid and produce an error
s.Require().False(itr.Valid())
})
}

View File

@ -1,15 +0,0 @@
# Trace KVStore
The `trace.Store` implementation defines a store which wraps a parent `KVStore`
and traces all operations performed on it. Each trace operation is written to a
provided `io.Writer` object. Specifically, a `TraceOperation` object is JSON
encoded and written to the writer. The `TraceOperation` object contains the exact
operation, e.g. a read or write, and the corresponding key and value pair.
A `trace.Store` can also be instantiated with a `store.TraceContext` which
can allow each traced operation to include additional metadata, e.g. a block height
or hash.
Note, `trace.Store` is not meant to be branched or written to. The parent `KVStore`
is responsible for all branching and writing operations, while a `trace.Store`
wraps such a store and traces all relevant operations on it.

View File

@ -1,7 +0,0 @@
/*
Package trace provides a KVStore implementation that wraps a parent KVStore
and allows all operations to be traced to an io.Writer. This can be useful to
serve use cases such as tracing and digesting all read operations for a specific
store key and key or value.
*/
package trace

View File

@ -1,58 +0,0 @@
package trace
import (
"io"
corestore "cosmossdk.io/core/store"
"cosmossdk.io/store/v2"
)
var _ corestore.Iterator = (*iterator)(nil)
type iterator struct {
parent corestore.Iterator
writer io.Writer
context store.TraceContext
}
func newIterator(w io.Writer, parent corestore.Iterator, tc store.TraceContext) corestore.Iterator {
return &iterator{
parent: parent,
writer: w,
context: tc,
}
}
func (itr *iterator) Domain() ([]byte, []byte) {
return itr.parent.Domain()
}
func (itr *iterator) Valid() bool {
return itr.parent.Valid()
}
func (itr *iterator) Next() {
itr.parent.Next()
}
func (itr *iterator) Error() error {
return itr.parent.Error()
}
func (itr *iterator) Close() error {
return itr.parent.Close()
}
func (itr *iterator) Key() []byte {
key := itr.parent.Key()
writeOperation(itr.writer, IterKeyOp, itr.context, key, nil)
return key
}
func (itr *iterator) Value() []byte {
value := itr.parent.Value()
writeOperation(itr.writer, IterValueOp, itr.context, nil, value)
return value
}

View File

@ -1,136 +0,0 @@
package trace
import (
"encoding/base64"
"encoding/json"
"fmt"
"io"
"github.com/cockroachdb/errors"
corestore "cosmossdk.io/core/store"
"cosmossdk.io/store/v2"
)
// Operation types for tracing KVStore operations.
const (
WriteOp = "write"
ReadOp = "read"
DeleteOp = "delete"
IterKeyOp = "iterKey"
IterValueOp = "iterValue"
)
var _ store.BranchedKVStore = (*Store)(nil)
type (
// Store defines a KVStore used for tracing capabilities, which typically wraps
// another KVStore implementation.
Store struct {
parent store.KVStore
context store.TraceContext
writer io.Writer
}
// TraceOperation defines a traced KVStore operation, such as a read or write
TraceOperation struct {
Operation string `json:"operation"`
Key string `json:"key"`
Value string `json:"value"`
Metadata map[string]any `json:"metadata"`
}
)
func New(p store.KVStore, w io.Writer, tc store.TraceContext) store.BranchedKVStore {
return &Store{
parent: p,
writer: w,
context: tc,
}
}
func (s *Store) GetStoreKey() string {
return s.parent.GetStoreKey()
}
func (s *Store) GetStoreType() store.StoreType {
return store.StoreTypeTrace
}
func (s *Store) GetChangeset() *store.Changeset {
return s.parent.GetChangeset()
}
func (s *Store) Get(key []byte) []byte {
value := s.parent.Get(key)
writeOperation(s.writer, ReadOp, s.context, key, value)
return value
}
func (s *Store) Has(key []byte) bool {
return s.parent.Has(key)
}
func (s *Store) Set(key, value []byte) {
writeOperation(s.writer, WriteOp, s.context, key, value)
s.parent.Set(key, value)
}
func (s *Store) Delete(key []byte) {
writeOperation(s.writer, DeleteOp, s.context, key, nil)
s.parent.Delete(key)
}
func (s *Store) Reset(toVersion uint64) error {
return s.parent.Reset(toVersion)
}
func (s *Store) Write() {
if b, ok := s.parent.(store.BranchedKVStore); ok {
b.Write()
}
}
func (s *Store) Branch() store.BranchedKVStore {
panic(fmt.Sprintf("cannot call Branch() on %T", s))
}
func (s *Store) BranchWithTrace(_ io.Writer, _ store.TraceContext) store.BranchedKVStore {
panic(fmt.Sprintf("cannot call BranchWithTrace() on %T", s))
}
func (s *Store) Iterator(start, end []byte) corestore.Iterator {
return newIterator(s.writer, s.parent.Iterator(start, end), s.context)
}
func (s *Store) ReverseIterator(start, end []byte) corestore.Iterator {
return newIterator(s.writer, s.parent.ReverseIterator(start, end), s.context)
}
// writeOperation writes a KVStore operation to the underlying io.Writer as
// JSON-encoded data where the key/value pair is base64 encoded.
func writeOperation(w io.Writer, op string, tc store.TraceContext, key, value []byte) {
traceOp := TraceOperation{
Operation: op,
Key: base64.StdEncoding.EncodeToString(key),
Value: base64.StdEncoding.EncodeToString(value),
}
if tc != nil {
traceOp.Metadata = tc
}
raw, err := json.Marshal(traceOp)
if err != nil {
panic(errors.Wrap(err, "failed to serialize trace operation"))
}
if _, err := w.Write(raw); err != nil {
panic(errors.Wrap(err, "failed to write trace operation"))
}
_, err = io.WriteString(w, "\n")
if err != nil {
panic(err)
}
}

View File

@ -1,265 +0,0 @@
package trace_test
import (
"bytes"
"fmt"
"io"
"testing"
"github.com/stretchr/testify/require"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/kv/mem"
"cosmossdk.io/store/v2/kv/trace"
)
const storeKey = "storeKey"
var kvPairs = store.KVPairs{
{Key: []byte(fmt.Sprintf("key%0.8d", 1)), Value: []byte(fmt.Sprintf("value%0.8d", 1))},
{Key: []byte(fmt.Sprintf("key%0.8d", 2)), Value: []byte(fmt.Sprintf("value%0.8d", 2))},
{Key: []byte(fmt.Sprintf("key%0.8d", 3)), Value: []byte(fmt.Sprintf("value%0.8d", 3))},
}
func newTraceKVStore(w io.Writer) store.KVStore {
store := newEmptyTraceKVStore(w)
for _, kvPair := range kvPairs {
store.Set(kvPair.Key, kvPair.Value)
}
return store
}
func newEmptyTraceKVStore(w io.Writer) store.KVStore {
memKVStore := mem.New(storeKey)
tc := store.TraceContext(map[string]any{"blockHeight": 64})
return trace.New(memKVStore, w, tc)
}
func TestTraceKVStoreGet(t *testing.T) {
testCases := []struct {
key []byte
expectedValue []byte
expectedOut string
}{
{
key: kvPairs[0].Key,
expectedValue: kvPairs[0].Value,
expectedOut: "{\"operation\":\"read\",\"key\":\"a2V5MDAwMDAwMDE=\",\"value\":\"dmFsdWUwMDAwMDAwMQ==\",\"metadata\":{\"blockHeight\":64}}\n",
},
{
key: []byte("does-not-exist"),
expectedValue: nil,
expectedOut: "{\"operation\":\"read\",\"key\":\"ZG9lcy1ub3QtZXhpc3Q=\",\"value\":\"\",\"metadata\":{\"blockHeight\":64}}\n",
},
}
for _, tc := range testCases {
var buf bytes.Buffer
store := newTraceKVStore(&buf)
buf.Reset()
value := store.Get(tc.key)
require.Equal(t, tc.expectedValue, value)
require.Equal(t, tc.expectedOut, buf.String())
}
}
func TestTraceKVStoreSet(t *testing.T) {
testCases := []struct {
key []byte
value []byte
expectedOut string
}{
{
key: kvPairs[0].Key,
value: kvPairs[0].Value,
expectedOut: "{\"operation\":\"write\",\"key\":\"a2V5MDAwMDAwMDE=\",\"value\":\"dmFsdWUwMDAwMDAwMQ==\",\"metadata\":{\"blockHeight\":64}}\n",
},
{
key: kvPairs[1].Key,
value: kvPairs[1].Value,
expectedOut: "{\"operation\":\"write\",\"key\":\"a2V5MDAwMDAwMDI=\",\"value\":\"dmFsdWUwMDAwMDAwMg==\",\"metadata\":{\"blockHeight\":64}}\n",
},
{
key: kvPairs[2].Key,
value: kvPairs[2].Value,
expectedOut: "{\"operation\":\"write\",\"key\":\"a2V5MDAwMDAwMDM=\",\"value\":\"dmFsdWUwMDAwMDAwMw==\",\"metadata\":{\"blockHeight\":64}}\n",
},
}
for _, tc := range testCases {
var buf bytes.Buffer
store := newEmptyTraceKVStore(&buf)
buf.Reset()
store.Set(tc.key, tc.value)
require.Equal(t, tc.expectedOut, buf.String())
}
var buf bytes.Buffer
store := newEmptyTraceKVStore(&buf)
require.Panics(t, func() { store.Set([]byte(""), []byte("value")) }, "setting an empty key should panic")
require.Panics(t, func() { store.Set(nil, []byte("value")) }, "setting a nil key should panic")
}
func TestTraceKVStoreDelete(t *testing.T) {
testCases := []struct {
key []byte
expectedOut string
}{
{
key: kvPairs[0].Key,
expectedOut: "{\"operation\":\"delete\",\"key\":\"a2V5MDAwMDAwMDE=\",\"value\":\"\",\"metadata\":{\"blockHeight\":64}}\n",
},
}
for _, tc := range testCases {
var buf bytes.Buffer
store := newTraceKVStore(&buf)
buf.Reset()
store.Delete(tc.key)
require.Equal(t, tc.expectedOut, buf.String())
}
}
func TestTraceKVStoreHas(t *testing.T) {
testCases := []struct {
key []byte
expected bool
}{
{
key: kvPairs[0].Key,
expected: true,
},
}
for _, tc := range testCases {
var buf bytes.Buffer
store := newTraceKVStore(&buf)
buf.Reset()
ok := store.Has(tc.key)
require.Equal(t, tc.expected, ok)
}
}
func TestTestTraceKVStoreIterator(t *testing.T) {
var buf bytes.Buffer
store := newTraceKVStore(&buf)
iterator := store.Iterator(nil, nil)
s, e := iterator.Domain()
require.Equal(t, []byte(nil), s)
require.Equal(t, []byte(nil), e)
testCases := []struct {
expectedKey []byte
expectedValue []byte
expectedKeyOut string
expectedvalueOut string
}{
{
expectedKey: kvPairs[0].Key,
expectedValue: kvPairs[0].Value,
expectedKeyOut: "{\"operation\":\"iterKey\",\"key\":\"a2V5MDAwMDAwMDE=\",\"value\":\"\",\"metadata\":{\"blockHeight\":64}}\n",
expectedvalueOut: "{\"operation\":\"iterValue\",\"key\":\"\",\"value\":\"dmFsdWUwMDAwMDAwMQ==\",\"metadata\":{\"blockHeight\":64}}\n",
},
{
expectedKey: kvPairs[1].Key,
expectedValue: kvPairs[1].Value,
expectedKeyOut: "{\"operation\":\"iterKey\",\"key\":\"a2V5MDAwMDAwMDI=\",\"value\":\"\",\"metadata\":{\"blockHeight\":64}}\n",
expectedvalueOut: "{\"operation\":\"iterValue\",\"key\":\"\",\"value\":\"dmFsdWUwMDAwMDAwMg==\",\"metadata\":{\"blockHeight\":64}}\n",
},
{
expectedKey: kvPairs[2].Key,
expectedValue: kvPairs[2].Value,
expectedKeyOut: "{\"operation\":\"iterKey\",\"key\":\"a2V5MDAwMDAwMDM=\",\"value\":\"\",\"metadata\":{\"blockHeight\":64}}\n",
expectedvalueOut: "{\"operation\":\"iterValue\",\"key\":\"\",\"value\":\"dmFsdWUwMDAwMDAwMw==\",\"metadata\":{\"blockHeight\":64}}\n",
},
}
for _, tc := range testCases {
buf.Reset()
ka := iterator.Key()
require.Equal(t, tc.expectedKeyOut, buf.String())
buf.Reset()
va := iterator.Value()
require.Equal(t, tc.expectedvalueOut, buf.String())
require.Equal(t, tc.expectedKey, ka)
require.Equal(t, tc.expectedValue, va)
iterator.Next()
}
require.False(t, iterator.Valid())
}
func TestTestTraceKVStoreReverseIterator(t *testing.T) {
var buf bytes.Buffer
store := newTraceKVStore(&buf)
iterator := store.ReverseIterator(nil, nil)
s, e := iterator.Domain()
require.Equal(t, []byte(nil), s)
require.Equal(t, []byte(nil), e)
testCases := []struct {
expectedKey []byte
expectedValue []byte
expectedKeyOut string
expectedvalueOut string
}{
{
expectedKey: kvPairs[2].Key,
expectedValue: kvPairs[2].Value,
expectedKeyOut: "{\"operation\":\"iterKey\",\"key\":\"a2V5MDAwMDAwMDM=\",\"value\":\"\",\"metadata\":{\"blockHeight\":64}}\n",
expectedvalueOut: "{\"operation\":\"iterValue\",\"key\":\"\",\"value\":\"dmFsdWUwMDAwMDAwMw==\",\"metadata\":{\"blockHeight\":64}}\n",
},
{
expectedKey: kvPairs[1].Key,
expectedValue: kvPairs[1].Value,
expectedKeyOut: "{\"operation\":\"iterKey\",\"key\":\"a2V5MDAwMDAwMDI=\",\"value\":\"\",\"metadata\":{\"blockHeight\":64}}\n",
expectedvalueOut: "{\"operation\":\"iterValue\",\"key\":\"\",\"value\":\"dmFsdWUwMDAwMDAwMg==\",\"metadata\":{\"blockHeight\":64}}\n",
},
{
expectedKey: kvPairs[0].Key,
expectedValue: kvPairs[0].Value,
expectedKeyOut: "{\"operation\":\"iterKey\",\"key\":\"a2V5MDAwMDAwMDE=\",\"value\":\"\",\"metadata\":{\"blockHeight\":64}}\n",
expectedvalueOut: "{\"operation\":\"iterValue\",\"key\":\"\",\"value\":\"dmFsdWUwMDAwMDAwMQ==\",\"metadata\":{\"blockHeight\":64}}\n",
},
}
for _, tc := range testCases {
buf.Reset()
ka := iterator.Key()
require.Equal(t, tc.expectedKeyOut, buf.String())
buf.Reset()
va := iterator.Value()
require.Equal(t, tc.expectedvalueOut, buf.String())
require.Equal(t, tc.expectedKey, ka)
require.Equal(t, tc.expectedValue, va)
iterator.Next()
}
require.False(t, iterator.Valid())
}
func TestTraceKVStoreGetStoreType(t *testing.T) {
traceKVStore := newEmptyTraceKVStore(nil)
require.Equal(t, store.StoreTypeTrace, traceKVStore.GetStoreType())
}

View File

@ -0,0 +1,49 @@
package root
import (
corestore "cosmossdk.io/core/store"
"cosmossdk.io/store/v2"
)
var _ store.ReadOnlyRootStore = (*ReadOnlyAdapter)(nil)
// ReadOnlyAdapter defines an adapter around a RootStore that only exposes read-only
// operations. This is useful for exposing a read-only view of the RootStore at
// a specific version in history, which could also be the latest state.
type ReadOnlyAdapter struct {
rootStore store.RootStore
version uint64
}
func NewReadOnlyAdapter(v uint64, rs store.RootStore) *ReadOnlyAdapter {
return &ReadOnlyAdapter{
rootStore: rs,
version: v,
}
}
func (roa *ReadOnlyAdapter) Has(storeKey string, key []byte) (bool, error) {
val, err := roa.Get(storeKey, key)
if err != nil {
return false, err
}
return val != nil, nil
}
func (roa *ReadOnlyAdapter) Get(storeKey string, key []byte) ([]byte, error) {
result, err := roa.rootStore.Query(storeKey, roa.version, key, false)
if err != nil {
return nil, err
}
return result.Value, nil
}
func (roa *ReadOnlyAdapter) Iterator(storeKey string, start, end []byte) (corestore.Iterator, error) {
return roa.rootStore.GetStateStorage().Iterator(storeKey, roa.version, start, end)
}
func (roa *ReadOnlyAdapter) ReverseIterator(storeKey string, start, end []byte) (corestore.Iterator, error) {
return roa.rootStore.GetStateStorage().ReverseIterator(storeKey, roa.version, start, end)
}

View File

@ -3,7 +3,6 @@ package root
import (
"bytes"
"fmt"
"io"
"slices"
"time"
@ -12,8 +11,6 @@ import (
coreheader "cosmossdk.io/core/header"
"cosmossdk.io/log"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/kv/branch"
"cosmossdk.io/store/v2/kv/trace"
"cosmossdk.io/store/v2/metrics"
"cosmossdk.io/store/v2/pruning"
)
@ -21,9 +18,9 @@ import (
var _ store.RootStore = (*Store)(nil)
// Store defines the SDK's default RootStore implementation. It contains a single
// State Storage (SS) backend and a single State Commitment (SC) backend. Note,
// this means all store keys are ignored and commitments exist in a single commitment
// tree.
// State Storage (SS) backend and a single State Commitment (SC) backend. The SC
// backend may or may not support multiple store keys and is implementation
// dependent.
type Store struct {
logger log.Logger
initialVersion uint64
@ -34,11 +31,6 @@ type Store struct {
// stateCommitment reflects the state commitment (SC) backend
stateCommitment store.Committer
// kvStores reflects a mapping of store keys, typically dedicated to modules,
// to a dedicated BranchedKVStore. Each store is used to accumulate writes
// and branch off of.
kvStores map[string]store.BranchedKVStore
// commitHeader reflects the header used when committing state (note, this isn't required and only used for query purposes)
commitHeader *coreheader.Info
@ -48,12 +40,6 @@ type Store struct {
// workingHash defines the current (yet to be committed) hash
workingHash []byte
// traceWriter defines a writer for store tracing operation
traceWriter io.Writer
// traceContext defines the tracing context, if any, for trace operations
traceContext store.TraceContext
// pruningManager manages pruning of the SS and SC backends
pruningManager *pruning.Manager
@ -65,20 +51,9 @@ func New(
logger log.Logger,
ss store.VersionedDatabase,
sc store.Committer,
storeKeys []string,
ssOpts, scOpts pruning.Options,
m metrics.StoreMetrics,
) (store.RootStore, error) {
kvStores := make(map[string]store.BranchedKVStore, len(storeKeys))
for _, storeKey := range storeKeys {
bkv, err := branch.New(storeKey, ss)
if err != nil {
return nil, err
}
kvStores[storeKey] = bkv
}
pruningManager := pruning.NewManager(logger, ss, sc)
pruningManager.SetStorageOptions(ssOpts)
pruningManager.SetCommitmentOptions(scOpts)
@ -89,7 +64,6 @@ func New(
initialVersion: 1,
stateStore: ss,
stateCommitment: sc,
kvStores: kvStores,
pruningManager: pruningManager,
telemetry: m,
}, nil
@ -121,8 +95,32 @@ func (s *Store) SetInitialVersion(v uint64) error {
return s.stateCommitment.SetInitialVersion(v)
}
// GetSCStore returns the store's state commitment (SC) backend.
func (s *Store) GetSCStore() store.Committer {
func (s *Store) StateLatest() (uint64, store.ReadOnlyRootStore, error) {
v, err := s.GetLatestVersion()
if err != nil {
return 0, nil, err
}
return v, NewReadOnlyAdapter(v, s), nil
}
func (s *Store) StateAt(v uint64) (store.ReadOnlyRootStore, error) {
// TODO(bez): Ensure the version <v> exists. We can utilize the GetCommitInfo()
// SC method once available.
//
// Ref: https://github.com/cosmos/cosmos-sdk/pull/18736
// if err := s.stateCommitment.GetCommitInfo(v); err != nil {
// return nil, fmt.Errorf("failed to get commit info for version %d: %w", v, err)
// }
return NewReadOnlyAdapter(v, s), nil
}
func (s *Store) GetStateStorage() store.VersionedDatabase {
return s.stateStore
}
func (s *Store) GetStateCommitment() store.Committer {
return s.stateCommitment
}
@ -198,30 +196,6 @@ func (s *Store) Query(storeKey string, version uint64, key []byte, prove bool) (
return result, nil
}
// GetKVStore returns a KVStore for the given store key. Any writes to this store
// without branching will be committed to SC and SS upon Commit(). Branching will create
// a branched KVStore that allow writes to be discarded and propagated to the
// root KVStore using Write().
func (s *Store) GetKVStore(storeKey string) store.KVStore {
bkv, ok := s.kvStores[storeKey]
if !ok {
panic(fmt.Sprintf("unknown store key: %s", storeKey))
}
if s.TracingEnabled() {
return trace.New(bkv, s.traceWriter, s.traceContext)
}
return bkv
}
func (s *Store) GetBranchedKVStore(storeKey string) store.BranchedKVStore {
// Branching will soon be removed.
//
// Ref: https://github.com/cosmos/cosmos-sdk/issues/18981
panic("TODO: WILL BE REMOVED!")
}
func (s *Store) LoadLatestVersion() error {
if s.telemetry != nil {
now := time.Now()
@ -248,14 +222,6 @@ func (s *Store) LoadVersion(version uint64) error {
func (s *Store) loadVersion(v uint64) error {
s.logger.Debug("loading version", "version", v)
// Reset each KVStore s.t. the latest version is v. Any writes will overwrite
// existing versions.
for storeKey, kvStore := range s.kvStores {
if err := kvStore.Reset(v); err != nil {
return fmt.Errorf("failed to reset %s KVStore: %w", storeKey, err)
}
}
if err := s.stateCommitment.LoadVersion(v); err != nil {
return fmt.Errorf("failed to load SS version %d: %w", v, err)
}
@ -269,29 +235,10 @@ func (s *Store) loadVersion(v uint64) error {
return nil
}
func (s *Store) SetTracingContext(tc store.TraceContext) {
s.traceContext = tc
}
func (s *Store) SetTracer(w io.Writer) {
s.traceWriter = w
}
func (s *Store) TracingEnabled() bool {
return s.traceWriter != nil
}
func (s *Store) SetCommitHeader(h *coreheader.Info) {
s.commitHeader = h
}
func (s *Store) Branch() store.BranchedRootStore {
// Branching will soon be removed.
//
// Ref: https://github.com/cosmos/cosmos-sdk/issues/18981
panic("TODO: WILL BE REMOVED!")
}
// WorkingHash returns the working hash of the root store. Note, WorkingHash()
// should only be called once per block once all writes are complete and prior
// to Commit() being called.
@ -299,14 +246,14 @@ func (s *Store) Branch() store.BranchedRootStore {
// If working hash is nil, then we need to compute and set it on the root store
// by constructing a CommitInfo object, which in turn creates and writes a batch
// of the current changeset to the SC tree.
func (s *Store) WorkingHash() ([]byte, error) {
func (s *Store) WorkingHash(cs *store.Changeset) ([]byte, error) {
if s.telemetry != nil {
now := time.Now()
s.telemetry.MeasureSince(now, "root_store", "working_hash")
}
if s.workingHash == nil {
if err := s.writeSC(); err != nil {
if err := s.writeSC(cs); err != nil {
return nil, err
}
@ -316,21 +263,12 @@ func (s *Store) WorkingHash() ([]byte, error) {
return slices.Clone(s.workingHash), nil
}
func (s *Store) Write() {
for _, kvStore := range s.kvStores {
kvStore.Write()
}
}
// Commit commits all state changes to the underlying SS and SC backends. Note,
// at the time of Commit(), we expect WorkingHash() to have already been called,
// which internally sets the working hash, retrieved by writing a batch of the
// changeset to the SC tree, and CommitInfo on the root store. The changeset is
// retrieved from the rootKVStore and represents the entire set of writes to be
// committed. The same changeset is used to flush writes to the SS backend.
//
// Note, Commit() commits SC and SC synchronously.
func (s *Store) Commit() ([]byte, error) {
// at the time of Commit(), we expect WorkingHash() to have already been called
// with the same Changeset, which internally sets the working hash, retrieved by
// writing a batch of the changeset to the SC tree, and CommitInfo on the root
// store.
func (s *Store) Commit(cs *store.Changeset) ([]byte, error) {
if s.telemetry != nil {
now := time.Now()
s.telemetry.MeasureSince(now, "root_store", "commit")
@ -346,18 +284,13 @@ func (s *Store) Commit() ([]byte, error) {
s.logger.Debug("commit header and version mismatch", "header_height", s.commitHeader.Height, "version", version)
}
changeset := store.NewChangeset()
for _, kvStore := range s.kvStores {
changeset.Merge(kvStore.GetChangeset())
}
// commit SS
if err := s.stateStore.ApplyChangeset(version, changeset); err != nil {
if err := s.stateStore.ApplyChangeset(version, cs); err != nil {
return nil, fmt.Errorf("failed to commit SS: %w", err)
}
// commit SC
if err := s.commitSC(); err != nil {
if err := s.commitSC(cs); err != nil {
return nil, fmt.Errorf("failed to commit SC stores: %w", err)
}
@ -365,12 +298,6 @@ func (s *Store) Commit() ([]byte, error) {
s.lastCommitInfo.Timestamp = s.commitHeader.Time
}
for storeKey, kvStore := range s.kvStores {
if err := kvStore.Reset(version); err != nil {
return nil, fmt.Errorf("failed to reset %s KVStore: %w", storeKey, err)
}
}
s.workingHash = nil
// prune SS and SC
@ -379,17 +306,12 @@ func (s *Store) Commit() ([]byte, error) {
return s.lastCommitInfo.Hash(), nil
}
// writeSC gets the current changeset from the rootKVStore and writes that as a
// batch to the underlying SC tree, which allows us to retrieve the working hash
// of the SC tree. Finally, we construct a *CommitInfo and return the hash.
// Note, this should only be called once per block!
func (s *Store) writeSC() error {
changeset := store.NewChangeset()
for _, kvStore := range s.kvStores {
changeset.Merge(kvStore.GetChangeset())
}
if err := s.stateCommitment.WriteBatch(changeset); err != nil {
// writeSC accepts a Changeset and writes that as a batch to the underlying SC
// tree, which allows us to retrieve the working hash of the SC tree. Finally,
// we construct a *CommitInfo and set that as lastCommitInfo. Note, this should
// only be called once per block!
func (s *Store) writeSC(cs *store.Changeset) error {
if err := s.stateCommitment.WriteBatch(cs); err != nil {
return fmt.Errorf("failed to write batch to SC store: %w", err)
}
@ -421,7 +343,7 @@ func (s *Store) writeSC() error {
// should have already been written to the SC via WorkingHash(). This method
// solely commits that batch. An error is returned if commit fails or if the
// resulting commit hash is not equivalent to the working hash.
func (s *Store) commitSC() error {
func (s *Store) commitSC(cs *store.Changeset) error {
commitStoreInfos, err := s.stateCommitment.Commit()
if err != nil {
return fmt.Errorf("failed to commit SC store: %w", err)
@ -432,7 +354,7 @@ func (s *Store) commitSC() error {
StoreInfos: commitStoreInfos,
}).Hash()
workingHash, err := s.WorkingHash()
workingHash, err := s.WorkingHash(cs)
if err != nil {
return fmt.Errorf("failed to get working hash: %w", err)
}

View File

@ -2,12 +2,12 @@ package root
import (
"fmt"
"io"
"testing"
dbm "github.com/cosmos/cosmos-db"
"github.com/stretchr/testify/suite"
coreheader "cosmossdk.io/core/header"
"cosmossdk.io/log"
"cosmossdk.io/store/v2"
"cosmossdk.io/store/v2/commitment"
@ -18,7 +18,7 @@ import (
)
const (
testStoreKey = "test"
testStoreKey = "test_store_key"
)
type RootStoreTestSuite struct {
@ -42,14 +42,9 @@ func (s *RootStoreTestSuite) SetupTest() {
sc, err := commitment.NewCommitStore(map[string]commitment.Tree{testStoreKey: tree}, noopLog)
s.Require().NoError(err)
rs, err := New(noopLog, ss, sc, []string{testStoreKey}, pruning.DefaultOptions(), pruning.DefaultOptions(), nil)
rs, err := New(noopLog, ss, sc, pruning.DefaultOptions(), pruning.DefaultOptions(), nil)
s.Require().NoError(err)
rs.SetTracer(io.Discard)
rs.SetTracingContext(store.TraceContext{
"test": s.T().Name(),
})
s.rootStore = rs
}
@ -58,13 +53,27 @@ func (s *RootStoreTestSuite) TearDownTest() {
s.Require().NoError(err)
}
func (s *RootStoreTestSuite) TestGetSCStore() {
s.Require().Equal(s.rootStore.GetSCStore(), s.rootStore.(*Store).stateCommitment)
func (s *RootStoreTestSuite) TestGetStateCommitment() {
s.Require().Equal(s.rootStore.GetStateCommitment(), s.rootStore.(*Store).stateCommitment)
}
func (s *RootStoreTestSuite) TestGetKVStore() {
kvs := s.rootStore.GetKVStore(testStoreKey)
s.Require().NotNil(kvs)
func (s *RootStoreTestSuite) TestGetStateStorage() {
s.Require().Equal(s.rootStore.GetStateStorage(), s.rootStore.(*Store).stateStore)
}
func (s *RootStoreTestSuite) TestSetInitialVersion() {
s.Require().NoError(s.rootStore.SetInitialVersion(100))
}
func (s *RootStoreTestSuite) TestSetCommitHeader() {
h := &coreheader.Info{
Height: 100,
Hash: []byte("foo"),
ChainID: "test",
}
s.rootStore.SetCommitHeader(h)
s.Require().Equal(h, s.rootStore.(*Store).commitHeader)
}
func (s *RootStoreTestSuite) TestQuery() {
@ -72,14 +81,14 @@ func (s *RootStoreTestSuite) TestQuery() {
s.Require().Error(err)
// write and commit a changeset
bs := s.rootStore.GetKVStore(testStoreKey)
bs.Set([]byte("foo"), []byte("bar"))
cs := store.NewChangeset()
cs.Add(testStoreKey, []byte("foo"), []byte("bar"))
workingHash, err := s.rootStore.WorkingHash()
workingHash, err := s.rootStore.WorkingHash(cs)
s.Require().NoError(err)
s.Require().NotNil(workingHash)
commitHash, err := s.rootStore.Commit()
commitHash, err := s.rootStore.Commit(cs)
s.Require().NoError(err)
s.Require().NotNil(commitHash)
s.Require().Equal(workingHash, commitHash)
@ -95,15 +104,16 @@ func (s *RootStoreTestSuite) TestQuery() {
func (s *RootStoreTestSuite) TestLoadVersion() {
// write and commit a few changesets
for v := 1; v <= 5; v++ {
bs := s.rootStore.GetKVStore(testStoreKey)
val := fmt.Sprintf("val%03d", v) // val001, val002, ..., val005
bs.Set([]byte("key"), []byte(val))
workingHash, err := s.rootStore.WorkingHash()
cs := store.NewChangeset()
cs.Add(testStoreKey, []byte("key"), []byte(val))
workingHash, err := s.rootStore.WorkingHash(cs)
s.Require().NoError(err)
s.Require().NotNil(workingHash)
commitHash, err := s.rootStore.Commit()
commitHash, err := s.rootStore.Commit(cs)
s.Require().NoError(err)
s.Require().NotNil(commitHash)
s.Require().Equal(workingHash, commitHash)
@ -128,21 +138,25 @@ func (s *RootStoreTestSuite) TestLoadVersion() {
s.Require().Equal(uint64(3), latest)
// query state and ensure values returned are based on the loaded version
kvStore := s.rootStore.GetKVStore(testStoreKey)
val := kvStore.Get([]byte("key"))
_, ro, err := s.rootStore.StateLatest()
s.Require().NoError(err)
val, err := ro.Get(testStoreKey, []byte("key"))
s.Require().NoError(err)
s.Require().Equal([]byte("val003"), val)
// attempt to write and commit a few changesets
for v := 4; v <= 5; v++ {
bs := s.rootStore.GetKVStore(testStoreKey)
val := fmt.Sprintf("overwritten_val%03d", v) // overwritten_val004, overwritten_val005
bs.Set([]byte("key"), []byte(val))
workingHash, err := s.rootStore.WorkingHash()
cs := store.NewChangeset()
cs.Add(testStoreKey, []byte("key"), []byte(val))
workingHash, err := s.rootStore.WorkingHash(cs)
s.Require().NoError(err)
s.Require().NotNil(workingHash)
commitHash, err := s.rootStore.Commit()
commitHash, err := s.rootStore.Commit(cs)
s.Require().NoError(err)
s.Require().NotNil(commitHash)
s.Require().Equal(workingHash, commitHash)
@ -154,8 +168,11 @@ func (s *RootStoreTestSuite) TestLoadVersion() {
s.Require().Equal(uint64(5), latest)
// query state and ensure values returned are based on the loaded version
kvStore = s.rootStore.GetKVStore(testStoreKey)
val = kvStore.Get([]byte("key"))
_, ro, err = s.rootStore.StateLatest()
s.Require().NoError(err)
val, err = ro.Get(testStoreKey, []byte("key"))
s.Require().NoError(err)
s.Require().Equal([]byte("overwritten_val005"), val)
}
@ -165,23 +182,23 @@ func (s *RootStoreTestSuite) TestCommit() {
s.Require().Zero(lv)
// perform changes
bs2 := s.rootStore.GetKVStore(testStoreKey)
cs := store.NewChangeset()
for i := 0; i < 100; i++ {
key := fmt.Sprintf("key%03d", i) // key000, key001, ..., key099
val := fmt.Sprintf("val%03d", i) // val000, val001, ..., val099
bs2.Set([]byte(key), []byte(val))
cs.Add(testStoreKey, []byte(key), []byte(val))
}
// committing w/o calling WorkingHash should error
_, err = s.rootStore.Commit()
_, err = s.rootStore.Commit(cs)
s.Require().Error(err)
// execute WorkingHash and Commit
wHash, err := s.rootStore.WorkingHash()
wHash, err := s.rootStore.WorkingHash(cs)
s.Require().NoError(err)
cHash, err := s.rootStore.Commit()
cHash, err := s.rootStore.Commit(cs)
s.Require().NoError(err)
s.Require().Equal(wHash, cHash)
@ -190,15 +207,58 @@ func (s *RootStoreTestSuite) TestCommit() {
s.Require().NoError(err)
s.Require().Equal(uint64(1), lv)
// ensure the root KVStore is cleared
s.Require().Empty(s.rootStore.(*Store).kvStores[testStoreKey].GetChangeset().Size())
// perform reads on the updated root store
bs := s.rootStore.GetKVStore(testStoreKey)
_, ro, err := s.rootStore.StateLatest()
s.Require().NoError(err)
for i := 0; i < 100; i++ {
key := fmt.Sprintf("key%03d", i) // key000, key001, ..., key099
val := fmt.Sprintf("val%03d", i) // val000, val001, ..., val099
s.Require().Equal([]byte(val), bs.Get([]byte(key)))
result, err := ro.Get(testStoreKey, []byte(key))
s.Require().NoError(err)
s.Require().Equal([]byte(val), result)
}
}
func (s *RootStoreTestSuite) TestStateAt() {
// write keys over multiple versions
for v := uint64(1); v <= 5; v++ {
// perform changes
cs := store.NewChangeset()
for i := 0; i < 100; i++ {
key := fmt.Sprintf("key%03d", i) // key000, key001, ..., key099
val := fmt.Sprintf("val%03d_%03d", i, v) // val000_1, val001_1, ..., val099_1
cs.Add(testStoreKey, []byte(key), []byte(val))
}
// execute WorkingHash and Commit
wHash, err := s.rootStore.WorkingHash(cs)
s.Require().NoError(err)
cHash, err := s.rootStore.Commit(cs)
s.Require().NoError(err)
s.Require().Equal(wHash, cHash)
}
lv, err := s.rootStore.GetLatestVersion()
s.Require().NoError(err)
s.Require().Equal(uint64(5), lv)
// ensure we can read state correctly at each version
for v := uint64(1); v <= 5; v++ {
ro, err := s.rootStore.StateAt(v)
s.Require().NoError(err)
for i := 0; i < 100; i++ {
key := fmt.Sprintf("key%03d", i) // key000, key001, ..., key099
val := fmt.Sprintf("val%03d_%03d", i, v) // val000_1, val001_1, ..., val099_1
result, err := ro.Get(testStoreKey, []byte(key))
s.Require().NoError(err)
s.Require().Equal([]byte(val), result)
}
}
}

View File

@ -8,48 +8,31 @@ import (
"cosmossdk.io/store/v2/metrics"
)
// StoreType defines a type of KVStore.
type StoreType int
// Sentinel store types.
const (
StoreTypeBranch StoreType = iota
StoreTypeTrace
StoreTypeMem
)
// RootStore defines an abstraction layer containing a State Storage (SS) engine
// and one or more State Commitment (SC) engines.
type RootStore interface {
// GetSCStore should return the SC backend.
GetSCStore() Committer
// GetKVStore returns the KVStore for the given store key. If an implementation
// chooses to have a single SS backend, the store key may be ignored.
GetKVStore(storeKey string) KVStore
// GetBranchedKVStore returns the KVStore for the given store key. If an
// implementation chooses to have a single SS backend, the store key may be
// ignored.
GetBranchedKVStore(storeKey string) BranchedKVStore
// StateLatest returns a read-only version of the RootStore at the latest
// height, alongside the associated version.
StateLatest() (uint64, ReadOnlyRootStore, error)
// StateAt is analogous to StateLatest() except it returns a read-only version
// of the RootStore at the provided version. If such a version cannot be found,
// an error must be returned.
StateAt(version uint64) (ReadOnlyRootStore, error)
// GetStateStorage returns the SS backend.
GetStateStorage() VersionedDatabase
// GetStateCommitment returns the SC backend.
GetStateCommitment() Committer
// Query performs a query on the RootStore for a given store key, version (height),
// and key tuple. Queries should be routed to the underlying SS engine.
Query(storeKey string, version uint64, key []byte, prove bool) (QueryResult, error)
// Branch should branch the entire RootStore, i.e. a copy of the original RootStore
// except with all internal KV store(s) branched.
Branch() BranchedRootStore
// SetTracingContext sets the tracing context, i.e tracing metadata, on the
// RootStore.
SetTracingContext(tc TraceContext)
// SetTracer sets the tracer on the RootStore, such that any calls to GetKVStore
// or GetBranchedKVStore, will have tracing enabled.
SetTracer(w io.Writer)
// TracingEnabled returns true if tracing is enabled on the RootStore.
TracingEnabled() bool
// LoadVersion loads the RootStore to the given version.
LoadVersion(version uint64) error
// LoadLatestVersion behaves identically to LoadVersion except it loads the
// latest version implicitly.
LoadLatestVersion() error
@ -65,19 +48,21 @@ type RootStore interface {
// queries based on block time need to be supported.
SetCommitHeader(h *coreheader.Info)
// WorkingHash returns the current WIP commitment hash. Depending on the underlying
// implementation, this may need to take the current changeset and write it to
// the SC backend(s). In such cases, Commit() would return this hash and flush
// writes to disk. This means that WorkingHash mutates the RootStore and must
// be called prior to Commit().
WorkingHash() ([]byte, error)
// Commit should be responsible for taking the current changeset and flushing
// WorkingHash returns the current WIP commitment hash by applying the Changeset
// to the SC backend. Typically, WorkingHash() is called prior to Commit() and
// must be applied with the exact same Changeset. This is because WorkingHash()
// is responsible for writing the Changeset to the SC backend and returning the
// resulting root hash. Then, Commit() would return this hash and flush writes
// to disk.
WorkingHash(cs *Changeset) ([]byte, error)
// Commit should be responsible for taking the provided changeset and flushing
// it to disk. Note, depending on the implementation, the changeset, at this
// point, may already be written to the SC backends. Commit() should ensure
// the changeset is committed to all SC and SC backends and flushed to disk.
// It must return a hash of the merkle-ized committed state. This hash should
// be the same as the hash returned by WorkingHash() prior to calling Commit().
Commit() ([]byte, error)
Commit(cs *Changeset) ([]byte, error)
// LastCommitID returns a CommitID pertaining to the last commitment.
LastCommitID() (CommitID, error)
@ -102,75 +87,19 @@ type UpgradeableRootStore interface {
LoadVersionAndUpgrade(version uint64, upgrades *StoreUpgrades) error
}
// BranchedRootStore defines an extension of the RootStore interface that allows
// for nested branching and flushing of writes. It extends RootStore by allowing
// a caller to call Branch() which should return a BranchedRootStore that has all
// internal relevant KV stores branched. A caller can then call Write() on the
// BranchedRootStore which will flush all changesets to the parent RootStore's
// internal KV stores.
type BranchedRootStore interface {
RootStore
// ReadOnlyRootStore defines a read-only interface for a RootStore.
type ReadOnlyRootStore interface {
// Has returns if a key exists in the read-only RootStore.
Has(storeKey string, key []byte) (bool, error)
Write()
}
// Get returns the value of a key, if it exists, in the read-only RootStore.
Get(storeKey string, key []byte) ([]byte, error)
// KVStore defines the core storage primitive for modules to read and write state.
type KVStore interface {
GetStoreKey() string
// Iterator returns an iterator over a given store key and domain.
Iterator(storeKey string, start, end []byte) (corestore.Iterator, error)
// GetStoreType returns the concrete store type.
GetStoreType() StoreType
// Get returns a value for a given key from the store.
Get(key []byte) []byte
// Has checks if a key exists.
Has(key []byte) bool
// Set sets a key/value entry to the store.
Set(key, value []byte)
// Delete deletes the key from the store.
Delete(key []byte)
// GetChangeset returns the ChangeSet, if any, for the branched state. This
// should contain all writes that are marked to be flushed and committed during
// Commit().
GetChangeset() *Changeset
// Reset resets the store, which is implementation dependent.
Reset(toVersion uint64) error
// Iterator creates a new Iterator over the domain [start, end). Note:
//
// - Start must be less than end
// - The iterator must be closed by caller
// - To iterate over entire domain, use store.Iterator(nil, nil)
//
// CONTRACT: No writes may happen within a domain while an iterator exists over
// it, with the exception of a branched/cached KVStore.
Iterator(start, end []byte) corestore.Iterator
// ReverseIterator creates a new reverse Iterator over the domain [start, end).
// It has the some properties and contracts as Iterator.
ReverseIterator(start, end []byte) corestore.Iterator
}
// BranchedKVStore defines an interface for a branched a KVStore. It extends KVStore
// by allowing dirty entries to be flushed to the underlying KVStore or discarded
// altogether. A BranchedKVStore can itself be branched, allowing for nested branching
// where writes are flushed up the branched stack.
type BranchedKVStore interface {
KVStore
// Write flushes writes to the underlying store.
Write()
// Branch recursively wraps.
Branch() BranchedKVStore
// BranchWithTrace recursively wraps with tracing enabled.
BranchWithTrace(w io.Writer, tc TraceContext) BranchedKVStore
// ReverseIterator returns a reverse iterator over a given store key and domain.
ReverseIterator(storeKey string, start, end []byte) (corestore.Iterator, error)
}
// QueryResult defines the response type to performing a query on a RootStore.