From af7a621440df3b757b56427837280bdcd23d23a4 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Mon, 11 Dec 2017 23:30:44 -0800 Subject: [PATCH] cacheMergeIterator (#298) --- store/cacheiterkvstore.go | 255 ++++++++++++++++++++++++++++++++++++ store/cachemergeiterator.go | 237 +++++++++++++++++++++++++++++++++ store/firstlast.go | 36 +++++ store/iavlstore.go | 163 ++++++++++------------- store/memiterator.go | 52 ++++++++ store/types.go | 16 ++- 6 files changed, 661 insertions(+), 98 deletions(-) create mode 100644 store/cacheiterkvstore.go create mode 100644 store/cachemergeiterator.go create mode 100644 store/firstlast.go create mode 100644 store/memiterator.go diff --git a/store/cacheiterkvstore.go b/store/cacheiterkvstore.go new file mode 100644 index 0000000000..7a6417f220 --- /dev/null +++ b/store/cacheiterkvstore.go @@ -0,0 +1,255 @@ +package store + +// TODO: Consider merge w/ tendermint/tmlibs/db/cachedb.go. + +import ( + "bytes" + "sort" + "sync" + "sync/atomic" +) + +// If value is nil but deleted is false, it means the parent doesn't have the +// key. (No need to delete upon Write()) +type cValue struct { + value []byte + deleted bool + dirty bool +} + +// cacheIterKVStore wraps an in-memory cache around an underlying IterKVStore. +type cacheIterKVStore struct { + mtx sync.Mutex + cache map[string]cValue + parent IterKVStore + lockVersion interface{} + + cwwMutex +} + +var _ CacheIterKVStore = (*cacheIterKVStore)(nil) + +// Users should typically not be required to call NewCacheIterKVStore directly, as the +// IterKVStore implementations here provide a .CacheIterKVStore() function already. +// `lockVersion` is typically provided by parent.GetWriteLockVersion(). +func NewCacheIterKVStore(parent IterKVStore, lockVersion interface{}) *cacheIterKVStore { + ci := &cacheIterKVStore{ + cache: make(map[string]cValue), + parent: parent, + lockVersion: lockVersion, + cwwMutex: NewCWWMutex(), + } + return ci +} + +func (ci *cacheIterKVStore) Get(key []byte) (value []byte) { + ci.mtx.Lock() + defer ci.mtx.Unlock() + ci.assertValidKey(key) + + cacheValue, ok := ci.cache[string(key)] + if !ok { + value = ci.parent.Get(key) + ci.setCacheValue(key, value, false, false) + } else { + value = cacheValue.value + } + + return value +} + +func (ci *cacheIterKVStore) Set(key []byte, value []byte) { + ci.mtx.Lock() + defer ci.mtx.Unlock() + ci.assertValidKey(key) + + ci.setCacheValue(key, value, false, true) +} + +func (ci *cacheIterKVStore) Has(key []byte) bool { + value := ci.Get(key) + return value != nil +} + +func (ci *cacheIterKVStore) Remove(key []byte) { + ci.mtx.Lock() + defer ci.mtx.Unlock() + ci.assertValidKey(key) + + ci.setCacheValue(key, nil, true, true) +} + +// Write writes pending updates to the parent database and clears the cache. +// NOTE: Not atomic. +func (ci *cacheIterKVStore) Write() { + ci.mtx.Lock() + defer ci.mtx.Unlock() + + // Optional sanity check to ensure that cacheIterKVStore is valid + if parent, ok := ci.parent.(WriteLocker); ok { + if parent.TryWriteLock(ci.lockVersion) { + // All good! + } else { + panic("parent.Write() failed. Did this CacheIterKVStore expire?") + } + } + + // We need a copy of all of the keys. + // Not the best, but probably not a bottleneck depending. + keys := make([]string, 0, len(ci.cache)) + for key, dbValue := range ci.cache { + if dbValue.dirty { + keys = append(keys, key) + } + } + sort.Strings(keys) + + // TODO in tmlibs/db we use Batch to write atomically. + // Consider locking the underlying IterKVStore during write. + for _, key := range keys { + cacheValue := ci.cache[key] + if cacheValue.deleted { + ci.parent.Remove([]byte(key)) + } else if cacheValue.value == nil { + // Skip, it already doesn't exist in parent. + } else { + ci.parent.Set([]byte(key), cacheValue.value) + } + } + + // Clear the cache + ci.cache = make(map[string]cValue) +} + +//---------------------------------------- +// To cache-wrap this cacheIterKVStore further. + +func (ci *cacheIterKVStore) CacheWrap() CacheWrap { + return ci.CacheIterKVStore() +} + +func (ci *cacheIterKVStore) CacheKVStore() CacheKVStore { + return ci.CacheIterKVStore() +} + +func (ci *cacheIterKVStore) CacheIterKVStore() CacheIterKVStore { + return NewCacheIterKVStore(ci, ci.GetWriteLockVersion()) +} + +// If the parent parent DB implements this, (e.g. such as a cacheIterKVStore +// parent to a cacheIterKVStore child), cacheIterKVStore will call +// `parent.TryWriteLock()` before attempting to write. +// This prevents multiple siblings from Write'ing to the parent. +type WriteLocker interface { + GetWriteLockVersion() (lockVersion interface{}) + TryWriteLock(lockVersion interface{}) bool +} + +// Implements TryWriteLocker. Embed this in DB structs if desired. +type cwwMutex struct { + mtx sync.Mutex + // CONTRACT: reading/writing to `*written` should use `atomic.*`. + // CONTRACT: replacing `written` with another *int32 should use `.mtx`. + written *int32 +} + +func NewCWWMutex() cwwMutex { + return cwwMutex{ + written: new(int32), + } +} + +func (cww *cwwMutex) GetWriteLockVersion() interface{} { + cww.mtx.Lock() + defer cww.mtx.Unlock() + + // `written` works as a "version" object because it gets replaced upon + // successful TryWriteLock. + return cww.written +} + +func (cww *cwwMutex) TryWriteLock(version interface{}) bool { + cww.mtx.Lock() + defer cww.mtx.Unlock() + + if version != cww.written { + return false // wrong "WriteLockVersion" + } + if !atomic.CompareAndSwapInt32(cww.written, 0, 1) { + return false // already written + } + + // New "WriteLockVersion" + cww.written = new(int32) + return true +} + +//---------------------------------------- +// Iteration + +func (ci *cacheIterKVStore) Iterator(start, end []byte) Iterator { + return ci.iterator(start, end, true) +} + +func (ci *cacheIterKVStore) ReverseIterator(start, end []byte) Iterator { + return ci.iterator(start, end, false) +} + +func (ci *cacheIterKVStore) iterator(start, end []byte, ascending bool) Iterator { + var parent, cache Iterator + if ascending { + parent = ci.parent.Iterator(start, end) + } else { + parent = ci.parent.ReverseIterator(start, end) + } + items := ci.dirtyItems(ascending) + cache = newMemIterator(start, end, items) + return newCacheMergeIterator(parent, cache, ascending) +} + +func (ci *cacheIterKVStore) First(start, end []byte) (kv KVPair, ok bool) { + return iteratorFirst(ci, start, end) +} + +func (ci *cacheIterKVStore) Last(start, end []byte) (kv KVPair, ok bool) { + return iteratorLast(ci, start, end) +} + +// Constructs a slice of dirty items, to use w/ memIterator. +func (ci *cacheIterKVStore) dirtyItems(ascending bool) []KVPair { + items := make([]KVPair, 0, len(ci.cache)) + for key, cacheValue := range ci.cache { + if !cacheValue.dirty { + continue + } + items = append(items, + KVPair{[]byte(key), cacheValue.value}) + } + sort.Slice(items, func(i, j int) bool { + if ascending { + return bytes.Compare(items[i].Key, items[j].Key) < 0 + } else { + return bytes.Compare(items[i].Key, items[j].Key) > 0 + } + }) + return items +} + +//---------------------------------------- +// etc + +func (ci *cacheIterKVStore) assertValidKey(key []byte) { + if key == nil { + panic("key is nil") + } +} + +// Only entrypoint to mutate ci.cache. +func (ci *cacheIterKVStore) setCacheValue(key, value []byte, deleted bool, dirty bool) { + cacheValue := cValue{ + value: value, + deleted: deleted, + dirty: dirty, + } + ci.cache[string(key)] = cacheValue +} diff --git a/store/cachemergeiterator.go b/store/cachemergeiterator.go new file mode 100644 index 0000000000..68f63a9b53 --- /dev/null +++ b/store/cachemergeiterator.go @@ -0,0 +1,237 @@ +package store + +import "bytes" + +// cacheMergeIterator merges a parent Iterator and a cache Iterator. +// The cache iterator may return nil keys to signal that an item +// had been deleted (but not deleted in the parent). +// If the cache iterator has the same key as the parent, the +// cache shadows (overrides) the parent. +// +// TODO: Optimize by memoizing. +type cacheMergeIterator struct { + parent Iterator + cache Iterator + ascending bool +} + +var _ Iterator = (*cacheMergeIterator)(nil) + +func newCacheMergeIterator(parent, cache Iterator, ascending bool) *cacheMergeIterator { + iter := &cacheMergeIterator{ + parent: parent, + cache: cache, + ascending: ascending, + } + return iter +} + +// Domain implements Iterator. +// If the domains are different, returns the union. +func (iter *cacheMergeIterator) Domain() (start, end []byte) { + startP, endP := iter.parent.Domain() + startC, endC := iter.cache.Domain() + if iter.compare(startP, startC) < 0 { + start = startP + } else { + start = startC + } + if iter.compare(endP, endC) < 0 { + end = endC + } else { + end = endP + } + return start, end +} + +// Valid implements Iterator. +func (iter *cacheMergeIterator) Valid() bool { + + // If parent is valid, this is valid. + if iter.parent.Valid() { + return true + } + + // Otherwise depends on child. + iter.skipCacheDeletes(nil) + return iter.cache.Valid() +} + +// Next implements Iterator +func (iter *cacheMergeIterator) Next() { + iter.skipUntilExistsOrInvalid() + iter.assertValid() + + // If parent is invalid, get the next cache item. + if !iter.parent.Valid() { + iter.cache.Next() + return + } + + // If cache is invalid, get the next parent item. + if !iter.cache.Valid() { + iter.parent.Next() + return + } + + // Both are valid. Compare keys. + keyP, keyC := iter.parent.Key(), iter.cache.Key() + cmp := iter.compare(keyP, keyC) + switch cmp { + case -1: // parent < cache + iter.parent.Next() + case 0: // parent == cache + iter.parent.Next() + iter.cache.Next() + case 1: // parent > cache + iter.cache.Next() + } +} + +// Key implements Iterator +func (iter *cacheMergeIterator) Key() []byte { + iter.skipUntilExistsOrInvalid() + iter.assertValid() + + // If parent is invalid, get the cache key. + if !iter.parent.Valid() { + return iter.cache.Key() + } + + // If cache is invalid, get the parent key. + if !iter.cache.Valid() { + return iter.parent.Key() + } + + // Both are valid. Compare keys. + keyP, keyC := iter.parent.Key(), iter.cache.Key() + cmp := iter.compare(keyP, keyC) + switch cmp { + case -1: // parent < cache + return keyP + case 0: // parent == cache + return keyP + case 1: // parent > cache + return keyC + default: + panic("invalid compare result") + } +} + +// Value implements Iterator +func (iter *cacheMergeIterator) Value() []byte { + iter.skipUntilExistsOrInvalid() + iter.assertValid() + + // If parent is invalid, get the cache value. + if !iter.parent.Valid() { + return iter.cache.Value() + } + + // If cache is invalid, get the parent value. + if !iter.cache.Valid() { + return iter.parent.Value() + } + + // Both are valid. Compare keys. + keyP, keyC := iter.parent.Key(), iter.cache.Key() + cmp := iter.compare(keyP, keyC) + switch cmp { + case -1: // parent < cache + return iter.parent.Value() + case 0: // parent == cache + return iter.cache.Value() + case 1: // parent > cache + return iter.cache.Value() + default: + panic("invalid comparison result") + } +} + +// Release implements Iterator +func (iter *cacheMergeIterator) Release() { + iter.parent.Release() + iter.cache.Release() +} + +// Like bytes.Compare but opposite if not ascending. +func (iter *cacheMergeIterator) compare(a, b []byte) int { + if iter.ascending { + return bytes.Compare(a, b) + } else { + return bytes.Compare(a, b) * -1 + } +} + +// Skip all delete-items from the cache w/ `key < until`. After this function, +// current item is a non-delete-item, or `until <= key`. +// If the current item is not a delete item, does noting. +// If `until` is nil, there is no limit. +// CONTRACT: cache is valid. +func (iter *cacheMergeIterator) skipCacheDeletes(until []byte) { + for (until == nil || iter.compare(iter.cache.Key(), until) < 0) && + iter.cache.Value() == nil { + + iter.cache.Next() + if !iter.cache.Valid() { + return + } + } +} + +// Fast forwards cache (or parent+cache in case of deleted items) until current +// item exists, or until iterator becomes invalid. +func (iter *cacheMergeIterator) skipUntilExistsOrInvalid() { + for { + + // Invalid. + if !iter.Valid() { + return + } + + // Parent and Cache items exist. + keyP, keyC := iter.parent.Key(), iter.cache.Key() + cmp := iter.compare(keyP, keyC) + switch cmp { + + // parent < cache + case -1: + + // Parent exists. + return + + // parent == cache + case 0: + + // Skip over if cache item is a delete. + valueC := iter.cache.Value() + if valueC == nil { + iter.parent.Next() + iter.cache.Next() + continue + } + // Child shadows parent. + return + + // parent > cache + case 1: + + // Skip over if cache item is a delete. + valueC := iter.cache.Value() + if valueC == nil { + iter.skipCacheDeletes(keyP) + continue + } + // Child exists. + return + } + } +} + +// If not valid, panics. +// NOTE: May have side-effect of iterating over cache. +func (iter *cacheMergeIterator) assertValid() { + if !iter.Valid() { + panic("iterator is invalid") + } +} diff --git a/store/firstlast.go b/store/firstlast.go new file mode 100644 index 0000000000..1d5b06cd3f --- /dev/null +++ b/store/firstlast.go @@ -0,0 +1,36 @@ +package store + +import "bytes" + +// Convenience for implemntation of IterKVCache.First using IterKVCache.Iterator +func iteratorFirst(st IterKVStore, start, end []byte) (kv KVPair, ok bool) { + iter := st.Iterator(start, end) + if !iter.Valid() { + return kv, false + } + defer iter.Release() + return KVPair{iter.Key(), iter.Value()}, true +} + +// Convenience for implemntation of IterKVCache.Last using IterKVCache.ReverseIterator +func iteratorLast(st IterKVStore, start, end []byte) (kv KVPair, ok bool) { + iter := st.ReverseIterator(end, start) + if !iter.Valid() { + if v := st.Get(start); v != nil { + return KVPair{cp(start), cp(v)}, true + } else { + return kv, false + } + } + defer iter.Release() + + if bytes.Equal(iter.Key(), end) { + // Skip this one, end is exclusive. + iter.Next() + if !iter.Valid() { + return kv, false + } + } + + return KVPair{iter.Key(), iter.Value()}, true +} diff --git a/store/iavlstore.go b/store/iavlstore.go index 772d3d0e80..a909414626 100644 --- a/store/iavlstore.go +++ b/store/iavlstore.go @@ -1,7 +1,6 @@ package store import ( - "bytes" "sync" "github.com/tendermint/iavl" @@ -79,16 +78,14 @@ func (st *iavlStore) CacheIterKVStore() CacheIterKVStore { } // Set implements IterKVStore. -func (st *iavlStore) Set(key, value []byte) (prev []byte) { - _, prev = st.tree.Get(key) +func (st *iavlStore) Set(key, value []byte) { st.tree.Set(key, value) - return prev } // Get implements IterKVStore. -func (st *iavlStore) Get(key []byte) (value []byte, exists bool) { +func (st *iavlStore) Get(key []byte) (value []byte) { _, v := st.tree.Get(key) - return v, (v != nil) + return v } // Has implements IterKVStore. @@ -97,8 +94,8 @@ func (st *iavlStore) Has(key []byte) (exists bool) { } // Remove implements IterKVStore. -func (st *iavlStore) Remove(key []byte) (prev []byte, removed bool) { - return st.tree.Remove(key) +func (st *iavlStore) Remove(key []byte) { + st.tree.Remove(key) } // Iterator implements IterKVStore. @@ -113,39 +110,17 @@ func (st *iavlStore) ReverseIterator(start, end []byte) Iterator { // First implements IterKVStore. func (st *iavlStore) First(start, end []byte) (kv KVPair, ok bool) { - iter := st.Iterator(start, end) - if !iter.Valid() { - return kv, false - } - defer iter.Release() - return KVPair{iter.Key(), iter.Value()}, true + return iteratorFirst(st, start, end) } // Last implements IterKVStore. func (st *iavlStore) Last(start, end []byte) (kv KVPair, ok bool) { - iter := st.ReverseIterator(end, start) - if !iter.Valid() { - if v, ok := st.Get(start); ok { - return KVPair{cp(start), cp(v)}, true - } else { - return kv, false - } - } - defer iter.Release() - - if bytes.Equal(iter.Key(), end) { - // Skip this one, end is exclusive. - iter.Next() - if !iter.Valid() { - return kv, false - } - } - - return KVPair{iter.Key(), iter.Value()}, true + return iteratorLast(st, start, end) } //---------------------------------------- +// Implements Iterator type iavlIterator struct { // Underlying store tree *iavl.Tree @@ -179,9 +154,9 @@ var _ Iterator = (*iavlIterator)(nil) // newIAVLIterator will create a new iavlIterator. // CONTRACT: Caller must release the iavlIterator, as each one creates a new // goroutine. -func newIAVLIterator(t *iavl.Tree, start, end []byte, ascending bool) *iavlIterator { - itr := &iavlIterator{ - tree: t, +func newIAVLIterator(tree *iavl.Tree, start, end []byte, ascending bool) *iavlIterator { + iter := &iavlIterator{ + tree: tree, start: cp(start), end: cp(end), ascending: ascending, @@ -189,116 +164,116 @@ func newIAVLIterator(t *iavl.Tree, start, end []byte, ascending bool) *iavlItera quitCh: make(chan struct{}), initCh: make(chan struct{}), } - go itr.iterateRoutine() - go itr.initRoutine() - return itr + go iter.iterateRoutine() + go iter.initRoutine() + return iter } // Run this to funnel items from the tree to iterCh. -func (ii *iavlIterator) iterateRoutine() { - ii.tree.IterateRange( - ii.start, ii.end, ii.ascending, +func (iter *iavlIterator) iterateRoutine() { + iter.tree.IterateRange( + iter.start, iter.end, iter.ascending, func(key, value []byte) bool { select { - case <-ii.quitCh: + case <-iter.quitCh: return true // done with iteration. - case ii.iterCh <- KVPair{key, value}: + case iter.iterCh <- KVPair{key, value}: return false // yay. } }, ) - close(ii.iterCh) // done. + close(iter.iterCh) // done. } // Run this to fetch the first item. -func (ii *iavlIterator) initRoutine() { - ii.receiveNext() - close(ii.initCh) +func (iter *iavlIterator) initRoutine() { + iter.receiveNext() + close(iter.initCh) } // Domain implements Iterator -func (ii *iavlIterator) Domain() (start, end []byte) { - return ii.start, ii.end +func (iter *iavlIterator) Domain() (start, end []byte) { + return iter.start, iter.end } // Valid implements Iterator -func (ii *iavlIterator) Valid() bool { - ii.waitInit() - ii.mtx.Lock() - defer ii.mtx.Unlock() +func (iter *iavlIterator) Valid() bool { + iter.waitInit() + iter.mtx.Lock() + defer iter.mtx.Unlock() - return !ii.invalid + return !iter.invalid } // Next implements Iterator -func (ii *iavlIterator) Next() { - ii.waitInit() - ii.mtx.Lock() - defer ii.mtx.Unlock() - ii.assertIsValid() +func (iter *iavlIterator) Next() { + iter.waitInit() + iter.mtx.Lock() + defer iter.mtx.Unlock() + iter.assertIsValid() - ii.receiveNext() + iter.receiveNext() } // Key implements Iterator -func (ii *iavlIterator) Key() []byte { - ii.waitInit() - ii.mtx.Lock() - defer ii.mtx.Unlock() - ii.assertIsValid() +func (iter *iavlIterator) Key() []byte { + iter.waitInit() + iter.mtx.Lock() + defer iter.mtx.Unlock() + iter.assertIsValid() - return ii.key + return iter.key } // Value implements Iterator -func (ii *iavlIterator) Value() []byte { - ii.waitInit() - ii.mtx.Lock() - defer ii.mtx.Unlock() - ii.assertIsValid() +func (iter *iavlIterator) Value() []byte { + iter.waitInit() + iter.mtx.Lock() + defer iter.mtx.Unlock() + iter.assertIsValid() - return ii.value + return iter.value } // Release implements Iterator -func (ii *iavlIterator) Release() { - close(ii.quitCh) +func (iter *iavlIterator) Release() { + close(iter.quitCh) } //---------------------------------------- -func (ii *iavlIterator) setNext(key, value []byte) { - ii.mtx.Lock() - defer ii.mtx.Unlock() - ii.assertIsValid() +func (iter *iavlIterator) setNext(key, value []byte) { + iter.mtx.Lock() + defer iter.mtx.Unlock() + iter.assertIsValid() - ii.key = key - ii.value = value + iter.key = key + iter.value = value } -func (ii *iavlIterator) setInvalid() { - ii.mtx.Lock() - defer ii.mtx.Unlock() - ii.assertIsValid() +func (iter *iavlIterator) setInvalid() { + iter.mtx.Lock() + defer iter.mtx.Unlock() + iter.assertIsValid() - ii.invalid = true + iter.invalid = true } -func (ii *iavlIterator) waitInit() { - <-ii.initCh +func (iter *iavlIterator) waitInit() { + <-iter.initCh } -func (ii *iavlIterator) receiveNext() { - kvPair, ok := <-ii.iterCh +func (iter *iavlIterator) receiveNext() { + kvPair, ok := <-iter.iterCh if ok { - ii.setNext(kvPair.Key, kvPair.Value) + iter.setNext(kvPair.Key, kvPair.Value) } else { - ii.setInvalid() + iter.setInvalid() } } -func (ii *iavlIterator) assertIsValid() { - if ii.invalid { +func (iter *iavlIterator) assertIsValid() { + if iter.invalid { panic("invalid iterator") } } diff --git a/store/memiterator.go b/store/memiterator.go new file mode 100644 index 0000000000..ab5d9b621b --- /dev/null +++ b/store/memiterator.go @@ -0,0 +1,52 @@ +package store + +// Iterates over iterKVCache items. +// if key is nil, means it was deleted. +// Implements Iterator. +type memIterator struct { + start, end []byte + items []KVPair +} + +func newMemIterator(start, end []byte, items []KVPair) *memIterator { + return &memIterator{ + start: start, + end: end, + items: items, + } +} + +func (mi *memIterator) Domain() ([]byte, []byte) { + return mi.start, mi.end +} + +func (mi *memIterator) Valid() bool { + return len(mi.items) > 0 +} + +func (mi *memIterator) assertValid() { + if !mi.Valid() { + panic("memIterator is invalid") + } +} + +func (mi *memIterator) Next() { + mi.assertValid() + mi.items = mi.items[1:] +} + +func (mi *memIterator) Key() []byte { + mi.assertValid() + return mi.items[0].Key +} + +func (mi *memIterator) Value() []byte { + mi.assertValid() + return mi.items[0].Value +} + +func (mi *memIterator) Release() { + mi.start = nil + mi.end = nil + mi.items = nil +} diff --git a/store/types.go b/store/types.go index 2e17f65255..caa6eafa4f 100644 --- a/store/types.go +++ b/store/types.go @@ -52,10 +52,18 @@ type CommitStoreLoader func(id CommitID) (CommitStore, error) // KVStore is a simple interface to get/set data type KVStore interface { - Set(key, value []byte) (prev []byte) - Get(key []byte) (value []byte, exists bool) - Has(key []byte) (exists bool) - Remove(key []byte) (prev []byte, removed bool) + + // Get returns nil iff key doesn't exist. Panics on nil key. + Get(key []byte) []byte + + // Set sets the key. Panics on nil key. + Set(key, value []byte) + + // Has checks if a key exists. Panics on nil key. + Has(key []byte) bool + + // Remove deletes the key. Panics on nil key. + Remove(key []byte) // CacheKVStore() wraps a thing with a cache. After // calling .Write() on the CacheKVStore, all previous