From 0262a99321dffaed23fd22c013908f1634715a7f Mon Sep 17 00:00:00 2001 From: Eric Meyer Date: Fri, 3 Nov 2017 08:54:32 -0500 Subject: [PATCH] Close subscription in integration test * This was causing issues in future tests, since the subscription was causing blocks and transactions to be added to the DB --- blockchain_listener/blockchain_listener.go | 4 ++++ blockchain_listener/blockchain_listener_test.go | 12 ++++++++++++ core/blockchain.go | 1 + fakes/blockchain.go | 7 ++++++- geth/geth_blockchain.go | 15 +++++++++++---- integration_test/geth_blockchain_test.go | 16 +++++++++++++--- 6 files changed, 47 insertions(+), 8 deletions(-) diff --git a/blockchain_listener/blockchain_listener.go b/blockchain_listener/blockchain_listener.go index a5734565..73e505b8 100644 --- a/blockchain_listener/blockchain_listener.go +++ b/blockchain_listener/blockchain_listener.go @@ -31,3 +31,7 @@ func (listener BlockchainListener) notifyObservers(block core.Block) { observer.NotifyBlockAdded(block) } } + +func (listener BlockchainListener) Stop() { + listener.blockchain.StopListening() +} diff --git a/blockchain_listener/blockchain_listener_test.go b/blockchain_listener/blockchain_listener_test.go index 787b134f..c64eb5c4 100644 --- a/blockchain_listener/blockchain_listener_test.go +++ b/blockchain_listener/blockchain_listener_test.go @@ -54,4 +54,16 @@ var _ = Describe("Blockchain listeners", func() { close(done) }, 1) + It("stops listening", func(done Done) { + observer := fakes.NewFakeBlockchainObserverTwo() + blockchain := &fakes.Blockchain{} + listener := blockchain_listener.NewBlockchainListener(blockchain, []core.BlockchainObserver{observer}) + go listener.Start() + + listener.Stop() + + Expect(blockchain.WasToldToStop).To(BeTrue()) + close(done) + }, 1) + }) diff --git a/core/blockchain.go b/core/blockchain.go index 72ff5b2c..7cc99c7e 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -3,4 +3,5 @@ package core type Blockchain interface { SubscribeToBlocks(blocks chan Block) StartListening() + StopListening() } diff --git a/fakes/blockchain.go b/fakes/blockchain.go index 577c89fc..bb1cddde 100644 --- a/fakes/blockchain.go +++ b/fakes/blockchain.go @@ -3,7 +3,8 @@ package fakes import "github.com/8thlight/vulcanizedb/core" type Blockchain struct { - outputBlocks chan core.Block + outputBlocks chan core.Block + WasToldToStop bool } func (blockchain *Blockchain) SubscribeToBlocks(outputBlocks chan core.Block) { @@ -15,3 +16,7 @@ func (blockchain Blockchain) AddBlock(block core.Block) { } func (*Blockchain) StartListening() {} + +func (blockchain *Blockchain) StopListening() { + blockchain.WasToldToStop = true +} diff --git a/geth/geth_blockchain.go b/geth/geth_blockchain.go index e712ee11..9323d2f7 100644 --- a/geth/geth_blockchain.go +++ b/geth/geth_blockchain.go @@ -4,15 +4,17 @@ import ( "fmt" "github.com/8thlight/vulcanizedb/core" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" "golang.org/x/net/context" ) type GethBlockchain struct { - client *ethclient.Client - readGethHeaders chan *types.Header - outputBlocks chan core.Block + client *ethclient.Client + readGethHeaders chan *types.Header + outputBlocks chan core.Block + newHeadSubscription ethereum.Subscription } func NewGethBlockchain(ipcPath string) *GethBlockchain { @@ -29,7 +31,8 @@ func (blockchain *GethBlockchain) SubscribeToBlocks(blocks chan core.Block) { inputHeaders := make(chan *types.Header, 10) myContext := context.Background() blockchain.readGethHeaders = inputHeaders - blockchain.client.SubscribeNewHead(myContext, inputHeaders) + subscription, _ := blockchain.client.SubscribeNewHead(myContext, inputHeaders) + blockchain.newHeadSubscription = subscription } func (blockchain *GethBlockchain) StartListening() { @@ -40,3 +43,7 @@ func (blockchain *GethBlockchain) StartListening() { blockchain.outputBlocks <- block } } + +func (blockchain *GethBlockchain) StopListening() { + blockchain.newHeadSubscription.Unsubscribe() +} diff --git a/integration_test/geth_blockchain_test.go b/integration_test/geth_blockchain_test.go index cc92710a..199c173f 100644 --- a/integration_test/geth_blockchain_test.go +++ b/integration_test/geth_blockchain_test.go @@ -22,11 +22,21 @@ func RunTimePath() string { var _ = Describe("Reading from the Geth blockchain", func() { - It("reads two blocks", func(done Done) { - observer := fakes.NewFakeBlockchainObserverTwo() + var listener blockchain_listener.BlockchainListener + var observer *fakes.BlockchainObserver + + BeforeEach(func() { + observer = fakes.NewFakeBlockchainObserverTwo() blockchain := geth.NewGethBlockchain(RunTimePath() + "/test_data_dir/geth.ipc") observers := []core.BlockchainObserver{observer} - listener := blockchain_listener.NewBlockchainListener(blockchain, observers) + listener = blockchain_listener.NewBlockchainListener(blockchain, observers) + }) + + AfterEach(func() { + listener.Stop() + }) + + It("reads two blocks", func(done Done) { go listener.Start() <-observer.WasNotified