Merge branch 'feature/ethutil-refactor' of https://github.com/ethersphere/eth-go into ethersphere-feature/ethutil-refactor

This commit is contained in:
obscuren 2014-08-01 10:22:25 +02:00
commit 8bed47a2d4
11 changed files with 399 additions and 194 deletions

View File

@ -3,6 +3,7 @@ package ethchain
import ( import (
"github.com/ethereum/eth-go/ethcrypto" "github.com/ethereum/eth-go/ethcrypto"
"github.com/ethereum/eth-go/ethlog" "github.com/ethereum/eth-go/ethlog"
"github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethutil"
"github.com/obscuren/sha3" "github.com/obscuren/sha3"
"hash" "hash"
@ -14,7 +15,7 @@ import (
var powlogger = ethlog.NewLogger("POW") var powlogger = ethlog.NewLogger("POW")
type PoW interface { type PoW interface {
Search(block *Block, reactChan chan ethutil.React) []byte Search(block *Block, reactChan chan ethreact.Event) []byte
Verify(hash []byte, diff *big.Int, nonce []byte) bool Verify(hash []byte, diff *big.Int, nonce []byte) bool
GetHashrate() int64 GetHashrate() int64
} }
@ -28,7 +29,7 @@ func (pow *EasyPow) GetHashrate() int64 {
return pow.HashRate return pow.HashRate
} }
func (pow *EasyPow) Search(block *Block, reactChan chan ethutil.React) []byte { func (pow *EasyPow) Search(block *Block, reactChan chan ethreact.Event) []byte {
r := rand.New(rand.NewSource(time.Now().UnixNano())) r := rand.New(rand.NewSource(time.Now().UnixNano()))
hash := block.HashNoNonce() hash := block.HashNoNonce()
diff := block.Difficulty diff := block.Difficulty

View File

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"github.com/ethereum/eth-go/ethcrypto" "github.com/ethereum/eth-go/ethcrypto"
"github.com/ethereum/eth-go/ethlog" "github.com/ethereum/eth-go/ethlog"
"github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethstate" "github.com/ethereum/eth-go/ethstate"
"github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethutil"
"github.com/ethereum/eth-go/ethwire" "github.com/ethereum/eth-go/ethwire"
@ -36,7 +37,7 @@ type EthManager interface {
BlockChain() *BlockChain BlockChain() *BlockChain
TxPool() *TxPool TxPool() *TxPool
Broadcast(msgType ethwire.MsgType, data []interface{}) Broadcast(msgType ethwire.MsgType, data []interface{})
Reactor() *ethutil.ReactorEngine Reactor() *ethreact.ReactorEngine
PeerCount() int PeerCount() int
IsMining() bool IsMining() bool
IsListening() bool IsListening() bool

View File

@ -14,6 +14,7 @@ import (
"github.com/ethereum/eth-go/ethchain" "github.com/ethereum/eth-go/ethchain"
"github.com/ethereum/eth-go/ethcrypto" "github.com/ethereum/eth-go/ethcrypto"
"github.com/ethereum/eth-go/ethlog" "github.com/ethereum/eth-go/ethlog"
"github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethrpc" "github.com/ethereum/eth-go/ethrpc"
"github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethutil"
"github.com/ethereum/eth-go/ethwire" "github.com/ethereum/eth-go/ethwire"
@ -75,7 +76,7 @@ type Ethereum struct {
listening bool listening bool
reactor *ethutil.ReactorEngine reactor *ethreact.ReactorEngine
RpcServer *ethrpc.JsonRpcServer RpcServer *ethrpc.JsonRpcServer
@ -113,7 +114,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager
clientIdentity: clientIdentity, clientIdentity: clientIdentity,
isUpToDate: true, isUpToDate: true,
} }
ethereum.reactor = ethutil.NewReactorEngine() ethereum.reactor = ethreact.New()
ethereum.txPool = ethchain.NewTxPool(ethereum) ethereum.txPool = ethchain.NewTxPool(ethereum)
ethereum.blockChain = ethchain.NewBlockChain(ethereum) ethereum.blockChain = ethchain.NewBlockChain(ethereum)
@ -125,7 +126,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager
return ethereum, nil return ethereum, nil
} }
func (s *Ethereum) Reactor() *ethutil.ReactorEngine { func (s *Ethereum) Reactor() *ethreact.ReactorEngine {
return s.reactor return s.reactor
} }
@ -357,6 +358,7 @@ func (s *Ethereum) ReapDeadPeerHandler() {
// Start the ethereum // Start the ethereum
func (s *Ethereum) Start(seed bool) { func (s *Ethereum) Start(seed bool) {
s.reactor.Start()
// Bind to addr and port // Bind to addr and port
ln, err := net.Listen("tcp", ":"+s.Port) ln, err := net.Listen("tcp", ":"+s.Port)
if err != nil { if err != nil {
@ -456,6 +458,8 @@ func (s *Ethereum) Stop() {
} }
s.txPool.Stop() s.txPool.Stop()
s.stateManager.Stop() s.stateManager.Stop()
s.reactor.Flush()
s.reactor.Stop()
ethlogger.Infoln("Server stopped") ethlogger.Infoln("Server stopped")
close(s.shutdownChan) close(s.shutdownChan)

View File

@ -39,7 +39,9 @@ func (msg *logMessage) send(logger LogSystem) {
var logMessages chan (*logMessage) var logMessages chan (*logMessage)
var logSystems []LogSystem var logSystems []LogSystem
var quit chan bool var quit chan chan error
var drained chan bool
var mutex = sync.Mutex{}
type LogLevel uint8 type LogLevel uint8
@ -52,34 +54,55 @@ const (
DebugDetailLevel DebugDetailLevel
) )
// log messages are dispatched to log writers func dispatch(msg *logMessage) {
func start() {
out:
for {
select {
case msg := <-logMessages:
for _, logSystem := range logSystems { for _, logSystem := range logSystems {
if logSystem.GetLogLevel() >= msg.LogLevel { if logSystem.GetLogLevel() >= msg.LogLevel {
msg.send(logSystem) msg.send(logSystem)
} }
} }
case <-quit: }
break out
// log messages are dispatched to log writers
func start() {
for {
select {
case status := <-quit:
status <- nil
return
case msg := <-logMessages:
dispatch(msg)
default:
drained <- true // this blocks until a message is sent to the queue
} }
} }
} }
func send(msg *logMessage) {
logMessages <- msg
select {
case <-drained:
default:
}
}
func Reset() {
mutex.Lock()
defer mutex.Unlock()
if logSystems != nil {
status := make(chan error)
quit <- status
select {
case <-drained:
default:
}
<-status
}
}
// waits until log messages are drained (dispatched to log writers) // waits until log messages are drained (dispatched to log writers)
func Flush() { func Flush() {
quit <- true if logSystems != nil {
<-drained
done:
for {
select {
case <-logMessages:
default:
break done
}
} }
} }
@ -97,7 +120,8 @@ func AddLogSystem(logSystem LogSystem) {
defer mutex.Unlock() defer mutex.Unlock()
if logSystems == nil { if logSystems == nil {
logMessages = make(chan *logMessage, 10) logMessages = make(chan *logMessage, 10)
quit = make(chan bool, 1) quit = make(chan chan error, 1)
drained = make(chan bool, 1)
go start() go start()
} }
logSystems = append(logSystems, logSystem) logSystems = append(logSystems, logSystem)
@ -106,14 +130,14 @@ func AddLogSystem(logSystem LogSystem) {
func (logger *Logger) sendln(level LogLevel, v ...interface{}) { func (logger *Logger) sendln(level LogLevel, v ...interface{}) {
if logMessages != nil { if logMessages != nil {
msg := newPrintlnLogMessage(level, logger.tag, v...) msg := newPrintlnLogMessage(level, logger.tag, v...)
logMessages <- msg send(msg)
} }
} }
func (logger *Logger) sendf(level LogLevel, format string, v ...interface{}) { func (logger *Logger) sendf(level LogLevel, format string, v ...interface{}) {
if logMessages != nil { if logMessages != nil {
msg := newPrintfLogMessage(level, logger.tag, format, v...) msg := newPrintfLogMessage(level, logger.tag, format, v...)
logMessages <- msg send(msg)
} }
} }

View File

@ -28,8 +28,19 @@ func (t *TestLogSystem) GetLogLevel() LogLevel {
return t.level return t.level
} }
func quote(s string) string { func TestLoggerFlush(t *testing.T) {
return fmt.Sprintf("'%s'", s) logger := NewLogger("TEST")
testLogSystem := &TestLogSystem{level: WarnLevel}
AddLogSystem(testLogSystem)
for i := 0; i < 5; i++ {
logger.Errorf(".")
}
Flush()
Reset()
output := testLogSystem.Output
if output != "[TEST] .[TEST] .[TEST] .[TEST] .[TEST] ." {
t.Error("Expected complete logger output '[TEST] .[TEST] .[TEST] .[TEST] .[TEST] .', got ", output)
}
} }
func TestLoggerPrintln(t *testing.T) { func TestLoggerPrintln(t *testing.T) {
@ -41,10 +52,11 @@ func TestLoggerPrintln(t *testing.T) {
logger.Infoln("info") logger.Infoln("info")
logger.Debugln("debug") logger.Debugln("debug")
Flush() Flush()
Reset()
output := testLogSystem.Output output := testLogSystem.Output
fmt.Println(quote(output)) fmt.Println(quote(output))
if output != "[TEST] error\n[TEST] warn\n" { if output != "[TEST] error\n[TEST] warn\n" {
t.Error("Expected logger output '[TEST] error\\n[TEST] warn\\n', got ", quote(testLogSystem.Output)) t.Error("Expected logger output '[TEST] error\\n[TEST] warn\\n', got ", output)
} }
} }
@ -57,10 +69,10 @@ func TestLoggerPrintf(t *testing.T) {
logger.Infof("info") logger.Infof("info")
logger.Debugf("debug") logger.Debugf("debug")
Flush() Flush()
Reset()
output := testLogSystem.Output output := testLogSystem.Output
fmt.Println(quote(output))
if output != "[TEST] error to { 2}\n[TEST] warn" { if output != "[TEST] error to { 2}\n[TEST] warn" {
t.Error("Expected logger output '[TEST] error to { 2}\\n[TEST] warn', got ", quote(testLogSystem.Output)) t.Error("Expected logger output '[TEST] error to { 2}\\n[TEST] warn', got ", output)
} }
} }
@ -73,13 +85,14 @@ func TestMultipleLogSystems(t *testing.T) {
logger.Errorln("error") logger.Errorln("error")
logger.Warnln("warn") logger.Warnln("warn")
Flush() Flush()
Reset()
output0 := testLogSystem0.Output output0 := testLogSystem0.Output
output1 := testLogSystem1.Output output1 := testLogSystem1.Output
if output0 != "[TEST] error\n" { if output0 != "[TEST] error\n" {
t.Error("Expected logger 0 output '[TEST] error\\n', got ", quote(testLogSystem0.Output)) t.Error("Expected logger 0 output '[TEST] error\\n', got ", output0)
} }
if output1 != "[TEST] error\n[TEST] warn\n" { if output1 != "[TEST] error\n[TEST] warn\n" {
t.Error("Expected logger 1 output '[TEST] error\\n[TEST] warn\\n', got ", quote(testLogSystem1.Output)) t.Error("Expected logger 1 output '[TEST] error\\n[TEST] warn\\n', got ", output1)
} }
} }
@ -94,9 +107,8 @@ func TestFileLogSystem(t *testing.T) {
Flush() Flush()
contents, _ := ioutil.ReadFile(filename) contents, _ := ioutil.ReadFile(filename)
output := string(contents) output := string(contents)
fmt.Println(quote(output))
if output != "[TEST] error to test.log\n[TEST] warn\n" { if output != "[TEST] error to test.log\n[TEST] warn\n" {
t.Error("Expected contents of file 'test.log': '[TEST] error to test.log\\n[TEST] warn\\n', got ", quote(output)) t.Error("Expected contents of file 'test.log': '[TEST] error to test.log\\n[TEST] warn\\n', got ", output)
} else { } else {
os.Remove(filename) os.Remove(filename)
} }

View File

@ -4,7 +4,7 @@ import (
"bytes" "bytes"
"github.com/ethereum/eth-go/ethchain" "github.com/ethereum/eth-go/ethchain"
"github.com/ethereum/eth-go/ethlog" "github.com/ethereum/eth-go/ethlog"
"github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethwire" "github.com/ethereum/eth-go/ethwire"
"sort" "sort"
) )
@ -15,13 +15,13 @@ type Miner struct {
pow ethchain.PoW pow ethchain.PoW
ethereum ethchain.EthManager ethereum ethchain.EthManager
coinbase []byte coinbase []byte
reactChan chan ethutil.React reactChan chan ethreact.Event
txs ethchain.Transactions txs ethchain.Transactions
uncles []*ethchain.Block uncles []*ethchain.Block
block *ethchain.Block block *ethchain.Block
powChan chan []byte powChan chan []byte
powQuitChan chan ethutil.React powQuitChan chan ethreact.Event
quitChan chan bool quitChan chan chan error
} }
func (self *Miner) GetPow() ethchain.PoW { func (self *Miner) GetPow() ethchain.PoW {
@ -29,55 +29,53 @@ func (self *Miner) GetPow() ethchain.PoW {
} }
func NewDefaultMiner(coinbase []byte, ethereum ethchain.EthManager) *Miner { func NewDefaultMiner(coinbase []byte, ethereum ethchain.EthManager) *Miner {
reactChan := make(chan ethutil.React, 1) // This is the channel that receives 'updates' when ever a new transaction or block comes in miner := Miner{
powChan := make(chan []byte, 1) // This is the channel that receives valid sha hases for a given block pow: &ethchain.EasyPow{},
powQuitChan := make(chan ethutil.React, 1) // This is the channel that can exit the miner thread ethereum: ethereum,
quitChan := make(chan bool, 1) coinbase: coinbase,
}
ethereum.Reactor().Subscribe("newBlock", reactChan) return &miner
ethereum.Reactor().Subscribe("newTx:pre", reactChan) }
func (miner *Miner) Start() {
miner.reactChan = make(chan ethreact.Event, 1) // This is the channel that receives 'updates' when ever a new transaction or block comes in
miner.powChan = make(chan []byte, 1) // This is the channel that receives valid sha hashes for a given block
miner.powQuitChan = make(chan ethreact.Event, 1) // This is the channel that can exit the miner thread
miner.quitChan = make(chan chan error, 1)
// Insert initial TXs in our little miner 'pool'
miner.txs = miner.ethereum.TxPool().Flush()
miner.block = miner.ethereum.BlockChain().NewBlock(miner.coinbase)
// Prepare inital block
//miner.ethereum.StateManager().Prepare(miner.block.State(), miner.block.State())
go miner.listener()
reactor := miner.ethereum.Reactor()
reactor.Subscribe("newBlock", miner.reactChan)
reactor.Subscribe("newTx:pre", miner.reactChan)
// We need the quit chan to be a Reactor event. // We need the quit chan to be a Reactor event.
// The POW search method is actually blocking and if we don't // The POW search method is actually blocking and if we don't
// listen to the reactor events inside of the pow itself // listen to the reactor events inside of the pow itself
// The miner overseer will never get the reactor events themselves // The miner overseer will never get the reactor events themselves
// Only after the miner will find the sha // Only after the miner will find the sha
ethereum.Reactor().Subscribe("newBlock", powQuitChan) reactor.Subscribe("newBlock", miner.powQuitChan)
ethereum.Reactor().Subscribe("newTx:pre", powQuitChan) reactor.Subscribe("newTx:pre", miner.powQuitChan)
miner := Miner{
pow: &ethchain.EasyPow{},
ethereum: ethereum,
coinbase: coinbase,
reactChan: reactChan,
powChan: powChan,
powQuitChan: powQuitChan,
quitChan: quitChan,
}
// Insert initial TXs in our little miner 'pool'
miner.txs = ethereum.TxPool().Flush()
miner.block = ethereum.BlockChain().NewBlock(miner.coinbase)
return &miner
}
func (miner *Miner) Start() {
// Prepare inital block
//miner.ethereum.StateManager().Prepare(miner.block.State(), miner.block.State())
go miner.listener()
logger.Infoln("Started") logger.Infoln("Started")
miner.ethereum.Reactor().Post("miner:start", miner) reactor.Post("miner:start", miner)
} }
func (miner *Miner) listener() { func (miner *Miner) listener() {
out:
for { for {
select { select {
case <-miner.quitChan: case status := <-miner.quitChan:
logger.Infoln("Stopped") logger.Infoln("Stopped")
break out status <- nil
return
case chanMessage := <-miner.reactChan: case chanMessage := <-miner.reactChan:
if block, ok := chanMessage.Resource.(*ethchain.Block); ok { if block, ok := chanMessage.Resource.(*ethchain.Block); ok {
@ -133,13 +131,22 @@ out:
} }
} }
func (self *Miner) Stop() { func (miner *Miner) Stop() {
logger.Infoln("Stopping...") logger.Infoln("Stopping...")
self.quitChan <- true miner.powQuitChan <- ethreact.Event{}
self.powQuitChan <- ethutil.React{}
self.ethereum.Reactor().Post("miner:stop", self) status := make(chan error)
miner.quitChan <- status
<-status
reactor := miner.ethereum.Reactor()
reactor.Unsubscribe("newBlock", miner.powQuitChan)
reactor.Unsubscribe("newTx:pre", miner.powQuitChan)
reactor.Unsubscribe("newBlock", miner.reactChan)
reactor.Unsubscribe("newTx:pre", miner.reactChan)
reactor.Post("miner:stop", miner)
} }
func (self *Miner) mineNewBlock() { func (self *Miner) mineNewBlock() {

28
ethreact/README.md Normal file
View File

@ -0,0 +1,28 @@
## Reactor
Reactor is the internal broadcast engine that allows components to be notified of ethereum stack events such as finding new blocks or change in state.
Event notification is handled via subscription:
var blockChan = make(chan ethreact.Event, 10)
reactor.Subscribe("newBlock", blockChan)
ethreact.Event broadcast on the channel are
type Event struct {
Resource interface{}
Name string
}
Resource is polimorphic depending on the event type and should be typecast before use, e.g:
b := <-blockChan:
block := b.Resource.(*ethchain.Block)
Events are guaranteed to be broadcast in order but the broadcast never blocks or leaks which means while the subscribing event channel is blocked (e.g., full if buffered) further messages will be skipped.
The engine allows arbitrary events to be posted and subscribed to.
ethereum.Reactor().Post("newBlock", newBlock)

182
ethreact/reactor.go Normal file
View File

@ -0,0 +1,182 @@
package ethreact
import (
"github.com/ethereum/eth-go/ethlog"
"sync"
)
var logger = ethlog.NewLogger("REACTOR")
const (
eventBufferSize int = 10
)
type EventHandler struct {
lock sync.RWMutex
name string
chans []chan Event
}
// Post the Event with the reactor resource on the channels
// currently subscribed to the event
func (e *EventHandler) Post(event Event) {
e.lock.RLock()
defer e.lock.RUnlock()
// if we want to preserve order pushing to subscibed channels
// dispatching should be syncrounous
// this means if subscribed event channel is blocked
// the reactor dispatch will be blocked, so we need to mitigate by skipping
// rogue blocking subscribers
for i, ch := range e.chans {
select {
case ch <- event:
default:
logger.Warnf("subscribing channel %d to event %s blocked. skipping\n", i, event.Name)
}
}
}
// Add a subscriber to this event
func (e *EventHandler) Add(ch chan Event) {
e.lock.Lock()
defer e.lock.Unlock()
e.chans = append(e.chans, ch)
}
// Remove a subscriber
func (e *EventHandler) Remove(ch chan Event) int {
e.lock.Lock()
defer e.lock.Unlock()
for i, c := range e.chans {
if c == ch {
e.chans = append(e.chans[:i], e.chans[i+1:]...)
}
}
return len(e.chans)
}
// Basic reactor event
type Event struct {
Resource interface{}
Name string
}
// The reactor basic engine. Acts as bridge
// between the events and the subscribers/posters
type ReactorEngine struct {
lock sync.RWMutex
eventChannel chan Event
eventHandlers map[string]*EventHandler
quit chan chan error
running bool
drained chan bool
}
func New() *ReactorEngine {
return &ReactorEngine{
eventHandlers: make(map[string]*EventHandler),
eventChannel: make(chan Event, eventBufferSize),
quit: make(chan chan error, 1),
drained: make(chan bool, 1),
}
}
func (reactor *ReactorEngine) Start() {
reactor.lock.Lock()
defer reactor.lock.Unlock()
if !reactor.running {
go func() {
for {
select {
case status := <-reactor.quit:
reactor.lock.Lock()
defer reactor.lock.Unlock()
reactor.running = false
logger.Infoln("stopped")
status <- nil
return
case event := <-reactor.eventChannel:
// needs to be called syncronously to keep order of events
reactor.dispatch(event)
default:
reactor.drained <- true // blocking till message is coming in
}
}
}()
reactor.running = true
logger.Infoln("started")
}
}
func (reactor *ReactorEngine) Stop() {
if reactor.running {
status := make(chan error)
reactor.quit <- status
select {
case <-reactor.drained:
default:
}
<-status
}
}
func (reactor *ReactorEngine) Flush() {
<-reactor.drained
}
// Subscribe a channel to the specified event
func (reactor *ReactorEngine) Subscribe(event string, eventChannel chan Event) {
reactor.lock.Lock()
defer reactor.lock.Unlock()
eventHandler := reactor.eventHandlers[event]
// Create a new event handler if one isn't available
if eventHandler == nil {
eventHandler = &EventHandler{name: event}
reactor.eventHandlers[event] = eventHandler
}
// Add the events channel to reactor event handler
eventHandler.Add(eventChannel)
logger.Debugf("added new subscription to %s", event)
}
func (reactor *ReactorEngine) Unsubscribe(event string, eventChannel chan Event) {
reactor.lock.Lock()
defer reactor.lock.Unlock()
eventHandler := reactor.eventHandlers[event]
if eventHandler != nil {
len := eventHandler.Remove(eventChannel)
if len == 0 {
reactor.eventHandlers[event] = nil
}
logger.Debugf("removed subscription to %s", event)
}
}
func (reactor *ReactorEngine) Post(event string, resource interface{}) {
reactor.lock.Lock()
defer reactor.lock.Unlock()
if reactor.running {
reactor.eventChannel <- Event{Resource: resource, Name: event}
select {
case <-reactor.drained:
default:
}
}
}
func (reactor *ReactorEngine) dispatch(event Event) {
name := event.Name
eventHandler := reactor.eventHandlers[name]
// if no subscriptions to this event type - no event handler created
// then noone to notify
if eventHandler != nil {
// needs to be called syncronously
eventHandler.Post(event)
}
}

63
ethreact/reactor_test.go Normal file
View File

@ -0,0 +1,63 @@
package ethreact
import (
"fmt"
"testing"
)
func TestReactorAdd(t *testing.T) {
reactor := New()
ch := make(chan Event)
reactor.Subscribe("test", ch)
if reactor.eventHandlers["test"] == nil {
t.Error("Expected new eventHandler to be created")
}
reactor.Unsubscribe("test", ch)
if reactor.eventHandlers["test"] != nil {
t.Error("Expected eventHandler to be removed")
}
}
func TestReactorEvent(t *testing.T) {
var name string
reactor := New()
// Buffer the channel, so it doesn't block for this test
cap := 20
ch := make(chan Event, cap)
reactor.Subscribe("even", ch)
reactor.Subscribe("odd", ch)
reactor.Post("even", "disappears") // should not broadcast if engine not started
reactor.Start()
for i := 0; i < cap; i++ {
if i%2 == 0 {
name = "even"
} else {
name = "odd"
}
reactor.Post(name, i)
}
reactor.Post("test", cap) // this should not block
i := 0
reactor.Flush()
close(ch)
for event := range ch {
fmt.Printf("%d: %v", i, event)
if i%2 == 0 {
name = "even"
} else {
name = "odd"
}
if val, ok := event.Resource.(int); ok {
if i != val || event.Name != name {
t.Error("Expected event %d to be of type %s and resource %d, got ", i, name, i, val)
}
} else {
t.Error("Unable to cast")
}
i++
}
if i != cap {
t.Error("excpected exactly %d events, got ", i)
}
reactor.Stop()
}

View File

@ -1,87 +0,0 @@
package ethutil
import (
"sync"
)
type ReactorEvent struct {
mut sync.Mutex
event string
chans []chan React
}
// Post the specified reactor resource on the channels
// currently subscribed
func (e *ReactorEvent) Post(react React) {
e.mut.Lock()
defer e.mut.Unlock()
for _, ch := range e.chans {
go func(ch chan React) {
ch <- react
}(ch)
}
}
// Add a subscriber to this event
func (e *ReactorEvent) Add(ch chan React) {
e.mut.Lock()
defer e.mut.Unlock()
e.chans = append(e.chans, ch)
}
// Remove a subscriber
func (e *ReactorEvent) Remove(ch chan React) {
e.mut.Lock()
defer e.mut.Unlock()
for i, c := range e.chans {
if c == ch {
e.chans = append(e.chans[:i], e.chans[i+1:]...)
}
}
}
// Basic reactor resource
type React struct {
Resource interface{}
Event string
}
// The reactor basic engine. Acts as bridge
// between the events and the subscribers/posters
type ReactorEngine struct {
patterns map[string]*ReactorEvent
}
func NewReactorEngine() *ReactorEngine {
return &ReactorEngine{patterns: make(map[string]*ReactorEvent)}
}
// Subscribe a channel to the specified event
func (reactor *ReactorEngine) Subscribe(event string, ch chan React) {
ev := reactor.patterns[event]
// Create a new event if one isn't available
if ev == nil {
ev = &ReactorEvent{event: event}
reactor.patterns[event] = ev
}
// Add the channel to reactor event handler
ev.Add(ch)
}
func (reactor *ReactorEngine) Unsubscribe(event string, ch chan React) {
ev := reactor.patterns[event]
if ev != nil {
ev.Remove(ch)
}
}
func (reactor *ReactorEngine) Post(event string, resource interface{}) {
ev := reactor.patterns[event]
if ev != nil {
ev.Post(React{Resource: resource, Event: event})
}
}

View File

@ -1,30 +0,0 @@
package ethutil
import "testing"
func TestReactorAdd(t *testing.T) {
engine := NewReactorEngine()
ch := make(chan React)
engine.Subscribe("test", ch)
if len(engine.patterns) != 1 {
t.Error("Expected patterns to be 1, got", len(engine.patterns))
}
}
func TestReactorEvent(t *testing.T) {
engine := NewReactorEngine()
// Buffer 1, so it doesn't block for this test
ch := make(chan React, 1)
engine.Subscribe("test", ch)
engine.Post("test", "hello")
value := <-ch
if val, ok := value.Resource.(string); ok {
if val != "hello" {
t.Error("Expected Resource to be 'hello', got", val)
}
} else {
t.Error("Unable to cast")
}
}