From 646e0fa0578cad23dd00b4ca00ced53659d43330 Mon Sep 17 00:00:00 2001 From: Eric Meyer Date: Thu, 2 Nov 2017 06:41:24 -0500 Subject: [PATCH] Refactor to use listener * This removes some duplication between the fake blockchain and geth blockchain. * This pulls the observers into the blockchain listener --- Gododir/main.go | 20 ++++----- core/blockchain.go | 4 +- core/blockchain_listener.go | 31 +++++++++++++ core/blockchain_listener_test.go | 56 ++++++++++++++++++++++++ core/fake_blockchain_test.go | 51 --------------------- core/geth_blockchain.go | 39 +++++++---------- fakes/blockchain.go | 18 +++----- fakes/blockchain_observer.go | 24 +++++----- integration_test/geth_blockchain_test.go | 34 ++++++-------- 9 files changed, 147 insertions(+), 130 deletions(-) create mode 100644 core/blockchain_listener.go create mode 100644 core/blockchain_listener_test.go delete mode 100644 core/fake_blockchain_test.go diff --git a/Gododir/main.go b/Gododir/main.go index 8254201f..1e7a0a55 100644 --- a/Gododir/main.go +++ b/Gododir/main.go @@ -20,19 +20,19 @@ func parseIpcPath(context *do.Context) string { } func startBlockchainListener(config cfg.Config, ipcPath string) { - port := config.Database.Port - host := config.Database.Hostname - databaseName := config.Database.Name - - var blockchain core.Blockchain = core.NewGethBlockchain(ipcPath) - blockchain.RegisterObserver(core.BlockchainLoggingObserver{}) - pgConfig := fmt.Sprintf("host=%s port=%d dbname=%s sslmode=disable", host, port, databaseName) - db, err := sqlx.Connect("postgres", pgConfig) + blockchain := core.NewGethBlockchain(ipcPath) + loggingObserver := core.BlockchainLoggingObserver{} + connectString := cfg.DbConnectionString(cfg.Public().Database) + db, err := sqlx.Connect("postgres", connectString) if err != nil { log.Fatalf("Error connecting to DB: %v\n", err) } - blockchain.RegisterObserver(core.BlockchainDBObserver{Db: db}) - blockchain.SubscribeToEvents() + dbObserver := (core.BlockchainDBObserver{Db: db}) + listener := core.NewBlockchainListener(blockchain, []core.BlockchainObserver{ + loggingObserver, + dbObserver, + }) + listener.Start() } func tasks(p *do.Project) { diff --git a/core/blockchain.go b/core/blockchain.go index 896fb21e..72ff5b2c 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1,6 +1,6 @@ package core type Blockchain interface { - RegisterObserver(observer BlockchainObserver) - SubscribeToEvents() + SubscribeToBlocks(blocks chan Block) + StartListening() } diff --git a/core/blockchain_listener.go b/core/blockchain_listener.go new file mode 100644 index 00000000..a488ab54 --- /dev/null +++ b/core/blockchain_listener.go @@ -0,0 +1,31 @@ +package core + +type BlockchainListener struct { + inputBlocks chan Block + blockchain Blockchain + observers []BlockchainObserver +} + +func NewBlockchainListener(blockchain Blockchain, observers []BlockchainObserver) BlockchainListener { + inputBlocks := make(chan Block, 10) + blockchain.SubscribeToBlocks(inputBlocks) + listener := BlockchainListener{ + inputBlocks: inputBlocks, + blockchain: blockchain, + observers: observers, + } + return listener +} + +func (listener BlockchainListener) Start() { + go listener.blockchain.StartListening() + for block := range listener.inputBlocks { + listener.notifyObservers(block) + } +} + +func (listener BlockchainListener) notifyObservers(block Block) { + for _, observer := range listener.observers { + observer.NotifyBlockAdded(block) + } +} diff --git a/core/blockchain_listener_test.go b/core/blockchain_listener_test.go new file mode 100644 index 00000000..86bf1ad7 --- /dev/null +++ b/core/blockchain_listener_test.go @@ -0,0 +1,56 @@ +package core_test + +import ( + "github.com/8thlight/vulcanizedb/core" + "github.com/8thlight/vulcanizedb/fakes" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Blockchain listeners", func() { + + It("starts with no blocks", func(done Done) { + observer := fakes.NewFakeBlockchainObserverTwo() + blockchain := &fakes.Blockchain{} + + core.NewBlockchainListener(blockchain, []core.BlockchainObserver{observer}) + + Expect(len(observer.CurrentBlocks)).To(Equal(0)) + close(done) + }, 1) + + It("sees when one block was added", func(done Done) { + observer := fakes.NewFakeBlockchainObserverTwo() + blockchain := &fakes.Blockchain{} + listener := core.NewBlockchainListener(blockchain, []core.BlockchainObserver{observer}) + go listener.Start() + + go blockchain.AddBlock(core.Block{Number: 123}) + + wasObserverNotified := <-observer.WasNotified + Expect(wasObserverNotified).To(BeTrue()) + Expect(len(observer.CurrentBlocks)).To(Equal(1)) + addedBlock := observer.CurrentBlocks[0] + Expect(addedBlock.Number).To(Equal(int64(123))) + close(done) + }, 1) + + It("sees a second block", func(done Done) { + observer := fakes.NewFakeBlockchainObserverTwo() + blockchain := &fakes.Blockchain{} + listener := core.NewBlockchainListener(blockchain, []core.BlockchainObserver{observer}) + go listener.Start() + + go blockchain.AddBlock(core.Block{Number: 123}) + <-observer.WasNotified + go blockchain.AddBlock(core.Block{Number: 456}) + wasObserverNotified := <-observer.WasNotified + + Expect(wasObserverNotified).To(BeTrue()) + Expect(len(observer.CurrentBlocks)).To(Equal(2)) + addedBlock := observer.CurrentBlocks[1] + Expect(addedBlock.Number).To(Equal(int64(456))) + close(done) + }, 1) + +}) diff --git a/core/fake_blockchain_test.go b/core/fake_blockchain_test.go deleted file mode 100644 index d8b16edf..00000000 --- a/core/fake_blockchain_test.go +++ /dev/null @@ -1,51 +0,0 @@ -package core_test - -import ( - "github.com/8thlight/vulcanizedb/core" - "github.com/8thlight/vulcanizedb/fakes" - - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" -) - -var _ = Describe("The fake blockchain", func() { - - It("conforms to the Blockchain interface", func() { - var blockchain core.Blockchain = &fakes.Blockchain{} - Expect(blockchain).ShouldNot(BeNil()) - }) - - It("lets the only observer know when a block was added", func() { - blockchain := fakes.Blockchain{} - blockchainObserver := &fakes.BlockchainObserver{} - blockchain.RegisterObserver(blockchainObserver) - - blockchain.AddBlock(core.Block{}) - - Expect(blockchainObserver.WasToldBlockAdded()).Should(Equal(true)) - }) - - It("lets the second observer know when a block was added", func() { - blockchain := fakes.Blockchain{} - blockchainObserverOne := &fakes.BlockchainObserver{} - blockchainObserverTwo := &fakes.BlockchainObserver{} - blockchain.RegisterObserver(blockchainObserverOne) - blockchain.RegisterObserver(blockchainObserverTwo) - - blockchain.AddBlock(core.Block{}) - - Expect(blockchainObserverTwo.WasToldBlockAdded()).Should(Equal(true)) - }) - - It("passes the added block to the observer", func() { - blockchain := fakes.Blockchain{} - blockchainObserver := &fakes.BlockchainObserver{} - blockchain.RegisterObserver(blockchainObserver) - - blockchain.AddBlock(core.Block{Number: int64(123)}) - - Expect(blockchainObserver.LastAddedBlock().Number).ShouldNot(BeNil()) - Expect(blockchainObserver.LastAddedBlock().Number).Should(Equal(int64(123))) - }) - -}) diff --git a/core/geth_blockchain.go b/core/geth_blockchain.go index 827f776f..932ae4ae 100644 --- a/core/geth_blockchain.go +++ b/core/geth_blockchain.go @@ -2,18 +2,16 @@ package core import ( "fmt" - "reflect" - "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" "golang.org/x/net/context" ) type GethBlockchain struct { - client *ethclient.Client - observers []BlockchainObserver - subscription ethereum.Subscription + client *ethclient.Client + readGethHeaders chan *types.Header + outputBlocks chan Block } func NewGethBlockchain(ipcPath string) *GethBlockchain { @@ -24,25 +22,20 @@ func NewGethBlockchain(ipcPath string) *GethBlockchain { return &blockchain } -func (blockchain GethBlockchain) notifyObservers(gethBlock *types.Block) { - block := GethBlockToCoreBlock(gethBlock) - for _, observer := range blockchain.observers { - observer.NotifyBlockAdded(block) - } -} - -func (blockchain *GethBlockchain) RegisterObserver(observer BlockchainObserver) { - fmt.Printf("Registering observer: %v\n", reflect.TypeOf(observer)) - blockchain.observers = append(blockchain.observers, observer) -} - -func (blockchain *GethBlockchain) SubscribeToEvents() { - headers := make(chan *types.Header, 10) +func (blockchain *GethBlockchain) SubscribeToBlocks(blocks chan Block) { + blockchain.outputBlocks = blocks + fmt.Println("SubscribeToBlocks") + inputHeaders := make(chan *types.Header, 10) myContext := context.Background() - sub, _ := blockchain.client.SubscribeNewHead(myContext, headers) - blockchain.subscription = sub - for header := range headers { + blockchain.readGethHeaders = inputHeaders + blockchain.client.SubscribeNewHead(myContext, inputHeaders) +} + +func (blockchain *GethBlockchain) StartListening() { + myContext := context.Background() + for header := range blockchain.readGethHeaders { gethBlock, _ := blockchain.client.BlockByNumber(myContext, header.Number) - blockchain.notifyObservers(gethBlock) + block := GethBlockToCoreBlock(gethBlock) + blockchain.outputBlocks <- block } } diff --git a/fakes/blockchain.go b/fakes/blockchain.go index f88a3c31..577c89fc 100644 --- a/fakes/blockchain.go +++ b/fakes/blockchain.go @@ -1,21 +1,17 @@ package fakes -import ( - "github.com/8thlight/vulcanizedb/core" -) +import "github.com/8thlight/vulcanizedb/core" type Blockchain struct { - observers []core.BlockchainObserver + outputBlocks chan core.Block } -func (blockchain *Blockchain) RegisterObserver(observer core.BlockchainObserver) { - blockchain.observers = append(blockchain.observers, observer) +func (blockchain *Blockchain) SubscribeToBlocks(outputBlocks chan core.Block) { + blockchain.outputBlocks = outputBlocks } -func (blockchain *Blockchain) AddBlock(block core.Block) { - for _, observer := range blockchain.observers { - observer.NotifyBlockAdded(block) - } +func (blockchain Blockchain) AddBlock(block core.Block) { + blockchain.outputBlocks <- block } -func (_ *Blockchain) SubscribeToEvents() {} +func (*Blockchain) StartListening() {} diff --git a/fakes/blockchain_observer.go b/fakes/blockchain_observer.go index 461d6484..add572e2 100644 --- a/fakes/blockchain_observer.go +++ b/fakes/blockchain_observer.go @@ -1,23 +1,23 @@ package fakes -import ( - "github.com/8thlight/vulcanizedb/core" -) +import "github.com/8thlight/vulcanizedb/core" type BlockchainObserver struct { - wasToldBlockAdded bool - blocks []core.Block + CurrentBlocks []core.Block + WasNotified chan bool } -func (blockchainObserver *BlockchainObserver) WasToldBlockAdded() bool { - return blockchainObserver.wasToldBlockAdded +func (observer *BlockchainObserver) LastBlock() core.Block { + return observer.CurrentBlocks[len(observer.CurrentBlocks)-1] } -func (blockchainObserver *BlockchainObserver) NotifyBlockAdded(block core.Block) { - blockchainObserver.blocks = append(blockchainObserver.blocks, block) - blockchainObserver.wasToldBlockAdded = true +func NewFakeBlockchainObserverTwo() *BlockchainObserver { + return &BlockchainObserver{ + WasNotified: make(chan bool), + } } -func (observer *BlockchainObserver) LastAddedBlock() core.Block { - return observer.blocks[len(observer.blocks)-1] +func (observer *BlockchainObserver) NotifyBlockAdded(block core.Block) { + observer.CurrentBlocks = append(observer.CurrentBlocks, block) + observer.WasNotified <- true } diff --git a/integration_test/geth_blockchain_test.go b/integration_test/geth_blockchain_test.go index 39e2b61d..9c40d7be 100644 --- a/integration_test/geth_blockchain_test.go +++ b/integration_test/geth_blockchain_test.go @@ -1,48 +1,40 @@ package integration_test import ( - "fmt" "path" - "path/filepath" "runtime" "github.com/8thlight/vulcanizedb/core" + "github.com/8thlight/vulcanizedb/fakes" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" ) var ( _, filename, _, _ = runtime.Caller(0) - basepath = filepath.Dir(filename) ) func RunTimePath() string { return path.Join(path.Dir(filename), "../") } -type ObserverWithChannel struct { - blocks chan core.Block -} - -func (observer *ObserverWithChannel) NotifyBlockAdded(block core.Block) { - fmt.Println("Block: ", block.Number) - observer.blocks <- block -} - var _ = Describe("Reading from the Geth blockchain", func() { - It("reads two blocks with incrementing numbers", func(done Done) { - addedBlock := make(chan core.Block, 10) - observer := &ObserverWithChannel{addedBlock} + It("reads two block with listener", func(done Done) { + observer := fakes.NewFakeBlockchainObserverTwo() + blockchain := core.NewGethBlockchain(RunTimePath() + "/test_data_dir/geth.ipc") + observers := []core.BlockchainObserver{observer} + listener := core.NewBlockchainListener(blockchain, observers) + go listener.Start() - var blockchain core.Blockchain = core.NewGethBlockchain(RunTimePath() + "/test_data_dir/geth.ipc") - blockchain.RegisterObserver(observer) + <-observer.WasNotified + firstBlock := observer.LastBlock() + Expect(firstBlock).NotTo(BeNil()) - go blockchain.SubscribeToEvents() + <-observer.WasNotified + secondBlock := observer.LastBlock() + Expect(secondBlock).NotTo(BeNil()) - firstBlock := <-addedBlock - Expect(firstBlock).ShouldNot(BeNil()) - secondBlock := <-addedBlock Expect(firstBlock.Number + 1).Should(Equal(secondBlock.Number)) close(done)