chain: Test 'manual' sync

This commit is contained in:
Łukasz Magiera 2019-07-30 18:04:36 +02:00 committed by whyrusleeping
parent 412a168151
commit cdf0e0c858
5 changed files with 80 additions and 21 deletions

View File

@ -57,7 +57,7 @@ func (m mybs) Get(c cid.Cid) (block.Block, error) {
b, err := m.Blockstore.Get(c) b, err := m.Blockstore.Get(c)
if err != nil { if err != nil {
// change to error for stacktraces, don't commit with that pls // change to error for stacktraces, don't commit with that pls
log.Warnf("Get failed: %s %s", c, err) log.Warn("Get failed: %s %s", c, err)
return nil, err return nil, err
} }

View File

@ -35,7 +35,7 @@ func NewStateTree(cst *hamt.CborIpldStore) (*StateTree, error) {
func LoadStateTree(cst *hamt.CborIpldStore, c cid.Cid) (*StateTree, error) { func LoadStateTree(cst *hamt.CborIpldStore, c cid.Cid) (*StateTree, error) {
nd, err := hamt.LoadNode(context.Background(), cst, c) nd, err := hamt.LoadNode(context.Background(), cst, c)
if err != nil { if err != nil {
log.Errorf("loading hamt node failed: %s", err) log.Errorf("loading hamt node %s failed: %s", c, err)
return nil, err return nil, err
} }

View File

@ -11,24 +11,32 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/filecoin-project/go-lotus/api" "github.com/filecoin-project/go-lotus/api"
"github.com/filecoin-project/go-lotus/chain"
"github.com/filecoin-project/go-lotus/chain/gen" "github.com/filecoin-project/go-lotus/chain/gen"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/node" "github.com/filecoin-project/go-lotus/node"
"github.com/filecoin-project/go-lotus/node/impl"
"github.com/filecoin-project/go-lotus/node/modules" "github.com/filecoin-project/go-lotus/node/modules"
"github.com/filecoin-project/go-lotus/node/repo" "github.com/filecoin-project/go-lotus/node/repo"
) )
const source = 0 const source = 0
func repoWithChain(t *testing.T, h int) (repo.Repo, []byte) { func repoWithChain(t *testing.T, h int) (repo.Repo, []byte, []*types.FullBlock) {
g, err := gen.NewGenerator() g, err := gen.NewGenerator()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
blks := make([]*types.FullBlock, h)
for i := 0; i < h; i++ { for i := 0; i < h; i++ {
b, err := g.NextBlock() blks[i], err = g.NextBlock()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(i+1), b.Header.Height, "wrong height")
fmt.Printf("block at H:%d: %s\n", blks[i].Header.Height, blks[i].Cid())
require.Equal(t, uint64(i+1), blks[i].Header.Height, "wrong height")
} }
r, err := g.YieldRepo() r, err := g.YieldRepo()
@ -37,7 +45,7 @@ func repoWithChain(t *testing.T, h int) (repo.Repo, []byte) {
genb, err := g.GenesisCar() genb, err := g.GenesisCar()
require.NoError(t, err) require.NoError(t, err)
return r, genb return r, genb, blks
} }
type syncTestUtil struct { type syncTestUtil struct {
@ -47,16 +55,36 @@ type syncTestUtil struct {
mn mocknet.Mocknet mn mocknet.Mocknet
genesis []byte genesis []byte
blocks []*types.FullBlock
nds []api.FullNode nds []api.FullNode
} }
func prepSyncTest(t *testing.T, h int) *syncTestUtil {
logging.SetLogLevel("*", "INFO")
ctx := context.Background()
tu := &syncTestUtil{
t: t,
ctx: ctx,
mn: mocknet.New(ctx),
}
tu.addSourceNode(h)
tu.checkHeight("source", source, h)
// separate logs
fmt.Println("\x1b[31m///////////////////////////////////////////////////\x1b[39b")
return tu
}
func (tu *syncTestUtil) addSourceNode(gen int) { func (tu *syncTestUtil) addSourceNode(gen int) {
if tu.genesis != nil { if tu.genesis != nil {
tu.t.Fatal("source node already exists") tu.t.Fatal("source node already exists")
} }
sourceRepo, genesis := repoWithChain(tu.t, gen) sourceRepo, genesis, blocks := repoWithChain(tu.t, gen)
var out api.FullNode var out api.FullNode
err := node.New(tu.ctx, err := node.New(tu.ctx,
@ -70,6 +98,7 @@ func (tu *syncTestUtil) addSourceNode(gen int) {
require.NoError(tu.t, err) require.NoError(tu.t, err)
tu.genesis = genesis tu.genesis = genesis
tu.blocks = blocks
tu.nds = append(tu.nds, out) // always at 0 tu.nds = append(tu.nds, out) // always at 0
} }
@ -117,36 +146,64 @@ func (tu *syncTestUtil) compareSourceState(with int) {
for _, addr := range sourceAccounts { for _, addr := range sourceAccounts {
sourceBalance, err := tu.nds[source].WalletBalance(tu.ctx, addr) sourceBalance, err := tu.nds[source].WalletBalance(tu.ctx, addr)
require.NoError(tu.t, err) require.NoError(tu.t, err)
fmt.Printf("Source state check for %s, expect %s\n", addr, sourceBalance)
actBalance, err := tu.nds[with].WalletBalance(tu.ctx, addr) actBalance, err := tu.nds[with].WalletBalance(tu.ctx, addr)
require.NoError(tu.t, err) require.NoError(tu.t, err)
require.Equal(tu.t, sourceBalance, actBalance) require.Equal(tu.t, sourceBalance, actBalance)
fmt.Printf("Source state check <OK> for %s\n", addr)
}
}
func (tu *syncTestUtil) submitSourceBlock(to int, h int) {
// utility to simulate incoming blocks without miner process
var b chain.BlockMsg
// -1 to match block.Height
b.Header = tu.blocks[h - 1].Header
for _, msg := range tu.blocks[h - 1].Messages {
c, err := tu.nds[to].(*impl.FullNodeAPI).Chain.PutMessage(msg)
require.NoError(tu.t, err)
b.Messages = append(b.Messages, c)
}
require.NoError(tu.t, tu.nds[to].ChainSubmitBlock(tu.ctx, &b))
}
func (tu *syncTestUtil) submitSourceBlocks(to int, h int, n int) {
for i := 0; i < n; i++ {
tu.submitSourceBlock(to, h + i)
} }
} }
func TestSyncSimple(t *testing.T) { func TestSyncSimple(t *testing.T) {
logging.SetLogLevel("*", "INFO")
H := 20 H := 20
ctx := context.Background() tu := prepSyncTest(t, H)
tu := &syncTestUtil{
t: t,
ctx: ctx,
mn: mocknet.New(ctx),
}
tu.addSourceNode(H)
tu.checkHeight("source", source, H)
// separate logs
fmt.Println("\x1b[31m///////////////////////////////////////////////////\x1b[39b")
client := tu.addClientNode() client := tu.addClientNode()
tu.checkHeight("client", client, 0) tu.checkHeight("client", client, 0)
require.NoError(t, tu.mn.LinkAll()) require.NoError(t, tu.mn.LinkAll())
tu.connect(1, 0) tu.connect(1, 0)
time.Sleep(time.Second * 3)
tu.checkHeight("client", client, H)
tu.compareSourceState(client)
}
func TestSyncManual(t *testing.T) {
H := 2
tu := prepSyncTest(t, H)
client := tu.addClientNode()
tu.checkHeight("client", client, 0)
tu.submitSourceBlocks(client, 1, H)
time.Sleep(time.Second * 1) time.Sleep(time.Second * 1)
tu.checkHeight("client", client, H) tu.checkHeight("client", client, H)

View File

@ -6,6 +6,7 @@ import (
"reflect" "reflect"
"time" "time"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-filestore" "github.com/ipfs/go-filestore"
exchange "github.com/ipfs/go-ipfs-exchange-interface" exchange "github.com/ipfs/go-ipfs-exchange-interface"

View File

@ -60,6 +60,7 @@ func Bitswap(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, rt routin
return exch.Close() return exch.Close()
}, },
}) })
return exch return exch
} }