234 lines
4.9 KiB
Go
234 lines
4.9 KiB
Go
package splitstore
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/filecoin-project/go-state-types/abi"
|
|
"github.com/filecoin-project/lotus/blockstore"
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
"github.com/filecoin-project/lotus/chain/types/mock"
|
|
|
|
blocks "github.com/ipfs/go-block-format"
|
|
datastore "github.com/ipfs/go-datastore"
|
|
dssync "github.com/ipfs/go-datastore/sync"
|
|
logging "github.com/ipfs/go-log/v2"
|
|
)
|
|
|
|
func init() {
|
|
CompactionThreshold = 5
|
|
CompactionCold = 1
|
|
CompactionBoundary = 2
|
|
logging.SetLogLevel("splitstore", "DEBUG")
|
|
}
|
|
|
|
func testSplitStore(t *testing.T, cfg *Config) {
|
|
chain := &mockChain{t: t}
|
|
|
|
// the myriads of stores
|
|
ds := dssync.MutexWrap(datastore.NewMapDatastore())
|
|
hot := blockstore.NewMemorySync()
|
|
cold := blockstore.NewMemorySync()
|
|
|
|
// this is necessary to avoid the garbage mock puts in the blocks
|
|
garbage := blocks.NewBlock([]byte{1, 2, 3})
|
|
err := cold.Put(garbage)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// genesis
|
|
genBlock := mock.MkBlock(nil, 0, 0)
|
|
genBlock.Messages = garbage.Cid()
|
|
genBlock.ParentMessageReceipts = garbage.Cid()
|
|
genBlock.ParentStateRoot = garbage.Cid()
|
|
genBlock.Timestamp = uint64(time.Now().Unix())
|
|
|
|
genTs := mock.TipSet(genBlock)
|
|
chain.push(genTs)
|
|
|
|
// put the genesis block to cold store
|
|
blk, err := genBlock.ToStorageBlock()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
err = cold.Put(blk)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// open the splitstore
|
|
ss, err := Open("", ds, hot, cold, cfg)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer ss.Close() //nolint
|
|
|
|
err = ss.Start(chain)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// make some tipsets, but not enough to cause compaction
|
|
mkBlock := func(curTs *types.TipSet, i int) *types.TipSet {
|
|
blk := mock.MkBlock(curTs, uint64(i), uint64(i))
|
|
|
|
blk.Messages = garbage.Cid()
|
|
blk.ParentMessageReceipts = garbage.Cid()
|
|
blk.ParentStateRoot = garbage.Cid()
|
|
blk.Timestamp = uint64(time.Now().Unix())
|
|
|
|
sblk, err := blk.ToStorageBlock()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
err = ss.Put(sblk)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
ts := mock.TipSet(blk)
|
|
chain.push(ts)
|
|
|
|
return ts
|
|
}
|
|
|
|
waitForCompaction := func() {
|
|
for atomic.LoadInt32(&ss.compacting) == 1 {
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
curTs := genTs
|
|
for i := 1; i < 5; i++ {
|
|
curTs = mkBlock(curTs, i)
|
|
waitForCompaction()
|
|
}
|
|
|
|
// count objects in the cold and hot stores
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
countBlocks := func(bs blockstore.Blockstore) int {
|
|
count := 0
|
|
ch, err := bs.AllKeysChan(ctx)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
for range ch {
|
|
count++
|
|
}
|
|
return count
|
|
}
|
|
|
|
coldCnt := countBlocks(cold)
|
|
hotCnt := countBlocks(hot)
|
|
|
|
if coldCnt != 2 {
|
|
t.Errorf("expected %d blocks, but got %d", 2, coldCnt)
|
|
}
|
|
|
|
if hotCnt != 5 {
|
|
t.Errorf("expected %d blocks, but got %d", 5, hotCnt)
|
|
}
|
|
|
|
// trigger a compaction
|
|
for i := 5; i < 10; i++ {
|
|
curTs = mkBlock(curTs, i)
|
|
waitForCompaction()
|
|
}
|
|
|
|
coldCnt = countBlocks(cold)
|
|
hotCnt = countBlocks(hot)
|
|
|
|
if coldCnt != 7 {
|
|
t.Errorf("expected %d cold blocks, but got %d", 7, coldCnt)
|
|
}
|
|
|
|
if hotCnt != 5 {
|
|
t.Errorf("expected %d hot blocks, but got %d", 5, hotCnt)
|
|
}
|
|
|
|
// Make sure we can revert without panicking.
|
|
chain.revert(2)
|
|
}
|
|
|
|
func TestSplitStoreSimpleCompaction(t *testing.T) {
|
|
testSplitStore(t, &Config{TrackingStoreType: "mem"})
|
|
}
|
|
|
|
type mockChain struct {
|
|
t testing.TB
|
|
|
|
sync.Mutex
|
|
genesis *types.BlockHeader
|
|
tipsets []*types.TipSet
|
|
listener func(revert []*types.TipSet, apply []*types.TipSet) error
|
|
}
|
|
|
|
func (c *mockChain) push(ts *types.TipSet) {
|
|
c.Lock()
|
|
c.tipsets = append(c.tipsets, ts)
|
|
if c.genesis == nil {
|
|
c.genesis = ts.Blocks()[0]
|
|
}
|
|
c.Unlock()
|
|
|
|
if c.listener != nil {
|
|
err := c.listener(nil, []*types.TipSet{ts})
|
|
if err != nil {
|
|
c.t.Errorf("mockchain: error dispatching listener: %s", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *mockChain) revert(count int) {
|
|
c.Lock()
|
|
revert := make([]*types.TipSet, count)
|
|
if count > len(c.tipsets) {
|
|
c.Unlock()
|
|
c.t.Fatalf("not enough tipsets to revert")
|
|
}
|
|
copy(revert, c.tipsets[len(c.tipsets)-count:])
|
|
c.tipsets = c.tipsets[:len(c.tipsets)-count]
|
|
c.Unlock()
|
|
|
|
if c.listener != nil {
|
|
err := c.listener(revert, nil)
|
|
if err != nil {
|
|
c.t.Errorf("mockchain: error dispatching listener: %s", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *mockChain) GetGenesis() (*types.BlockHeader, error) {
|
|
return c.genesis, nil
|
|
}
|
|
|
|
func (c *mockChain) GetTipsetByHeight(_ context.Context, epoch abi.ChainEpoch, _ *types.TipSet, _ bool) (*types.TipSet, error) {
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
|
|
iEpoch := int(epoch)
|
|
if iEpoch > len(c.tipsets) {
|
|
return nil, fmt.Errorf("bad epoch %d", epoch)
|
|
}
|
|
|
|
return c.tipsets[iEpoch], nil
|
|
}
|
|
|
|
func (c *mockChain) GetHeaviestTipSet() *types.TipSet {
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
|
|
return c.tipsets[len(c.tipsets)-1]
|
|
}
|
|
|
|
func (c *mockChain) SubscribeHeadChanges(change func(revert []*types.TipSet, apply []*types.TipSet) error) {
|
|
c.listener = change
|
|
}
|