fix lifecycle of BlockMiner.

This commit is contained in:
Raúl Kripalani 2021-05-21 13:39:09 +01:00
parent 25daa0c8e4
commit 20dfe220f3
3 changed files with 29 additions and 23 deletions

View File

@ -17,18 +17,19 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestAPI(t *testing.T) {
t.Run("direct", func(t *testing.T) {
runAPITest(t, kit.Builder)
})
t.Run("rpc", func(t *testing.T) {
runAPITest(t, kit.RPCBuilder)
})
}
type apiSuite struct { type apiSuite struct {
makeNodes kit.APIBuilder makeNodes kit.APIBuilder
} }
func TestAPI(t *testing.T) {
runAPITest(t, kit.Builder)
}
func TestAPIRPC(t *testing.T) {
runAPITest(t, kit.RPCBuilder)
}
// runAPITest is the entry point to API test suite // runAPITest is the entry point to API test suite
func runAPITest(t *testing.T, b kit.APIBuilder) { func runAPITest(t *testing.T, b kit.APIBuilder) {
ts := apiSuite{ ts := apiSuite{

View File

@ -2,6 +2,7 @@ package kit
import ( import (
"context" "context"
"sync"
"sync/atomic" "sync/atomic"
"testing" "testing"
"time" "time"
@ -17,37 +18,45 @@ type BlockMiner struct {
miner TestMiner miner TestMiner
nextNulls int64 nextNulls int64
stopCh chan chan struct{} wg sync.WaitGroup
cancel context.CancelFunc
} }
func NewBlockMiner(t *testing.T, miner TestMiner) *BlockMiner { func NewBlockMiner(t *testing.T, miner TestMiner) *BlockMiner {
return &BlockMiner{ return &BlockMiner{
t: t, t: t,
miner: miner, miner: miner,
stopCh: make(chan chan struct{}), cancel: func() {},
} }
} }
func (bm *BlockMiner) MineBlocks(ctx context.Context, blocktime time.Duration) { func (bm *BlockMiner) MineBlocks(ctx context.Context, blocktime time.Duration) {
time.Sleep(time.Second) time.Sleep(time.Second)
// wrap context in a cancellable context.
ctx, bm.cancel = context.WithCancel(ctx)
bm.wg.Add(1)
go func() { go func() {
defer bm.wg.Done()
for { for {
select { select {
case <-time.After(blocktime): case <-time.After(blocktime):
case <-ctx.Done(): case <-ctx.Done():
return return
case ch := <-bm.stopCh:
close(ch)
close(bm.stopCh)
return
} }
nulls := atomic.SwapInt64(&bm.nextNulls, 0) nulls := atomic.SwapInt64(&bm.nextNulls, 0)
if err := bm.miner.MineOne(ctx, miner.MineReq{ err := bm.miner.MineOne(ctx, miner.MineReq{
InjectNulls: abi.ChainEpoch(nulls), InjectNulls: abi.ChainEpoch(nulls),
Done: func(bool, abi.ChainEpoch, error) {}, Done: func(bool, abi.ChainEpoch, error) {},
}); err != nil { })
switch {
case err == nil: // wrap around
case ctx.Err() != nil: // context fired.
return
default: // log error
bm.t.Error(err) bm.t.Error(err)
} }
} }
@ -110,11 +119,6 @@ func (bm *BlockMiner) MineUntilBlock(ctx context.Context, fn TestFullNode, cb fu
// Stop stops the block miner. // Stop stops the block miner.
func (bm *BlockMiner) Stop() { func (bm *BlockMiner) Stop() {
bm.t.Log("shutting down mining") bm.t.Log("shutting down mining")
if _, ok := <-bm.stopCh; ok { bm.cancel()
// already stopped bm.wg.Wait()
return
}
ch := make(chan struct{})
bm.stopCh <- ch
<-ch
} }

View File

@ -370,6 +370,7 @@ func mockBuilderOpts(t *testing.T, fullOpts []FullNodeOpts, storage []StorageMin
wait.Lock() wait.Lock()
bm := NewBlockMiner(t, miners[0]) bm := NewBlockMiner(t, miners[0])
t.Cleanup(bm.Stop)
bm.MineUntilBlock(ctx, fulls[0], func(epoch abi.ChainEpoch) { bm.MineUntilBlock(ctx, fulls[0], func(epoch abi.ChainEpoch) {
wait.Unlock() wait.Unlock()