2d3065ae8e
Includes changes from:
- https://github.com/ipfs/go-block-format/pull/37
- https://github.com/ipfs/go-libipfs/pull/58
(cherry picked from commit f572852d06
)
780 lines
17 KiB
Go
780 lines
17 KiB
Go
// stm: #unit
|
|
package splitstore
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"math/rand"
|
|
"sync"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/ipfs/go-cid"
|
|
"github.com/ipfs/go-datastore"
|
|
dssync "github.com/ipfs/go-datastore/sync"
|
|
ipld "github.com/ipfs/go-ipld-format"
|
|
blocks "github.com/ipfs/go-libipfs/blocks"
|
|
logging "github.com/ipfs/go-log/v2"
|
|
mh "github.com/multiformats/go-multihash"
|
|
|
|
"github.com/filecoin-project/go-state-types/abi"
|
|
|
|
"github.com/filecoin-project/lotus/blockstore"
|
|
"github.com/filecoin-project/lotus/chain/stmgr"
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
"github.com/filecoin-project/lotus/chain/types/mock"
|
|
)
|
|
|
|
func init() {
|
|
CompactionThreshold = 5
|
|
CompactionBoundary = 2
|
|
WarmupBoundary = 0
|
|
SyncWaitTime = time.Millisecond
|
|
logging.SetLogLevel("splitstore", "DEBUG")
|
|
}
|
|
|
|
func testSplitStore(t *testing.T, cfg *Config) {
|
|
ctx := context.Background()
|
|
chain := &mockChain{t: t}
|
|
fmt.Printf("Config: %v\n", cfg)
|
|
|
|
// the myriads of stores
|
|
ds := dssync.MutexWrap(datastore.NewMapDatastore())
|
|
hot := newMockStore()
|
|
cold := newMockStore()
|
|
|
|
// this is necessary to avoid the garbage mock puts in the blocks
|
|
garbage := blocks.NewBlock([]byte{1, 2, 3})
|
|
err := cold.Put(ctx, garbage)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// genesis
|
|
genBlock := mock.MkBlock(nil, 0, 0)
|
|
genBlock.Messages = garbage.Cid()
|
|
genBlock.ParentMessageReceipts = garbage.Cid()
|
|
genBlock.ParentStateRoot = garbage.Cid()
|
|
genBlock.Timestamp = uint64(time.Now().Unix())
|
|
|
|
genTs := mock.TipSet(genBlock)
|
|
chain.push(genTs)
|
|
|
|
// put the genesis block to cold store
|
|
blk, err := genBlock.ToStorageBlock()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
err = cold.Put(ctx, blk)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// create a garbage block that is protected with a rgistered protector
|
|
protected := blocks.NewBlock([]byte("protected!"))
|
|
err = hot.Put(ctx, protected)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// and another one that is not protected
|
|
unprotected := blocks.NewBlock([]byte("unprotected!"))
|
|
err = hot.Put(ctx, unprotected)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
path := t.TempDir()
|
|
|
|
// open the splitstore
|
|
ss, err := Open(path, ds, hot, cold, cfg)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer ss.Close() //nolint
|
|
|
|
// register our protector
|
|
ss.AddProtector(func(protect func(cid.Cid) error) error {
|
|
return protect(protected.Cid())
|
|
})
|
|
|
|
err = ss.Start(chain, nil)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// make some tipsets, but not enough to cause compaction
|
|
mkBlock := func(curTs *types.TipSet, i int, stateRoot blocks.Block) *types.TipSet {
|
|
blk := mock.MkBlock(curTs, uint64(i), uint64(i))
|
|
|
|
blk.Messages = garbage.Cid()
|
|
blk.ParentMessageReceipts = garbage.Cid()
|
|
blk.ParentStateRoot = stateRoot.Cid()
|
|
blk.Timestamp = uint64(time.Now().Unix())
|
|
|
|
sblk, err := blk.ToStorageBlock()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
err = ss.Put(ctx, stateRoot)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
err = ss.Put(ctx, sblk)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
ts := mock.TipSet(blk)
|
|
chain.push(ts)
|
|
|
|
return ts
|
|
}
|
|
|
|
waitForCompaction := func() {
|
|
ss.txnSyncMx.Lock()
|
|
ss.txnSync = true
|
|
ss.txnSyncCond.Broadcast()
|
|
ss.txnSyncMx.Unlock()
|
|
for atomic.LoadInt32(&ss.compacting) == 1 {
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
curTs := genTs
|
|
for i := 1; i < 5; i++ {
|
|
stateRoot := blocks.NewBlock([]byte{byte(i), 3, 3, 7})
|
|
curTs = mkBlock(curTs, i, stateRoot)
|
|
waitForCompaction()
|
|
}
|
|
|
|
// count objects in the cold and hot stores
|
|
countBlocks := func(bs blockstore.Blockstore) int {
|
|
count := 0
|
|
_ = bs.(blockstore.BlockstoreIterator).ForEachKey(func(_ cid.Cid) error {
|
|
count++
|
|
return nil
|
|
})
|
|
return count
|
|
}
|
|
|
|
coldCnt := countBlocks(cold)
|
|
hotCnt := countBlocks(hot)
|
|
|
|
if coldCnt != 2 {
|
|
t.Errorf("expected %d blocks, but got %d", 2, coldCnt)
|
|
}
|
|
|
|
if hotCnt != 12 {
|
|
t.Errorf("expected %d blocks, but got %d", 12, hotCnt)
|
|
}
|
|
|
|
// trigger a compaction
|
|
for i := 5; i < 10; i++ {
|
|
stateRoot := blocks.NewBlock([]byte{byte(i), 3, 3, 7})
|
|
curTs = mkBlock(curTs, i, stateRoot)
|
|
waitForCompaction()
|
|
}
|
|
|
|
coldCnt = countBlocks(cold)
|
|
hotCnt = countBlocks(hot)
|
|
|
|
if coldCnt != 6 {
|
|
t.Errorf("expected %d cold blocks, but got %d", 6, coldCnt)
|
|
}
|
|
|
|
if hotCnt != 18 {
|
|
t.Errorf("expected %d hot blocks, but got %d", 18, hotCnt)
|
|
}
|
|
|
|
// ensure our protected block is still there
|
|
has, err := hot.Has(ctx, protected.Cid())
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
if !has {
|
|
t.Fatal("protected block is missing from hotstore")
|
|
}
|
|
|
|
// ensure our unprotected block is in the coldstore now
|
|
has, err = hot.Has(ctx, unprotected.Cid())
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
if has {
|
|
t.Fatal("unprotected block is still in hotstore")
|
|
}
|
|
|
|
has, err = cold.Has(ctx, unprotected.Cid())
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
if !has {
|
|
t.Fatal("unprotected block is missing from coldstore")
|
|
}
|
|
|
|
// Make sure we can revert without panicking.
|
|
chain.revert(2)
|
|
}
|
|
|
|
func TestSplitStoreCompaction(t *testing.T) {
|
|
//stm: @SPLITSTORE_SPLITSTORE_OPEN_001, @SPLITSTORE_SPLITSTORE_CLOSE_001
|
|
//stm: @SPLITSTORE_SPLITSTORE_PUT_001, @SPLITSTORE_SPLITSTORE_ADD_PROTECTOR_001
|
|
//stm: @SPLITSTORE_SPLITSTORE_CLOSE_001
|
|
testSplitStore(t, &Config{MarkSetType: "map", UniversalColdBlocks: true})
|
|
}
|
|
|
|
func TestSplitStoreCompactionWithBadger(t *testing.T) {
|
|
//stm: @SPLITSTORE_SPLITSTORE_OPEN_001, @SPLITSTORE_SPLITSTORE_CLOSE_001
|
|
//stm: @SPLITSTORE_SPLITSTORE_PUT_001, @SPLITSTORE_SPLITSTORE_ADD_PROTECTOR_001
|
|
//stm: @SPLITSTORE_SPLITSTORE_CLOSE_001
|
|
bs := badgerMarkSetBatchSize
|
|
badgerMarkSetBatchSize = 1
|
|
t.Cleanup(func() {
|
|
badgerMarkSetBatchSize = bs
|
|
})
|
|
testSplitStore(t, &Config{MarkSetType: "badger", UniversalColdBlocks: true})
|
|
}
|
|
|
|
func TestSplitStoreSuppressCompactionNearUpgrade(t *testing.T) {
|
|
//stm: @SPLITSTORE_SPLITSTORE_OPEN_001, @SPLITSTORE_SPLITSTORE_CLOSE_001
|
|
//stm: @SPLITSTORE_SPLITSTORE_PUT_001, @SPLITSTORE_SPLITSTORE_ADD_PROTECTOR_001
|
|
//stm: @SPLITSTORE_SPLITSTORE_CLOSE_001
|
|
ctx := context.Background()
|
|
chain := &mockChain{t: t}
|
|
|
|
// the myriads of stores
|
|
ds := dssync.MutexWrap(datastore.NewMapDatastore())
|
|
hot := newMockStore()
|
|
cold := newMockStore()
|
|
|
|
// this is necessary to avoid the garbage mock puts in the blocks
|
|
garbage := blocks.NewBlock([]byte{1, 2, 3})
|
|
err := cold.Put(ctx, garbage)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// genesis
|
|
genBlock := mock.MkBlock(nil, 0, 0)
|
|
genBlock.Messages = garbage.Cid()
|
|
genBlock.ParentMessageReceipts = garbage.Cid()
|
|
genBlock.ParentStateRoot = garbage.Cid()
|
|
genBlock.Timestamp = uint64(time.Now().Unix())
|
|
|
|
genTs := mock.TipSet(genBlock)
|
|
chain.push(genTs)
|
|
|
|
// put the genesis block to cold store
|
|
blk, err := genBlock.ToStorageBlock()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
err = cold.Put(ctx, blk)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
path := t.TempDir()
|
|
|
|
// open the splitstore
|
|
ss, err := Open(path, ds, hot, cold, &Config{MarkSetType: "map", UniversalColdBlocks: true})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer ss.Close() //nolint
|
|
|
|
// create an upgrade schedule that will suppress compaction during the test
|
|
upgradeBoundary = 0
|
|
upgrade := stmgr.Upgrade{
|
|
Height: 10,
|
|
PreMigrations: []stmgr.PreMigration{{StartWithin: 10}},
|
|
}
|
|
|
|
err = ss.Start(chain, []stmgr.Upgrade{upgrade})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
mkBlock := func(curTs *types.TipSet, i int, stateRoot blocks.Block) *types.TipSet {
|
|
blk := mock.MkBlock(curTs, uint64(i), uint64(i))
|
|
|
|
blk.Messages = garbage.Cid()
|
|
blk.ParentMessageReceipts = garbage.Cid()
|
|
blk.ParentStateRoot = stateRoot.Cid()
|
|
blk.Timestamp = uint64(time.Now().Unix())
|
|
|
|
sblk, err := blk.ToStorageBlock()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
err = ss.Put(ctx, stateRoot)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
err = ss.Put(ctx, sblk)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
ts := mock.TipSet(blk)
|
|
chain.push(ts)
|
|
|
|
return ts
|
|
}
|
|
|
|
waitForCompaction := func() {
|
|
ss.txnSyncMx.Lock()
|
|
ss.txnSync = true
|
|
ss.txnSyncCond.Broadcast()
|
|
ss.txnSyncMx.Unlock()
|
|
for atomic.LoadInt32(&ss.compacting) == 1 {
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
curTs := genTs
|
|
for i := 1; i < 10; i++ {
|
|
stateRoot := blocks.NewBlock([]byte{byte(i), 3, 3, 7})
|
|
curTs = mkBlock(curTs, i, stateRoot)
|
|
waitForCompaction()
|
|
}
|
|
|
|
countBlocks := func(bs blockstore.Blockstore) int {
|
|
count := 0
|
|
_ = bs.(blockstore.BlockstoreIterator).ForEachKey(func(_ cid.Cid) error {
|
|
count++
|
|
return nil
|
|
})
|
|
return count
|
|
}
|
|
|
|
// we should not have compacted due to suppression and everything should still be hot
|
|
hotCnt := countBlocks(hot)
|
|
coldCnt := countBlocks(cold)
|
|
|
|
if hotCnt != 20 {
|
|
t.Errorf("expected %d blocks, but got %d", 20, hotCnt)
|
|
}
|
|
|
|
if coldCnt != 2 {
|
|
t.Errorf("expected %d blocks, but got %d", 2, coldCnt)
|
|
}
|
|
|
|
// put some more blocks, now we should compact
|
|
for i := 10; i < 20; i++ {
|
|
stateRoot := blocks.NewBlock([]byte{byte(i), 3, 3, 7})
|
|
curTs = mkBlock(curTs, i, stateRoot)
|
|
waitForCompaction()
|
|
}
|
|
|
|
hotCnt = countBlocks(hot)
|
|
coldCnt = countBlocks(cold)
|
|
|
|
if hotCnt != 24 {
|
|
t.Errorf("expected %d blocks, but got %d", 24, hotCnt)
|
|
}
|
|
|
|
if coldCnt != 18 {
|
|
t.Errorf("expected %d blocks, but got %d", 18, coldCnt)
|
|
}
|
|
}
|
|
|
|
func testSplitStoreReification(t *testing.T, f func(context.Context, blockstore.Blockstore, cid.Cid) error) {
|
|
ds := dssync.MutexWrap(datastore.NewMapDatastore())
|
|
hot := newMockStore()
|
|
cold := newMockStore()
|
|
|
|
mkRandomBlock := func() blocks.Block {
|
|
data := make([]byte, 128)
|
|
_, err := rand.Read(data)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
return blocks.NewBlock(data)
|
|
}
|
|
|
|
block1 := mkRandomBlock()
|
|
block2 := mkRandomBlock()
|
|
block3 := mkRandomBlock()
|
|
|
|
hdr := mock.MkBlock(nil, 0, 0)
|
|
hdr.Messages = block1.Cid()
|
|
hdr.ParentMessageReceipts = block2.Cid()
|
|
hdr.ParentStateRoot = block3.Cid()
|
|
block4, err := hdr.ToStorageBlock()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
allBlocks := []blocks.Block{block1, block2, block3, block4}
|
|
for _, blk := range allBlocks {
|
|
err := cold.Put(context.Background(), blk)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
path := t.TempDir()
|
|
|
|
ss, err := Open(path, ds, hot, cold, &Config{MarkSetType: "map", UniversalColdBlocks: true})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer ss.Close() //nolint
|
|
|
|
ss.warmupEpoch = 1
|
|
go ss.reifyOrchestrator()
|
|
|
|
waitForReification := func() {
|
|
for {
|
|
ss.reifyMx.Lock()
|
|
ready := len(ss.reifyPend) == 0 && len(ss.reifyInProgress) == 0
|
|
ss.reifyMx.Unlock()
|
|
|
|
if ready {
|
|
return
|
|
}
|
|
|
|
time.Sleep(time.Millisecond)
|
|
}
|
|
}
|
|
|
|
// first access using the standard view
|
|
err = f(context.Background(), ss, block4.Cid())
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// nothing should be reified
|
|
waitForReification()
|
|
for _, blk := range allBlocks {
|
|
has, err := hot.Has(context.Background(), blk.Cid())
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
if has {
|
|
t.Fatal("block unexpectedly reified")
|
|
}
|
|
}
|
|
|
|
// now make the hot/reifying view and ensure access reifies
|
|
err = f(blockstore.WithHotView(context.Background()), ss, block4.Cid())
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
// everything should be reified
|
|
waitForReification()
|
|
for i, blk := range allBlocks {
|
|
has, err := hot.Has(context.Background(), blk.Cid())
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
if !has {
|
|
t.Fatalf("block%d was not reified", i+1)
|
|
}
|
|
}
|
|
}
|
|
|
|
func testSplitStoreReificationLimit(t *testing.T, f func(context.Context, blockstore.Blockstore, cid.Cid) error) {
|
|
ds := dssync.MutexWrap(datastore.NewMapDatastore())
|
|
hot := newMockStore()
|
|
cold := newMockStore()
|
|
|
|
mkRandomBlock := func() blocks.Block {
|
|
data := make([]byte, 128)
|
|
_, err := rand.Read(data)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
return blocks.NewBlock(data)
|
|
}
|
|
|
|
block1 := mkRandomBlock()
|
|
block2 := mkRandomBlock()
|
|
block3 := mkRandomBlock()
|
|
|
|
hdr := mock.MkBlock(nil, 0, 0)
|
|
hdr.Messages = block1.Cid()
|
|
hdr.ParentMessageReceipts = block2.Cid()
|
|
hdr.ParentStateRoot = block3.Cid()
|
|
block4, err := hdr.ToStorageBlock()
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
allBlocks := []blocks.Block{block1, block2, block3, block4}
|
|
for _, blk := range allBlocks {
|
|
err := cold.Put(context.Background(), blk)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
path := t.TempDir()
|
|
|
|
ss, err := Open(path, ds, hot, cold, &Config{MarkSetType: "map", UniversalColdBlocks: true})
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
defer ss.Close() //nolint
|
|
|
|
ss.warmupEpoch = 1
|
|
go ss.reifyOrchestrator()
|
|
|
|
waitForReification := func() {
|
|
for {
|
|
ss.reifyMx.Lock()
|
|
ready := len(ss.reifyPend) == 0 && len(ss.reifyInProgress) == 0
|
|
ss.reifyMx.Unlock()
|
|
|
|
if ready {
|
|
return
|
|
}
|
|
|
|
time.Sleep(time.Millisecond)
|
|
}
|
|
}
|
|
|
|
// do a hot access -- nothing should be reified as the limit should be exceeded
|
|
oldReifyLimit := ReifyLimit
|
|
ReifyLimit = 2
|
|
t.Cleanup(func() {
|
|
ReifyLimit = oldReifyLimit
|
|
})
|
|
|
|
err = f(blockstore.WithHotView(context.Background()), ss, block4.Cid())
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
waitForReification()
|
|
|
|
for _, blk := range allBlocks {
|
|
has, err := hot.Has(context.Background(), blk.Cid())
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
if has {
|
|
t.Fatal("block unexpectedly reified")
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
func TestSplitStoreReification(t *testing.T) {
|
|
t.Log("test reification with Has")
|
|
testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error {
|
|
_, err := s.Has(ctx, c)
|
|
return err
|
|
})
|
|
t.Log("test reification with Get")
|
|
testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error {
|
|
_, err := s.Get(ctx, c)
|
|
return err
|
|
})
|
|
t.Log("test reification with GetSize")
|
|
testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error {
|
|
_, err := s.GetSize(ctx, c)
|
|
return err
|
|
})
|
|
t.Log("test reification with View")
|
|
testSplitStoreReification(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error {
|
|
return s.View(ctx, c, func(_ []byte) error { return nil })
|
|
})
|
|
t.Log("test reification limit")
|
|
testSplitStoreReificationLimit(t, func(ctx context.Context, s blockstore.Blockstore, c cid.Cid) error {
|
|
_, err := s.Has(ctx, c)
|
|
return err
|
|
})
|
|
}
|
|
|
|
type mockChain struct {
|
|
t testing.TB
|
|
|
|
sync.Mutex
|
|
genesis *types.BlockHeader
|
|
tipsets []*types.TipSet
|
|
listener func(revert []*types.TipSet, apply []*types.TipSet) error
|
|
}
|
|
|
|
func (c *mockChain) push(ts *types.TipSet) {
|
|
c.Lock()
|
|
c.tipsets = append(c.tipsets, ts)
|
|
if c.genesis == nil {
|
|
c.genesis = ts.Blocks()[0]
|
|
}
|
|
c.Unlock()
|
|
|
|
if c.listener != nil {
|
|
err := c.listener(nil, []*types.TipSet{ts})
|
|
if err != nil {
|
|
c.t.Errorf("mockchain: error dispatching listener: %s", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *mockChain) revert(count int) {
|
|
c.Lock()
|
|
revert := make([]*types.TipSet, count)
|
|
if count > len(c.tipsets) {
|
|
c.Unlock()
|
|
c.t.Fatalf("not enough tipsets to revert")
|
|
}
|
|
copy(revert, c.tipsets[len(c.tipsets)-count:])
|
|
c.tipsets = c.tipsets[:len(c.tipsets)-count]
|
|
c.Unlock()
|
|
|
|
if c.listener != nil {
|
|
err := c.listener(revert, nil)
|
|
if err != nil {
|
|
c.t.Errorf("mockchain: error dispatching listener: %s", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *mockChain) GetTipsetByHeight(_ context.Context, epoch abi.ChainEpoch, _ *types.TipSet, _ bool) (*types.TipSet, error) {
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
|
|
iEpoch := int(epoch)
|
|
if iEpoch > len(c.tipsets) {
|
|
return nil, fmt.Errorf("bad epoch %d", epoch)
|
|
}
|
|
|
|
return c.tipsets[iEpoch], nil
|
|
}
|
|
|
|
func (c *mockChain) GetHeaviestTipSet() *types.TipSet {
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
|
|
return c.tipsets[len(c.tipsets)-1]
|
|
}
|
|
|
|
func (c *mockChain) SubscribeHeadChanges(change func(revert []*types.TipSet, apply []*types.TipSet) error) {
|
|
c.listener = change
|
|
}
|
|
|
|
type mockStore struct {
|
|
mx sync.Mutex
|
|
set map[string]blocks.Block
|
|
}
|
|
|
|
func newMockStore() *mockStore {
|
|
return &mockStore{set: make(map[string]blocks.Block)}
|
|
}
|
|
|
|
func (b *mockStore) keyOf(c cid.Cid) string {
|
|
return string(c.Hash())
|
|
}
|
|
|
|
func (b *mockStore) cidOf(k string) cid.Cid {
|
|
return cid.NewCidV1(cid.Raw, mh.Multihash([]byte(k)))
|
|
}
|
|
|
|
func (b *mockStore) Has(_ context.Context, cid cid.Cid) (bool, error) {
|
|
b.mx.Lock()
|
|
defer b.mx.Unlock()
|
|
_, ok := b.set[b.keyOf(cid)]
|
|
return ok, nil
|
|
}
|
|
|
|
func (b *mockStore) HashOnRead(hor bool) {}
|
|
|
|
func (b *mockStore) Get(_ context.Context, cid cid.Cid) (blocks.Block, error) {
|
|
b.mx.Lock()
|
|
defer b.mx.Unlock()
|
|
|
|
blk, ok := b.set[b.keyOf(cid)]
|
|
if !ok {
|
|
return nil, ipld.ErrNotFound{Cid: cid}
|
|
}
|
|
return blk, nil
|
|
}
|
|
|
|
func (b *mockStore) GetSize(ctx context.Context, cid cid.Cid) (int, error) {
|
|
blk, err := b.Get(ctx, cid)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return len(blk.RawData()), nil
|
|
}
|
|
|
|
func (b *mockStore) View(ctx context.Context, cid cid.Cid, f func([]byte) error) error {
|
|
blk, err := b.Get(ctx, cid)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return f(blk.RawData())
|
|
}
|
|
|
|
func (b *mockStore) Put(_ context.Context, blk blocks.Block) error {
|
|
b.mx.Lock()
|
|
defer b.mx.Unlock()
|
|
|
|
b.set[b.keyOf(blk.Cid())] = blk
|
|
return nil
|
|
}
|
|
|
|
func (b *mockStore) PutMany(_ context.Context, blks []blocks.Block) error {
|
|
b.mx.Lock()
|
|
defer b.mx.Unlock()
|
|
|
|
for _, blk := range blks {
|
|
b.set[b.keyOf(blk.Cid())] = blk
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (b *mockStore) DeleteBlock(_ context.Context, cid cid.Cid) error {
|
|
b.mx.Lock()
|
|
defer b.mx.Unlock()
|
|
|
|
delete(b.set, b.keyOf(cid))
|
|
return nil
|
|
}
|
|
|
|
func (b *mockStore) DeleteMany(_ context.Context, cids []cid.Cid) error {
|
|
b.mx.Lock()
|
|
defer b.mx.Unlock()
|
|
|
|
for _, c := range cids {
|
|
delete(b.set, b.keyOf(c))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (b *mockStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
|
|
return nil, errors.New("not implemented")
|
|
}
|
|
|
|
func (b *mockStore) ForEachKey(f func(cid.Cid) error) error {
|
|
b.mx.Lock()
|
|
defer b.mx.Unlock()
|
|
|
|
for c := range b.set {
|
|
err := f(b.cidOf(c))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (b *mockStore) Close() error {
|
|
return nil
|
|
}
|