lotus/blockstore/splitstore/splitstore_test.go

280 lines
5.7 KiB
Go
Raw Normal View History

2021-03-05 17:55:32 +00:00
package splitstore
import (
"context"
"fmt"
"sync"
"sync/atomic"
2021-03-05 17:55:32 +00:00
"testing"
"time"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/mock"
cid "github.com/ipfs/go-cid"
datastore "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
2021-03-05 17:55:32 +00:00
logging "github.com/ipfs/go-log/v2"
)
func init() {
CompactionThreshold = 5
CompactionCold = 1
CompactionBoundary = 2
logging.SetLogLevel("splitstore", "DEBUG")
}
func testSplitStore(t *testing.T, cfg *Config) {
chain := &mockChain{t: t}
2021-03-05 17:55:32 +00:00
// genesis
genBlock := mock.MkBlock(nil, 0, 0)
genTs := mock.TipSet(genBlock)
chain.push(genTs)
// the myriads of stores
ds := dssync.MutexWrap(datastore.NewMapDatastore())
2021-03-05 17:55:32 +00:00
hot := blockstore.NewMemorySync()
cold := blockstore.NewMemorySync()
// put the genesis block to cold store
blk, err := genBlock.ToStorageBlock()
if err != nil {
t.Fatal(err)
}
err = cold.Put(blk)
if err != nil {
t.Fatal(err)
}
// open the splitstore
ss, err := Open("", ds, hot, cold, cfg)
if err != nil {
t.Fatal(err)
}
2021-03-05 18:05:32 +00:00
defer ss.Close() //nolint
2021-03-05 17:55:32 +00:00
err = ss.Start(chain)
if err != nil {
t.Fatal(err)
}
// make some tipsets, but not enough to cause compaction
mkBlock := func(curTs *types.TipSet, i int) *types.TipSet {
blk := mock.MkBlock(curTs, uint64(i), uint64(i))
sblk, err := blk.ToStorageBlock()
if err != nil {
t.Fatal(err)
}
err = ss.Put(sblk)
if err != nil {
t.Fatal(err)
}
ts := mock.TipSet(blk)
chain.push(ts)
return ts
}
mkGarbageBlock := func(curTs *types.TipSet, i int) {
blk := mock.MkBlock(curTs, uint64(i), uint64(i))
sblk, err := blk.ToStorageBlock()
if err != nil {
t.Fatal(err)
}
err = ss.Put(sblk)
if err != nil {
t.Fatal(err)
}
}
waitForCompaction := func() {
for atomic.LoadInt32(&ss.compacting) == 1 {
time.Sleep(100 * time.Millisecond)
}
}
2021-03-05 17:55:32 +00:00
curTs := genTs
for i := 1; i < 5; i++ {
curTs = mkBlock(curTs, i)
waitForCompaction()
2021-03-05 17:55:32 +00:00
}
mkGarbageBlock(genTs, 1)
// count objects in the cold and hot stores
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
countBlocks := func(bs blockstore.Blockstore) int {
count := 0
ch, err := bs.AllKeysChan(ctx)
if err != nil {
t.Fatal(err)
}
2021-03-05 18:05:32 +00:00
for range ch {
2021-03-05 17:55:32 +00:00
count++
}
return count
}
coldCnt := countBlocks(cold)
hotCnt := countBlocks(hot)
if coldCnt != 1 {
t.Errorf("expected %d blocks, but got %d", 1, coldCnt)
2021-03-05 17:55:32 +00:00
}
if hotCnt != 5 {
t.Errorf("expected %d blocks, but got %d", 5, hotCnt)
2021-03-05 17:55:32 +00:00
}
// trigger a compaction
for i := 5; i < 10; i++ {
curTs = mkBlock(curTs, i)
waitForCompaction()
2021-03-05 17:55:32 +00:00
}
coldCnt = countBlocks(cold)
hotCnt = countBlocks(hot)
if !cfg.EnableFullCompaction {
if coldCnt != 5 {
t.Errorf("expected %d cold blocks, but got %d", 5, coldCnt)
2021-03-05 17:55:32 +00:00
}
if hotCnt != 5 {
t.Errorf("expected %d hot blocks, but got %d", 5, hotCnt)
2021-03-05 17:55:32 +00:00
}
}
if cfg.EnableFullCompaction && !cfg.EnableGC {
if coldCnt != 3 {
t.Errorf("expected %d cold blocks, but got %d", 3, coldCnt)
2021-03-05 17:55:32 +00:00
}
if hotCnt != 7 {
t.Errorf("expected %d hot blocks, but got %d", 7, hotCnt)
2021-03-05 17:55:32 +00:00
}
}
if cfg.EnableFullCompaction && cfg.EnableGC {
if coldCnt != 2 {
t.Errorf("expected %d cold blocks, but got %d", 2, coldCnt)
2021-03-05 17:55:32 +00:00
}
if hotCnt != 7 {
t.Errorf("expected %d hot blocks, but got %d", 7, hotCnt)
2021-03-05 17:55:32 +00:00
}
}
// Make sure we can revert without panicing.
chain.revert(2)
2021-03-05 17:55:32 +00:00
}
func TestSplitStoreSimpleCompaction(t *testing.T) {
testSplitStore(t, &Config{TrackingStoreType: "mem"})
}
func TestSplitStoreFullCompactionWithoutGC(t *testing.T) {
testSplitStore(t, &Config{
TrackingStoreType: "mem",
EnableFullCompaction: true,
})
}
func TestSplitStoreFullCompactionWithGC(t *testing.T) {
testSplitStore(t, &Config{
TrackingStoreType: "mem",
EnableFullCompaction: true,
EnableGC: true,
})
}
type mockChain struct {
t testing.TB
2021-03-05 17:55:32 +00:00
sync.Mutex
tipsets []*types.TipSet
listener func(revert []*types.TipSet, apply []*types.TipSet) error
}
func (c *mockChain) push(ts *types.TipSet) {
c.Lock()
c.tipsets = append(c.tipsets, ts)
c.Unlock()
if c.listener != nil {
err := c.listener(nil, []*types.TipSet{ts})
if err != nil {
c.t.Errorf("mockchain: error dispatching listener: %s", err)
}
}
}
func (c *mockChain) revert(count int) {
c.Lock()
revert := make([]*types.TipSet, count)
if count > len(c.tipsets) {
c.Unlock()
c.t.Fatalf("not enough tipsets to revert")
}
copy(revert, c.tipsets[len(c.tipsets)-count:])
c.tipsets = c.tipsets[:len(c.tipsets)-count]
c.Unlock()
if c.listener != nil {
err := c.listener(revert, nil)
if err != nil {
c.t.Errorf("mockchain: error dispatching listener: %s", err)
2021-03-05 17:55:32 +00:00
}
}
}
func (c *mockChain) GetTipsetByHeight(_ context.Context, epoch abi.ChainEpoch, _ *types.TipSet, _ bool) (*types.TipSet, error) {
c.Lock()
defer c.Unlock()
iEpoch := int(epoch)
if iEpoch > len(c.tipsets) {
return nil, fmt.Errorf("bad epoch %d", epoch)
}
return c.tipsets[iEpoch-1], nil
}
func (c *mockChain) GetHeaviestTipSet() *types.TipSet {
c.Lock()
defer c.Unlock()
return c.tipsets[len(c.tipsets)-1]
}
func (c *mockChain) SubscribeHeadChanges(change func(revert []*types.TipSet, apply []*types.TipSet) error) {
c.listener = change
}
func (c *mockChain) WalkSnapshot(_ context.Context, ts *types.TipSet, epochs abi.ChainEpoch, _ bool, _ bool, f func(cid.Cid) error) error {
c.Lock()
defer c.Unlock()
start := int(ts.Height()) - 1
end := start - int(epochs)
if end < 0 {
end = -1
}
for i := start; i > end; i-- {
ts := c.tipsets[i]
for _, cid := range ts.Cids() {
err := f(cid)
if err != nil {
return err
}
}
}
return nil
}