diff --git a/itests/kit/blockminer.go b/itests/kit/blockminer.go index c1061b558..878f4a663 100644 --- a/itests/kit/blockminer.go +++ b/itests/kit/blockminer.go @@ -83,10 +83,10 @@ func (p *partitionTracker) recordIfPost(t *testing.T, bm *BlockMiner, smsg *type } // Like MineBlocks but refuses to mine until the window post scheduler has wdpost messages in the mempool -// and everything shuts down if a post fails +// and everything shuts down if a post fails. It also enforces that every block mined succeeds func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Duration) { - time.Sleep(time.Second) + time.Sleep(3 * time.Second) // wrap context in a cancellable context. ctx, bm.cancel = context.WithCancel(ctx) @@ -96,7 +96,20 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur activeDeadlines := make(map[int]struct{}) _ = activeDeadlines - + ts, err := bm.miner.FullNode.ChainHead(ctx) + require.NoError(bm.t, err) + wait := make(chan bool) + reportSuccessFn := func(success bool, epoch abi.ChainEpoch, err error) { + bm.t.Logf("done with mine one at epoch %d, success %t", epoch, success) + require.NoError(bm.t, err) + wait <- success + } + chg, err := bm.miner.FullNode.ChainNotify(ctx) + require.NoError(bm.t, err) + // read current out + curr := <-chg + require.Equal(bm.t, ts.Height(), curr[0].Val.Height()) + numMined := curr[0].Val.Height() for { select { case <-time.After(blocktime): @@ -110,6 +123,7 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur ts, err := bm.miner.FullNode.ChainHead(ctx) require.NoError(bm.t, err) tsk := ts.Key() + bm.t.Logf("Miner sees head ts: %s at height %d, num mined = %d", tsk, ts.Height(), numMined) dlinfo, err := bm.miner.FullNode.StateMinerProvingDeadline(ctx, bm.miner.ActorAddr, tsk) require.NoError(bm.t, err) if ts.Height()+1 == dlinfo.Last() { // Last epoch in dline, we need to check that miner has posted @@ -132,10 +146,12 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur bm.t.Logf("post missing from mpool, block mining suspended until it arrives") POOL: for { + bm.t.Logf("mpool event wait loop at block height %d, ts: %s", ts.Height(), ts.Key()) select { case <-ctx.Done(): return case evt := <-poolEvts: + bm.t.Logf("pool event: %d", evt.Type) if evt.Type == api.MpoolAdd { bm.t.Logf("incoming message %v", evt.Message) if tracker.recordIfPost(bm.t, bm, evt.Message) { @@ -144,17 +160,53 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur } } } - + bm.t.Logf("done waiting on mpool") } - } - } - err = bm.miner.MineOne(ctx, miner.MineReq{ - InjectNulls: abi.ChainEpoch(nulls), - Done: func(bool, abi.ChainEpoch, error) {}, - }) + baseHeight := ts.Height() + + syncedToHeight := func(target abi.ChainEpoch) { + headChangeCh, err := bm.miner.FullNode.ChainNotify(ctx) + require.NoError(bm.t, err) + hccurrent, ok1 := <-headChangeCh + for !ok1 { + hccurrent, ok1 = <-headChangeCh + } + if hccurrent[0].Val.Height() >= target { + return + } + var ok2 bool + for { + var headChanges []*api.HeadChange + select { + case headChanges, ok2 = <-headChangeCh: + if !ok2 { // if channel is closed on us fail + bm.t.Log("channel closed") + bm.t.Fatal("chain notify channel closed while waiting to sync") + } + for _, hc := range headChanges { + if hc.Val.Height() >= target { + return + } + } + case <-ctx.Done(): + return + } + } + } + + var success bool + for i := int64(0); !success; i++ { + err = bm.miner.MineOne(ctx, miner.MineReq{ + InjectNulls: abi.ChainEpoch(nulls + i), + Done: reportSuccessFn, + }) + success = <-wait + } + syncedToHeight(baseHeight + 1) + numMined += 1 switch { case err == nil: // wrap around case ctx.Err() != nil: // context fired.