Merge pull request #8088 from filecoin-project/fix/flakey-paychapi-test
fix:paychan:deflake integration test
This commit is contained in:
commit
6926d63655
@ -3,6 +3,7 @@ package kit
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
@ -10,6 +11,7 @@ import (
|
||||
|
||||
"github.com/filecoin-project/go-bitfield"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-state-types/dline"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
aminer "github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
@ -63,11 +65,10 @@ func (p *partitionTracker) done(t *testing.T) bool {
|
||||
return uint64(len(p.partitions)) == p.count(t)
|
||||
}
|
||||
|
||||
func (p *partitionTracker) recordIfPost(t *testing.T, bm *BlockMiner, smsg *types.SignedMessage) (ret bool) {
|
||||
func (p *partitionTracker) recordIfPost(t *testing.T, bm *BlockMiner, msg *types.Message) (ret bool) {
|
||||
defer func() {
|
||||
ret = p.done(t)
|
||||
}()
|
||||
msg := smsg.Message
|
||||
if !(msg.To == bm.miner.ActorAddr) {
|
||||
return
|
||||
}
|
||||
@ -82,11 +83,69 @@ func (p *partitionTracker) recordIfPost(t *testing.T, bm *BlockMiner, smsg *type
|
||||
return
|
||||
}
|
||||
|
||||
func (bm *BlockMiner) forcePoSt(ctx context.Context, ts *types.TipSet, dlinfo *dline.Info) {
|
||||
|
||||
tracker := newPartitionTracker(ctx, dlinfo.Index, bm)
|
||||
if !tracker.done(bm.t) { // need to wait for post
|
||||
bm.t.Logf("expect %d partitions proved but only see %d", len(tracker.partitions), tracker.count(bm.t))
|
||||
poolEvts, err := bm.miner.FullNode.MpoolSub(ctx) //subscribe before checking pending so we don't miss any events
|
||||
require.NoError(bm.t, err)
|
||||
|
||||
// First check pending messages we'll mine this epoch
|
||||
msgs, err := bm.miner.FullNode.MpoolPending(ctx, types.EmptyTSK)
|
||||
require.NoError(bm.t, err)
|
||||
for _, msg := range msgs {
|
||||
if tracker.recordIfPost(bm.t, bm, &msg.Message) {
|
||||
fmt.Printf("found post in mempool pending\n")
|
||||
}
|
||||
}
|
||||
|
||||
// Account for included but not yet executed messages
|
||||
for _, bc := range ts.Cids() {
|
||||
msgs, err := bm.miner.FullNode.ChainGetBlockMessages(ctx, bc)
|
||||
require.NoError(bm.t, err)
|
||||
for _, msg := range msgs.BlsMessages {
|
||||
if tracker.recordIfPost(bm.t, bm, msg) {
|
||||
fmt.Printf("found post in message of prev tipset\n")
|
||||
}
|
||||
|
||||
}
|
||||
for _, msg := range msgs.SecpkMessages {
|
||||
if tracker.recordIfPost(bm.t, bm, &msg.Message) {
|
||||
fmt.Printf("found post in message of prev tipset\n")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// post not yet in mpool, wait for it
|
||||
if !tracker.done(bm.t) {
|
||||
bm.t.Logf("post missing from mpool, block mining suspended until it arrives")
|
||||
POOL:
|
||||
for {
|
||||
bm.t.Logf("mpool event wait loop at block height %d, ts: %s", ts.Height(), ts.Key())
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case evt := <-poolEvts:
|
||||
bm.t.Logf("pool event: %d", evt.Type)
|
||||
if evt.Type == api.MpoolAdd {
|
||||
bm.t.Logf("incoming message %v", evt.Message)
|
||||
if tracker.recordIfPost(bm.t, bm, &evt.Message.Message) {
|
||||
fmt.Printf("found post in mempool evt\n")
|
||||
break POOL
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
bm.t.Logf("done waiting on mpool")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Like MineBlocks but refuses to mine until the window post scheduler has wdpost messages in the mempool
|
||||
// and everything shuts down if a post fails. It also enforces that every block mined succeeds
|
||||
func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Duration) {
|
||||
|
||||
time.Sleep(3 * time.Second)
|
||||
time.Sleep(time.Second)
|
||||
|
||||
// wrap context in a cancellable context.
|
||||
ctx, bm.cancel = context.WithCancel(ctx)
|
||||
@ -94,8 +153,6 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur
|
||||
go func() {
|
||||
defer bm.wg.Done()
|
||||
|
||||
activeDeadlines := make(map[int]struct{})
|
||||
_ = activeDeadlines
|
||||
ts, err := bm.miner.FullNode.ChainHead(ctx)
|
||||
require.NoError(bm.t, err)
|
||||
wait := make(chan bool)
|
||||
@ -103,7 +160,7 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur
|
||||
require.NoError(bm.t, err)
|
||||
// read current out
|
||||
curr := <-chg
|
||||
require.Equal(bm.t, ts.Height(), curr[0].Val.Height())
|
||||
require.Equal(bm.t, ts.Height(), curr[0].Val.Height(), "failed sanity check: are multiple miners mining with must post?")
|
||||
for {
|
||||
select {
|
||||
case <-time.After(blocktime):
|
||||
@ -111,52 +168,15 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur
|
||||
return
|
||||
}
|
||||
nulls := atomic.SwapInt64(&bm.nextNulls, 0)
|
||||
require.Equal(bm.t, int64(0), nulls, "Injecting > 0 null blocks while `MustPost` mining is currently unsupported")
|
||||
|
||||
// Wake up and figure out if we are at the end of an active deadline
|
||||
ts, err := bm.miner.FullNode.ChainHead(ctx)
|
||||
require.NoError(bm.t, err)
|
||||
tsk := ts.Key()
|
||||
|
||||
dlinfo, err := bm.miner.FullNode.StateMinerProvingDeadline(ctx, bm.miner.ActorAddr, tsk)
|
||||
dlinfo, err := bm.miner.FullNode.StateMinerProvingDeadline(ctx, bm.miner.ActorAddr, ts.Key())
|
||||
require.NoError(bm.t, err)
|
||||
if ts.Height()+1 == dlinfo.Last() { // Last epoch in dline, we need to check that miner has posted
|
||||
|
||||
tracker := newPartitionTracker(ctx, dlinfo.Index, bm)
|
||||
if !tracker.done(bm.t) { // need to wait for post
|
||||
bm.t.Logf("expect %d partitions proved but only see %d", len(tracker.partitions), tracker.count(bm.t))
|
||||
poolEvts, err := bm.miner.FullNode.MpoolSub(ctx)
|
||||
require.NoError(bm.t, err)
|
||||
|
||||
// First check pending messages we'll mine this epoch
|
||||
msgs, err := bm.miner.FullNode.MpoolPending(ctx, types.EmptyTSK)
|
||||
require.NoError(bm.t, err)
|
||||
for _, msg := range msgs {
|
||||
tracker.recordIfPost(bm.t, bm, msg)
|
||||
}
|
||||
|
||||
// post not yet in mpool, wait for it
|
||||
if !tracker.done(bm.t) {
|
||||
bm.t.Logf("post missing from mpool, block mining suspended until it arrives")
|
||||
POOL:
|
||||
for {
|
||||
bm.t.Logf("mpool event wait loop at block height %d, ts: %s", ts.Height(), ts.Key())
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case evt := <-poolEvts:
|
||||
bm.t.Logf("pool event: %d", evt.Type)
|
||||
if evt.Type == api.MpoolAdd {
|
||||
bm.t.Logf("incoming message %v", evt.Message)
|
||||
if tracker.recordIfPost(bm.t, bm, evt.Message) {
|
||||
break POOL
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
bm.t.Logf("done waiting on mpool")
|
||||
}
|
||||
}
|
||||
if ts.Height()+1+abi.ChainEpoch(nulls) >= dlinfo.Last() { // Next block brings us past the last epoch in dline, we need to wait for miner to post
|
||||
bm.forcePoSt(ctx, ts, dlinfo)
|
||||
}
|
||||
|
||||
var target abi.ChainEpoch
|
||||
@ -173,6 +193,12 @@ func (bm *BlockMiner) MineBlocksMustPost(ctx context.Context, blocktime time.Dur
|
||||
Done: reportSuccessFn,
|
||||
})
|
||||
success = <-wait
|
||||
if !success {
|
||||
// if we are mining a new null block and it brings us past deadline boundary we need to wait for miner to post
|
||||
if ts.Height()+1+abi.ChainEpoch(nulls+i) >= dlinfo.Last() {
|
||||
bm.forcePoSt(ctx, ts, dlinfo)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Wait until it shows up on the given full nodes ChainHead
|
||||
|
@ -51,7 +51,7 @@ func TestPaymentChannelsAPI(t *testing.T) {
|
||||
Miner(&miner, &paymentCreator, kit.WithAllSubsystems()).
|
||||
Start().
|
||||
InterconnectAll()
|
||||
bms := ens.BeginMining(blockTime)
|
||||
bms := ens.BeginMiningMustPost(blockTime)
|
||||
bm := bms[0]
|
||||
|
||||
// send some funds to register the receiver
|
||||
|
Loading…
Reference in New Issue
Block a user