Deflake snap deals integration test

This commit is contained in:
zenground0 2022-01-05 08:24:46 -05:00 committed by Jennifer Wang
parent d6aa17e21f
commit a82ac00d9a

View File

@ -83,10 +83,10 @@ func (p *partitionTracker) recordIfPost(t *testing.T, bm *BlockMiner, smsg *type
} }
// Like MineBlocks but refuses to mine until the window post scheduler has wdpost messages in the mempool // Like MineBlocks but refuses to mine until the window post scheduler has wdpost messages in the mempool
// and everything shuts down if a post fails // and everything shuts down if a post fails. It also enforces that every block mined succeeds
func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Duration) { func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Duration) {
time.Sleep(time.Second) time.Sleep(3 * time.Second)
// wrap context in a cancellable context. // wrap context in a cancellable context.
ctx, bm.cancel = context.WithCancel(ctx) ctx, bm.cancel = context.WithCancel(ctx)
@ -96,7 +96,20 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur
activeDeadlines := make(map[int]struct{}) activeDeadlines := make(map[int]struct{})
_ = activeDeadlines _ = activeDeadlines
ts, err := bm.miner.FullNode.ChainHead(ctx)
require.NoError(bm.t, err)
wait := make(chan bool)
reportSuccessFn := func(success bool, epoch abi.ChainEpoch, err error) {
bm.t.Logf("done with mine one at epoch %d, success %t", epoch, success)
require.NoError(bm.t, err)
wait <- success
}
chg, err := bm.miner.FullNode.ChainNotify(ctx)
require.NoError(bm.t, err)
// read current out
curr := <-chg
require.Equal(bm.t, ts.Height(), curr[0].Val.Height())
numMined := curr[0].Val.Height()
for { for {
select { select {
case <-time.After(blocktime): case <-time.After(blocktime):
@ -110,6 +123,7 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur
ts, err := bm.miner.FullNode.ChainHead(ctx) ts, err := bm.miner.FullNode.ChainHead(ctx)
require.NoError(bm.t, err) require.NoError(bm.t, err)
tsk := ts.Key() tsk := ts.Key()
bm.t.Logf("Miner sees head ts: %s at height %d, num mined = %d", tsk, ts.Height(), numMined)
dlinfo, err := bm.miner.FullNode.StateMinerProvingDeadline(ctx, bm.miner.ActorAddr, tsk) dlinfo, err := bm.miner.FullNode.StateMinerProvingDeadline(ctx, bm.miner.ActorAddr, tsk)
require.NoError(bm.t, err) require.NoError(bm.t, err)
if ts.Height()+1 == dlinfo.Last() { // Last epoch in dline, we need to check that miner has posted if ts.Height()+1 == dlinfo.Last() { // Last epoch in dline, we need to check that miner has posted
@ -132,10 +146,12 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur
bm.t.Logf("post missing from mpool, block mining suspended until it arrives") bm.t.Logf("post missing from mpool, block mining suspended until it arrives")
POOL: POOL:
for { for {
bm.t.Logf("mpool event wait loop at block height %d, ts: %s", ts.Height(), ts.Key())
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case evt := <-poolEvts: case evt := <-poolEvts:
bm.t.Logf("pool event: %d", evt.Type)
if evt.Type == api.MpoolAdd { if evt.Type == api.MpoolAdd {
bm.t.Logf("incoming message %v", evt.Message) bm.t.Logf("incoming message %v", evt.Message)
if tracker.recordIfPost(bm.t, bm, evt.Message) { if tracker.recordIfPost(bm.t, bm, evt.Message) {
@ -144,17 +160,53 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur
} }
} }
} }
bm.t.Logf("done waiting on mpool")
} }
}
} }
baseHeight := ts.Height()
syncedToHeight := func(target abi.ChainEpoch) {
headChangeCh, err := bm.miner.FullNode.ChainNotify(ctx)
require.NoError(bm.t, err)
hccurrent, ok1 := <-headChangeCh
for !ok1 {
hccurrent, ok1 = <-headChangeCh
}
if hccurrent[0].Val.Height() >= target {
return
}
var ok2 bool
for {
var headChanges []*api.HeadChange
select {
case headChanges, ok2 = <-headChangeCh:
if !ok2 { // if channel is closed on us fail
bm.t.Log("channel closed")
bm.t.Fatal("chain notify channel closed while waiting to sync")
}
for _, hc := range headChanges {
if hc.Val.Height() >= target {
return
}
}
case <-ctx.Done():
return
}
}
} }
var success bool
for i := int64(0); !success; i++ {
err = bm.miner.MineOne(ctx, miner.MineReq{ err = bm.miner.MineOne(ctx, miner.MineReq{
InjectNulls: abi.ChainEpoch(nulls), InjectNulls: abi.ChainEpoch(nulls + i),
Done: func(bool, abi.ChainEpoch, error) {}, Done: reportSuccessFn,
}) })
success = <-wait
}
syncedToHeight(baseHeight + 1)
numMined += 1
switch { switch {
case err == nil: // wrap around case err == nil: // wrap around
case ctx.Err() != nil: // context fired. case ctx.Err() != nil: // context fired.