From 72b0ed004bb536524c708f3ddbd8eedb37eabac7 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 13 Dec 2017 01:17:20 -0500 Subject: [PATCH] wip: tests and fixes for kvstore iteration --- store/cachekvstore_test.go | 483 ++++++++++++++++++++++++++++++++++-- store/cachemergeiterator.go | 84 +++---- store/firstlast.go | 4 +- store/iavlstore.go | 4 +- store/memiterator.go | 15 +- store/types.go | 13 + 6 files changed, 527 insertions(+), 76 deletions(-) diff --git a/store/cachekvstore_test.go b/store/cachekvstore_test.go index 7b55e55a13..2564427dba 100644 --- a/store/cachekvstore_test.go +++ b/store/cachekvstore_test.go @@ -3,63 +3,494 @@ package store import ( "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + cmn "github.com/tendermint/tmlibs/common" dbm "github.com/tendermint/tmlibs/db" ) +func newCacheKVStore() CacheKVStore { + mem := dbm.NewMemDB() + return NewCacheKVStore(mem) +} + +func keyFmt(i int) []byte { return bz(cmn.Fmt("key%0.8d", i)) } +func valFmt(i int) []byte { return bz(cmn.Fmt("value%0.8d", i)) } + func TestCacheKVStore(t *testing.T) { mem := dbm.NewMemDB() st := NewCacheKVStore(mem) - require.Empty(t, st.Get(bz("key1")), "Expected `key1` to be empty") + require.Empty(t, st.Get(keyFmt(1)), "Expected `key1` to be empty") - mem.Set(bz("key1"), bz("value1")) - st.Set(bz("key1"), bz("value1")) - require.Equal(t, bz("value1"), st.Get(bz("key1"))) + // put something in mem and in cache + mem.Set(keyFmt(1), valFmt(1)) + st.Set(keyFmt(1), valFmt(1)) + require.Equal(t, valFmt(1), st.Get(keyFmt(1))) - st.Set(bz("key1"), bz("value2")) - require.Equal(t, bz("value2"), st.Get(bz("key1"))) - require.Equal(t, bz("value1"), mem.Get(bz("key1"))) + // update it in cache, shoudn't change mem + st.Set(keyFmt(1), valFmt(2)) + require.Equal(t, valFmt(2), st.Get(keyFmt(1))) + require.Equal(t, valFmt(1), mem.Get(keyFmt(1))) + // write it. should change mem st.Write() - require.Equal(t, bz("value2"), mem.Get(bz("key1"))) + require.Equal(t, valFmt(2), mem.Get(keyFmt(1))) + require.Equal(t, valFmt(2), st.Get(keyFmt(1))) + // more writes and checks st.Write() st.Write() - require.Equal(t, bz("value2"), mem.Get(bz("key1"))) + require.Equal(t, valFmt(2), mem.Get(keyFmt(1))) + require.Equal(t, valFmt(2), st.Get(keyFmt(1))) + // make a new one, check it st = NewCacheKVStore(mem) - st.Delete(bz("key1")) - require.Empty(t, st.Get(bz("key1"))) - require.Equal(t, mem.Get(bz("key1")), bz("value2")) + require.Equal(t, valFmt(2), st.Get(keyFmt(1))) + // make a new one and delete - should not be removed from mem + st = NewCacheKVStore(mem) + st.Delete(keyFmt(1)) + require.Empty(t, st.Get(keyFmt(1))) + require.Equal(t, mem.Get(keyFmt(1)), valFmt(2)) + + // Write. should now be removed from both st.Write() - require.Empty(t, st.Get(bz("key1")), "Expected `key1` to be empty") - require.Empty(t, mem.Get(bz("key1")), "Expected `key1` to be empty") + require.Empty(t, st.Get(keyFmt(1)), "Expected `key1` to be empty") + require.Empty(t, mem.Get(keyFmt(1)), "Expected `key1` to be empty") } func TestCacheKVStoreNested(t *testing.T) { mem := dbm.NewMemDB() st := NewCacheKVStore(mem) - st.Set(bz("key1"), bz("value1")) - require.Empty(t, mem.Get(bz("key1"))) - require.Equal(t, bz("value1"), st.Get(bz("key1"))) + // set. check its there on st and not on mem. + st.Set(keyFmt(1), valFmt(1)) + require.Empty(t, mem.Get(keyFmt(1))) + require.Equal(t, valFmt(1), st.Get(keyFmt(1))) + + // make a new from st and check st2 := NewCacheKVStore(st) - require.Equal(t, bz("value1"), st2.Get(bz("key1"))) + require.Equal(t, valFmt(1), st2.Get(keyFmt(1))) - st2.Set(bz("key1"), bz("VALUE2")) - require.Equal(t, []byte(nil), mem.Get(bz("key1"))) - require.Equal(t, bz("value1"), st.Get(bz("key1"))) - require.Equal(t, bz("VALUE2"), st2.Get(bz("key1"))) + // update the value on st2, check it only effects st2 + st2.Set(keyFmt(1), valFmt(3)) + require.Equal(t, []byte(nil), mem.Get(keyFmt(1))) + require.Equal(t, valFmt(1), st.Get(keyFmt(1))) + require.Equal(t, valFmt(3), st2.Get(keyFmt(1))) + // st2 writes to its parent, st. doesnt effect mem st2.Write() - require.Equal(t, []byte(nil), mem.Get(bz("key1"))) - require.Equal(t, bz("VALUE2"), st.Get(bz("key1"))) + require.Equal(t, []byte(nil), mem.Get(keyFmt(1))) + require.Equal(t, valFmt(3), st.Get(keyFmt(1))) + // updates mem st.Write() - require.Equal(t, bz("VALUE2"), mem.Get(bz("key1"))) - + require.Equal(t, valFmt(3), mem.Get(keyFmt(1))) } +func TestCacheKVIteratorBounds(t *testing.T) { + st := newCacheKVStore() + + // set some items + nItems := 5 + for i := 0; i < nItems; i++ { + st.Set(keyFmt(i), valFmt(i)) + } + + // iterate over all of them + itr := st.Iterator(nil, nil) + var i = 0 + for ; itr.Valid(); itr.Next() { + k, v := itr.Key(), itr.Value() + assert.Equal(t, keyFmt(i), k) + assert.Equal(t, valFmt(i), v) + i += 1 + } + assert.Equal(t, nItems, i) + + // iterate over none + itr = st.Iterator(bz("money"), nil) + i = 0 + for ; itr.Valid(); itr.Next() { + i += 1 + } + assert.Equal(t, 0, i) + + // iterate over lower + itr = st.Iterator(keyFmt(0), keyFmt(3)) + i = 0 + for ; itr.Valid(); itr.Next() { + k, v := itr.Key(), itr.Value() + assert.Equal(t, keyFmt(i), k) + assert.Equal(t, valFmt(i), v) + i += 1 + } + assert.Equal(t, 3, i) + + // iterate over upper + itr = st.Iterator(keyFmt(2), keyFmt(4)) + i = 2 + for ; itr.Valid(); itr.Next() { + k, v := itr.Key(), itr.Value() + assert.Equal(t, keyFmt(i), k) + assert.Equal(t, valFmt(i), v) + i += 1 + } + assert.Equal(t, 4, i) +} + +func TestCacheKVMergeIteratorBasics(t *testing.T) { + st := newCacheKVStore() + + // set and delete an item in the cache, iterator should be empty + k, v := keyFmt(0), valFmt(0) + st.Set(k, v) + st.Delete(k) + assertIterateDomain(t, st, 0) + + // now set it and assert its there + st.Set(k, v) + assertIterateDomain(t, st, 1) + + // write it and assert its there + st.Write() + assertIterateDomain(t, st, 1) + + // remove it in cache and assert its not + st.Delete(k) + assertIterateDomain(t, st, 0) + + // write the delete and assert its not there + st.Write() + assertIterateDomain(t, st, 0) + + // add two keys and assert theyre there + k1, v1 := keyFmt(1), valFmt(1) + st.Set(k, v) + st.Set(k1, v1) + assertIterateDomain(t, st, 2) + + // write it and assert theyre there + st.Write() + assertIterateDomain(t, st, 2) + + // remove one in cache and assert its not + st.Delete(k1) + assertIterateDomain(t, st, 1) + + // write the delete and assert its not there + st.Write() + assertIterateDomain(t, st, 1) + + // delete the other key in cache and asserts its empty + st.Delete(k) + assertIterateDomain(t, st, 0) +} + +func TestCacheKVMergeIteratorDeleteLast(t *testing.T) { + st := newCacheKVStore() + + // set some items and write them + nItems := 5 + for i := 0; i < nItems; i++ { + st.Set(keyFmt(i), valFmt(i)) + } + st.Write() + + // set some more items and leave dirty + for i := nItems; i < nItems*2; i++ { + st.Set(keyFmt(i), valFmt(i)) + } + + // iterate over all of them + assertIterateDomain(t, st, nItems*2) + + // delete them all + for i := 0; i < nItems*2; i++ { + last := nItems*2 - 1 - i + st.Delete(keyFmt(last)) + assertIterateDomain(t, st, last) + } +} + +func TestCacheKVMergeIteratorDeletes(t *testing.T) { + st := newCacheKVStore() + truth := dbm.NewMemDB() + + // set some items and write them + nItems := 10 + for i := 0; i < nItems; i++ { + doOp(st, truth, opSet, i) + } + st.Write() + + // delete every other item, starting from 0 + for i := 0; i < nItems; i += 2 { + doOp(st, truth, opDel, i) + assertIterateDomainCompare(t, st, truth) + } + + // reset + st = newCacheKVStore() + truth = dbm.NewMemDB() + + // set some items and write them + for i := 0; i < nItems; i++ { + doOp(st, truth, opSet, i) + } + st.Write() + + // delete every other item, starting from 1 + for i := 1; i < nItems; i += 2 { + doOp(st, truth, opDel, i) + assertIterateDomainCompare(t, st, truth) + } +} + +func TestCacheKVMergeIteratorChunks(t *testing.T) { + st := newCacheKVStore() + + // Use the truth to check values on the merge iterator + truth := dbm.NewMemDB() + + // sets to the parent + setRange(st, truth, 0, 20) + setRange(st, truth, 40, 60) + st.Write() + + // sets to the cache + setRange(st, truth, 20, 40) + setRange(st, truth, 60, 80) + assertIterateDomainCheck(t, st, truth, []keyRange{{0, 80}}) + + // remove some parents and some cache + deleteRange(st, truth, 15, 25) + assertIterateDomainCheck(t, st, truth, []keyRange{{0, 15}, {25, 80}}) + + // remove some parents and some cache + deleteRange(st, truth, 35, 45) + assertIterateDomainCheck(t, st, truth, []keyRange{{0, 15}, {25, 35}, {45, 80}}) + + // write, add more to the cache, and delete some cache + st.Write() + setRange(st, truth, 38, 42) + deleteRange(st, truth, 40, 43) + assertIterateDomainCheck(t, st, truth, []keyRange{{0, 15}, {25, 35}, {38, 40}, {45, 80}}) +} + +func TestCacheKVMergeIteratorRandom(t *testing.T) { + st := newCacheKVStore() + truth := dbm.NewMemDB() + + start, end := 25, 75 + max := 100 + setRange(st, truth, start, end) + + // do an op, test the iterator + for i := 0; i < 2000; i++ { + doRandomOp(st, truth, max) + assertIterateDomainCompare(t, st, truth) + } +} + +//------------------------------------------------------------------------------------------- +// do some random ops + +const ( + opSet = 0 + opSetRange = 1 + opDel = 2 + opDelRange = 3 + opWrite = 4 + + totalOps = 5 // number of possible operations +) + +func randInt(n int) int { + return cmn.RandInt() % n +} + +// useful for replaying a error case if we find one +func doOp(st CacheKVStore, truth dbm.DB, op int, args ...int) { + switch op { + case opSet: + k := args[0] + st.Set(keyFmt(k), valFmt(k)) + truth.Set(keyFmt(k), valFmt(k)) + case opSetRange: + start := args[0] + end := args[1] + setRange(st, truth, start, end) + case opDel: + k := args[0] + st.Delete(keyFmt(k)) + truth.Delete(keyFmt(k)) + case opDelRange: + start := args[0] + end := args[1] + deleteRange(st, truth, start, end) + case opWrite: + st.Write() + } +} + +func doRandomOp(st CacheKVStore, truth dbm.DB, maxKey int) { + r := randInt(totalOps) + switch r { + case opSet: + k := randInt(maxKey) + st.Set(keyFmt(k), valFmt(k)) + truth.Set(keyFmt(k), valFmt(k)) + case opSetRange: + start := randInt(maxKey - 2) + end := randInt(maxKey-start) + start + setRange(st, truth, start, end) + case opDel: + k := randInt(maxKey) + st.Delete(keyFmt(k)) + truth.Delete(keyFmt(k)) + case opDelRange: + start := randInt(maxKey - 2) + end := randInt(maxKey-start) + start + deleteRange(st, truth, start, end) + case opWrite: + st.Write() + } +} + +//------------------------------------------------------------------------------------------- + +// iterate over whole domain +func assertIterateDomain(t *testing.T, st KVStore, expectedN int) { + itr := st.Iterator(nil, nil) + var i = 0 + for ; itr.Valid(); itr.Next() { + k, v := itr.Key(), itr.Value() + assert.Equal(t, keyFmt(i), k) + assert.Equal(t, valFmt(i), v) + i += 1 + } + assert.Equal(t, expectedN, i) +} + +func assertIterateDomainCheck(t *testing.T, st KVStore, mem dbm.DB, r []keyRange) { + // iterate over each and check they match the other + itr := st.Iterator(nil, nil) + itr2 := mem.Iterator(nil, nil) // ground truth + + krc := newKeyRangeCounter(r) + i := 0 + + for ; krc.valid(); krc.next() { + assert.True(t, itr.Valid()) + assert.True(t, itr2.Valid()) + + // check the key/val matches the ground truth + k, v := itr.Key(), itr.Value() + k2, v2 := itr2.Key(), itr2.Value() + assert.Equal(t, k, k2) + assert.Equal(t, v, v2) + + // check they match the counter + assert.Equal(t, k, keyFmt(krc.key())) + + itr.Next() + itr2.Next() + i += 1 + } + + assert.False(t, itr.Valid()) + assert.False(t, itr2.Valid()) +} + +func assertIterateDomainCompare(t *testing.T, st KVStore, mem dbm.DB) { + // iterate over each and check they match the other + itr := st.Iterator(nil, nil) + itr2 := mem.Iterator(nil, nil) // ground truth + checkIterators(t, itr, itr2) + checkIterators(t, itr2, itr) +} + +func checkIterators(t *testing.T, itr, itr2 Iterator) { + for ; itr.Valid(); itr.Next() { + assert.True(t, itr2.Valid()) + k, v := itr.Key(), itr.Value() + k2, v2 := itr2.Key(), itr2.Value() + assert.Equal(t, k, k2) + assert.Equal(t, v, v2) + itr2.Next() + } + assert.False(t, itr.Valid()) + assert.False(t, itr2.Valid()) +} + +//-------------------------------------------------------- + +func setRange(st KVStore, mem dbm.DB, start, end int) { + for i := start; i < end; i++ { + st.Set(keyFmt(i), valFmt(i)) + mem.Set(keyFmt(i), valFmt(i)) + } +} + +func deleteRange(st KVStore, mem dbm.DB, start, end int) { + for i := start; i < end; i++ { + st.Delete(keyFmt(i)) + mem.Delete(keyFmt(i)) + } +} + +//-------------------------------------------------------- + +type keyRange struct { + start int + end int +} + +func (kr keyRange) len() int { + return kr.end - kr.start +} + +func newKeyRangeCounter(kr []keyRange) *keyRangeCounter { + return &keyRangeCounter{keyRanges: kr} +} + +// we can iterate over this and make sure our real iterators have all the right keys +type keyRangeCounter struct { + rangeIdx int + idx int + keyRanges []keyRange +} + +func (krc *keyRangeCounter) valid() bool { + maxRangeIdx := len(krc.keyRanges) - 1 + maxRange := krc.keyRanges[maxRangeIdx] + + // if we're not in the max range, we're valid + if krc.rangeIdx <= maxRangeIdx && + krc.idx < maxRange.len() { + return true + } + + return false +} + +func (krc *keyRangeCounter) next() { + thisKeyRange := krc.keyRanges[krc.rangeIdx] + if krc.idx == thisKeyRange.len()-1 { + krc.rangeIdx += 1 + krc.idx = 0 + } else { + krc.idx += 1 + } +} + +func (krc *keyRangeCounter) key() int { + thisKeyRange := krc.keyRanges[krc.rangeIdx] + return thisKeyRange.start + krc.idx +} + +//-------------------------------------------------------- + func bz(s string) []byte { return []byte(s) } diff --git a/store/cachemergeiterator.go b/store/cachemergeiterator.go index 68f63a9b53..b6c50c42cc 100644 --- a/store/cachemergeiterator.go +++ b/store/cachemergeiterator.go @@ -1,6 +1,8 @@ package store -import "bytes" +import ( + "bytes" +) // cacheMergeIterator merges a parent Iterator and a cache Iterator. // The cache iterator may return nil keys to signal that an item @@ -46,15 +48,7 @@ func (iter *cacheMergeIterator) Domain() (start, end []byte) { // Valid implements Iterator. func (iter *cacheMergeIterator) Valid() bool { - - // If parent is valid, this is valid. - if iter.parent.Valid() { - return true - } - - // Otherwise depends on child. - iter.skipCacheDeletes(nil) - return iter.cache.Valid() + return iter.skipUntilExistsOrInvalid() } // Next implements Iterator @@ -76,8 +70,7 @@ func (iter *cacheMergeIterator) Next() { // Both are valid. Compare keys. keyP, keyC := iter.parent.Key(), iter.cache.Key() - cmp := iter.compare(keyP, keyC) - switch cmp { + switch iter.compare(keyP, keyC) { case -1: // parent < cache iter.parent.Next() case 0: // parent == cache @@ -148,10 +141,10 @@ func (iter *cacheMergeIterator) Value() []byte { } } -// Release implements Iterator -func (iter *cacheMergeIterator) Release() { - iter.parent.Release() - iter.cache.Release() +// Close implements Iterator +func (iter *cacheMergeIterator) Close() { + iter.parent.Close() + iter.cache.Close() } // Like bytes.Compare but opposite if not ascending. @@ -164,44 +157,46 @@ func (iter *cacheMergeIterator) compare(a, b []byte) int { } // Skip all delete-items from the cache w/ `key < until`. After this function, -// current item is a non-delete-item, or `until <= key`. -// If the current item is not a delete item, does noting. -// If `until` is nil, there is no limit. +// current cache item is a non-delete-item, or `until <= key`. +// If the current cache item is not a delete item, does nothing. +// If `until` is nil, there is no limit, and cache may end up invalid. // CONTRACT: cache is valid. func (iter *cacheMergeIterator) skipCacheDeletes(until []byte) { - for (until == nil || iter.compare(iter.cache.Key(), until) < 0) && - iter.cache.Value() == nil { + for iter.cache.Valid() && + iter.cache.Value() == nil && + (until == nil || iter.compare(iter.cache.Key(), until) < 0) { iter.cache.Next() - if !iter.cache.Valid() { - return - } } } // Fast forwards cache (or parent+cache in case of deleted items) until current // item exists, or until iterator becomes invalid. -func (iter *cacheMergeIterator) skipUntilExistsOrInvalid() { +// Returns whether the iterator is valid. +func (iter *cacheMergeIterator) skipUntilExistsOrInvalid() bool { for { - // Invalid. - if !iter.Valid() { - return + // If parent is invalid, fast-forward cache. + if !iter.parent.Valid() { + iter.skipCacheDeletes(nil) + return iter.cache.Valid() } + // Parent is valid. - // Parent and Cache items exist. - keyP, keyC := iter.parent.Key(), iter.cache.Key() - cmp := iter.compare(keyP, keyC) - switch cmp { + if !iter.cache.Valid() { + return true + } + // Parent is valid, cache is valid. - // parent < cache - case -1: + // Compare parent and cache. + keyP := iter.parent.Key() + keyC := iter.cache.Key() + switch iter.compare(keyP, keyC) { - // Parent exists. - return + case -1: // parent < cache. + return true - // parent == cache - case 0: + case 0: // parent == cache. // Skip over if cache item is a delete. valueC := iter.cache.Value() @@ -210,11 +205,11 @@ func (iter *cacheMergeIterator) skipUntilExistsOrInvalid() { iter.cache.Next() continue } - // Child shadows parent. - return + // Cache is not a delete. - // parent > cache - case 1: + return true // cache exists. + + case 1: // cache < parent // Skip over if cache item is a delete. valueC := iter.cache.Value() @@ -222,8 +217,9 @@ func (iter *cacheMergeIterator) skipUntilExistsOrInvalid() { iter.skipCacheDeletes(keyP) continue } - // Child exists. - return + // Cache is not a delete. + + return true // cache exists. } } } diff --git a/store/firstlast.go b/store/firstlast.go index 57910a67ad..661a08c263 100644 --- a/store/firstlast.go +++ b/store/firstlast.go @@ -8,7 +8,7 @@ func First(st KVStore, start, end []byte) (kv KVPair, ok bool) { if !iter.Valid() { return kv, false } - defer iter.Release() + defer iter.Close() return KVPair{iter.Key(), iter.Value()}, true } @@ -23,7 +23,7 @@ func Last(st KVStore, start, end []byte) (kv KVPair, ok bool) { return kv, false } } - defer iter.Release() + defer iter.Close() if bytes.Equal(iter.Key(), end) { // Skip this one, end is exclusive. diff --git a/store/iavlstore.go b/store/iavlstore.go index 9816d5bc51..f7dc3d6f54 100644 --- a/store/iavlstore.go +++ b/store/iavlstore.go @@ -234,8 +234,8 @@ func (iter *iavlIterator) Value() []byte { return iter.value } -// Release implements Iterator -func (iter *iavlIterator) Release() { +// Close implements Iterator +func (iter *iavlIterator) Close() { close(iter.quitCh) } diff --git a/store/memiterator.go b/store/memiterator.go index ab5d9b621b..8c1283958b 100644 --- a/store/memiterator.go +++ b/store/memiterator.go @@ -1,5 +1,9 @@ package store +import ( + dbm "github.com/tendermint/tmlibs/db" +) + // Iterates over iterKVCache items. // if key is nil, means it was deleted. // Implements Iterator. @@ -9,10 +13,17 @@ type memIterator struct { } func newMemIterator(start, end []byte, items []KVPair) *memIterator { + itemsInDomain := make([]KVPair, 0) + for _, item := range items { + ascending := keyCompare(start, end) < 0 + if dbm.IsKeyInDomain(item.Key, start, end, !ascending) { + itemsInDomain = append(itemsInDomain, item) + } + } return &memIterator{ start: start, end: end, - items: items, + items: itemsInDomain, } } @@ -45,7 +56,7 @@ func (mi *memIterator) Value() []byte { return mi.items[0].Value } -func (mi *memIterator) Release() { +func (mi *memIterator) Close() { mi.start = nil mi.end = nil mi.items = nil diff --git a/store/types.go b/store/types.go index 22b4e48d93..d132d9983c 100644 --- a/store/types.go +++ b/store/types.go @@ -1,6 +1,8 @@ package store import ( + "bytes" + "github.com/tendermint/go-wire/data" "github.com/tendermint/tmlibs/db" ) @@ -128,3 +130,14 @@ type CommitID struct { func (cid CommitID) IsZero() bool { return cid.Version == 0 && len(cid.Hash) == 0 } + +// bytes.Compare but bounded on both sides by nil. +// both (k1, nil) and (nil, k2) return -1 +func keyCompare(k1, k2 []byte) int { + if k1 == nil && k2 == nil { + return 0 + } else if k1 == nil || k2 == nil { + return -1 + } + return bytes.Compare(k1, k2) +}