Close subscription in integration test

* This was causing issues in future tests,
   since the subscription was causing blocks
   and transactions to be added to the DB
This commit is contained in:
Eric Meyer 2017-11-03 08:54:32 -05:00
parent 4733b25f54
commit 0262a99321
6 changed files with 47 additions and 8 deletions

View File

@ -31,3 +31,7 @@ func (listener BlockchainListener) notifyObservers(block core.Block) {
observer.NotifyBlockAdded(block) observer.NotifyBlockAdded(block)
} }
} }
func (listener BlockchainListener) Stop() {
listener.blockchain.StopListening()
}

View File

@ -54,4 +54,16 @@ var _ = Describe("Blockchain listeners", func() {
close(done) close(done)
}, 1) }, 1)
It("stops listening", func(done Done) {
observer := fakes.NewFakeBlockchainObserverTwo()
blockchain := &fakes.Blockchain{}
listener := blockchain_listener.NewBlockchainListener(blockchain, []core.BlockchainObserver{observer})
go listener.Start()
listener.Stop()
Expect(blockchain.WasToldToStop).To(BeTrue())
close(done)
}, 1)
}) })

View File

@ -3,4 +3,5 @@ package core
type Blockchain interface { type Blockchain interface {
SubscribeToBlocks(blocks chan Block) SubscribeToBlocks(blocks chan Block)
StartListening() StartListening()
StopListening()
} }

View File

@ -3,7 +3,8 @@ package fakes
import "github.com/8thlight/vulcanizedb/core" import "github.com/8thlight/vulcanizedb/core"
type Blockchain struct { type Blockchain struct {
outputBlocks chan core.Block outputBlocks chan core.Block
WasToldToStop bool
} }
func (blockchain *Blockchain) SubscribeToBlocks(outputBlocks chan core.Block) { func (blockchain *Blockchain) SubscribeToBlocks(outputBlocks chan core.Block) {
@ -15,3 +16,7 @@ func (blockchain Blockchain) AddBlock(block core.Block) {
} }
func (*Blockchain) StartListening() {} func (*Blockchain) StartListening() {}
func (blockchain *Blockchain) StopListening() {
blockchain.WasToldToStop = true
}

View File

@ -4,15 +4,17 @@ import (
"fmt" "fmt"
"github.com/8thlight/vulcanizedb/core" "github.com/8thlight/vulcanizedb/core"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/ethclient"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
type GethBlockchain struct { type GethBlockchain struct {
client *ethclient.Client client *ethclient.Client
readGethHeaders chan *types.Header readGethHeaders chan *types.Header
outputBlocks chan core.Block outputBlocks chan core.Block
newHeadSubscription ethereum.Subscription
} }
func NewGethBlockchain(ipcPath string) *GethBlockchain { func NewGethBlockchain(ipcPath string) *GethBlockchain {
@ -29,7 +31,8 @@ func (blockchain *GethBlockchain) SubscribeToBlocks(blocks chan core.Block) {
inputHeaders := make(chan *types.Header, 10) inputHeaders := make(chan *types.Header, 10)
myContext := context.Background() myContext := context.Background()
blockchain.readGethHeaders = inputHeaders blockchain.readGethHeaders = inputHeaders
blockchain.client.SubscribeNewHead(myContext, inputHeaders) subscription, _ := blockchain.client.SubscribeNewHead(myContext, inputHeaders)
blockchain.newHeadSubscription = subscription
} }
func (blockchain *GethBlockchain) StartListening() { func (blockchain *GethBlockchain) StartListening() {
@ -40,3 +43,7 @@ func (blockchain *GethBlockchain) StartListening() {
blockchain.outputBlocks <- block blockchain.outputBlocks <- block
} }
} }
func (blockchain *GethBlockchain) StopListening() {
blockchain.newHeadSubscription.Unsubscribe()
}

View File

@ -22,11 +22,21 @@ func RunTimePath() string {
var _ = Describe("Reading from the Geth blockchain", func() { var _ = Describe("Reading from the Geth blockchain", func() {
It("reads two blocks", func(done Done) { var listener blockchain_listener.BlockchainListener
observer := fakes.NewFakeBlockchainObserverTwo() var observer *fakes.BlockchainObserver
BeforeEach(func() {
observer = fakes.NewFakeBlockchainObserverTwo()
blockchain := geth.NewGethBlockchain(RunTimePath() + "/test_data_dir/geth.ipc") blockchain := geth.NewGethBlockchain(RunTimePath() + "/test_data_dir/geth.ipc")
observers := []core.BlockchainObserver{observer} observers := []core.BlockchainObserver{observer}
listener := blockchain_listener.NewBlockchainListener(blockchain, observers) listener = blockchain_listener.NewBlockchainListener(blockchain, observers)
})
AfterEach(func() {
listener.Stop()
})
It("reads two blocks", func(done Done) {
go listener.Start() go listener.Start()
<-observer.WasNotified <-observer.WasNotified