KVStore refactor (#303)

I removed that stuff about cwwMutex and CacheWraps expiring. It was unnecessary complexity given that users need not write concurrent logic. If you have 2 CacheKVStores and you Write on one and later you Write on the other one, they will both write, and strange things may happen. But that’s OK, it’s kinda the expected (unsurprising) behavior. And now the code is much simpler.

Also I got rid of IterKVStore. It’s not necessary to distinguish them because we can have KVStore.Iterator() return nil or panic or an empty iterator. Further simplification…

KVStore.CacheKVStore() doesn’t exist. No need, just call NewCacheKVStore(parent).

Originally I wanted the .CacheKVStore() method because you can make the implementor be responsible for ensuring that returned CacheWraps don’t trample each other. But as written previously this complicates the code a lot more and it isn’t strictly necessary. It’s a kind of magic that maybe should just be avoided.

sdk.Iterator is dbm.Iterator. The plan is to conservatively add more aliases into the “sdk” namespace.

GetError() is removed from Iterator. We need to just panic to be consistent. More simplification.
This commit is contained in:
Jae Kwon 2017-12-12 20:13:51 -08:00 committed by GitHub
parent 1a28c4b89c
commit eb1f877d3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 564 additions and 541 deletions

4
glide.lock generated
View File

@ -136,7 +136,7 @@ imports:
- data
- data/base58
- name: github.com/tendermint/iavl
version: ab22235a11524125a1df019e7b223d9797a88810
version: 47776d66ae2bce73d66a4a3118284549cc3dca9a
- name: github.com/tendermint/light-client
version: 76313d625e662ed7b284d066d68ff71edd7a9fac
subpackages:
@ -174,7 +174,7 @@ imports:
- types
- version
- name: github.com/tendermint/tmlibs
version: 17dc8a74497d3fee933592ef860275e6b0dd71d6
version: 5636a02d035258701974da39c62d13c1d76f8ae8
subpackages:
- autofile
- cli

View File

@ -1,255 +0,0 @@
package store
// TODO: Consider merge w/ tendermint/tmlibs/db/cachedb.go.
import (
"bytes"
"sort"
"sync"
"sync/atomic"
)
// If value is nil but deleted is false, it means the parent doesn't have the
// key. (No need to delete upon Write())
type cValue struct {
value []byte
deleted bool
dirty bool
}
// cacheIterKVStore wraps an in-memory cache around an underlying IterKVStore.
type cacheIterKVStore struct {
mtx sync.Mutex
cache map[string]cValue
parent IterKVStore
lockVersion interface{}
cwwMutex
}
var _ CacheIterKVStore = (*cacheIterKVStore)(nil)
// Users should typically not be required to call NewCacheIterKVStore directly, as the
// IterKVStore implementations here provide a .CacheIterKVStore() function already.
// `lockVersion` is typically provided by parent.GetWriteLockVersion().
func NewCacheIterKVStore(parent IterKVStore, lockVersion interface{}) *cacheIterKVStore {
ci := &cacheIterKVStore{
cache: make(map[string]cValue),
parent: parent,
lockVersion: lockVersion,
cwwMutex: NewCWWMutex(),
}
return ci
}
func (ci *cacheIterKVStore) Get(key []byte) (value []byte) {
ci.mtx.Lock()
defer ci.mtx.Unlock()
ci.assertValidKey(key)
cacheValue, ok := ci.cache[string(key)]
if !ok {
value = ci.parent.Get(key)
ci.setCacheValue(key, value, false, false)
} else {
value = cacheValue.value
}
return value
}
func (ci *cacheIterKVStore) Set(key []byte, value []byte) {
ci.mtx.Lock()
defer ci.mtx.Unlock()
ci.assertValidKey(key)
ci.setCacheValue(key, value, false, true)
}
func (ci *cacheIterKVStore) Has(key []byte) bool {
value := ci.Get(key)
return value != nil
}
func (ci *cacheIterKVStore) Remove(key []byte) {
ci.mtx.Lock()
defer ci.mtx.Unlock()
ci.assertValidKey(key)
ci.setCacheValue(key, nil, true, true)
}
// Write writes pending updates to the parent database and clears the cache.
// NOTE: Not atomic.
func (ci *cacheIterKVStore) Write() {
ci.mtx.Lock()
defer ci.mtx.Unlock()
// Optional sanity check to ensure that cacheIterKVStore is valid
if parent, ok := ci.parent.(WriteLocker); ok {
if parent.TryWriteLock(ci.lockVersion) {
// All good!
} else {
panic("parent.Write() failed. Did this CacheIterKVStore expire?")
}
}
// We need a copy of all of the keys.
// Not the best, but probably not a bottleneck depending.
keys := make([]string, 0, len(ci.cache))
for key, dbValue := range ci.cache {
if dbValue.dirty {
keys = append(keys, key)
}
}
sort.Strings(keys)
// TODO in tmlibs/db we use Batch to write atomically.
// Consider locking the underlying IterKVStore during write.
for _, key := range keys {
cacheValue := ci.cache[key]
if cacheValue.deleted {
ci.parent.Remove([]byte(key))
} else if cacheValue.value == nil {
// Skip, it already doesn't exist in parent.
} else {
ci.parent.Set([]byte(key), cacheValue.value)
}
}
// Clear the cache
ci.cache = make(map[string]cValue)
}
//----------------------------------------
// To cache-wrap this cacheIterKVStore further.
func (ci *cacheIterKVStore) CacheWrap() CacheWrap {
return ci.CacheIterKVStore()
}
func (ci *cacheIterKVStore) CacheKVStore() CacheKVStore {
return ci.CacheIterKVStore()
}
func (ci *cacheIterKVStore) CacheIterKVStore() CacheIterKVStore {
return NewCacheIterKVStore(ci, ci.GetWriteLockVersion())
}
// If the parent parent DB implements this, (e.g. such as a cacheIterKVStore
// parent to a cacheIterKVStore child), cacheIterKVStore will call
// `parent.TryWriteLock()` before attempting to write.
// This prevents multiple siblings from Write'ing to the parent.
type WriteLocker interface {
GetWriteLockVersion() (lockVersion interface{})
TryWriteLock(lockVersion interface{}) bool
}
// Implements TryWriteLocker. Embed this in DB structs if desired.
type cwwMutex struct {
mtx sync.Mutex
// CONTRACT: reading/writing to `*written` should use `atomic.*`.
// CONTRACT: replacing `written` with another *int32 should use `.mtx`.
written *int32
}
func NewCWWMutex() cwwMutex {
return cwwMutex{
written: new(int32),
}
}
func (cww *cwwMutex) GetWriteLockVersion() interface{} {
cww.mtx.Lock()
defer cww.mtx.Unlock()
// `written` works as a "version" object because it gets replaced upon
// successful TryWriteLock.
return cww.written
}
func (cww *cwwMutex) TryWriteLock(version interface{}) bool {
cww.mtx.Lock()
defer cww.mtx.Unlock()
if version != cww.written {
return false // wrong "WriteLockVersion"
}
if !atomic.CompareAndSwapInt32(cww.written, 0, 1) {
return false // already written
}
// New "WriteLockVersion"
cww.written = new(int32)
return true
}
//----------------------------------------
// Iteration
func (ci *cacheIterKVStore) Iterator(start, end []byte) Iterator {
return ci.iterator(start, end, true)
}
func (ci *cacheIterKVStore) ReverseIterator(start, end []byte) Iterator {
return ci.iterator(start, end, false)
}
func (ci *cacheIterKVStore) iterator(start, end []byte, ascending bool) Iterator {
var parent, cache Iterator
if ascending {
parent = ci.parent.Iterator(start, end)
} else {
parent = ci.parent.ReverseIterator(start, end)
}
items := ci.dirtyItems(ascending)
cache = newMemIterator(start, end, items)
return newCacheMergeIterator(parent, cache, ascending)
}
func (ci *cacheIterKVStore) First(start, end []byte) (kv KVPair, ok bool) {
return iteratorFirst(ci, start, end)
}
func (ci *cacheIterKVStore) Last(start, end []byte) (kv KVPair, ok bool) {
return iteratorLast(ci, start, end)
}
// Constructs a slice of dirty items, to use w/ memIterator.
func (ci *cacheIterKVStore) dirtyItems(ascending bool) []KVPair {
items := make([]KVPair, 0, len(ci.cache))
for key, cacheValue := range ci.cache {
if !cacheValue.dirty {
continue
}
items = append(items,
KVPair{[]byte(key), cacheValue.value})
}
sort.Slice(items, func(i, j int) bool {
if ascending {
return bytes.Compare(items[i].Key, items[j].Key) < 0
} else {
return bytes.Compare(items[i].Key, items[j].Key) > 0
}
})
return items
}
//----------------------------------------
// etc
func (ci *cacheIterKVStore) assertValidKey(key []byte) {
if key == nil {
panic("key is nil")
}
}
// Only entrypoint to mutate ci.cache.
func (ci *cacheIterKVStore) setCacheValue(key, value []byte, deleted bool, dirty bool) {
cacheValue := cValue{
value: value,
deleted: deleted,
dirty: dirty,
}
ci.cache[string(key)] = cacheValue
}

172
store/cachekvstore.go Normal file
View File

@ -0,0 +1,172 @@
package store
import (
"bytes"
"sort"
"sync"
)
// If value is nil but deleted is false, it means the parent doesn't have the
// key. (No need to delete upon Write())
type cValue struct {
value []byte
deleted bool
dirty bool
}
// cacheKVStore wraps an in-memory cache around an underlying KVStore.
type cacheKVStore struct {
mtx sync.Mutex
cache map[string]cValue
parent KVStore
}
var _ CacheKVStore = (*cacheKVStore)(nil)
func NewCacheKVStore(parent KVStore) *cacheKVStore {
ci := &cacheKVStore{
cache: make(map[string]cValue),
parent: parent,
}
return ci
}
func (ci *cacheKVStore) Get(key []byte) (value []byte) {
ci.mtx.Lock()
defer ci.mtx.Unlock()
ci.assertValidKey(key)
cacheValue, ok := ci.cache[string(key)]
if !ok {
value = ci.parent.Get(key)
ci.setCacheValue(key, value, false, false)
} else {
value = cacheValue.value
}
return value
}
func (ci *cacheKVStore) Set(key []byte, value []byte) {
ci.mtx.Lock()
defer ci.mtx.Unlock()
ci.assertValidKey(key)
ci.setCacheValue(key, value, false, true)
}
func (ci *cacheKVStore) Has(key []byte) bool {
value := ci.Get(key)
return value != nil
}
func (ci *cacheKVStore) Delete(key []byte) {
ci.mtx.Lock()
defer ci.mtx.Unlock()
ci.assertValidKey(key)
ci.setCacheValue(key, nil, true, true)
}
// Write writes pending updates to the parent database and clears the cache.
func (ci *cacheKVStore) Write() {
ci.mtx.Lock()
defer ci.mtx.Unlock()
// We need a copy of all of the keys.
// Not the best, but probably not a bottleneck depending.
keys := make([]string, 0, len(ci.cache))
for key, dbValue := range ci.cache {
if dbValue.dirty {
keys = append(keys, key)
}
}
sort.Strings(keys)
// TODO in tmlibs/db we use Batch to write atomically.
// Consider allowing usage of Batch.
for _, key := range keys {
cacheValue := ci.cache[key]
if cacheValue.deleted {
ci.parent.Delete([]byte(key))
} else if cacheValue.value == nil {
// Skip, it already doesn't exist in parent.
} else {
ci.parent.Set([]byte(key), cacheValue.value)
}
}
// Clear the cache
ci.cache = make(map[string]cValue)
}
//----------------------------------------
// To cache-wrap this cacheKVStore further.
func (ci *cacheKVStore) CacheWrap() CacheWrap {
return NewCacheKVStore(ci)
}
//----------------------------------------
// Iteration
func (ci *cacheKVStore) Iterator(start, end []byte) Iterator {
return ci.iterator(start, end, true)
}
func (ci *cacheKVStore) ReverseIterator(start, end []byte) Iterator {
return ci.iterator(start, end, false)
}
func (ci *cacheKVStore) iterator(start, end []byte, ascending bool) Iterator {
var parent, cache Iterator
if ascending {
parent = ci.parent.Iterator(start, end)
} else {
parent = ci.parent.ReverseIterator(start, end)
}
items := ci.dirtyItems(ascending)
cache = newMemIterator(start, end, items)
return newCacheMergeIterator(parent, cache, ascending)
}
// Constructs a slice of dirty items, to use w/ memIterator.
func (ci *cacheKVStore) dirtyItems(ascending bool) []KVPair {
items := make([]KVPair, 0, len(ci.cache))
for key, cacheValue := range ci.cache {
if !cacheValue.dirty {
continue
}
items = append(items,
KVPair{[]byte(key), cacheValue.value})
}
sort.Slice(items, func(i, j int) bool {
if ascending {
return bytes.Compare(items[i].Key, items[j].Key) < 0
} else {
return bytes.Compare(items[i].Key, items[j].Key) > 0
}
})
return items
}
//----------------------------------------
// etc
func (ci *cacheKVStore) assertValidKey(key []byte) {
if key == nil {
panic("key is nil")
}
}
// Only entrypoint to mutate ci.cache.
func (ci *cacheKVStore) setCacheValue(key, value []byte, deleted bool, dirty bool) {
cacheValue := cValue{
value: value,
deleted: deleted,
dirty: dirty,
}
ci.cache[string(key)] = cacheValue
}

View File

@ -0,0 +1,65 @@
package store
import (
"testing"
"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tmlibs/db"
)
func TestCacheKVStore(t *testing.T) {
mem := dbm.NewMemDB()
st := NewCacheKVStore(mem)
require.Empty(t, st.Get(bz("key1")), "Expected `key1` to be empty")
mem.Set(bz("key1"), bz("value1"))
st.Set(bz("key1"), bz("value1"))
require.Equal(t, bz("value1"), st.Get(bz("key1")))
st.Set(bz("key1"), bz("value2"))
require.Equal(t, bz("value2"), st.Get(bz("key1")))
require.Equal(t, bz("value1"), mem.Get(bz("key1")))
st.Write()
require.Equal(t, bz("value2"), mem.Get(bz("key1")))
st.Write()
st.Write()
require.Equal(t, bz("value2"), mem.Get(bz("key1")))
st = NewCacheKVStore(mem)
st.Delete(bz("key1"))
require.Empty(t, st.Get(bz("key1")))
require.Equal(t, mem.Get(bz("key1")), bz("value2"))
st.Write()
require.Empty(t, st.Get(bz("key1")), "Expected `key1` to be empty")
require.Empty(t, mem.Get(bz("key1")), "Expected `key1` to be empty")
}
func TestCacheKVStoreNested(t *testing.T) {
mem := dbm.NewMemDB()
st := NewCacheKVStore(mem)
st.Set(bz("key1"), bz("value1"))
require.Empty(t, mem.Get(bz("key1")))
require.Equal(t, bz("value1"), st.Get(bz("key1")))
st2 := NewCacheKVStore(st)
require.Equal(t, bz("value1"), st2.Get(bz("key1")))
st2.Set(bz("key1"), bz("VALUE2"))
require.Equal(t, []byte(nil), mem.Get(bz("key1")))
require.Equal(t, bz("value1"), st.Get(bz("key1")))
require.Equal(t, bz("VALUE2"), st2.Get(bz("key1")))
st2.Write()
require.Equal(t, []byte(nil), mem.Get(bz("key1")))
require.Equal(t, bz("VALUE2"), st.Get(bz("key1")))
st.Write()
require.Equal(t, bz("VALUE2"), mem.Get(bz("key1")))
}
func bz(s string) []byte { return []byte(s) }

View File

@ -1,14 +1,12 @@
package store
import dbm "github.com/tendermint/tmlibs/db"
//----------------------------------------
// cacheMultiStore
// cacheMultiStore holds many cache-wrapped stores.
// Implements MultiStore.
type cacheMultiStore struct {
db dbm.CacheDB
db CacheKVStore
curVersion int64
lastCommitID CommitID
substores map[string]CacheWrap
@ -16,7 +14,7 @@ type cacheMultiStore struct {
func newCacheMultiStoreFromRMS(rms *rootMultiStore) cacheMultiStore {
cms := cacheMultiStore{
db: rms.db.CacheDB(),
db: NewCacheKVStore(rms.db),
curVersion: rms.curVersion,
lastCommitID: rms.lastCommitID,
substores: make(map[string]CacheWrap, len(rms.substores)),
@ -29,7 +27,7 @@ func newCacheMultiStoreFromRMS(rms *rootMultiStore) cacheMultiStore {
func newCacheMultiStoreFromCMS(cms cacheMultiStore) cacheMultiStore {
cms2 := cacheMultiStore{
db: cms.db.CacheDB(),
db: NewCacheKVStore(cms.db),
curVersion: cms.curVersion,
lastCommitID: cms.lastCommitID,
substores: make(map[string]CacheWrap, len(cms.substores)),
@ -60,7 +58,7 @@ func (cms cacheMultiStore) Write() {
// Implements CacheMultiStore
func (cms cacheMultiStore) CacheWrap() CacheWrap {
return cms.CacheMultiStore()
return cms.CacheMultiStore().(CacheWrap)
}
// Implements CacheMultiStore
@ -77,8 +75,3 @@ func (cms cacheMultiStore) GetStore(name string) interface{} {
func (cms cacheMultiStore) GetKVStore(name string) KVStore {
return cms.substores[name].(KVStore)
}
// Implements CacheMultiStore
func (cms cacheMultiStore) GetIterKVStore(name string) IterKVStore {
return cms.substores[name].(IterKVStore)
}

View File

@ -2,18 +2,19 @@ package store
import "bytes"
// Convenience for implemntation of IterKVCache.First using IterKVCache.Iterator
func iteratorFirst(st IterKVStore, start, end []byte) (kv KVPair, ok bool) {
// Gets the first item.
func First(st KVStore, start, end []byte) (kv KVPair, ok bool) {
iter := st.Iterator(start, end)
if !iter.Valid() {
return kv, false
}
defer iter.Release()
return KVPair{iter.Key(), iter.Value()}, true
}
// Convenience for implemntation of IterKVCache.Last using IterKVCache.ReverseIterator
func iteratorLast(st IterKVStore, start, end []byte) (kv KVPair, ok bool) {
// Gets the last item. `end` is exclusive.
func Last(st KVStore, start, end []byte) (kv KVPair, ok bool) {
iter := st.ReverseIterator(end, start)
if !iter.Valid() {
if v := st.Get(start); v != nil {

View File

@ -37,10 +37,10 @@ func (isl iavlStoreLoader) Load(id CommitID) (CommitStore, error) {
//----------------------------------------
var _ IterKVStore = (*iavlStore)(nil)
var _ KVStore = (*iavlStore)(nil)
var _ CommitStore = (*iavlStore)(nil)
// iavlStore Implements IterKVStore and CommitStore.
// iavlStore Implements KVStore and CommitStore.
type iavlStore struct {
// The underlying tree.
@ -81,44 +81,33 @@ func (st *iavlStore) Commit() CommitID {
}
}
// CacheWrap implements IterKVStore.
// CacheWrap implements KVStore.
func (st *iavlStore) CacheWrap() CacheWrap {
return st.CacheIterKVStore()
return NewCacheKVStore(st)
}
// CacheKVStore implements IterKVStore.
func (st *iavlStore) CacheKVStore() CacheKVStore {
return st.CacheIterKVStore()
}
// CacheIterKVStore implements IterKVStore.
func (st *iavlStore) CacheIterKVStore() CacheIterKVStore {
// XXX Create generic IterKVStore wrapper.
return nil
}
// Set implements IterKVStore.
// Set implements KVStore.
func (st *iavlStore) Set(key, value []byte) {
st.tree.Set(key, value)
}
// Get implements IterKVStore.
// Get implements KVStore.
func (st *iavlStore) Get(key []byte) (value []byte) {
_, v := st.tree.Get(key)
return v
}
// Has implements IterKVStore.
// Has implements KVStore.
func (st *iavlStore) Has(key []byte) (exists bool) {
return st.tree.Has(key)
}
// Remove implements IterKVStore.
func (st *iavlStore) Remove(key []byte) {
// Delete implements KVStore.
func (st *iavlStore) Delete(key []byte) {
st.tree.Remove(key)
}
// Iterator implements IterKVStore.
// Iterator implements KVStore.
func (st *iavlStore) Iterator(start, end []byte) Iterator {
return newIAVLIterator(st.tree.Tree(), start, end, true)
}
@ -128,16 +117,6 @@ func (st *iavlStore) ReverseIterator(start, end []byte) Iterator {
return newIAVLIterator(st.tree.Tree(), start, end, false)
}
// First implements IterKVStore.
func (st *iavlStore) First(start, end []byte) (kv KVPair, ok bool) {
return iteratorFirst(st, start, end)
}
// Last implements IterKVStore.
func (st *iavlStore) Last(start, end []byte) (kv KVPair, ok bool) {
return iteratorLast(st, start, end)
}
//----------------------------------------
// Implements Iterator

View File

@ -54,7 +54,7 @@ func TestIAVLStoreLoader(t *testing.T) {
assert.Equal(t, id.Version+1, id2.Version)
}
func TestIAVLStoreGetSetHasRemove(t *testing.T) {
func TestIAVLStoreGetSetHasDelete(t *testing.T) {
db := dbm.NewMemDB()
tree, _ := newTree(t, db)
iavlStore := newIAVLStore(tree, numHistory)
@ -64,21 +64,16 @@ func TestIAVLStoreGetSetHasRemove(t *testing.T) {
exists := iavlStore.Has([]byte(key))
assert.True(t, exists)
value, exists := iavlStore.Get([]byte(key))
assert.True(t, exists)
value := iavlStore.Get([]byte(key))
assert.EqualValues(t, value, treeData[key])
value2 := "notgoodbye"
prev := iavlStore.Set([]byte(key), []byte(value2))
assert.EqualValues(t, value, prev)
iavlStore.Set([]byte(key), []byte(value2))
value, exists = iavlStore.Get([]byte(key))
assert.True(t, exists)
value = iavlStore.Get([]byte(key))
assert.EqualValues(t, value, value2)
prev, removed := iavlStore.Remove([]byte(key))
assert.True(t, removed)
assert.EqualValues(t, value2, prev)
iavlStore.Delete([]byte(key))
exists = iavlStore.Has([]byte(key))
assert.False(t, exists)

View File

@ -62,16 +62,17 @@ func (rs *rootMultiStore) LoadVersion(ver int64) error {
if err != nil {
return fmt.Errorf("Failed to load rootMultiStore: %v", err)
}
rs.curVersion = 1
rs.lastCommitID = CommitID{}
rs.substores[name] = store
}
rs.curVersion = 1
rs.lastCommitID = CommitID{}
return nil
}
// Otherwise, version is 1 or greater
// Load commitState
state, err := loadCommitState(rs.db, ver)
// Get commitState
state, err := getCommitState(rs.db, ver)
if err != nil {
return err
}
@ -82,7 +83,7 @@ func (rs *rootMultiStore) LoadVersion(ver int64) error {
name, commitID := store.Name, store.CommitID
storeLoader := rs.storeLoaders[name]
if storeLoader == nil {
return fmt.Errorf("Failed to loadrootMultiStore: CommitStoreLoader missing for %v", name)
return fmt.Errorf("Failed to load rootMultiStore substore %v for commitID %v: %v", name, commitID, err)
}
store, err := storeLoader(commitID)
if err != nil {
@ -93,7 +94,7 @@ func (rs *rootMultiStore) LoadVersion(ver int64) error {
// If any CommitStoreLoaders were not used, return error.
for name := range rs.storeLoaders {
if _, ok := rs.substores[name]; !ok {
if _, ok := newSubstores[name]; !ok {
return fmt.Errorf("Unused CommitStoreLoader: %v", name)
}
}
@ -105,56 +106,21 @@ func (rs *rootMultiStore) LoadVersion(ver int64) error {
return nil
}
// Commits each substore and gets commitState.
func (rs *rootMultiStore) doCommit() commitState {
version := rs.curVersion
substores := make([]substore, len(rs.substores))
for name, store := range rs.substores {
// Commit
commitID := store.Commit()
// Record CommitID
substore := substore{}
substore.Name = name
substore.CommitID = commitID
substores = append(substores, substore)
}
// Incr curVersion
rs.curVersion += 1
return commitState{
Version: version,
Substores: substores,
}
}
//----------------------------------------
// Implements CommitStore
func (rs *rootMultiStore) Commit() CommitID {
// Commit substores
version := rs.curVersion
state := commitSubstores(version, rs.substores)
// Needs to be transactional
// Need to update self state atomically.
batch := rs.db.NewBatch()
// Commit each substore and get commitState
state := rs.doCommit()
stateBytes, err := wire.Marshal(state)
if err != nil {
panic(err)
}
commitStateKey := fmt.Sprintf(commitStateKeyFmt, rs.curVersion)
batch.Set([]byte(commitStateKey), stateBytes)
// Save the latest version
latestBytes, _ := wire.Marshal(rs.curVersion) // Does not error
batch.Set([]byte(latestVersionKey), latestBytes)
setCommitState(batch, version, state)
setLatestVersion(batch, version)
batch.Write()
rs.curVersion += 1
// Prepare for next version.
rs.curVersion = version + 1
commitID := CommitID{
Version: version,
Hash: state.Hash(),
@ -165,7 +131,7 @@ func (rs *rootMultiStore) Commit() CommitID {
// Implements CommitStore
func (rs *rootMultiStore) CacheWrap() CacheWrap {
return rs.CacheMultiStore()
return rs.CacheMultiStore().(CacheWrap)
}
// Get the last committed CommitID
@ -188,11 +154,6 @@ func (rs *rootMultiStore) GetKVStore(name string) KVStore {
return rs.substores[name].(KVStore)
}
// Implements MultiStore
func (rs *rootMultiStore) GetIterKVStore(name string) IterKVStore {
return rs.substores[name].(IterKVStore)
}
//----------------------------------------
// commitState
@ -206,25 +167,7 @@ type commitState struct {
Substores []substore
}
// loads commitState from disk.
func loadCommitState(db dbm.DB, ver int64) (commitState, error) {
// Load from DB.
commitStateKey := fmt.Sprintf(commitStateKeyFmt, ver)
stateBytes := db.Get([]byte(commitStateKey))
if stateBytes == nil {
return commitState{}, fmt.Errorf("Failed to load rootMultiStore: no data")
}
// Parse bytes.
var state commitState
err := wire.Unmarshal(stateBytes, &state)
if err != nil {
return commitState{}, fmt.Errorf("Failed to load rootMultiStore: %v", err)
}
return state, nil
}
// Hash returns the simple merkle root hash of the substores sorted by name.
func (cs commitState) Hash() []byte {
// TODO cache to cs.hash []byte
m := make(map[string]interface{}, len(cs.Substores))
@ -244,6 +187,8 @@ func (cs commitState) CommitID() CommitID {
//----------------------------------------
// substore state
// substore contains the name and core reference for an underlying store.
// It is the leaf of the rootMultiStores top level simple merkle tree.
type substore struct {
Name string
substoreCore
@ -254,6 +199,7 @@ type substoreCore struct {
// ... maybe add more state
}
// Hash returns the RIPEMD160 of the wire-encoded substore.
func (sc substoreCore) Hash() []byte {
scBytes, _ := wire.Marshal(sc) // Does not error
hasher := ripemd160.New()
@ -275,3 +221,59 @@ func getLatestVersion(db dbm.DB) int64 {
}
return latest
}
// Set the latest version.
func setLatestVersion(batch dbm.Batch, version int64) {
latestBytes, _ := wire.Marshal(version) // Does not error
batch.Set([]byte(latestVersionKey), latestBytes)
}
// Commits each substore and returns a new commitState.
func commitSubstores(version int64, substoresMap map[string]CommitStore) commitState {
substores := make([]substore, 0, len(substoresMap))
for name, store := range substoresMap {
// Commit
commitID := store.Commit()
// Record CommitID
substore := substore{}
substore.Name = name
substore.CommitID = commitID
substores = append(substores, substore)
}
return commitState{
Version: version,
Substores: substores,
}
}
// Gets commitState from disk.
func getCommitState(db dbm.DB, ver int64) (commitState, error) {
// Get from DB.
commitStateKey := fmt.Sprintf(commitStateKeyFmt, ver)
stateBytes := db.Get([]byte(commitStateKey))
if stateBytes == nil {
return commitState{}, fmt.Errorf("Failed to get rootMultiStore: no data")
}
// Parse bytes.
var state commitState
err := wire.Unmarshal(stateBytes, &state)
if err != nil {
return commitState{}, fmt.Errorf("Failed to get rootMultiStore: %v", err)
}
return state, nil
}
// Set a commit state for given version.
func setCommitState(batch dbm.Batch, version int64, state commitState) {
stateBytes, err := wire.Marshal(state)
if err != nil {
panic(err)
}
commitStateKey := fmt.Sprintf(commitStateKeyFmt, version)
batch.Set([]byte(commitStateKey), stateBytes)
}

View File

@ -0,0 +1,126 @@
package store
import (
"testing"
"github.com/stretchr/testify/assert"
dbm "github.com/tendermint/tmlibs/db"
"github.com/tendermint/tmlibs/merkle"
)
func TestMultistoreCommitLoad(t *testing.T) {
db := dbm.NewMemDB()
store := newMultiStoreWithLoaders(db)
err := store.LoadLatestVersion()
assert.Nil(t, err)
// new store has empty last commit
commitID := CommitID{}
checkStore(t, store, commitID, commitID)
// make a few commits and check them
nCommits := int64(3)
for i := int64(0); i < nCommits; i++ {
commitID = store.Commit()
expectedCommitID := getExpectedCommitID(store, i+1)
checkStore(t, store, expectedCommitID, commitID)
}
// Load the latest multistore again and check version
store = newMultiStoreWithLoaders(db)
err = store.LoadLatestVersion()
assert.Nil(t, err)
commitID = getExpectedCommitID(store, nCommits)
checkStore(t, store, commitID, commitID)
// commit and check version
commitID = store.Commit()
expectedCommitID := getExpectedCommitID(store, nCommits+1)
checkStore(t, store, expectedCommitID, commitID)
// Load an older multistore and check version
ver := nCommits - 1
store = newMultiStoreWithLoaders(db)
err = store.LoadVersion(ver)
assert.Nil(t, err)
commitID = getExpectedCommitID(store, ver)
checkStore(t, store, commitID, commitID)
// XXX: commit this older version
commitID = store.Commit()
expectedCommitID = getExpectedCommitID(store, ver+1)
checkStore(t, store, expectedCommitID, commitID)
// XXX: confirm old commit is overwritten and
// we have rolled back LatestVersion
store = newMultiStoreWithLoaders(db)
err = store.LoadLatestVersion()
assert.Nil(t, err)
commitID = getExpectedCommitID(store, ver+1)
checkStore(t, store, commitID, commitID)
}
//-----------------------------------------------------------------------
// utils
func newMultiStoreWithLoaders(db dbm.DB) *rootMultiStore {
store := NewMultiStore(db)
storeLoaders := map[string]CommitStoreLoader{
"store1": newMockCommitStore,
"store2": newMockCommitStore,
"store3": newMockCommitStore,
}
for name, loader := range storeLoaders {
store.SetCommitStoreLoader(name, loader)
}
return store
}
func checkStore(t *testing.T, store *rootMultiStore, expect, got CommitID) {
assert.EqualValues(t, expect.Version+1, store.GetCurrentVersion())
assert.Equal(t, expect, got)
assert.Equal(t, expect, store.LastCommitID())
}
func getExpectedCommitID(store *rootMultiStore, ver int64) CommitID {
return CommitID{
Version: ver,
Hash: hashStores(store.substores),
}
}
func hashStores(stores map[string]CommitStore) []byte {
m := make(map[string]interface{}, len(stores))
for name, store := range stores {
m[name] = substore{
Name: name,
substoreCore: substoreCore{
CommitID: store.Commit(),
},
}
}
return merkle.SimpleHashFromMap(m)
}
//-----------------------------------------------------------------------
// mockCommitStore
var _ CommitStore = (*mockCommitStore)(nil)
type mockCommitStore struct {
id CommitID
}
func newMockCommitStore(id CommitID) (CommitStore, error) {
return &mockCommitStore{id}, nil
}
func (cs *mockCommitStore) Commit() CommitID {
return cs.id
}
func (cs *mockCommitStore) CacheWrap() CacheWrap {
cs2 := *cs
return &cs2
}
func (cs *mockCommitStore) Write() {}

View File

@ -2,158 +2,11 @@ package store
import (
"github.com/tendermint/go-wire/data"
"github.com/tendermint/tmlibs/db"
)
type CommitID struct {
Version int64
Hash []byte
}
func (cid CommitID) IsZero() bool {
return cid.Version == 0 && len(cid.Hash) == 0
}
type Committer interface {
// Commit persists the state to disk.
Commit() CommitID
}
type CacheWrapper interface {
/*
CacheWrap() makes the most appropriate cache-wrap. For example,
IAVLStore.CacheWrap() returns a CacheIterKVStore. After call to
.Write() on the cache-wrap, all previous cache-wraps on the object
expire.
CacheWrap() should not return a Committer, since Commit() on
cache-wraps make no sense. It can return KVStore, IterKVStore, etc.
The returned object may or may not implement CacheWrap() as well.
NOTE: https://dave.cheney.net/2017/07/22/should-go-2-0-support-generics.
*/
CacheWrap() CacheWrap
}
type CacheWrap interface {
// Write syncs with the underlying store.
Write()
// CacheWrap recursively wraps again.
CacheWrap() CacheWrap
}
type CommitStore interface {
Committer
CacheWrapper
}
type CommitStoreLoader func(id CommitID) (CommitStore, error)
// KVStore is a simple interface to get/set data
type KVStore interface {
// Get returns nil iff key doesn't exist. Panics on nil key.
Get(key []byte) []byte
// Set sets the key. Panics on nil key.
Set(key, value []byte)
// Has checks if a key exists. Panics on nil key.
Has(key []byte) bool
// Remove deletes the key. Panics on nil key.
Remove(key []byte)
// CacheKVStore() wraps a thing with a cache. After
// calling .Write() on the CacheKVStore, all previous
// cache-wraps on the object expire.
CacheKVStore() CacheKVStore
// CacheWrap() returns a CacheKVStore.
CacheWrap() CacheWrap
}
type CacheKVStore interface {
KVStore
Write() // Writes operations to underlying KVStore
}
// IterKVStore can be iterated on
// CONTRACT: No writes may happen within a domain while an iterator exists over it.
type IterKVStore interface {
KVStore
Iterator(start, end []byte) Iterator
ReverseIterator(start, end []byte) Iterator
// Gets the first item.
First(start, end []byte) (kv KVPair, ok bool)
// Gets the last item (towards "end").
// End is exclusive.
Last(start, end []byte) (kv KVPair, ok bool)
// CacheIterKVStore() wraps a thing with a cache.
// After calling .Write() on the CacheIterKVStore, all
// previous cache-wraps on the object expire.
CacheIterKVStore() CacheIterKVStore
// CacheWrap() returns a CacheIterKVStore.
// CacheWrap() defined in KVStore
}
type CacheIterKVStore interface {
IterKVStore
Write() // Writes operations to underlying KVStore
}
type KVPair struct {
Key data.Bytes
Value data.Bytes
}
/*
Usage:
for itr := kvm.Iterator(start, end); itr.Valid(); itr.Next() {
k, v := itr.Key(); itr.Value()
....
}
*/
type Iterator interface {
// The start & end (exclusive) limits to iterate over.
// If end < start, then the Iterator goes in reverse order.
// A domain of ([]byte{12, 13}, []byte{12, 14}) will iterate
// over anything with the prefix []byte{12, 13}
Domain() (start []byte, end []byte)
// Returns if the current position is valid.
Valid() bool
// Next moves the iterator to the next key/value pair.
//
// If Valid returns false, this method will panic.
Next()
// Key returns the key of the current key/value pair, or nil if done.
// The caller should not modify the contents of the returned slice, and
// its contents may change after calling Next().
//
// If Valid returns false, this method will panic.
Key() []byte
// Value returns the key of the current key/value pair, or nil if done.
// The caller should not modify the contents of the returned slice, and
// its contents may change after calling Next().
//
// If Valid returns false, this method will panic.
Value() []byte
// Releases any resources and iteration-locks
Release()
}
//----------------------------------------
// MultiStore
type MultiStore interface {
@ -170,16 +23,108 @@ type MultiStore interface {
// call CacheMultiStore.Write().
CacheMultiStore() CacheMultiStore
// CacheWrap returns a CacheMultiStore.
CacheWrap() CacheWrap
// Convenience
GetStore(name string) interface{}
GetKVStore(name string) KVStore
GetIterKVStore(name string) IterKVStore
}
type CacheMultiStore interface {
MultiStore
Write() // Writes operations to underlying KVStore
}
type CommitStore interface {
Committer
CacheWrapper
}
type CommitStoreLoader func(id CommitID) (CommitStore, error)
type Committer interface {
// Commit persists the state to disk.
Commit() CommitID
}
//----------------------------------------
// KVStore
// KVStore is a simple interface to get/set data
type KVStore interface {
// Get returns nil iff key doesn't exist. Panics on nil key.
Get(key []byte) []byte
// Has checks if a key exists. Panics on nil key.
Has(key []byte) bool
// Set sets the key. Panics on nil key.
Set(key, value []byte)
// Delete deletes the key. Panics on nil key.
Delete(key []byte)
// Iterator over a domain of keys in ascending order. End is exclusive.
// Start must be less than end, or the Iterator is invalid.
// CONTRACT: No writes may happen within a domain while an iterator exists over it.
Iterator(start, end []byte) Iterator
// Iterator over a domain of keys in descending order. End is exclusive.
// Start must be greater than end, or the Iterator is invalid.
// CONTRACT: No writes may happen within a domain while an iterator exists over it.
ReverseIterator(start, end []byte) Iterator
}
// db.DB implements KVStore so we can CacheKVStore it.
var _ KVStore = db.DB(nil)
// Alias iterator to db's Iterator for convenience.
type Iterator = db.Iterator
// CacheKVStore cache-wraps a KVStore. After calling .Write() on the
// CacheKVStore, all previously created CacheKVStores on the object expire.
type CacheKVStore interface {
KVStore
Write() // Writes operations to underlying KVStore
}
//----------------------------------------
// CacheWrap
/*
CacheWrap() makes the most appropriate cache-wrap. For example,
IAVLStore.CacheWrap() returns a CacheKVStore.
CacheWrap() should not return a Committer, since Commit() on
cache-wraps make no sense. It can return KVStore, HeapStore,
SpaceStore, etc.
*/
type CacheWrapper interface {
CacheWrap() CacheWrap
}
type CacheWrap interface {
// Write syncs with the underlying store.
Write()
// CacheWrap recursively wraps again.
CacheWrap() CacheWrap
}
//----------------------------------------
// etc
type KVPair struct {
Key data.Bytes
Value data.Bytes
}
// CommitID contains the tree version number and its merkle root.
type CommitID struct {
Version int64
Hash []byte
}
func (cid CommitID) IsZero() bool {
return cid.Version == 0 && len(cid.Hash) == 0
}