Merge pull request #170 from filecoin-project/feat/test-util-sync-wait

add testutil sync wait
This commit is contained in:
Whyrusleeping 2019-09-03 14:32:42 +10:00 committed by GitHub
commit 56e465bb19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 95 additions and 22 deletions

View File

@ -214,7 +214,7 @@ func (cg *ChainGen) NextBlock() (*types.FullBlock, []*types.SignedMessage, error
msgs := make([]*types.SignedMessage, cg.msgsPerBlock) msgs := make([]*types.SignedMessage, cg.msgsPerBlock)
for m := range msgs { for m := range msgs {
msg := types.Message{ msg := types.Message{
To: cg.receivers[m], To: cg.receivers[m%len(cg.receivers)],
From: cg.banker, From: cg.banker,
Nonce: atomic.AddUint64(&cg.bankerNonce, 1) - 1, Nonce: atomic.AddUint64(&cg.bankerNonce, 1) - 1,

View File

@ -4,12 +4,14 @@ import (
"testing" "testing"
) )
func testGeneration(t testing.TB, n int) { func testGeneration(t testing.TB, n int, msgs int) {
g, err := NewGenerator() g, err := NewGenerator()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
g.msgsPerBlock = msgs
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
b, _, err := g.NextBlock() b, _, err := g.NextBlock()
if err != nil { if err != nil {
@ -22,9 +24,23 @@ func testGeneration(t testing.TB, n int) {
} }
func TestChainGeneration(t *testing.T) { func TestChainGeneration(t *testing.T) {
testGeneration(t, 10) testGeneration(t, 10, 20)
} }
func BenchmarkChainGeneration(b *testing.B) { func BenchmarkChainGeneration(b *testing.B) {
testGeneration(b, b.N) b.Run("0-messages", func(b *testing.B) {
testGeneration(b, b.N, 0)
})
b.Run("10-messages", func(b *testing.B) {
testGeneration(b, b.N, 10)
})
b.Run("100-messages", func(b *testing.B) {
testGeneration(b, b.N, 100)
})
b.Run("1000-messages", func(b *testing.B) {
testGeneration(b, b.N, 1000)
})
} }

View File

@ -135,11 +135,15 @@ func (cs *ChainStore) SubHeadChanges(ctx context.Context) chan *HeadChange {
select { select {
case val, ok := <-subch: case val, ok := <-subch:
if !ok { if !ok {
log.Warn("chain head sub exit loop")
return return
} }
out <- val.(*HeadChange) select {
case out <- val.(*HeadChange):
case <-ctx.Done(): case <-ctx.Done():
cs.bestTips.Unsub(subch) }
case <-ctx.Done():
go cs.bestTips.Unsub(subch)
} }
} }
}() }()

View File

@ -4,7 +4,6 @@ import (
"context" "context"
"fmt" "fmt"
"testing" "testing"
"time"
logging "github.com/ipfs/go-log" logging "github.com/ipfs/go-log"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
@ -22,7 +21,7 @@ import (
const source = 0 const source = 0
func (tu *syncTestUtil) repoWithChain(t *testing.T, h int) (repo.Repo, []byte, []*types.FullBlock) { func (tu *syncTestUtil) repoWithChain(t testing.TB, h int) (repo.Repo, []byte, []*types.FullBlock) {
blks := make([]*types.FullBlock, h) blks := make([]*types.FullBlock, h)
for i := 0; i < h; i++ { for i := 0; i < h; i++ {
@ -45,9 +44,11 @@ func (tu *syncTestUtil) repoWithChain(t *testing.T, h int) (repo.Repo, []byte, [
} }
type syncTestUtil struct { type syncTestUtil struct {
t *testing.T t testing.TB
ctx context.Context ctx context.Context
cancel func()
mn mocknet.Mocknet mn mocknet.Mocknet
g *gen.ChainGen g *gen.ChainGen
@ -58,7 +59,7 @@ type syncTestUtil struct {
nds []api.FullNode nds []api.FullNode
} }
func prepSyncTest(t *testing.T, h int) *syncTestUtil { func prepSyncTest(t testing.TB, h int) *syncTestUtil {
logging.SetLogLevel("*", "INFO") logging.SetLogLevel("*", "INFO")
g, err := gen.NewGenerator() g, err := gen.NewGenerator()
@ -66,10 +67,13 @@ func prepSyncTest(t *testing.T, h int) *syncTestUtil {
t.Fatal(err) t.Fatal(err)
} }
ctx := context.Background() ctx, cancel := context.WithCancel(context.Background())
tu := &syncTestUtil{ tu := &syncTestUtil{
t: t, t: t,
ctx: ctx, ctx: ctx,
cancel: cancel,
mn: mocknet.New(ctx), mn: mocknet.New(ctx),
g: g, g: g,
} }
@ -83,6 +87,10 @@ func prepSyncTest(t *testing.T, h int) *syncTestUtil {
return tu return tu
} }
func (tu *syncTestUtil) Shutdown() {
tu.cancel()
}
func (tu *syncTestUtil) mineNewBlock(src int) { func (tu *syncTestUtil) mineNewBlock(src int) {
fblk, msgs, err := tu.g.NextBlock() fblk, msgs, err := tu.g.NextBlock()
require.NoError(tu.t, err) require.NoError(tu.t, err)
@ -185,6 +193,28 @@ func (tu *syncTestUtil) compareSourceState(with int) {
} }
} }
func (tu *syncTestUtil) waitUntilSync(from, to int) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
target, err := tu.nds[from].ChainHead(ctx)
if err != nil {
tu.t.Fatal(err)
}
hc, err := tu.nds[to].ChainNotify(ctx)
if err != nil {
tu.t.Fatal(err)
}
// TODO: some sort of timeout?
for c := range hc {
if c.Val.Equals(target) {
return
}
}
}
func (tu *syncTestUtil) submitSourceBlock(to int, h int) { func (tu *syncTestUtil) submitSourceBlock(to int, h int) {
// utility to simulate incoming blocks without miner process // utility to simulate incoming blocks without miner process
// TODO: should call syncer directly, this won't work correctly in all cases // TODO: should call syncer directly, this won't work correctly in all cases
@ -218,7 +248,7 @@ func TestSyncSimple(t *testing.T) {
require.NoError(t, tu.mn.LinkAll()) require.NoError(t, tu.mn.LinkAll())
tu.connect(1, 0) tu.connect(1, 0)
time.Sleep(time.Second * 3) tu.waitUntilSync(0, client)
tu.checkHeight("client", client, H) tu.checkHeight("client", client, H)
@ -226,15 +256,18 @@ func TestSyncSimple(t *testing.T) {
} }
func TestSyncMining(t *testing.T) { func TestSyncMining(t *testing.T) {
H := 50 H := 100
tu := prepSyncTest(t, H) tu := prepSyncTest(t, H)
client := tu.addClientNode() client := tu.addClientNode()
tu.checkHeight("client", client, 0) tu.checkHeight("client", client, 0)
require.NoError(t, tu.mn.LinkAll()) require.NoError(t, tu.mn.LinkAll())
tu.connect(1, 0) tu.connect(client, 0)
time.Sleep(time.Second * 4) fmt.Println("waiting for sync...")
tu.waitUntilSync(0, client)
fmt.Println("after wait until sync")
tu.checkHeight("client", client, H) tu.checkHeight("client", client, H)
@ -242,11 +275,31 @@ func TestSyncMining(t *testing.T) {
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
tu.mineNewBlock(0) tu.mineNewBlock(0)
time.Sleep(time.Second / 2) tu.waitUntilSync(0, client)
tu.compareSourceState(client) tu.compareSourceState(client)
} }
} }
func BenchmarkSyncBasic(b *testing.B) {
for i := 0; i < b.N; i++ {
runSyncBenchLength(b, 100)
}
}
func runSyncBenchLength(b *testing.B, l int) {
tu := prepSyncTest(b, l)
client := tu.addClientNode()
tu.checkHeight("client", client, 0)
b.ResetTimer()
require.NoError(b, tu.mn.LinkAll())
tu.connect(1, 0)
tu.waitUntilSync(0, client)
}
/* /*
TODO: this is broken because of how tu.submitSourceBlock works now TODO: this is broken because of how tu.submitSourceBlock works now
func TestSyncManual(t *testing.T) { func TestSyncManual(t *testing.T) {