This commit is contained in:
vyzo 2021-07-04 12:43:05 +03:00
parent 190cb18ab0
commit 95c3aaec9a

View File

@ -2,6 +2,7 @@ package splitstore
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -14,6 +15,7 @@ import (
"github.com/filecoin-project/lotus/chain/types/mock" "github.com/filecoin-project/lotus/chain/types/mock"
blocks "github.com/ipfs/go-block-format" blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
datastore "github.com/ipfs/go-datastore" datastore "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync" dssync "github.com/ipfs/go-datastore/sync"
logging "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2"
@ -22,7 +24,6 @@ import (
func init() { func init() {
CompactionThreshold = 5 CompactionThreshold = 5
CompactionBoundary = 2 CompactionBoundary = 2
CompactionSlack = 2
logging.SetLogLevel("splitstore", "DEBUG") logging.SetLogLevel("splitstore", "DEBUG")
} }
@ -31,8 +32,8 @@ func testSplitStore(t *testing.T, cfg *Config) {
// the myriads of stores // the myriads of stores
ds := dssync.MutexWrap(datastore.NewMapDatastore()) ds := dssync.MutexWrap(datastore.NewMapDatastore())
hot := blockstore.NewMemorySync() hot := newMockStore()
cold := blockstore.NewMemorySync() cold := newMockStore()
// this is necessary to avoid the garbage mock puts in the blocks // this is necessary to avoid the garbage mock puts in the blocks
garbage := blocks.NewBlock([]byte{1, 2, 3}) garbage := blocks.NewBlock([]byte{1, 2, 3})
@ -110,18 +111,12 @@ func testSplitStore(t *testing.T, cfg *Config) {
} }
// count objects in the cold and hot stores // count objects in the cold and hot stores
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
countBlocks := func(bs blockstore.Blockstore) int { countBlocks := func(bs blockstore.Blockstore) int {
count := 0 count := 0
ch, err := bs.AllKeysChan(ctx) bs.(blockstore.BlockstoreIterator).ForEachKey(func(_ cid.Cid) error {
if err != nil {
t.Fatal(err)
}
for range ch {
count++ count++
} return nil
})
return count return count
} }
@ -145,20 +140,20 @@ func testSplitStore(t *testing.T, cfg *Config) {
coldCnt = countBlocks(cold) coldCnt = countBlocks(cold)
hotCnt = countBlocks(hot) hotCnt = countBlocks(hot)
if coldCnt != 6 { if coldCnt != 2 {
t.Errorf("expected %d cold blocks, but got %d", 6, coldCnt) t.Errorf("expected %d cold blocks, but got %d", 2, coldCnt)
} }
if hotCnt != 7 { if hotCnt != 11 {
t.Errorf("expected %d hot blocks, but got %d", 7, hotCnt) t.Errorf("expected %d hot blocks, but got %d", 11, hotCnt)
} }
// Make sure we can revert without panicking. // Make sure we can revert without panicking.
chain.revert(2) chain.revert(2)
} }
func TestSplitStoreSimpleCompaction(t *testing.T) { func TestSplitStoreCompaction(t *testing.T) {
testSplitStore(t, &Config{TrackingStoreType: "mem"}) testSplitStore(t, &Config{MarkSetType: "mapts"})
} }
type mockChain struct { type mockChain struct {
@ -231,3 +226,106 @@ func (c *mockChain) GetHeaviestTipSet() *types.TipSet {
func (c *mockChain) SubscribeHeadChanges(change func(revert []*types.TipSet, apply []*types.TipSet) error) { func (c *mockChain) SubscribeHeadChanges(change func(revert []*types.TipSet, apply []*types.TipSet) error) {
c.listener = change c.listener = change
} }
type mockStore struct {
mx sync.Mutex
set map[cid.Cid]blocks.Block
}
func newMockStore() *mockStore {
return &mockStore{set: make(map[cid.Cid]blocks.Block)}
}
func (b *mockStore) Has(cid cid.Cid) (bool, error) {
b.mx.Lock()
defer b.mx.Unlock()
_, ok := b.set[cid]
return ok, nil
}
func (b *mockStore) HashOnRead(hor bool) {}
func (b *mockStore) Get(cid cid.Cid) (blocks.Block, error) {
b.mx.Lock()
defer b.mx.Unlock()
blk, ok := b.set[cid]
if !ok {
return nil, blockstore.ErrNotFound
}
return blk, nil
}
func (b *mockStore) GetSize(cid cid.Cid) (int, error) {
blk, err := b.Get(cid)
if err != nil {
return 0, err
}
return len(blk.RawData()), nil
}
func (b *mockStore) View(cid cid.Cid, f func([]byte) error) error {
blk, err := b.Get(cid)
if err != nil {
return err
}
return f(blk.RawData())
}
func (b *mockStore) Put(blk blocks.Block) error {
b.mx.Lock()
defer b.mx.Unlock()
b.set[blk.Cid()] = blk
return nil
}
func (b *mockStore) PutMany(blks []blocks.Block) error {
b.mx.Lock()
defer b.mx.Unlock()
for _, blk := range blks {
b.set[blk.Cid()] = blk
}
return nil
}
func (b *mockStore) DeleteBlock(cid cid.Cid) error {
b.mx.Lock()
defer b.mx.Unlock()
delete(b.set, cid)
return nil
}
func (b *mockStore) DeleteMany(cids []cid.Cid) error {
b.mx.Lock()
defer b.mx.Unlock()
for _, c := range cids {
delete(b.set, c)
}
return nil
}
func (b *mockStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, errors.New("not implemented")
}
func (b *mockStore) ForEachKey(f func(cid.Cid) error) error {
b.mx.Lock()
defer b.mx.Unlock()
for c := range b.set {
err := f(c)
if err != nil {
return err
}
}
return nil
}
func (b *mockStore) Close() error {
return nil
}