Merge pull request #48 from 8thlight/refactor-to-listener
Refactor to use listener
This commit is contained in:
commit
70c34e86ea
@ -20,19 +20,19 @@ func parseIpcPath(context *do.Context) string {
|
||||
}
|
||||
|
||||
func startBlockchainListener(config cfg.Config, ipcPath string) {
|
||||
port := config.Database.Port
|
||||
host := config.Database.Hostname
|
||||
databaseName := config.Database.Name
|
||||
|
||||
var blockchain core.Blockchain = core.NewGethBlockchain(ipcPath)
|
||||
blockchain.RegisterObserver(core.BlockchainLoggingObserver{})
|
||||
pgConfig := fmt.Sprintf("host=%s port=%d dbname=%s sslmode=disable", host, port, databaseName)
|
||||
db, err := sqlx.Connect("postgres", pgConfig)
|
||||
blockchain := core.NewGethBlockchain(ipcPath)
|
||||
loggingObserver := core.BlockchainLoggingObserver{}
|
||||
connectString := cfg.DbConnectionString(cfg.Public().Database)
|
||||
db, err := sqlx.Connect("postgres", connectString)
|
||||
if err != nil {
|
||||
log.Fatalf("Error connecting to DB: %v\n", err)
|
||||
}
|
||||
blockchain.RegisterObserver(core.BlockchainDBObserver{Db: db})
|
||||
blockchain.SubscribeToEvents()
|
||||
dbObserver := (core.BlockchainDBObserver{Db: db})
|
||||
listener := core.NewBlockchainListener(blockchain, []core.BlockchainObserver{
|
||||
loggingObserver,
|
||||
dbObserver,
|
||||
})
|
||||
listener.Start()
|
||||
}
|
||||
|
||||
func tasks(p *do.Project) {
|
||||
|
@ -1,6 +1,6 @@
|
||||
package core
|
||||
|
||||
type Blockchain interface {
|
||||
RegisterObserver(observer BlockchainObserver)
|
||||
SubscribeToEvents()
|
||||
SubscribeToBlocks(blocks chan Block)
|
||||
StartListening()
|
||||
}
|
||||
|
31
core/blockchain_listener.go
Normal file
31
core/blockchain_listener.go
Normal file
@ -0,0 +1,31 @@
|
||||
package core
|
||||
|
||||
type BlockchainListener struct {
|
||||
inputBlocks chan Block
|
||||
blockchain Blockchain
|
||||
observers []BlockchainObserver
|
||||
}
|
||||
|
||||
func NewBlockchainListener(blockchain Blockchain, observers []BlockchainObserver) BlockchainListener {
|
||||
inputBlocks := make(chan Block, 10)
|
||||
blockchain.SubscribeToBlocks(inputBlocks)
|
||||
listener := BlockchainListener{
|
||||
inputBlocks: inputBlocks,
|
||||
blockchain: blockchain,
|
||||
observers: observers,
|
||||
}
|
||||
return listener
|
||||
}
|
||||
|
||||
func (listener BlockchainListener) Start() {
|
||||
go listener.blockchain.StartListening()
|
||||
for block := range listener.inputBlocks {
|
||||
listener.notifyObservers(block)
|
||||
}
|
||||
}
|
||||
|
||||
func (listener BlockchainListener) notifyObservers(block Block) {
|
||||
for _, observer := range listener.observers {
|
||||
observer.NotifyBlockAdded(block)
|
||||
}
|
||||
}
|
56
core/blockchain_listener_test.go
Normal file
56
core/blockchain_listener_test.go
Normal file
@ -0,0 +1,56 @@
|
||||
package core_test
|
||||
|
||||
import (
|
||||
"github.com/8thlight/vulcanizedb/core"
|
||||
"github.com/8thlight/vulcanizedb/fakes"
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = Describe("Blockchain listeners", func() {
|
||||
|
||||
It("starts with no blocks", func(done Done) {
|
||||
observer := fakes.NewFakeBlockchainObserverTwo()
|
||||
blockchain := &fakes.Blockchain{}
|
||||
|
||||
core.NewBlockchainListener(blockchain, []core.BlockchainObserver{observer})
|
||||
|
||||
Expect(len(observer.CurrentBlocks)).To(Equal(0))
|
||||
close(done)
|
||||
}, 1)
|
||||
|
||||
It("sees when one block was added", func(done Done) {
|
||||
observer := fakes.NewFakeBlockchainObserverTwo()
|
||||
blockchain := &fakes.Blockchain{}
|
||||
listener := core.NewBlockchainListener(blockchain, []core.BlockchainObserver{observer})
|
||||
go listener.Start()
|
||||
|
||||
go blockchain.AddBlock(core.Block{Number: 123})
|
||||
|
||||
wasObserverNotified := <-observer.WasNotified
|
||||
Expect(wasObserverNotified).To(BeTrue())
|
||||
Expect(len(observer.CurrentBlocks)).To(Equal(1))
|
||||
addedBlock := observer.CurrentBlocks[0]
|
||||
Expect(addedBlock.Number).To(Equal(int64(123)))
|
||||
close(done)
|
||||
}, 1)
|
||||
|
||||
It("sees a second block", func(done Done) {
|
||||
observer := fakes.NewFakeBlockchainObserverTwo()
|
||||
blockchain := &fakes.Blockchain{}
|
||||
listener := core.NewBlockchainListener(blockchain, []core.BlockchainObserver{observer})
|
||||
go listener.Start()
|
||||
|
||||
go blockchain.AddBlock(core.Block{Number: 123})
|
||||
<-observer.WasNotified
|
||||
go blockchain.AddBlock(core.Block{Number: 456})
|
||||
wasObserverNotified := <-observer.WasNotified
|
||||
|
||||
Expect(wasObserverNotified).To(BeTrue())
|
||||
Expect(len(observer.CurrentBlocks)).To(Equal(2))
|
||||
addedBlock := observer.CurrentBlocks[1]
|
||||
Expect(addedBlock.Number).To(Equal(int64(456)))
|
||||
close(done)
|
||||
}, 1)
|
||||
|
||||
})
|
@ -1,51 +0,0 @@
|
||||
package core_test
|
||||
|
||||
import (
|
||||
"github.com/8thlight/vulcanizedb/core"
|
||||
"github.com/8thlight/vulcanizedb/fakes"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = Describe("The fake blockchain", func() {
|
||||
|
||||
It("conforms to the Blockchain interface", func() {
|
||||
var blockchain core.Blockchain = &fakes.Blockchain{}
|
||||
Expect(blockchain).ShouldNot(BeNil())
|
||||
})
|
||||
|
||||
It("lets the only observer know when a block was added", func() {
|
||||
blockchain := fakes.Blockchain{}
|
||||
blockchainObserver := &fakes.BlockchainObserver{}
|
||||
blockchain.RegisterObserver(blockchainObserver)
|
||||
|
||||
blockchain.AddBlock(core.Block{})
|
||||
|
||||
Expect(blockchainObserver.WasToldBlockAdded()).Should(Equal(true))
|
||||
})
|
||||
|
||||
It("lets the second observer know when a block was added", func() {
|
||||
blockchain := fakes.Blockchain{}
|
||||
blockchainObserverOne := &fakes.BlockchainObserver{}
|
||||
blockchainObserverTwo := &fakes.BlockchainObserver{}
|
||||
blockchain.RegisterObserver(blockchainObserverOne)
|
||||
blockchain.RegisterObserver(blockchainObserverTwo)
|
||||
|
||||
blockchain.AddBlock(core.Block{})
|
||||
|
||||
Expect(blockchainObserverTwo.WasToldBlockAdded()).Should(Equal(true))
|
||||
})
|
||||
|
||||
It("passes the added block to the observer", func() {
|
||||
blockchain := fakes.Blockchain{}
|
||||
blockchainObserver := &fakes.BlockchainObserver{}
|
||||
blockchain.RegisterObserver(blockchainObserver)
|
||||
|
||||
blockchain.AddBlock(core.Block{Number: int64(123)})
|
||||
|
||||
Expect(blockchainObserver.LastAddedBlock().Number).ShouldNot(BeNil())
|
||||
Expect(blockchainObserver.LastAddedBlock().Number).Should(Equal(int64(123)))
|
||||
})
|
||||
|
||||
})
|
@ -2,9 +2,7 @@ package core
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
"github.com/ethereum/go-ethereum"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/ethclient"
|
||||
"golang.org/x/net/context"
|
||||
@ -12,8 +10,8 @@ import (
|
||||
|
||||
type GethBlockchain struct {
|
||||
client *ethclient.Client
|
||||
observers []BlockchainObserver
|
||||
subscription ethereum.Subscription
|
||||
readGethHeaders chan *types.Header
|
||||
outputBlocks chan Block
|
||||
}
|
||||
|
||||
func NewGethBlockchain(ipcPath string) *GethBlockchain {
|
||||
@ -24,25 +22,20 @@ func NewGethBlockchain(ipcPath string) *GethBlockchain {
|
||||
return &blockchain
|
||||
}
|
||||
|
||||
func (blockchain GethBlockchain) notifyObservers(gethBlock *types.Block) {
|
||||
block := GethBlockToCoreBlock(gethBlock)
|
||||
for _, observer := range blockchain.observers {
|
||||
observer.NotifyBlockAdded(block)
|
||||
}
|
||||
}
|
||||
|
||||
func (blockchain *GethBlockchain) RegisterObserver(observer BlockchainObserver) {
|
||||
fmt.Printf("Registering observer: %v\n", reflect.TypeOf(observer))
|
||||
blockchain.observers = append(blockchain.observers, observer)
|
||||
}
|
||||
|
||||
func (blockchain *GethBlockchain) SubscribeToEvents() {
|
||||
headers := make(chan *types.Header, 10)
|
||||
func (blockchain *GethBlockchain) SubscribeToBlocks(blocks chan Block) {
|
||||
blockchain.outputBlocks = blocks
|
||||
fmt.Println("SubscribeToBlocks")
|
||||
inputHeaders := make(chan *types.Header, 10)
|
||||
myContext := context.Background()
|
||||
sub, _ := blockchain.client.SubscribeNewHead(myContext, headers)
|
||||
blockchain.subscription = sub
|
||||
for header := range headers {
|
||||
blockchain.readGethHeaders = inputHeaders
|
||||
blockchain.client.SubscribeNewHead(myContext, inputHeaders)
|
||||
}
|
||||
|
||||
func (blockchain *GethBlockchain) StartListening() {
|
||||
myContext := context.Background()
|
||||
for header := range blockchain.readGethHeaders {
|
||||
gethBlock, _ := blockchain.client.BlockByNumber(myContext, header.Number)
|
||||
blockchain.notifyObservers(gethBlock)
|
||||
block := GethBlockToCoreBlock(gethBlock)
|
||||
blockchain.outputBlocks <- block
|
||||
}
|
||||
}
|
||||
|
@ -1,21 +1,17 @@
|
||||
package fakes
|
||||
|
||||
import (
|
||||
"github.com/8thlight/vulcanizedb/core"
|
||||
)
|
||||
import "github.com/8thlight/vulcanizedb/core"
|
||||
|
||||
type Blockchain struct {
|
||||
observers []core.BlockchainObserver
|
||||
outputBlocks chan core.Block
|
||||
}
|
||||
|
||||
func (blockchain *Blockchain) RegisterObserver(observer core.BlockchainObserver) {
|
||||
blockchain.observers = append(blockchain.observers, observer)
|
||||
func (blockchain *Blockchain) SubscribeToBlocks(outputBlocks chan core.Block) {
|
||||
blockchain.outputBlocks = outputBlocks
|
||||
}
|
||||
|
||||
func (blockchain *Blockchain) AddBlock(block core.Block) {
|
||||
for _, observer := range blockchain.observers {
|
||||
observer.NotifyBlockAdded(block)
|
||||
}
|
||||
func (blockchain Blockchain) AddBlock(block core.Block) {
|
||||
blockchain.outputBlocks <- block
|
||||
}
|
||||
|
||||
func (_ *Blockchain) SubscribeToEvents() {}
|
||||
func (*Blockchain) StartListening() {}
|
||||
|
@ -1,23 +1,23 @@
|
||||
package fakes
|
||||
|
||||
import (
|
||||
"github.com/8thlight/vulcanizedb/core"
|
||||
)
|
||||
import "github.com/8thlight/vulcanizedb/core"
|
||||
|
||||
type BlockchainObserver struct {
|
||||
wasToldBlockAdded bool
|
||||
blocks []core.Block
|
||||
CurrentBlocks []core.Block
|
||||
WasNotified chan bool
|
||||
}
|
||||
|
||||
func (blockchainObserver *BlockchainObserver) WasToldBlockAdded() bool {
|
||||
return blockchainObserver.wasToldBlockAdded
|
||||
func (observer *BlockchainObserver) LastBlock() core.Block {
|
||||
return observer.CurrentBlocks[len(observer.CurrentBlocks)-1]
|
||||
}
|
||||
|
||||
func (blockchainObserver *BlockchainObserver) NotifyBlockAdded(block core.Block) {
|
||||
blockchainObserver.blocks = append(blockchainObserver.blocks, block)
|
||||
blockchainObserver.wasToldBlockAdded = true
|
||||
func NewFakeBlockchainObserverTwo() *BlockchainObserver {
|
||||
return &BlockchainObserver{
|
||||
WasNotified: make(chan bool),
|
||||
}
|
||||
}
|
||||
|
||||
func (observer *BlockchainObserver) LastAddedBlock() core.Block {
|
||||
return observer.blocks[len(observer.blocks)-1]
|
||||
func (observer *BlockchainObserver) NotifyBlockAdded(block core.Block) {
|
||||
observer.CurrentBlocks = append(observer.CurrentBlocks, block)
|
||||
observer.WasNotified <- true
|
||||
}
|
||||
|
@ -1,48 +1,40 @@
|
||||
package integration_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
|
||||
"github.com/8thlight/vulcanizedb/core"
|
||||
"github.com/8thlight/vulcanizedb/fakes"
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var (
|
||||
_, filename, _, _ = runtime.Caller(0)
|
||||
basepath = filepath.Dir(filename)
|
||||
)
|
||||
|
||||
func RunTimePath() string {
|
||||
return path.Join(path.Dir(filename), "../")
|
||||
}
|
||||
|
||||
type ObserverWithChannel struct {
|
||||
blocks chan core.Block
|
||||
}
|
||||
|
||||
func (observer *ObserverWithChannel) NotifyBlockAdded(block core.Block) {
|
||||
fmt.Println("Block: ", block.Number)
|
||||
observer.blocks <- block
|
||||
}
|
||||
|
||||
var _ = Describe("Reading from the Geth blockchain", func() {
|
||||
|
||||
It("reads two blocks with incrementing numbers", func(done Done) {
|
||||
addedBlock := make(chan core.Block, 10)
|
||||
observer := &ObserverWithChannel{addedBlock}
|
||||
It("reads two block with listener", func(done Done) {
|
||||
observer := fakes.NewFakeBlockchainObserverTwo()
|
||||
blockchain := core.NewGethBlockchain(RunTimePath() + "/test_data_dir/geth.ipc")
|
||||
observers := []core.BlockchainObserver{observer}
|
||||
listener := core.NewBlockchainListener(blockchain, observers)
|
||||
go listener.Start()
|
||||
|
||||
var blockchain core.Blockchain = core.NewGethBlockchain(RunTimePath() + "/test_data_dir/geth.ipc")
|
||||
blockchain.RegisterObserver(observer)
|
||||
<-observer.WasNotified
|
||||
firstBlock := observer.LastBlock()
|
||||
Expect(firstBlock).NotTo(BeNil())
|
||||
|
||||
go blockchain.SubscribeToEvents()
|
||||
<-observer.WasNotified
|
||||
secondBlock := observer.LastBlock()
|
||||
Expect(secondBlock).NotTo(BeNil())
|
||||
|
||||
firstBlock := <-addedBlock
|
||||
Expect(firstBlock).ShouldNot(BeNil())
|
||||
secondBlock := <-addedBlock
|
||||
Expect(firstBlock.Number + 1).Should(Equal(secondBlock.Number))
|
||||
|
||||
close(done)
|
||||
|
Loading…
Reference in New Issue
Block a user