diff --git a/chain/gen/gen.go b/chain/gen/gen.go index 554674e9d..fa942a23b 100644 --- a/chain/gen/gen.go +++ b/chain/gen/gen.go @@ -214,7 +214,7 @@ func (cg *ChainGen) NextBlock() (*types.FullBlock, []*types.SignedMessage, error msgs := make([]*types.SignedMessage, cg.msgsPerBlock) for m := range msgs { msg := types.Message{ - To: cg.receivers[m], + To: cg.receivers[m%len(cg.receivers)], From: cg.banker, Nonce: atomic.AddUint64(&cg.bankerNonce, 1) - 1, diff --git a/chain/gen/gen_test.go b/chain/gen/gen_test.go index 73333f238..84fbf31e5 100644 --- a/chain/gen/gen_test.go +++ b/chain/gen/gen_test.go @@ -4,12 +4,14 @@ import ( "testing" ) -func testGeneration(t testing.TB, n int) { +func testGeneration(t testing.TB, n int, msgs int) { g, err := NewGenerator() if err != nil { t.Fatal(err) } + g.msgsPerBlock = msgs + for i := 0; i < n; i++ { b, _, err := g.NextBlock() if err != nil { @@ -22,9 +24,23 @@ func testGeneration(t testing.TB, n int) { } func TestChainGeneration(t *testing.T) { - testGeneration(t, 10) + testGeneration(t, 10, 20) } func BenchmarkChainGeneration(b *testing.B) { - testGeneration(b, b.N) + b.Run("0-messages", func(b *testing.B) { + testGeneration(b, b.N, 0) + }) + + b.Run("10-messages", func(b *testing.B) { + testGeneration(b, b.N, 10) + }) + + b.Run("100-messages", func(b *testing.B) { + testGeneration(b, b.N, 100) + }) + + b.Run("1000-messages", func(b *testing.B) { + testGeneration(b, b.N, 1000) + }) } diff --git a/chain/store/store.go b/chain/store/store.go index 8c35e4564..cc22bd5bf 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -135,11 +135,15 @@ func (cs *ChainStore) SubHeadChanges(ctx context.Context) chan *HeadChange { select { case val, ok := <-subch: if !ok { + log.Warn("chain head sub exit loop") return } - out <- val.(*HeadChange) + select { + case out <- val.(*HeadChange): + case <-ctx.Done(): + } case <-ctx.Done(): - cs.bestTips.Unsub(subch) + go cs.bestTips.Unsub(subch) } } }() diff --git a/chain/sync_test.go b/chain/sync_test.go index 4928ce79c..d548d7394 100644 --- a/chain/sync_test.go +++ b/chain/sync_test.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "testing" - "time" logging "github.com/ipfs/go-log" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" @@ -22,7 +21,7 @@ import ( const source = 0 -func (tu *syncTestUtil) repoWithChain(t *testing.T, h int) (repo.Repo, []byte, []*types.FullBlock) { +func (tu *syncTestUtil) repoWithChain(t testing.TB, h int) (repo.Repo, []byte, []*types.FullBlock) { blks := make([]*types.FullBlock, h) for i := 0; i < h; i++ { @@ -45,10 +44,12 @@ func (tu *syncTestUtil) repoWithChain(t *testing.T, h int) (repo.Repo, []byte, [ } type syncTestUtil struct { - t *testing.T + t testing.TB - ctx context.Context - mn mocknet.Mocknet + ctx context.Context + cancel func() + + mn mocknet.Mocknet g *gen.ChainGen @@ -58,7 +59,7 @@ type syncTestUtil struct { nds []api.FullNode } -func prepSyncTest(t *testing.T, h int) *syncTestUtil { +func prepSyncTest(t testing.TB, h int) *syncTestUtil { logging.SetLogLevel("*", "INFO") g, err := gen.NewGenerator() @@ -66,12 +67,15 @@ func prepSyncTest(t *testing.T, h int) *syncTestUtil { t.Fatal(err) } - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) + tu := &syncTestUtil{ - t: t, - ctx: ctx, - mn: mocknet.New(ctx), - g: g, + t: t, + ctx: ctx, + cancel: cancel, + + mn: mocknet.New(ctx), + g: g, } tu.addSourceNode(h) @@ -83,6 +87,10 @@ func prepSyncTest(t *testing.T, h int) *syncTestUtil { return tu } +func (tu *syncTestUtil) Shutdown() { + tu.cancel() +} + func (tu *syncTestUtil) mineNewBlock(src int) { fblk, msgs, err := tu.g.NextBlock() require.NoError(tu.t, err) @@ -185,6 +193,28 @@ func (tu *syncTestUtil) compareSourceState(with int) { } } +func (tu *syncTestUtil) waitUntilSync(from, to int) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + target, err := tu.nds[from].ChainHead(ctx) + if err != nil { + tu.t.Fatal(err) + } + + hc, err := tu.nds[to].ChainNotify(ctx) + if err != nil { + tu.t.Fatal(err) + } + + // TODO: some sort of timeout? + for c := range hc { + if c.Val.Equals(target) { + return + } + } +} + func (tu *syncTestUtil) submitSourceBlock(to int, h int) { // utility to simulate incoming blocks without miner process // TODO: should call syncer directly, this won't work correctly in all cases @@ -218,7 +248,7 @@ func TestSyncSimple(t *testing.T) { require.NoError(t, tu.mn.LinkAll()) tu.connect(1, 0) - time.Sleep(time.Second * 3) + tu.waitUntilSync(0, client) tu.checkHeight("client", client, H) @@ -226,15 +256,18 @@ func TestSyncSimple(t *testing.T) { } func TestSyncMining(t *testing.T) { - H := 50 + H := 100 tu := prepSyncTest(t, H) client := tu.addClientNode() tu.checkHeight("client", client, 0) require.NoError(t, tu.mn.LinkAll()) - tu.connect(1, 0) - time.Sleep(time.Second * 4) + tu.connect(client, 0) + fmt.Println("waiting for sync...") + tu.waitUntilSync(0, client) + + fmt.Println("after wait until sync") tu.checkHeight("client", client, H) @@ -242,11 +275,31 @@ func TestSyncMining(t *testing.T) { for i := 0; i < 5; i++ { tu.mineNewBlock(0) - time.Sleep(time.Second / 2) + tu.waitUntilSync(0, client) tu.compareSourceState(client) } } +func BenchmarkSyncBasic(b *testing.B) { + for i := 0; i < b.N; i++ { + runSyncBenchLength(b, 100) + } +} + +func runSyncBenchLength(b *testing.B, l int) { + tu := prepSyncTest(b, l) + + client := tu.addClientNode() + tu.checkHeight("client", client, 0) + + b.ResetTimer() + + require.NoError(b, tu.mn.LinkAll()) + tu.connect(1, 0) + + tu.waitUntilSync(0, client) +} + /* TODO: this is broken because of how tu.submitSourceBlock works now func TestSyncManual(t *testing.T) {