add splitstore unit test
This commit is contained in:
parent
0a2f2cf00d
commit
09f5ba177a
@ -79,7 +79,7 @@ const (
|
||||
type Config struct {
|
||||
// TrackingStore is the type of tracking store to use.
|
||||
//
|
||||
// Supported values are: "bolt" (default if omitted).
|
||||
// Supported values are: "bolt" (default if omitted), "mem" (for tests and readonly access).
|
||||
TrackingStoreType string
|
||||
|
||||
// MarkSetType is the type of mark set to use.
|
||||
|
248
blockstore/splitstore/splitstore_test.go
Normal file
248
blockstore/splitstore/splitstore_test.go
Normal file
@ -0,0 +1,248 @@
|
||||
package splitstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/lotus/blockstore"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
"github.com/filecoin-project/lotus/chain/types/mock"
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
datastore "github.com/ipfs/go-datastore"
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
)
|
||||
|
||||
func init() {
|
||||
CompactionThreshold = 5
|
||||
CompactionCold = 1
|
||||
CompactionBoundary = 2
|
||||
logging.SetLogLevel("splitstore", "DEBUG")
|
||||
}
|
||||
|
||||
func testSplitStore(t *testing.T, cfg *Config) {
|
||||
t.Helper()
|
||||
|
||||
chain := &mockChain{}
|
||||
// genesis
|
||||
genBlock := mock.MkBlock(nil, 0, 0)
|
||||
genTs := mock.TipSet(genBlock)
|
||||
chain.push(genTs)
|
||||
|
||||
// the myriads of stores
|
||||
ds := datastore.NewMapDatastore()
|
||||
hot := blockstore.NewMemorySync()
|
||||
cold := blockstore.NewMemorySync()
|
||||
|
||||
// put the genesis block to cold store
|
||||
blk, err := genBlock.ToStorageBlock()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = cold.Put(blk)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// open the splitstore
|
||||
ss, err := Open("", ds, hot, cold, cfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer ss.Close()
|
||||
|
||||
err = ss.Start(chain)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// make some tipsets, but not enough to cause compaction
|
||||
mkBlock := func(curTs *types.TipSet, i int) *types.TipSet {
|
||||
blk := mock.MkBlock(curTs, uint64(i), uint64(i))
|
||||
sblk, err := blk.ToStorageBlock()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = ss.Put(sblk)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
ts := mock.TipSet(blk)
|
||||
chain.push(ts)
|
||||
|
||||
return ts
|
||||
}
|
||||
|
||||
mkGarbageBlock := func(curTs *types.TipSet, i int) {
|
||||
blk := mock.MkBlock(curTs, uint64(i), uint64(i))
|
||||
sblk, err := blk.ToStorageBlock()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = ss.Put(sblk)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
curTs := genTs
|
||||
for i := 1; i < 5; i++ {
|
||||
curTs = mkBlock(curTs, i)
|
||||
}
|
||||
|
||||
mkGarbageBlock(genTs, 1)
|
||||
|
||||
// count objects in the cold and hot stores
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
countBlocks := func(bs blockstore.Blockstore) int {
|
||||
count := 0
|
||||
ch, err := bs.AllKeysChan(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for _ = range ch {
|
||||
count++
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
coldCnt := countBlocks(cold)
|
||||
hotCnt := countBlocks(hot)
|
||||
|
||||
if coldCnt != 1 {
|
||||
t.Fatalf("expected %d blocks, but got %d", 1, coldCnt)
|
||||
}
|
||||
|
||||
if hotCnt != 4 {
|
||||
t.Fatalf("expected %d blocks, but got %d", 4, hotCnt)
|
||||
}
|
||||
|
||||
// trigger a compaction
|
||||
for i := 5; i < 10; i++ {
|
||||
curTs = mkBlock(curTs, i)
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
coldCnt = countBlocks(cold)
|
||||
hotCnt = countBlocks(hot)
|
||||
|
||||
if !cfg.EnableFullCompaction {
|
||||
if coldCnt != 5 {
|
||||
t.Fatalf("expected %d cold blocks, but got %d", 5, coldCnt)
|
||||
}
|
||||
|
||||
if hotCnt != 5 {
|
||||
t.Fatalf("expected %d hot blocks, but got %d", 5, hotCnt)
|
||||
}
|
||||
}
|
||||
|
||||
if cfg.EnableFullCompaction && !cfg.EnableGC {
|
||||
if coldCnt != 3 {
|
||||
t.Fatalf("expected %d cold blocks, but got %d", 3, coldCnt)
|
||||
}
|
||||
|
||||
if hotCnt != 7 {
|
||||
t.Fatalf("expected %d hot blocks, but got %d", 7, hotCnt)
|
||||
}
|
||||
}
|
||||
|
||||
if cfg.EnableFullCompaction && cfg.EnableGC {
|
||||
if coldCnt != 2 {
|
||||
t.Fatalf("expected %d cold blocks, but got %d", 2, coldCnt)
|
||||
}
|
||||
|
||||
if hotCnt != 7 {
|
||||
t.Fatalf("expected %d hot blocks, but got %d", 7, hotCnt)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSplitStoreSimpleCompaction(t *testing.T) {
|
||||
testSplitStore(t, &Config{TrackingStoreType: "mem"})
|
||||
}
|
||||
|
||||
func TestSplitStoreFullCompactionWithoutGC(t *testing.T) {
|
||||
testSplitStore(t, &Config{
|
||||
TrackingStoreType: "mem",
|
||||
EnableFullCompaction: true,
|
||||
})
|
||||
}
|
||||
|
||||
func TestSplitStoreFullCompactionWithGC(t *testing.T) {
|
||||
testSplitStore(t, &Config{
|
||||
TrackingStoreType: "mem",
|
||||
EnableFullCompaction: true,
|
||||
EnableGC: true,
|
||||
})
|
||||
}
|
||||
|
||||
type mockChain struct {
|
||||
sync.Mutex
|
||||
tipsets []*types.TipSet
|
||||
listener func(revert []*types.TipSet, apply []*types.TipSet) error
|
||||
}
|
||||
|
||||
func (c *mockChain) push(ts *types.TipSet) {
|
||||
c.Lock()
|
||||
c.tipsets = append(c.tipsets, ts)
|
||||
c.Unlock()
|
||||
|
||||
if c.listener != nil {
|
||||
err := c.listener(nil, []*types.TipSet{ts})
|
||||
if err != nil {
|
||||
log.Errorf("mockchain: error dispatching listener: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *mockChain) GetTipsetByHeight(_ context.Context, epoch abi.ChainEpoch, _ *types.TipSet, _ bool) (*types.TipSet, error) {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
iEpoch := int(epoch)
|
||||
if iEpoch > len(c.tipsets) {
|
||||
return nil, fmt.Errorf("bad epoch %d", epoch)
|
||||
}
|
||||
|
||||
return c.tipsets[iEpoch-1], nil
|
||||
}
|
||||
|
||||
func (c *mockChain) GetHeaviestTipSet() *types.TipSet {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
return c.tipsets[len(c.tipsets)-1]
|
||||
}
|
||||
|
||||
func (c *mockChain) SubscribeHeadChanges(change func(revert []*types.TipSet, apply []*types.TipSet) error) {
|
||||
c.listener = change
|
||||
}
|
||||
|
||||
func (c *mockChain) WalkSnapshot(_ context.Context, ts *types.TipSet, epochs abi.ChainEpoch, _ bool, _ bool, f func(cid.Cid) error) error {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
|
||||
start := int(ts.Height()) - 1
|
||||
end := start - int(epochs)
|
||||
if end < 0 {
|
||||
end = -1
|
||||
}
|
||||
for i := start; i > end; i-- {
|
||||
ts := c.tipsets[i]
|
||||
for _, cid := range ts.Cids() {
|
||||
err := f(cid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
@ -2,6 +2,7 @@ package splitstore
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
@ -28,7 +29,81 @@ func OpenTrackingStore(path string, ttype string) (TrackingStore, error) {
|
||||
switch ttype {
|
||||
case "", "bolt":
|
||||
return OpenBoltTrackingStore(filepath.Join(path, "tracker.bolt"))
|
||||
case "mem":
|
||||
return NewMemTrackingStore(), nil
|
||||
default:
|
||||
return nil, xerrors.Errorf("unknown tracking store type %s", ttype)
|
||||
}
|
||||
}
|
||||
|
||||
// NewMemTrackingStore creates an in-memory tracking store.
|
||||
// This is only useful for test or situations where you don't want to open the
|
||||
// real tracking store (eg concurrent read only access on a node's datastore)
|
||||
func NewMemTrackingStore() *MemTrackingStore {
|
||||
return &MemTrackingStore{tab: make(map[cid.Cid]abi.ChainEpoch)}
|
||||
}
|
||||
|
||||
// MemTrackingStore is a simple in-memory tracking store
|
||||
type MemTrackingStore struct {
|
||||
sync.Mutex
|
||||
tab map[cid.Cid]abi.ChainEpoch
|
||||
}
|
||||
|
||||
var _ TrackingStore = (*MemTrackingStore)(nil)
|
||||
|
||||
func (s *MemTrackingStore) Put(cid cid.Cid, epoch abi.ChainEpoch) error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
s.tab[cid] = epoch
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *MemTrackingStore) PutBatch(cids []cid.Cid, epoch abi.ChainEpoch) error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
for _, cid := range cids {
|
||||
s.tab[cid] = epoch
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *MemTrackingStore) Get(cid cid.Cid) (abi.ChainEpoch, error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
epoch, ok := s.tab[cid]
|
||||
if ok {
|
||||
return epoch, nil
|
||||
}
|
||||
return 0, xerrors.Errorf("missing tracking epoch for %s", cid)
|
||||
}
|
||||
|
||||
func (s *MemTrackingStore) Delete(cid cid.Cid) error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
delete(s.tab, cid)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *MemTrackingStore) DeleteBatch(cids []cid.Cid) error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
for _, cid := range cids {
|
||||
delete(s.tab, cid)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *MemTrackingStore) ForEach(f func(cid.Cid, abi.ChainEpoch) error) error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
for cid, epoch := range s.tab {
|
||||
err := f(cid, epoch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *MemTrackingStore) Sync() error { return nil }
|
||||
func (s *MemTrackingStore) Close() error { return nil }
|
||||
|
Loading…
Reference in New Issue
Block a user