lotus/blockstore/splitstore/splitstore_test.go

780 lines
17 KiB
Go
Raw Normal View History

2022-08-29 14:25:30 +00:00
// stm: #unit
2021-03-05 17:55:32 +00:00
package splitstore
import (
"context"
2021-07-04 09:43:05 +00:00
"errors"
2021-03-05 17:55:32 +00:00
"fmt"
2022-02-04 14:16:34 +00:00
"math/rand"
2021-03-05 17:55:32 +00:00
"sync"
"sync/atomic"
2021-03-05 17:55:32 +00:00
"testing"
"time"
2021-03-13 10:00:28 +00:00
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
ipld "github.com/ipfs/go-ipld-format"
2021-03-05 17:55:32 +00:00
logging "github.com/ipfs/go-log/v2"
2022-01-30 13:33:30 +00:00
mh "github.com/multiformats/go-multihash"
2022-06-14 15:00:51 +00:00
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/mock"
2021-03-05 17:55:32 +00:00
)
func init() {
CompactionThreshold = 5
CompactionBoundary = 2
WarmupBoundary = 0
2022-02-01 08:55:15 +00:00
SyncWaitTime = time.Millisecond
2021-03-05 17:55:32 +00:00
logging.SetLogLevel("splitstore", "DEBUG")
}
func testSplitStore(t *testing.T, cfg *Config) {
2021-12-14 15:17:30 +00:00
ctx := context.Background()
chain := &mockChain{t: t}
fmt.Printf("Config: %v\n", cfg)
2021-03-05 17:55:32 +00:00
// the myriads of stores
ds := dssync.MutexWrap(datastore.NewMapDatastore())
2021-07-04 09:43:05 +00:00
hot := newMockStore()
cold := newMockStore()
2021-03-05 17:55:32 +00:00
2021-03-13 10:00:28 +00:00
// this is necessary to avoid the garbage mock puts in the blocks
garbage := blocks.NewBlock([]byte{1, 2, 3})
2021-12-14 15:17:30 +00:00
err := cold.Put(ctx, garbage)
2021-03-13 10:00:28 +00:00
if err != nil {
t.Fatal(err)
}
// genesis
genBlock := mock.MkBlock(nil, 0, 0)
genBlock.Messages = garbage.Cid()
genBlock.ParentMessageReceipts = garbage.Cid()
genBlock.ParentStateRoot = garbage.Cid()
genBlock.Timestamp = uint64(time.Now().Unix())
2021-03-13 10:00:28 +00:00
genTs := mock.TipSet(genBlock)
chain.push(genTs)
2021-03-05 17:55:32 +00:00
// put the genesis block to cold store
blk, err := genBlock.ToStorageBlock()
if err != nil {
t.Fatal(err)
}
2021-12-14 15:17:30 +00:00
err = cold.Put(ctx, blk)
2021-03-05 17:55:32 +00:00
if err != nil {
t.Fatal(err)
}
2021-07-17 17:40:13 +00:00
// create a garbage block that is protected with a rgistered protector
protected := blocks.NewBlock([]byte("protected!"))
2021-12-14 15:17:30 +00:00
err = hot.Put(ctx, protected)
2021-07-17 17:40:13 +00:00
if err != nil {
t.Fatal(err)
}
// and another one that is not protected
unprotected := blocks.NewBlock([]byte("unprotected!"))
2021-12-14 15:17:30 +00:00
err = hot.Put(ctx, unprotected)
2021-07-17 17:40:13 +00:00
if err != nil {
t.Fatal(err)
}
path := t.TempDir()
2021-03-05 17:55:32 +00:00
// open the splitstore
ss, err := Open(path, ds, hot, cold, cfg)
2021-03-05 17:55:32 +00:00
if err != nil {
t.Fatal(err)
}
2021-03-05 18:05:32 +00:00
defer ss.Close() //nolint
2021-03-05 17:55:32 +00:00
2021-07-17 17:40:13 +00:00
// register our protector
ss.AddProtector(func(protect func(cid.Cid) error) error {
return protect(protected.Cid())
})
err = ss.Start(chain, nil)
2021-03-05 17:55:32 +00:00
if err != nil {
t.Fatal(err)
}
// make some tipsets, but not enough to cause compaction
2021-07-04 09:56:01 +00:00
mkBlock := func(curTs *types.TipSet, i int, stateRoot blocks.Block) *types.TipSet {
2021-03-05 17:55:32 +00:00
blk := mock.MkBlock(curTs, uint64(i), uint64(i))
2021-03-13 10:00:28 +00:00
blk.Messages = garbage.Cid()
blk.ParentMessageReceipts = garbage.Cid()
2021-07-04 09:56:01 +00:00
blk.ParentStateRoot = stateRoot.Cid()
blk.Timestamp = uint64(time.Now().Unix())
2021-03-13 10:00:28 +00:00
2021-03-05 17:55:32 +00:00
sblk, err := blk.ToStorageBlock()
if err != nil {
t.Fatal(err)
}
2021-12-14 15:17:30 +00:00
err = ss.Put(ctx, stateRoot)
2021-07-04 09:56:01 +00:00
if err != nil {
t.Fatal(err)
}
2021-12-14 15:17:30 +00:00
err = ss.Put(ctx, sblk)
2021-03-05 17:55:32 +00:00
if err != nil {
t.Fatal(err)
}
ts := mock.TipSet(blk)
chain.push(ts)
return ts
}
waitForCompaction := func() {
2022-02-01 09:13:01 +00:00
ss.txnSyncMx.Lock()
ss.txnSync = true
ss.txnSyncCond.Broadcast()
ss.txnSyncMx.Unlock()
for atomic.LoadInt32(&ss.compacting) == 1 {
time.Sleep(100 * time.Millisecond)
}
}
2021-03-05 17:55:32 +00:00
curTs := genTs
for i := 1; i < 5; i++ {
2021-07-04 09:56:01 +00:00
stateRoot := blocks.NewBlock([]byte{byte(i), 3, 3, 7})
curTs = mkBlock(curTs, i, stateRoot)
waitForCompaction()
2021-03-05 17:55:32 +00:00
}
// count objects in the cold and hot stores
countBlocks := func(bs blockstore.Blockstore) int {
count := 0
2021-07-04 10:17:31 +00:00
_ = bs.(blockstore.BlockstoreIterator).ForEachKey(func(_ cid.Cid) error {
2021-03-05 17:55:32 +00:00
count++
2021-07-04 09:43:05 +00:00
return nil
})
2021-03-05 17:55:32 +00:00
return count
}
coldCnt := countBlocks(cold)
hotCnt := countBlocks(hot)
2021-03-13 10:00:28 +00:00
if coldCnt != 2 {
t.Errorf("expected %d blocks, but got %d", 2, coldCnt)
2021-03-05 17:55:32 +00:00
}
2021-07-17 17:40:13 +00:00
if hotCnt != 12 {
t.Errorf("expected %d blocks, but got %d", 12, hotCnt)
2021-03-05 17:55:32 +00:00
}
// trigger a compaction
for i := 5; i < 10; i++ {
2021-07-04 09:56:01 +00:00
stateRoot := blocks.NewBlock([]byte{byte(i), 3, 3, 7})
curTs = mkBlock(curTs, i, stateRoot)
waitForCompaction()
2021-03-05 17:55:32 +00:00
}
coldCnt = countBlocks(cold)
hotCnt = countBlocks(hot)
2021-07-17 17:40:13 +00:00
if coldCnt != 6 {
t.Errorf("expected %d cold blocks, but got %d", 6, coldCnt)
}
if hotCnt != 18 {
t.Errorf("expected %d hot blocks, but got %d", 18, hotCnt)
}
// ensure our protected block is still there
2021-12-14 15:17:30 +00:00
has, err := hot.Has(ctx, protected.Cid())
2021-07-17 17:40:13 +00:00
if err != nil {
t.Fatal(err)
}
if !has {
t.Fatal("protected block is missing from hotstore")
}
// ensure our unprotected block is in the coldstore now
2021-12-14 15:17:30 +00:00
has, err = hot.Has(ctx, unprotected.Cid())
2021-07-17 17:40:13 +00:00
if err != nil {
t.Fatal(err)
}
if has {
t.Fatal("unprotected block is still in hotstore")
}
2021-12-14 15:17:30 +00:00
has, err = cold.Has(ctx, unprotected.Cid())
2021-07-17 17:40:13 +00:00
if err != nil {
t.Fatal(err)
2021-03-05 17:55:32 +00:00
}
2021-07-17 17:40:13 +00:00
if !has {
t.Fatal("unprotected block is missing from coldstore")
2021-03-05 17:55:32 +00:00
}
// Make sure we can revert without panicking.
chain.revert(2)
2021-03-05 17:55:32 +00:00
}
2021-07-04 09:43:05 +00:00
func TestSplitStoreCompaction(t *testing.T) {
feat: Add additional test annotations (#8272) * Annotate api,proxy_util,blockstore_badger, policy tests * Annotate splitstore: bsbadger / markset * Annotate splitstore feature * Annotate union/timed blockstore tests * Annotate openrpc, diff_adt tests * Annotate error,drand,events tests * Annotate predicates_test * Fix annotations * Annotate tscache, gen tests * Annotate fundmanager test * Annotate repub and selection tests * Annotate statetree_test * Annotate forks_test * Annotate searchwait_test.go * Fix duplicated @@ symbols * Annotate chain stmgr/store tests * Annotate more (types) tests * More tests annotated * Annotate conformance chaos actor tests * Annotate more integration tests * Annotate journal system tests * Annotate more tests. * Annotate gas,head buffer behaviors * Fix markset annotations * doc: test annotations for the markets dagstore wrapper * Annotate miner_api test in dagstore * Annotate more test files * Remove bad annotations from fsrepo * Annotate wdpost system * Remove bad annotations * Renamce "conformance" to "chaos_actor" tests * doc: stm annotations for blockheader & election proof tests * Annotate remaining "A" tests * annotate: stm for error_test * memrepo_test.go * Annotate "b" file tests * message_test.go * doc: stm annotate for fsrepo_test * Annotate "c" file tests * Annotate "D" test files * message_test.go * doc: stm annotate for chain, node/config & client * docs: stm annotate node_test * Annotate u,v,wl tests * doc: stm annotations for various test files * Annotate "T" test files * doc: stm annotate for proxy_util_test & policy_test * doc: stm annotate for various tests * doc: final few stm annotations * Add mempool unit tests * Add two more memPool Add tests * Update submodules * Add check function tests * Add stm annotations, refactor test helper * Annotate api,proxy_util,blockstore_badger, policy tests * Annotate splitstore: bsbadger / markset solving merge conflicts * Annotate splitstore feature * Annotate union/timed blockstore tests * Annotate openrpc, diff_adt tests * Annotate error,drand,events tests * Annotate predicates_test * Fix annotations * Annotate tscache, gen tests * Annotate fundmanager test * Annotate statetree_test * Annotate forks_test * Annotate searchwait_test.go * Fix duplicated @@ symbols * Annotate chain stmgr/store tests * Annotate more (types) tests * More tests annotated * Annotate conformance chaos actor tests * Annotate more integration tests * Annotate journal system tests * Annotate more tests. * Annotate gas,head buffer behaviors solve merge conflict * Fix markset annotations * Annotate miner_api test in dagstore * Annotate more test files * doc: test annotations for the markets dagstore wrapper * Annotate wdpost system * Renamce "conformance" to "chaos_actor" tests * Annotate remaining "A" tests * doc: stm annotations for blockheader & election proof tests * annotate: stm for error_test * Annotate "b" file tests * memrepo_test.go * Annotate "c" file tests * message_test.go * Annotate "D" test files * doc: stm annotate for fsrepo_test * Annotate u,v,wl tests * message_test.go * doc: stm annotate for chain, node/config & client * docs: stm annotate node_test * Annotate "T" test files * doc: stm annotations for various test files * Add mempool unit tests solve merge conflict * doc: stm annotate for proxy_util_test & policy_test * doc: stm annotate for various tests * doc: final few stm annotations * Add two more memPool Add tests * Update submodules * Add check function tests solve conflict * Add stm annotations, refactor test helper solve merge conflict * Change CLI test kinds to "unit" * Fix double merged test * Fix ccupgrade_test merge * Fix lint issues * Add stm annotation to types_Test * Test vectors submodule * Add file annotation to burn_test Co-authored-by: Nikola Divic <divicnikola@gmail.com> Co-authored-by: TheMenko <themenkoprojects@gmail.com>
2022-03-16 17:37:34 +00:00
//stm: @SPLITSTORE_SPLITSTORE_OPEN_001, @SPLITSTORE_SPLITSTORE_CLOSE_001
//stm: @SPLITSTORE_SPLITSTORE_PUT_001, @SPLITSTORE_SPLITSTORE_ADD_PROTECTOR_001
//stm: @SPLITSTORE_SPLITSTORE_CLOSE_001
testSplitStore(t, &Config{MarkSetType: "map", UniversalColdBlocks: true})
2021-03-05 17:55:32 +00:00
}
func TestSplitStoreCompactionWithBadger(t *testing.T) {
feat: Add additional test annotations (#8272) * Annotate api,proxy_util,blockstore_badger, policy tests * Annotate splitstore: bsbadger / markset * Annotate splitstore feature * Annotate union/timed blockstore tests * Annotate openrpc, diff_adt tests * Annotate error,drand,events tests * Annotate predicates_test * Fix annotations * Annotate tscache, gen tests * Annotate fundmanager test * Annotate repub and selection tests * Annotate statetree_test * Annotate forks_test * Annotate searchwait_test.go * Fix duplicated @@ symbols * Annotate chain stmgr/store tests * Annotate more (types) tests * More tests annotated * Annotate conformance chaos actor tests * Annotate more integration tests * Annotate journal system tests * Annotate more tests. * Annotate gas,head buffer behaviors * Fix markset annotations * doc: test annotations for the markets dagstore wrapper * Annotate miner_api test in dagstore * Annotate more test files * Remove bad annotations from fsrepo * Annotate wdpost system * Remove bad annotations * Renamce "conformance" to "chaos_actor" tests * doc: stm annotations for blockheader & election proof tests * Annotate remaining "A" tests * annotate: stm for error_test * memrepo_test.go * Annotate "b" file tests * message_test.go * doc: stm annotate for fsrepo_test * Annotate "c" file tests * Annotate "D" test files * message_test.go * doc: stm annotate for chain, node/config & client * docs: stm annotate node_test * Annotate u,v,wl tests * doc: stm annotations for various test files * Annotate "T" test files * doc: stm annotate for proxy_util_test & policy_test * doc: stm annotate for various tests * doc: final few stm annotations * Add mempool unit tests * Add two more memPool Add tests * Update submodules * Add check function tests * Add stm annotations, refactor test helper * Annotate api,proxy_util,blockstore_badger, policy tests * Annotate splitstore: bsbadger / markset solving merge conflicts * Annotate splitstore feature * Annotate union/timed blockstore tests * Annotate openrpc, diff_adt tests * Annotate error,drand,events tests * Annotate predicates_test * Fix annotations * Annotate tscache, gen tests * Annotate fundmanager test * Annotate statetree_test * Annotate forks_test * Annotate searchwait_test.go * Fix duplicated @@ symbols * Annotate chain stmgr/store tests * Annotate more (types) tests * More tests annotated * Annotate conformance chaos actor tests * Annotate more integration tests * Annotate journal system tests * Annotate more tests. * Annotate gas,head buffer behaviors solve merge conflict * Fix markset annotations * Annotate miner_api test in dagstore * Annotate more test files * doc: test annotations for the markets dagstore wrapper * Annotate wdpost system * Renamce "conformance" to "chaos_actor" tests * Annotate remaining "A" tests * doc: stm annotations for blockheader & election proof tests * annotate: stm for error_test * Annotate "b" file tests * memrepo_test.go * Annotate "c" file tests * message_test.go * Annotate "D" test files * doc: stm annotate for fsrepo_test * Annotate u,v,wl tests * message_test.go * doc: stm annotate for chain, node/config & client * docs: stm annotate node_test * Annotate "T" test files * doc: stm annotations for various test files * Add mempool unit tests solve merge conflict * doc: stm annotate for proxy_util_test & policy_test * doc: stm annotate for various tests * doc: final few stm annotations * Add two more memPool Add tests * Update submodules * Add check function tests solve conflict * Add stm annotations, refactor test helper solve merge conflict * Change CLI test kinds to "unit" * Fix double merged test * Fix ccupgrade_test merge * Fix lint issues * Add stm annotation to types_Test * Test vectors submodule * Add file annotation to burn_test Co-authored-by: Nikola Divic <divicnikola@gmail.com> Co-authored-by: TheMenko <themenkoprojects@gmail.com>
2022-03-16 17:37:34 +00:00
//stm: @SPLITSTORE_SPLITSTORE_OPEN_001, @SPLITSTORE_SPLITSTORE_CLOSE_001
//stm: @SPLITSTORE_SPLITSTORE_PUT_001, @SPLITSTORE_SPLITSTORE_ADD_PROTECTOR_001
//stm: @SPLITSTORE_SPLITSTORE_CLOSE_001
bs := badgerMarkSetBatchSize
badgerMarkSetBatchSize = 1
t.Cleanup(func() {
badgerMarkSetBatchSize = bs
})
testSplitStore(t, &Config{MarkSetType: "badger", UniversalColdBlocks: true})
}
func TestSplitStoreSuppressCompactionNearUpgrade(t *testing.T) {
feat: Add additional test annotations (#8272) * Annotate api,proxy_util,blockstore_badger, policy tests * Annotate splitstore: bsbadger / markset * Annotate splitstore feature * Annotate union/timed blockstore tests * Annotate openrpc, diff_adt tests * Annotate error,drand,events tests * Annotate predicates_test * Fix annotations * Annotate tscache, gen tests * Annotate fundmanager test * Annotate repub and selection tests * Annotate statetree_test * Annotate forks_test * Annotate searchwait_test.go * Fix duplicated @@ symbols * Annotate chain stmgr/store tests * Annotate more (types) tests * More tests annotated * Annotate conformance chaos actor tests * Annotate more integration tests * Annotate journal system tests * Annotate more tests. * Annotate gas,head buffer behaviors * Fix markset annotations * doc: test annotations for the markets dagstore wrapper * Annotate miner_api test in dagstore * Annotate more test files * Remove bad annotations from fsrepo * Annotate wdpost system * Remove bad annotations * Renamce "conformance" to "chaos_actor" tests * doc: stm annotations for blockheader & election proof tests * Annotate remaining "A" tests * annotate: stm for error_test * memrepo_test.go * Annotate "b" file tests * message_test.go * doc: stm annotate for fsrepo_test * Annotate "c" file tests * Annotate "D" test files * message_test.go * doc: stm annotate for chain, node/config & client * docs: stm annotate node_test * Annotate u,v,wl tests * doc: stm annotations for various test files * Annotate "T" test files * doc: stm annotate for proxy_util_test & policy_test * doc: stm annotate for various tests * doc: final few stm annotations * Add mempool unit tests * Add two more memPool Add tests * Update submodules * Add check function tests * Add stm annotations, refactor test helper * Annotate api,proxy_util,blockstore_badger, policy tests * Annotate splitstore: bsbadger / markset solving merge conflicts * Annotate splitstore feature * Annotate union/timed blockstore tests * Annotate openrpc, diff_adt tests * Annotate error,drand,events tests * Annotate predicates_test * Fix annotations * Annotate tscache, gen tests * Annotate fundmanager test * Annotate statetree_test * Annotate forks_test * Annotate searchwait_test.go * Fix duplicated @@ symbols * Annotate chain stmgr/store tests * Annotate more (types) tests * More tests annotated * Annotate conformance chaos actor tests * Annotate more integration tests * Annotate journal system tests * Annotate more tests. * Annotate gas,head buffer behaviors solve merge conflict * Fix markset annotations * Annotate miner_api test in dagstore * Annotate more test files * doc: test annotations for the markets dagstore wrapper * Annotate wdpost system * Renamce "conformance" to "chaos_actor" tests * Annotate remaining "A" tests * doc: stm annotations for blockheader & election proof tests * annotate: stm for error_test * Annotate "b" file tests * memrepo_test.go * Annotate "c" file tests * message_test.go * Annotate "D" test files * doc: stm annotate for fsrepo_test * Annotate u,v,wl tests * message_test.go * doc: stm annotate for chain, node/config & client * docs: stm annotate node_test * Annotate "T" test files * doc: stm annotations for various test files * Add mempool unit tests solve merge conflict * doc: stm annotate for proxy_util_test & policy_test * doc: stm annotate for various tests * doc: final few stm annotations * Add two more memPool Add tests * Update submodules * Add check function tests solve conflict * Add stm annotations, refactor test helper solve merge conflict * Change CLI test kinds to "unit" * Fix double merged test * Fix ccupgrade_test merge * Fix lint issues * Add stm annotation to types_Test * Test vectors submodule * Add file annotation to burn_test Co-authored-by: Nikola Divic <divicnikola@gmail.com> Co-authored-by: TheMenko <themenkoprojects@gmail.com>
2022-03-16 17:37:34 +00:00
//stm: @SPLITSTORE_SPLITSTORE_OPEN_001, @SPLITSTORE_SPLITSTORE_CLOSE_001
//stm: @SPLITSTORE_SPLITSTORE_PUT_001, @SPLITSTORE_SPLITSTORE_ADD_PROTECTOR_001
//stm: @SPLITSTORE_SPLITSTORE_CLOSE_001
2021-12-14 15:17:30 +00:00
ctx := context.Background()
chain := &mockChain{t: t}
// the myriads of stores
ds := dssync.MutexWrap(datastore.NewMapDatastore())
hot := newMockStore()
cold := newMockStore()
// this is necessary to avoid the garbage mock puts in the blocks
garbage := blocks.NewBlock([]byte{1, 2, 3})
2021-12-14 15:17:30 +00:00
err := cold.Put(ctx, garbage)
if err != nil {
t.Fatal(err)
}
// genesis
genBlock := mock.MkBlock(nil, 0, 0)
genBlock.Messages = garbage.Cid()
genBlock.ParentMessageReceipts = garbage.Cid()
genBlock.ParentStateRoot = garbage.Cid()
genBlock.Timestamp = uint64(time.Now().Unix())
genTs := mock.TipSet(genBlock)
chain.push(genTs)
// put the genesis block to cold store
blk, err := genBlock.ToStorageBlock()
if err != nil {
t.Fatal(err)
}
2021-12-14 15:17:30 +00:00
err = cold.Put(ctx, blk)
if err != nil {
t.Fatal(err)
}
path := t.TempDir()
// open the splitstore
ss, err := Open(path, ds, hot, cold, &Config{MarkSetType: "map", UniversalColdBlocks: true})
if err != nil {
t.Fatal(err)
}
defer ss.Close() //nolint
// create an upgrade schedule that will suppress compaction during the test
upgradeBoundary = 0
upgrade := stmgr.Upgrade{
Height: 10,
PreMigrations: []stmgr.PreMigration{{StartWithin: 10}},
}
err = ss.Start(chain, []stmgr.Upgrade{upgrade})
if err != nil {
t.Fatal(err)
}
mkBlock := func(curTs *types.TipSet, i int, stateRoot blocks.Block) *types.TipSet {
blk := mock.MkBlock(curTs, uint64(i), uint64(i))
blk.Messages = garbage.Cid()
blk.ParentMessageReceipts = garbage.Cid()
blk.ParentStateRoot = stateRoot.Cid()
blk.Timestamp = uint64(time.Now().Unix())
sblk, err := blk.ToStorageBlock()
if err != nil {
t.Fatal(err)
}
2021-12-14 15:17:30 +00:00
err = ss.Put(ctx, stateRoot)
if err != nil {
t.Fatal(err)
}
2021-12-14 15:17:30 +00:00
err = ss.Put(ctx, sblk)
if err != nil {
t.Fatal(err)
}
ts := mock.TipSet(blk)
chain.push(ts)
return ts
}
waitForCompaction := func() {
2022-02-01 09:13:01 +00:00
ss.txnSyncMx.Lock()
ss.txnSync = true
ss.txnSyncCond.Broadcast()
ss.txnSyncMx.Unlock()
for atomic.LoadInt32(&ss.compacting) == 1 {
time.Sleep(100 * time.Millisecond)
}
}
curTs := genTs
for i := 1; i < 10; i++ {
stateRoot := blocks.NewBlock([]byte{byte(i), 3, 3, 7})
curTs = mkBlock(curTs, i, stateRoot)
waitForCompaction()
}
countBlocks := func(bs blockstore.Blockstore) int {
count := 0
_ = bs.(blockstore.BlockstoreIterator).ForEachKey(func(_ cid.Cid) error {
count++
return nil
})
return count
}
// we should not have compacted due to suppression and everything should still be hot
hotCnt := countBlocks(hot)
coldCnt := countBlocks(cold)
if hotCnt != 20 {
t.Errorf("expected %d blocks, but got %d", 20, hotCnt)
}
if coldCnt != 2 {
t.Errorf("expected %d blocks, but got %d", 2, coldCnt)
}
// put some more blocks, now we should compact
for i := 10; i < 20; i++ {
stateRoot := blocks.NewBlock([]byte{byte(i), 3, 3, 7})
curTs = mkBlock(curTs, i, stateRoot)
waitForCompaction()
}
hotCnt = countBlocks(hot)
coldCnt = countBlocks(cold)
if hotCnt != 24 {
t.Errorf("expected %d blocks, but got %d", 24, hotCnt)
}
if coldCnt != 18 {
t.Errorf("expected %d blocks, but got %d", 18, coldCnt)
}
}
2022-02-04 14:16:34 +00:00
func testSplitStoreReification(t *testing.T, f func(context.Context, blockstore.Blockstore, cid.Cid) error) {
ds := dssync.MutexWrap(datastore.NewMapDatastore())
hot := newMockStore()
cold := newMockStore()
mkRandomBlock := func() blocks.Block {
data := make([]byte, 128)
_, err := rand.Read(data)
if err != nil {
t.Fatal(err)
}
return blocks.NewBlock(data)
}
block1 := mkRandomBlock()
block2 := mkRandomBlock()
block3 := mkRandomBlock()
hdr := mock.MkBlock(nil, 0, 0)
hdr.Messages = block1.Cid()
hdr.ParentMessageReceipts = block2.Cid()
hdr.ParentStateRoot = block3.Cid()
block4, err := hdr.ToStorageBlock()
if err != nil {
t.Fatal(err)
}
allBlocks := []blocks.Block{block1, block2, block3, block4}
for _, blk := range allBlocks {
err := cold.Put(context.Background(), blk)
if err != nil {
t.Fatal(err)
}
}
path := t.TempDir()
2022-02-04 14:16:34 +00:00
ss, err := Open(path, ds, hot, cold, &Config{MarkSetType: "map", UniversalColdBlocks: true})
2022-02-04 14:16:34 +00:00
if err != nil {
t.Fatal(err)
}
defer ss.Close() //nolint
ss.warmupEpoch = 1
go ss.reifyOrchestrator()
waitForReification := func() {
for {
ss.reifyMx.Lock()
ready := len(ss.reifyPend) == 0 && len(ss.reifyInProgress) == 0
ss.reifyMx.Unlock()
if ready {
return
}
time.Sleep(time.Millisecond)
}
}
// first access using the standard view
err = f(context.Background(), ss, block4.Cid())
if err != nil {
t.Fatal(err)
}
// nothing should be reified
waitForReification()
for _, blk := range allBlocks {
has, err := hot.Has(context.Background(), blk.Cid())
if err != nil {
t.Fatal(err)
}
if has {
t.Fatal("block unexpectedly reified")
}
}
// now make the hot/reifying view and ensure access reifies
err = f(blockstore.WithHotView(context.Background()), ss, block4.Cid())
if err != nil {
t.Fatal(err)
}
// everything should be reified
waitForReification()
for i, blk := range allBlocks {
has, err := hot.Has(context.Background(), blk.Cid())
if err != nil {
t.Fatal(err)
}
if !has {
t.Fatalf("block%d was not reified", i+1)
}
}
}
2022-02-18 10:19:19 +00:00
func testSplitStoreReificationLimit(t *testing.T, f func(context.Context, blockstore.Blockstore, cid.Cid) error) {
ds := dssync.MutexWrap(datastore.NewMapDatastore())
hot := newMockStore()
cold := newMockStore()
mkRandomBlock := func() blocks.Block {
data := make([]byte, 128)
_, err := rand.Read(data)
if err != nil {
t.Fatal(err)
}
return blocks.NewBlock(data)
}
block1 := mkRandomBlock()
block2 := mkRandomBlock()
block3 := mkRandomBlock()
hdr := mock.MkBlock(nil, 0, 0)
hdr.Messages = block1.Cid()
hdr.ParentMessageReceipts = block2.Cid()
hdr.ParentStateRoot = block3.Cid()
block4, err := hdr.ToStorageBlock()
if err != nil {
t.Fatal(err)
}
allBlocks := []blocks.Block{block1, block2, block3, block4}
for _, blk := range allBlocks {
err := cold.Put(context.Background(), blk)
if err != nil {
t.Fatal(err)
}
}
path := t.TempDir()
2022-02-18 10:19:19 +00:00
ss, err := Open(path, ds, hot, cold, &Config{MarkSetType: "map", UniversalColdBlocks: true})
2022-02-18 10:19:19 +00:00
if err != nil {
t.Fatal(err)
}
defer ss.Close() //nolint
ss.warmupEpoch = 1
go ss.reifyOrchestrator()
waitForReification := func() {
for {
ss.reifyMx.Lock()
ready := len(ss.reifyPend) == 0 && len(ss.reifyInProgress) == 0
ss.reifyMx.Unlock()
if ready {
return
}
time.Sleep(time.Millisecond)
}
}
// do a hot access -- nothing should be reified as the limit should be exceeded
oldReifyLimit := ReifyLimit
ReifyLimit = 2
t.Cleanup(func() {
ReifyLimit = oldReifyLimit
})
err = f(blockstore.WithHotView(context.Background()), ss, block4.Cid())
if err != nil {
t.Fatal(err)
}
waitForReification()
for _, blk := range allBlocks {
has, err := hot.Has(context.Background(), blk.Cid())
if err != nil {
t.Fatal(err)
}
if has {
t.Fatal("block unexpectedly reified")
}
}
}
2022-02-04 14:16:34 +00:00
func TestSplitStoreReification(t *testing.T) {
t.Log("test reification with Has")
testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error {
_, err := s.Has(ctx, c)
return err
})
t.Log("test reification with Get")
testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error {
_, err := s.Get(ctx, c)
return err
})
t.Log("test reification with GetSize")
testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error {
_, err := s.GetSize(ctx, c)
return err
})
t.Log("test reification with View")
testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error {
return s.View(ctx, c, func(_ []byte) error { return nil })
})
2022-02-18 10:19:19 +00:00
t.Log("test reification limit")
testSplitStoreReificationLimit(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error {
_, err := s.Has(ctx, c)
return err
})
2022-02-04 14:16:34 +00:00
}
2021-03-05 17:55:32 +00:00
type mockChain struct {
t testing.TB
2021-03-05 17:55:32 +00:00
sync.Mutex
2021-03-19 10:17:32 +00:00
genesis *types.BlockHeader
2021-03-05 17:55:32 +00:00
tipsets []*types.TipSet
listener func(revert []*types.TipSet, apply []*types.TipSet) error
}
func (c *mockChain) push(ts *types.TipSet) {
c.Lock()
c.tipsets = append(c.tipsets, ts)
2021-03-19 10:17:32 +00:00
if c.genesis == nil {
c.genesis = ts.Blocks()[0]
}
2021-03-05 17:55:32 +00:00
c.Unlock()
if c.listener != nil {
err := c.listener(nil, []*types.TipSet{ts})
if err != nil {
c.t.Errorf("mockchain: error dispatching listener: %s", err)
}
}
}
func (c *mockChain) revert(count int) {
c.Lock()
revert := make([]*types.TipSet, count)
if count > len(c.tipsets) {
c.Unlock()
c.t.Fatalf("not enough tipsets to revert")
}
copy(revert, c.tipsets[len(c.tipsets)-count:])
c.tipsets = c.tipsets[:len(c.tipsets)-count]
c.Unlock()
if c.listener != nil {
err := c.listener(revert, nil)
if err != nil {
c.t.Errorf("mockchain: error dispatching listener: %s", err)
2021-03-05 17:55:32 +00:00
}
}
}
func (c *mockChain) GetTipsetByHeight(_ context.Context, epoch abi.ChainEpoch, _ *types.TipSet, _ bool) (*types.TipSet, error) {
c.Lock()
defer c.Unlock()
iEpoch := int(epoch)
if iEpoch > len(c.tipsets) {
return nil, fmt.Errorf("bad epoch %d", epoch)
}
2021-03-13 10:00:28 +00:00
return c.tipsets[iEpoch], nil
2021-03-05 17:55:32 +00:00
}
func (c *mockChain) GetHeaviestTipSet() *types.TipSet {
c.Lock()
defer c.Unlock()
return c.tipsets[len(c.tipsets)-1]
}
func (c *mockChain) SubscribeHeadChanges(change func(revert []*types.TipSet, apply []*types.TipSet) error) {
c.listener = change
}
2021-07-04 09:43:05 +00:00
type mockStore struct {
mx sync.Mutex
2022-01-30 13:33:30 +00:00
set map[string]blocks.Block
2021-07-04 09:43:05 +00:00
}
func newMockStore() *mockStore {
2022-01-30 13:33:30 +00:00
return &mockStore{set: make(map[string]blocks.Block)}
}
func (b *mockStore) keyOf(c cid.Cid) string {
return string(c.Hash())
}
func (b *mockStore) cidOf(k string) cid.Cid {
return cid.NewCidV1(cid.Raw, mh.Multihash([]byte(k)))
2021-07-04 09:43:05 +00:00
}
2021-12-14 15:17:30 +00:00
func (b *mockStore) Has(_ context.Context, cid cid.Cid) (bool, error) {
2021-07-04 09:43:05 +00:00
b.mx.Lock()
defer b.mx.Unlock()
2022-01-30 13:33:30 +00:00
_, ok := b.set[b.keyOf(cid)]
2021-07-04 09:43:05 +00:00
return ok, nil
}
func (b *mockStore) HashOnRead(hor bool) {}
2021-12-14 15:17:30 +00:00
func (b *mockStore) Get(_ context.Context, cid cid.Cid) (blocks.Block, error) {
2021-07-04 09:43:05 +00:00
b.mx.Lock()
defer b.mx.Unlock()
2022-01-30 13:33:30 +00:00
blk, ok := b.set[b.keyOf(cid)]
2021-07-04 09:43:05 +00:00
if !ok {
return nil, ipld.ErrNotFound{Cid: cid}
2021-07-04 09:43:05 +00:00
}
return blk, nil
}
2021-12-14 15:17:30 +00:00
func (b *mockStore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
blk, err := b.Get(ctx, cid)
2021-07-04 09:43:05 +00:00
if err != nil {
return 0, err
}
return len(blk.RawData()), nil
}
2021-12-14 15:17:30 +00:00
func (b *mockStore) View(ctx context.Context, cid cid.Cid, f func([]byte) error) error {
blk, err := b.Get(ctx, cid)
2021-07-04 09:43:05 +00:00
if err != nil {
return err
}
return f(blk.RawData())
}
2021-12-14 15:17:30 +00:00
func (b *mockStore) Put(_ context.Context, blk blocks.Block) error {
2021-07-04 09:43:05 +00:00
b.mx.Lock()
defer b.mx.Unlock()
2022-01-30 13:33:30 +00:00
b.set[b.keyOf(blk.Cid())] = blk
2021-07-04 09:43:05 +00:00
return nil
}
2021-12-14 15:17:30 +00:00
func (b *mockStore) PutMany(_ context.Context, blks []blocks.Block) error {
2021-07-04 09:43:05 +00:00
b.mx.Lock()
defer b.mx.Unlock()
for _, blk := range blks {
2022-01-30 13:33:30 +00:00
b.set[b.keyOf(blk.Cid())] = blk
2021-07-04 09:43:05 +00:00
}
return nil
}
2021-12-14 15:17:30 +00:00
func (b *mockStore) DeleteBlock(_ context.Context, cid cid.Cid) error {
2021-07-04 09:43:05 +00:00
b.mx.Lock()
defer b.mx.Unlock()
2022-01-30 13:33:30 +00:00
delete(b.set, b.keyOf(cid))
2021-07-04 09:43:05 +00:00
return nil
}
2021-12-14 15:17:30 +00:00
func (b *mockStore) DeleteMany(_ context.Context, cids []cid.Cid) error {
2021-07-04 09:43:05 +00:00
b.mx.Lock()
defer b.mx.Unlock()
for _, c := range cids {
2022-01-30 13:33:30 +00:00
delete(b.set, b.keyOf(c))
2021-07-04 09:43:05 +00:00
}
return nil
}
func (b *mockStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, errors.New("not implemented")
}
func (b *mockStore) ForEachKey(f func(cid.Cid) error) error {
b.mx.Lock()
defer b.mx.Unlock()
for c := range b.set {
2022-01-30 13:33:30 +00:00
err := f(b.cidOf(c))
2021-07-04 09:43:05 +00:00
if err != nil {
return err
}
}
return nil
}
func (b *mockStore) Close() error {
return nil
}