cacheMergeIterator (#298)

This commit is contained in:
Jae Kwon 2017-12-11 23:30:44 -08:00 committed by GitHub
parent 04fcc6193c
commit af7a621440
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 661 additions and 98 deletions

255
store/cacheiterkvstore.go Normal file
View File

@ -0,0 +1,255 @@
package store
// TODO: Consider merge w/ tendermint/tmlibs/db/cachedb.go.
import (
"bytes"
"sort"
"sync"
"sync/atomic"
)
// If value is nil but deleted is false, it means the parent doesn't have the
// key. (No need to delete upon Write())
type cValue struct {
value []byte
deleted bool
dirty bool
}
// cacheIterKVStore wraps an in-memory cache around an underlying IterKVStore.
type cacheIterKVStore struct {
mtx sync.Mutex
cache map[string]cValue
parent IterKVStore
lockVersion interface{}
cwwMutex
}
var _ CacheIterKVStore = (*cacheIterKVStore)(nil)
// Users should typically not be required to call NewCacheIterKVStore directly, as the
// IterKVStore implementations here provide a .CacheIterKVStore() function already.
// `lockVersion` is typically provided by parent.GetWriteLockVersion().
func NewCacheIterKVStore(parent IterKVStore, lockVersion interface{}) *cacheIterKVStore {
ci := &cacheIterKVStore{
cache: make(map[string]cValue),
parent: parent,
lockVersion: lockVersion,
cwwMutex: NewCWWMutex(),
}
return ci
}
func (ci *cacheIterKVStore) Get(key []byte) (value []byte) {
ci.mtx.Lock()
defer ci.mtx.Unlock()
ci.assertValidKey(key)
cacheValue, ok := ci.cache[string(key)]
if !ok {
value = ci.parent.Get(key)
ci.setCacheValue(key, value, false, false)
} else {
value = cacheValue.value
}
return value
}
func (ci *cacheIterKVStore) Set(key []byte, value []byte) {
ci.mtx.Lock()
defer ci.mtx.Unlock()
ci.assertValidKey(key)
ci.setCacheValue(key, value, false, true)
}
func (ci *cacheIterKVStore) Has(key []byte) bool {
value := ci.Get(key)
return value != nil
}
func (ci *cacheIterKVStore) Remove(key []byte) {
ci.mtx.Lock()
defer ci.mtx.Unlock()
ci.assertValidKey(key)
ci.setCacheValue(key, nil, true, true)
}
// Write writes pending updates to the parent database and clears the cache.
// NOTE: Not atomic.
func (ci *cacheIterKVStore) Write() {
ci.mtx.Lock()
defer ci.mtx.Unlock()
// Optional sanity check to ensure that cacheIterKVStore is valid
if parent, ok := ci.parent.(WriteLocker); ok {
if parent.TryWriteLock(ci.lockVersion) {
// All good!
} else {
panic("parent.Write() failed. Did this CacheIterKVStore expire?")
}
}
// We need a copy of all of the keys.
// Not the best, but probably not a bottleneck depending.
keys := make([]string, 0, len(ci.cache))
for key, dbValue := range ci.cache {
if dbValue.dirty {
keys = append(keys, key)
}
}
sort.Strings(keys)
// TODO in tmlibs/db we use Batch to write atomically.
// Consider locking the underlying IterKVStore during write.
for _, key := range keys {
cacheValue := ci.cache[key]
if cacheValue.deleted {
ci.parent.Remove([]byte(key))
} else if cacheValue.value == nil {
// Skip, it already doesn't exist in parent.
} else {
ci.parent.Set([]byte(key), cacheValue.value)
}
}
// Clear the cache
ci.cache = make(map[string]cValue)
}
//----------------------------------------
// To cache-wrap this cacheIterKVStore further.
func (ci *cacheIterKVStore) CacheWrap() CacheWrap {
return ci.CacheIterKVStore()
}
func (ci *cacheIterKVStore) CacheKVStore() CacheKVStore {
return ci.CacheIterKVStore()
}
func (ci *cacheIterKVStore) CacheIterKVStore() CacheIterKVStore {
return NewCacheIterKVStore(ci, ci.GetWriteLockVersion())
}
// If the parent parent DB implements this, (e.g. such as a cacheIterKVStore
// parent to a cacheIterKVStore child), cacheIterKVStore will call
// `parent.TryWriteLock()` before attempting to write.
// This prevents multiple siblings from Write'ing to the parent.
type WriteLocker interface {
GetWriteLockVersion() (lockVersion interface{})
TryWriteLock(lockVersion interface{}) bool
}
// Implements TryWriteLocker. Embed this in DB structs if desired.
type cwwMutex struct {
mtx sync.Mutex
// CONTRACT: reading/writing to `*written` should use `atomic.*`.
// CONTRACT: replacing `written` with another *int32 should use `.mtx`.
written *int32
}
func NewCWWMutex() cwwMutex {
return cwwMutex{
written: new(int32),
}
}
func (cww *cwwMutex) GetWriteLockVersion() interface{} {
cww.mtx.Lock()
defer cww.mtx.Unlock()
// `written` works as a "version" object because it gets replaced upon
// successful TryWriteLock.
return cww.written
}
func (cww *cwwMutex) TryWriteLock(version interface{}) bool {
cww.mtx.Lock()
defer cww.mtx.Unlock()
if version != cww.written {
return false // wrong "WriteLockVersion"
}
if !atomic.CompareAndSwapInt32(cww.written, 0, 1) {
return false // already written
}
// New "WriteLockVersion"
cww.written = new(int32)
return true
}
//----------------------------------------
// Iteration
func (ci *cacheIterKVStore) Iterator(start, end []byte) Iterator {
return ci.iterator(start, end, true)
}
func (ci *cacheIterKVStore) ReverseIterator(start, end []byte) Iterator {
return ci.iterator(start, end, false)
}
func (ci *cacheIterKVStore) iterator(start, end []byte, ascending bool) Iterator {
var parent, cache Iterator
if ascending {
parent = ci.parent.Iterator(start, end)
} else {
parent = ci.parent.ReverseIterator(start, end)
}
items := ci.dirtyItems(ascending)
cache = newMemIterator(start, end, items)
return newCacheMergeIterator(parent, cache, ascending)
}
func (ci *cacheIterKVStore) First(start, end []byte) (kv KVPair, ok bool) {
return iteratorFirst(ci, start, end)
}
func (ci *cacheIterKVStore) Last(start, end []byte) (kv KVPair, ok bool) {
return iteratorLast(ci, start, end)
}
// Constructs a slice of dirty items, to use w/ memIterator.
func (ci *cacheIterKVStore) dirtyItems(ascending bool) []KVPair {
items := make([]KVPair, 0, len(ci.cache))
for key, cacheValue := range ci.cache {
if !cacheValue.dirty {
continue
}
items = append(items,
KVPair{[]byte(key), cacheValue.value})
}
sort.Slice(items, func(i, j int) bool {
if ascending {
return bytes.Compare(items[i].Key, items[j].Key) < 0
} else {
return bytes.Compare(items[i].Key, items[j].Key) > 0
}
})
return items
}
//----------------------------------------
// etc
func (ci *cacheIterKVStore) assertValidKey(key []byte) {
if key == nil {
panic("key is nil")
}
}
// Only entrypoint to mutate ci.cache.
func (ci *cacheIterKVStore) setCacheValue(key, value []byte, deleted bool, dirty bool) {
cacheValue := cValue{
value: value,
deleted: deleted,
dirty: dirty,
}
ci.cache[string(key)] = cacheValue
}

237
store/cachemergeiterator.go Normal file
View File

@ -0,0 +1,237 @@
package store
import "bytes"
// cacheMergeIterator merges a parent Iterator and a cache Iterator.
// The cache iterator may return nil keys to signal that an item
// had been deleted (but not deleted in the parent).
// If the cache iterator has the same key as the parent, the
// cache shadows (overrides) the parent.
//
// TODO: Optimize by memoizing.
type cacheMergeIterator struct {
parent Iterator
cache Iterator
ascending bool
}
var _ Iterator = (*cacheMergeIterator)(nil)
func newCacheMergeIterator(parent, cache Iterator, ascending bool) *cacheMergeIterator {
iter := &cacheMergeIterator{
parent: parent,
cache: cache,
ascending: ascending,
}
return iter
}
// Domain implements Iterator.
// If the domains are different, returns the union.
func (iter *cacheMergeIterator) Domain() (start, end []byte) {
startP, endP := iter.parent.Domain()
startC, endC := iter.cache.Domain()
if iter.compare(startP, startC) < 0 {
start = startP
} else {
start = startC
}
if iter.compare(endP, endC) < 0 {
end = endC
} else {
end = endP
}
return start, end
}
// Valid implements Iterator.
func (iter *cacheMergeIterator) Valid() bool {
// If parent is valid, this is valid.
if iter.parent.Valid() {
return true
}
// Otherwise depends on child.
iter.skipCacheDeletes(nil)
return iter.cache.Valid()
}
// Next implements Iterator
func (iter *cacheMergeIterator) Next() {
iter.skipUntilExistsOrInvalid()
iter.assertValid()
// If parent is invalid, get the next cache item.
if !iter.parent.Valid() {
iter.cache.Next()
return
}
// If cache is invalid, get the next parent item.
if !iter.cache.Valid() {
iter.parent.Next()
return
}
// Both are valid. Compare keys.
keyP, keyC := iter.parent.Key(), iter.cache.Key()
cmp := iter.compare(keyP, keyC)
switch cmp {
case -1: // parent < cache
iter.parent.Next()
case 0: // parent == cache
iter.parent.Next()
iter.cache.Next()
case 1: // parent > cache
iter.cache.Next()
}
}
// Key implements Iterator
func (iter *cacheMergeIterator) Key() []byte {
iter.skipUntilExistsOrInvalid()
iter.assertValid()
// If parent is invalid, get the cache key.
if !iter.parent.Valid() {
return iter.cache.Key()
}
// If cache is invalid, get the parent key.
if !iter.cache.Valid() {
return iter.parent.Key()
}
// Both are valid. Compare keys.
keyP, keyC := iter.parent.Key(), iter.cache.Key()
cmp := iter.compare(keyP, keyC)
switch cmp {
case -1: // parent < cache
return keyP
case 0: // parent == cache
return keyP
case 1: // parent > cache
return keyC
default:
panic("invalid compare result")
}
}
// Value implements Iterator
func (iter *cacheMergeIterator) Value() []byte {
iter.skipUntilExistsOrInvalid()
iter.assertValid()
// If parent is invalid, get the cache value.
if !iter.parent.Valid() {
return iter.cache.Value()
}
// If cache is invalid, get the parent value.
if !iter.cache.Valid() {
return iter.parent.Value()
}
// Both are valid. Compare keys.
keyP, keyC := iter.parent.Key(), iter.cache.Key()
cmp := iter.compare(keyP, keyC)
switch cmp {
case -1: // parent < cache
return iter.parent.Value()
case 0: // parent == cache
return iter.cache.Value()
case 1: // parent > cache
return iter.cache.Value()
default:
panic("invalid comparison result")
}
}
// Release implements Iterator
func (iter *cacheMergeIterator) Release() {
iter.parent.Release()
iter.cache.Release()
}
// Like bytes.Compare but opposite if not ascending.
func (iter *cacheMergeIterator) compare(a, b []byte) int {
if iter.ascending {
return bytes.Compare(a, b)
} else {
return bytes.Compare(a, b) * -1
}
}
// Skip all delete-items from the cache w/ `key < until`. After this function,
// current item is a non-delete-item, or `until <= key`.
// If the current item is not a delete item, does noting.
// If `until` is nil, there is no limit.
// CONTRACT: cache is valid.
func (iter *cacheMergeIterator) skipCacheDeletes(until []byte) {
for (until == nil || iter.compare(iter.cache.Key(), until) < 0) &&
iter.cache.Value() == nil {
iter.cache.Next()
if !iter.cache.Valid() {
return
}
}
}
// Fast forwards cache (or parent+cache in case of deleted items) until current
// item exists, or until iterator becomes invalid.
func (iter *cacheMergeIterator) skipUntilExistsOrInvalid() {
for {
// Invalid.
if !iter.Valid() {
return
}
// Parent and Cache items exist.
keyP, keyC := iter.parent.Key(), iter.cache.Key()
cmp := iter.compare(keyP, keyC)
switch cmp {
// parent < cache
case -1:
// Parent exists.
return
// parent == cache
case 0:
// Skip over if cache item is a delete.
valueC := iter.cache.Value()
if valueC == nil {
iter.parent.Next()
iter.cache.Next()
continue
}
// Child shadows parent.
return
// parent > cache
case 1:
// Skip over if cache item is a delete.
valueC := iter.cache.Value()
if valueC == nil {
iter.skipCacheDeletes(keyP)
continue
}
// Child exists.
return
}
}
}
// If not valid, panics.
// NOTE: May have side-effect of iterating over cache.
func (iter *cacheMergeIterator) assertValid() {
if !iter.Valid() {
panic("iterator is invalid")
}
}

36
store/firstlast.go Normal file
View File

@ -0,0 +1,36 @@
package store
import "bytes"
// Convenience for implemntation of IterKVCache.First using IterKVCache.Iterator
func iteratorFirst(st IterKVStore, start, end []byte) (kv KVPair, ok bool) {
iter := st.Iterator(start, end)
if !iter.Valid() {
return kv, false
}
defer iter.Release()
return KVPair{iter.Key(), iter.Value()}, true
}
// Convenience for implemntation of IterKVCache.Last using IterKVCache.ReverseIterator
func iteratorLast(st IterKVStore, start, end []byte) (kv KVPair, ok bool) {
iter := st.ReverseIterator(end, start)
if !iter.Valid() {
if v := st.Get(start); v != nil {
return KVPair{cp(start), cp(v)}, true
} else {
return kv, false
}
}
defer iter.Release()
if bytes.Equal(iter.Key(), end) {
// Skip this one, end is exclusive.
iter.Next()
if !iter.Valid() {
return kv, false
}
}
return KVPair{iter.Key(), iter.Value()}, true
}

View File

@ -1,7 +1,6 @@
package store
import (
"bytes"
"sync"
"github.com/tendermint/iavl"
@ -79,16 +78,14 @@ func (st *iavlStore) CacheIterKVStore() CacheIterKVStore {
}
// Set implements IterKVStore.
func (st *iavlStore) Set(key, value []byte) (prev []byte) {
_, prev = st.tree.Get(key)
func (st *iavlStore) Set(key, value []byte) {
st.tree.Set(key, value)
return prev
}
// Get implements IterKVStore.
func (st *iavlStore) Get(key []byte) (value []byte, exists bool) {
func (st *iavlStore) Get(key []byte) (value []byte) {
_, v := st.tree.Get(key)
return v, (v != nil)
return v
}
// Has implements IterKVStore.
@ -97,8 +94,8 @@ func (st *iavlStore) Has(key []byte) (exists bool) {
}
// Remove implements IterKVStore.
func (st *iavlStore) Remove(key []byte) (prev []byte, removed bool) {
return st.tree.Remove(key)
func (st *iavlStore) Remove(key []byte) {
st.tree.Remove(key)
}
// Iterator implements IterKVStore.
@ -113,39 +110,17 @@ func (st *iavlStore) ReverseIterator(start, end []byte) Iterator {
// First implements IterKVStore.
func (st *iavlStore) First(start, end []byte) (kv KVPair, ok bool) {
iter := st.Iterator(start, end)
if !iter.Valid() {
return kv, false
}
defer iter.Release()
return KVPair{iter.Key(), iter.Value()}, true
return iteratorFirst(st, start, end)
}
// Last implements IterKVStore.
func (st *iavlStore) Last(start, end []byte) (kv KVPair, ok bool) {
iter := st.ReverseIterator(end, start)
if !iter.Valid() {
if v, ok := st.Get(start); ok {
return KVPair{cp(start), cp(v)}, true
} else {
return kv, false
}
}
defer iter.Release()
if bytes.Equal(iter.Key(), end) {
// Skip this one, end is exclusive.
iter.Next()
if !iter.Valid() {
return kv, false
}
}
return KVPair{iter.Key(), iter.Value()}, true
return iteratorLast(st, start, end)
}
//----------------------------------------
// Implements Iterator
type iavlIterator struct {
// Underlying store
tree *iavl.Tree
@ -179,9 +154,9 @@ var _ Iterator = (*iavlIterator)(nil)
// newIAVLIterator will create a new iavlIterator.
// CONTRACT: Caller must release the iavlIterator, as each one creates a new
// goroutine.
func newIAVLIterator(t *iavl.Tree, start, end []byte, ascending bool) *iavlIterator {
itr := &iavlIterator{
tree: t,
func newIAVLIterator(tree *iavl.Tree, start, end []byte, ascending bool) *iavlIterator {
iter := &iavlIterator{
tree: tree,
start: cp(start),
end: cp(end),
ascending: ascending,
@ -189,116 +164,116 @@ func newIAVLIterator(t *iavl.Tree, start, end []byte, ascending bool) *iavlItera
quitCh: make(chan struct{}),
initCh: make(chan struct{}),
}
go itr.iterateRoutine()
go itr.initRoutine()
return itr
go iter.iterateRoutine()
go iter.initRoutine()
return iter
}
// Run this to funnel items from the tree to iterCh.
func (ii *iavlIterator) iterateRoutine() {
ii.tree.IterateRange(
ii.start, ii.end, ii.ascending,
func (iter *iavlIterator) iterateRoutine() {
iter.tree.IterateRange(
iter.start, iter.end, iter.ascending,
func(key, value []byte) bool {
select {
case <-ii.quitCh:
case <-iter.quitCh:
return true // done with iteration.
case ii.iterCh <- KVPair{key, value}:
case iter.iterCh <- KVPair{key, value}:
return false // yay.
}
},
)
close(ii.iterCh) // done.
close(iter.iterCh) // done.
}
// Run this to fetch the first item.
func (ii *iavlIterator) initRoutine() {
ii.receiveNext()
close(ii.initCh)
func (iter *iavlIterator) initRoutine() {
iter.receiveNext()
close(iter.initCh)
}
// Domain implements Iterator
func (ii *iavlIterator) Domain() (start, end []byte) {
return ii.start, ii.end
func (iter *iavlIterator) Domain() (start, end []byte) {
return iter.start, iter.end
}
// Valid implements Iterator
func (ii *iavlIterator) Valid() bool {
ii.waitInit()
ii.mtx.Lock()
defer ii.mtx.Unlock()
func (iter *iavlIterator) Valid() bool {
iter.waitInit()
iter.mtx.Lock()
defer iter.mtx.Unlock()
return !ii.invalid
return !iter.invalid
}
// Next implements Iterator
func (ii *iavlIterator) Next() {
ii.waitInit()
ii.mtx.Lock()
defer ii.mtx.Unlock()
ii.assertIsValid()
func (iter *iavlIterator) Next() {
iter.waitInit()
iter.mtx.Lock()
defer iter.mtx.Unlock()
iter.assertIsValid()
ii.receiveNext()
iter.receiveNext()
}
// Key implements Iterator
func (ii *iavlIterator) Key() []byte {
ii.waitInit()
ii.mtx.Lock()
defer ii.mtx.Unlock()
ii.assertIsValid()
func (iter *iavlIterator) Key() []byte {
iter.waitInit()
iter.mtx.Lock()
defer iter.mtx.Unlock()
iter.assertIsValid()
return ii.key
return iter.key
}
// Value implements Iterator
func (ii *iavlIterator) Value() []byte {
ii.waitInit()
ii.mtx.Lock()
defer ii.mtx.Unlock()
ii.assertIsValid()
func (iter *iavlIterator) Value() []byte {
iter.waitInit()
iter.mtx.Lock()
defer iter.mtx.Unlock()
iter.assertIsValid()
return ii.value
return iter.value
}
// Release implements Iterator
func (ii *iavlIterator) Release() {
close(ii.quitCh)
func (iter *iavlIterator) Release() {
close(iter.quitCh)
}
//----------------------------------------
func (ii *iavlIterator) setNext(key, value []byte) {
ii.mtx.Lock()
defer ii.mtx.Unlock()
ii.assertIsValid()
func (iter *iavlIterator) setNext(key, value []byte) {
iter.mtx.Lock()
defer iter.mtx.Unlock()
iter.assertIsValid()
ii.key = key
ii.value = value
iter.key = key
iter.value = value
}
func (ii *iavlIterator) setInvalid() {
ii.mtx.Lock()
defer ii.mtx.Unlock()
ii.assertIsValid()
func (iter *iavlIterator) setInvalid() {
iter.mtx.Lock()
defer iter.mtx.Unlock()
iter.assertIsValid()
ii.invalid = true
iter.invalid = true
}
func (ii *iavlIterator) waitInit() {
<-ii.initCh
func (iter *iavlIterator) waitInit() {
<-iter.initCh
}
func (ii *iavlIterator) receiveNext() {
kvPair, ok := <-ii.iterCh
func (iter *iavlIterator) receiveNext() {
kvPair, ok := <-iter.iterCh
if ok {
ii.setNext(kvPair.Key, kvPair.Value)
iter.setNext(kvPair.Key, kvPair.Value)
} else {
ii.setInvalid()
iter.setInvalid()
}
}
func (ii *iavlIterator) assertIsValid() {
if ii.invalid {
func (iter *iavlIterator) assertIsValid() {
if iter.invalid {
panic("invalid iterator")
}
}

52
store/memiterator.go Normal file
View File

@ -0,0 +1,52 @@
package store
// Iterates over iterKVCache items.
// if key is nil, means it was deleted.
// Implements Iterator.
type memIterator struct {
start, end []byte
items []KVPair
}
func newMemIterator(start, end []byte, items []KVPair) *memIterator {
return &memIterator{
start: start,
end: end,
items: items,
}
}
func (mi *memIterator) Domain() ([]byte, []byte) {
return mi.start, mi.end
}
func (mi *memIterator) Valid() bool {
return len(mi.items) > 0
}
func (mi *memIterator) assertValid() {
if !mi.Valid() {
panic("memIterator is invalid")
}
}
func (mi *memIterator) Next() {
mi.assertValid()
mi.items = mi.items[1:]
}
func (mi *memIterator) Key() []byte {
mi.assertValid()
return mi.items[0].Key
}
func (mi *memIterator) Value() []byte {
mi.assertValid()
return mi.items[0].Value
}
func (mi *memIterator) Release() {
mi.start = nil
mi.end = nil
mi.items = nil
}

View File

@ -52,10 +52,18 @@ type CommitStoreLoader func(id CommitID) (CommitStore, error)
// KVStore is a simple interface to get/set data
type KVStore interface {
Set(key, value []byte) (prev []byte)
Get(key []byte) (value []byte, exists bool)
Has(key []byte) (exists bool)
Remove(key []byte) (prev []byte, removed bool)
// Get returns nil iff key doesn't exist. Panics on nil key.
Get(key []byte) []byte
// Set sets the key. Panics on nil key.
Set(key, value []byte)
// Has checks if a key exists. Panics on nil key.
Has(key []byte) bool
// Remove deletes the key. Panics on nil key.
Remove(key []byte)
// CacheKVStore() wraps a thing with a cache. After
// calling .Write() on the CacheKVStore, all previous