Refactor to use listener

* This removes some duplication between the fake blockchain and
   geth blockchain.
 * This pulls the observers into the blockchain listener
This commit is contained in:
Eric Meyer 2017-11-02 06:41:24 -05:00
parent 60a8be67f4
commit 646e0fa057
9 changed files with 147 additions and 130 deletions

View File

@ -20,19 +20,19 @@ func parseIpcPath(context *do.Context) string {
}
func startBlockchainListener(config cfg.Config, ipcPath string) {
port := config.Database.Port
host := config.Database.Hostname
databaseName := config.Database.Name
var blockchain core.Blockchain = core.NewGethBlockchain(ipcPath)
blockchain.RegisterObserver(core.BlockchainLoggingObserver{})
pgConfig := fmt.Sprintf("host=%s port=%d dbname=%s sslmode=disable", host, port, databaseName)
db, err := sqlx.Connect("postgres", pgConfig)
blockchain := core.NewGethBlockchain(ipcPath)
loggingObserver := core.BlockchainLoggingObserver{}
connectString := cfg.DbConnectionString(cfg.Public().Database)
db, err := sqlx.Connect("postgres", connectString)
if err != nil {
log.Fatalf("Error connecting to DB: %v\n", err)
}
blockchain.RegisterObserver(core.BlockchainDBObserver{Db: db})
blockchain.SubscribeToEvents()
dbObserver := (core.BlockchainDBObserver{Db: db})
listener := core.NewBlockchainListener(blockchain, []core.BlockchainObserver{
loggingObserver,
dbObserver,
})
listener.Start()
}
func tasks(p *do.Project) {

View File

@ -1,6 +1,6 @@
package core
type Blockchain interface {
RegisterObserver(observer BlockchainObserver)
SubscribeToEvents()
SubscribeToBlocks(blocks chan Block)
StartListening()
}

View File

@ -0,0 +1,31 @@
package core
type BlockchainListener struct {
inputBlocks chan Block
blockchain Blockchain
observers []BlockchainObserver
}
func NewBlockchainListener(blockchain Blockchain, observers []BlockchainObserver) BlockchainListener {
inputBlocks := make(chan Block, 10)
blockchain.SubscribeToBlocks(inputBlocks)
listener := BlockchainListener{
inputBlocks: inputBlocks,
blockchain: blockchain,
observers: observers,
}
return listener
}
func (listener BlockchainListener) Start() {
go listener.blockchain.StartListening()
for block := range listener.inputBlocks {
listener.notifyObservers(block)
}
}
func (listener BlockchainListener) notifyObservers(block Block) {
for _, observer := range listener.observers {
observer.NotifyBlockAdded(block)
}
}

View File

@ -0,0 +1,56 @@
package core_test
import (
"github.com/8thlight/vulcanizedb/core"
"github.com/8thlight/vulcanizedb/fakes"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = Describe("Blockchain listeners", func() {
It("starts with no blocks", func(done Done) {
observer := fakes.NewFakeBlockchainObserverTwo()
blockchain := &fakes.Blockchain{}
core.NewBlockchainListener(blockchain, []core.BlockchainObserver{observer})
Expect(len(observer.CurrentBlocks)).To(Equal(0))
close(done)
}, 1)
It("sees when one block was added", func(done Done) {
observer := fakes.NewFakeBlockchainObserverTwo()
blockchain := &fakes.Blockchain{}
listener := core.NewBlockchainListener(blockchain, []core.BlockchainObserver{observer})
go listener.Start()
go blockchain.AddBlock(core.Block{Number: 123})
wasObserverNotified := <-observer.WasNotified
Expect(wasObserverNotified).To(BeTrue())
Expect(len(observer.CurrentBlocks)).To(Equal(1))
addedBlock := observer.CurrentBlocks[0]
Expect(addedBlock.Number).To(Equal(int64(123)))
close(done)
}, 1)
It("sees a second block", func(done Done) {
observer := fakes.NewFakeBlockchainObserverTwo()
blockchain := &fakes.Blockchain{}
listener := core.NewBlockchainListener(blockchain, []core.BlockchainObserver{observer})
go listener.Start()
go blockchain.AddBlock(core.Block{Number: 123})
<-observer.WasNotified
go blockchain.AddBlock(core.Block{Number: 456})
wasObserverNotified := <-observer.WasNotified
Expect(wasObserverNotified).To(BeTrue())
Expect(len(observer.CurrentBlocks)).To(Equal(2))
addedBlock := observer.CurrentBlocks[1]
Expect(addedBlock.Number).To(Equal(int64(456)))
close(done)
}, 1)
})

View File

@ -1,51 +0,0 @@
package core_test
import (
"github.com/8thlight/vulcanizedb/core"
"github.com/8thlight/vulcanizedb/fakes"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var _ = Describe("The fake blockchain", func() {
It("conforms to the Blockchain interface", func() {
var blockchain core.Blockchain = &fakes.Blockchain{}
Expect(blockchain).ShouldNot(BeNil())
})
It("lets the only observer know when a block was added", func() {
blockchain := fakes.Blockchain{}
blockchainObserver := &fakes.BlockchainObserver{}
blockchain.RegisterObserver(blockchainObserver)
blockchain.AddBlock(core.Block{})
Expect(blockchainObserver.WasToldBlockAdded()).Should(Equal(true))
})
It("lets the second observer know when a block was added", func() {
blockchain := fakes.Blockchain{}
blockchainObserverOne := &fakes.BlockchainObserver{}
blockchainObserverTwo := &fakes.BlockchainObserver{}
blockchain.RegisterObserver(blockchainObserverOne)
blockchain.RegisterObserver(blockchainObserverTwo)
blockchain.AddBlock(core.Block{})
Expect(blockchainObserverTwo.WasToldBlockAdded()).Should(Equal(true))
})
It("passes the added block to the observer", func() {
blockchain := fakes.Blockchain{}
blockchainObserver := &fakes.BlockchainObserver{}
blockchain.RegisterObserver(blockchainObserver)
blockchain.AddBlock(core.Block{Number: int64(123)})
Expect(blockchainObserver.LastAddedBlock().Number).ShouldNot(BeNil())
Expect(blockchainObserver.LastAddedBlock().Number).Should(Equal(int64(123)))
})
})

View File

@ -2,9 +2,7 @@ package core
import (
"fmt"
"reflect"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"golang.org/x/net/context"
@ -12,8 +10,8 @@ import (
type GethBlockchain struct {
client *ethclient.Client
observers []BlockchainObserver
subscription ethereum.Subscription
readGethHeaders chan *types.Header
outputBlocks chan Block
}
func NewGethBlockchain(ipcPath string) *GethBlockchain {
@ -24,25 +22,20 @@ func NewGethBlockchain(ipcPath string) *GethBlockchain {
return &blockchain
}
func (blockchain GethBlockchain) notifyObservers(gethBlock *types.Block) {
block := GethBlockToCoreBlock(gethBlock)
for _, observer := range blockchain.observers {
observer.NotifyBlockAdded(block)
}
}
func (blockchain *GethBlockchain) RegisterObserver(observer BlockchainObserver) {
fmt.Printf("Registering observer: %v\n", reflect.TypeOf(observer))
blockchain.observers = append(blockchain.observers, observer)
}
func (blockchain *GethBlockchain) SubscribeToEvents() {
headers := make(chan *types.Header, 10)
func (blockchain *GethBlockchain) SubscribeToBlocks(blocks chan Block) {
blockchain.outputBlocks = blocks
fmt.Println("SubscribeToBlocks")
inputHeaders := make(chan *types.Header, 10)
myContext := context.Background()
sub, _ := blockchain.client.SubscribeNewHead(myContext, headers)
blockchain.subscription = sub
for header := range headers {
blockchain.readGethHeaders = inputHeaders
blockchain.client.SubscribeNewHead(myContext, inputHeaders)
}
func (blockchain *GethBlockchain) StartListening() {
myContext := context.Background()
for header := range blockchain.readGethHeaders {
gethBlock, _ := blockchain.client.BlockByNumber(myContext, header.Number)
blockchain.notifyObservers(gethBlock)
block := GethBlockToCoreBlock(gethBlock)
blockchain.outputBlocks <- block
}
}

View File

@ -1,21 +1,17 @@
package fakes
import (
"github.com/8thlight/vulcanizedb/core"
)
import "github.com/8thlight/vulcanizedb/core"
type Blockchain struct {
observers []core.BlockchainObserver
outputBlocks chan core.Block
}
func (blockchain *Blockchain) RegisterObserver(observer core.BlockchainObserver) {
blockchain.observers = append(blockchain.observers, observer)
func (blockchain *Blockchain) SubscribeToBlocks(outputBlocks chan core.Block) {
blockchain.outputBlocks = outputBlocks
}
func (blockchain *Blockchain) AddBlock(block core.Block) {
for _, observer := range blockchain.observers {
observer.NotifyBlockAdded(block)
}
func (blockchain Blockchain) AddBlock(block core.Block) {
blockchain.outputBlocks <- block
}
func (_ *Blockchain) SubscribeToEvents() {}
func (*Blockchain) StartListening() {}

View File

@ -1,23 +1,23 @@
package fakes
import (
"github.com/8thlight/vulcanizedb/core"
)
import "github.com/8thlight/vulcanizedb/core"
type BlockchainObserver struct {
wasToldBlockAdded bool
blocks []core.Block
CurrentBlocks []core.Block
WasNotified chan bool
}
func (blockchainObserver *BlockchainObserver) WasToldBlockAdded() bool {
return blockchainObserver.wasToldBlockAdded
func (observer *BlockchainObserver) LastBlock() core.Block {
return observer.CurrentBlocks[len(observer.CurrentBlocks)-1]
}
func (blockchainObserver *BlockchainObserver) NotifyBlockAdded(block core.Block) {
blockchainObserver.blocks = append(blockchainObserver.blocks, block)
blockchainObserver.wasToldBlockAdded = true
func NewFakeBlockchainObserverTwo() *BlockchainObserver {
return &BlockchainObserver{
WasNotified: make(chan bool),
}
}
func (observer *BlockchainObserver) LastAddedBlock() core.Block {
return observer.blocks[len(observer.blocks)-1]
func (observer *BlockchainObserver) NotifyBlockAdded(block core.Block) {
observer.CurrentBlocks = append(observer.CurrentBlocks, block)
observer.WasNotified <- true
}

View File

@ -1,48 +1,40 @@
package integration_test
import (
"fmt"
"path"
"path/filepath"
"runtime"
"github.com/8thlight/vulcanizedb/core"
"github.com/8thlight/vulcanizedb/fakes"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
var (
_, filename, _, _ = runtime.Caller(0)
basepath = filepath.Dir(filename)
)
func RunTimePath() string {
return path.Join(path.Dir(filename), "../")
}
type ObserverWithChannel struct {
blocks chan core.Block
}
func (observer *ObserverWithChannel) NotifyBlockAdded(block core.Block) {
fmt.Println("Block: ", block.Number)
observer.blocks <- block
}
var _ = Describe("Reading from the Geth blockchain", func() {
It("reads two blocks with incrementing numbers", func(done Done) {
addedBlock := make(chan core.Block, 10)
observer := &ObserverWithChannel{addedBlock}
It("reads two block with listener", func(done Done) {
observer := fakes.NewFakeBlockchainObserverTwo()
blockchain := core.NewGethBlockchain(RunTimePath() + "/test_data_dir/geth.ipc")
observers := []core.BlockchainObserver{observer}
listener := core.NewBlockchainListener(blockchain, observers)
go listener.Start()
var blockchain core.Blockchain = core.NewGethBlockchain(RunTimePath() + "/test_data_dir/geth.ipc")
blockchain.RegisterObserver(observer)
<-observer.WasNotified
firstBlock := observer.LastBlock()
Expect(firstBlock).NotTo(BeNil())
go blockchain.SubscribeToEvents()
<-observer.WasNotified
secondBlock := observer.LastBlock()
Expect(secondBlock).NotTo(BeNil())
firstBlock := <-addedBlock
Expect(firstBlock).ShouldNot(BeNil())
secondBlock := <-addedBlock
Expect(firstBlock.Number + 1).Should(Equal(secondBlock.Number))
close(done)