package splitstore import ( "context" "fmt" "sync" "sync/atomic" "testing" "time" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/blockstore" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types/mock" cid "github.com/ipfs/go-cid" datastore "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" logging "github.com/ipfs/go-log/v2" ) func init() { CompactionThreshold = 5 CompactionCold = 1 CompactionBoundary = 2 logging.SetLogLevel("splitstore", "DEBUG") } func testSplitStore(t *testing.T, cfg *Config) { chain := &mockChain{} // genesis genBlock := mock.MkBlock(nil, 0, 0) genTs := mock.TipSet(genBlock) chain.push(genTs) // the myriads of stores ds := dssync.MutexWrap(datastore.NewMapDatastore()) hot := blockstore.NewMemorySync() cold := blockstore.NewMemorySync() // put the genesis block to cold store blk, err := genBlock.ToStorageBlock() if err != nil { t.Fatal(err) } err = cold.Put(blk) if err != nil { t.Fatal(err) } // open the splitstore ss, err := Open("", ds, hot, cold, cfg) if err != nil { t.Fatal(err) } defer ss.Close() //nolint err = ss.Start(chain) if err != nil { t.Fatal(err) } // make some tipsets, but not enough to cause compaction mkBlock := func(curTs *types.TipSet, i int) *types.TipSet { blk := mock.MkBlock(curTs, uint64(i), uint64(i)) sblk, err := blk.ToStorageBlock() if err != nil { t.Fatal(err) } err = ss.Put(sblk) if err != nil { t.Fatal(err) } ts := mock.TipSet(blk) chain.push(ts) return ts } mkGarbageBlock := func(curTs *types.TipSet, i int) { blk := mock.MkBlock(curTs, uint64(i), uint64(i)) sblk, err := blk.ToStorageBlock() if err != nil { t.Fatal(err) } err = ss.Put(sblk) if err != nil { t.Fatal(err) } } waitForCompaction := func() { for atomic.LoadInt32(&ss.compacting) == 1 { time.Sleep(100 * time.Millisecond) } } curTs := genTs for i := 1; i < 5; i++ { curTs = mkBlock(curTs, i) waitForCompaction() } mkGarbageBlock(genTs, 1) // count objects in the cold and hot stores ctx, cancel := context.WithCancel(context.Background()) defer cancel() countBlocks := func(bs blockstore.Blockstore) int { count := 0 ch, err := bs.AllKeysChan(ctx) if err != nil { t.Fatal(err) } for range ch { count++ } return count } coldCnt := countBlocks(cold) hotCnt := countBlocks(hot) if coldCnt != 1 { t.Errorf("expected %d blocks, but got %d", 1, coldCnt) } if hotCnt != 5 { t.Errorf("expected %d blocks, but got %d", 5, hotCnt) } // trigger a compaction for i := 5; i < 10; i++ { curTs = mkBlock(curTs, i) waitForCompaction() } coldCnt = countBlocks(cold) hotCnt = countBlocks(hot) if !cfg.EnableFullCompaction { if coldCnt != 5 { t.Errorf("expected %d cold blocks, but got %d", 5, coldCnt) } if hotCnt != 5 { t.Errorf("expected %d hot blocks, but got %d", 5, hotCnt) } } if cfg.EnableFullCompaction && !cfg.EnableGC { if coldCnt != 3 { t.Errorf("expected %d cold blocks, but got %d", 3, coldCnt) } if hotCnt != 7 { t.Errorf("expected %d hot blocks, but got %d", 7, hotCnt) } } if cfg.EnableFullCompaction && cfg.EnableGC { if coldCnt != 2 { t.Errorf("expected %d cold blocks, but got %d", 2, coldCnt) } if hotCnt != 7 { t.Errorf("expected %d hot blocks, but got %d", 7, hotCnt) } } } func TestSplitStoreSimpleCompaction(t *testing.T) { testSplitStore(t, &Config{TrackingStoreType: "mem"}) } func TestSplitStoreFullCompactionWithoutGC(t *testing.T) { testSplitStore(t, &Config{ TrackingStoreType: "mem", EnableFullCompaction: true, }) } func TestSplitStoreFullCompactionWithGC(t *testing.T) { testSplitStore(t, &Config{ TrackingStoreType: "mem", EnableFullCompaction: true, EnableGC: true, }) } type mockChain struct { sync.Mutex tipsets []*types.TipSet listener func(revert []*types.TipSet, apply []*types.TipSet) error } func (c *mockChain) push(ts *types.TipSet) { c.Lock() c.tipsets = append(c.tipsets, ts) c.Unlock() if c.listener != nil { err := c.listener(nil, []*types.TipSet{ts}) if err != nil { log.Errorf("mockchain: error dispatching listener: %s", err) } } } func (c *mockChain) GetTipsetByHeight(_ context.Context, epoch abi.ChainEpoch, _ *types.TipSet, _ bool) (*types.TipSet, error) { c.Lock() defer c.Unlock() iEpoch := int(epoch) if iEpoch > len(c.tipsets) { return nil, fmt.Errorf("bad epoch %d", epoch) } return c.tipsets[iEpoch-1], nil } func (c *mockChain) GetHeaviestTipSet() *types.TipSet { c.Lock() defer c.Unlock() return c.tipsets[len(c.tipsets)-1] } func (c *mockChain) SubscribeHeadChanges(change func(revert []*types.TipSet, apply []*types.TipSet) error) { c.listener = change } func (c *mockChain) WalkSnapshot(_ context.Context, ts *types.TipSet, epochs abi.ChainEpoch, _ bool, _ bool, f func(cid.Cid) error) error { c.Lock() defer c.Unlock() start := int(ts.Height()) - 1 end := start - int(epochs) if end < 0 { end = -1 } for i := start; i > end; i-- { ts := c.tipsets[i] for _, cid := range ts.Cids() { err := f(cid) if err != nil { return err } } } return nil }