Merge branch 'develop' of github.com-obscure:ethereum/eth-go into develop

This commit is contained in:
obscuren 2014-10-10 16:58:26 +02:00
commit 9b494c6869
3 changed files with 83 additions and 19 deletions

View File

@ -95,6 +95,8 @@ type Ethereum struct {
isUpToDate bool isUpToDate bool
filterMu sync.RWMutex
filterId int
filters map[int]*ethchain.Filter filters map[int]*ethchain.Filter
} }
@ -594,22 +596,29 @@ out:
} }
} }
var filterId = 0 // InstallFilter adds filter for blockchain events.
// The filter's callbacks will run for matching blocks and messages.
func (self *Ethereum) InstallFilter(object map[string]interface{}) (*ethchain.Filter, int) { // The filter should not be modified after it has been installed.
defer func() { filterId++ }() func (self *Ethereum) InstallFilter(filter *ethchain.Filter) (id int) {
self.filterMu.Lock()
filter := ethchain.NewFilterFromMap(object, self) id = self.filterId
self.filters[filterId] = filter self.filters[id] = filter
self.filterId++
return filter, filterId self.filterMu.Unlock()
return id
} }
func (self *Ethereum) UninstallFilter(id int) { func (self *Ethereum) UninstallFilter(id int) {
self.filterMu.Lock()
delete(self.filters, id) delete(self.filters, id)
self.filterMu.Unlock()
} }
// GetFilter retrieves a filter installed using InstallFilter.
// The filter may not be modified.
func (self *Ethereum) GetFilter(id int) *ethchain.Filter { func (self *Ethereum) GetFilter(id int) *ethchain.Filter {
self.filterMu.RLock()
defer self.filterMu.RUnlock()
return self.filters[id] return self.filters[id]
} }
@ -627,14 +636,17 @@ out:
break out break out
case block := <-blockChan: case block := <-blockChan:
if block, ok := block.Resource.(*ethchain.Block); ok { if block, ok := block.Resource.(*ethchain.Block); ok {
self.filterMu.RLock()
for _, filter := range self.filters { for _, filter := range self.filters {
if filter.BlockCallback != nil { if filter.BlockCallback != nil {
filter.BlockCallback(block) filter.BlockCallback(block)
} }
} }
self.filterMu.RUnlock()
} }
case msg := <-messageChan: case msg := <-messageChan:
if messages, ok := msg.Resource.(ethstate.Messages); ok { if messages, ok := msg.Resource.(ethstate.Messages); ok {
self.filterMu.RLock()
for _, filter := range self.filters { for _, filter := range self.filters {
if filter.MessageCallback != nil { if filter.MessageCallback != nil {
msgs := filter.FilterMessages(messages) msgs := filter.FilterMessages(messages)
@ -643,6 +655,7 @@ out:
} }
} }
} }
self.filterMu.RUnlock()
} }
} }
} }

View File

@ -1,5 +1,7 @@
package eventer package eventer
import "sync"
// Basic receiver interface. // Basic receiver interface.
type Receiver interface { type Receiver interface {
Send(Event) Send(Event)
@ -27,17 +29,18 @@ type Event struct {
type Channels map[string][]Receiver type Channels map[string][]Receiver
type EventMachine struct { type EventMachine struct {
mu sync.RWMutex
channels Channels channels Channels
} }
func New() *EventMachine { func New() *EventMachine {
return &EventMachine{ return &EventMachine{channels: make(Channels)}
channels: make(Channels),
}
} }
func (self *EventMachine) add(typ string, r Receiver) { func (self *EventMachine) add(typ string, r Receiver) {
self.mu.Lock()
self.channels[typ] = append(self.channels[typ], r) self.channels[typ] = append(self.channels[typ], r)
self.mu.Unlock()
} }
// Generalised methods for the known receiver types // Generalised methods for the known receiver types
@ -64,11 +67,11 @@ func (self *EventMachine) RegisterFunc(typ string, f Function) {
func (self *EventMachine) Register(typ string) Channel { func (self *EventMachine) Register(typ string) Channel {
c := make(Channel, 1) c := make(Channel, 1)
self.add(typ, c) self.add(typ, c)
return c return c
} }
func (self *EventMachine) Post(typ string, data interface{}) { func (self *EventMachine) Post(typ string, data interface{}) {
self.mu.RLock()
if self.channels[typ] != nil { if self.channels[typ] != nil {
ev := Event{typ, data} ev := Event{typ, data}
for _, receiver := range self.channels[typ] { for _, receiver := range self.channels[typ] {
@ -76,4 +79,5 @@ func (self *EventMachine) Post(typ string, data interface{}) {
receiver.Send(ev) receiver.Send(ev)
} }
} }
self.mu.RUnlock()
} }

View File

@ -1,9 +1,13 @@
package eventer package eventer
import "testing" import (
"math/rand"
"testing"
"time"
)
func TestChannel(t *testing.T) { func TestChannel(t *testing.T) {
eventer := New(nil) eventer := New()
c := make(Channel, 1) c := make(Channel, 1)
eventer.RegisterChannel("test", c) eventer.RegisterChannel("test", c)
@ -17,7 +21,7 @@ func TestChannel(t *testing.T) {
} }
func TestFunction(t *testing.T) { func TestFunction(t *testing.T) {
eventer := New(nil) eventer := New()
var data string var data string
eventer.RegisterFunc("test", func(ev Event) { eventer.RegisterFunc("test", func(ev Event) {
@ -31,7 +35,7 @@ func TestFunction(t *testing.T) {
} }
func TestRegister(t *testing.T) { func TestRegister(t *testing.T) {
eventer := New(nil) eventer := New()
c := eventer.Register("test") c := eventer.Register("test")
eventer.Post("test", "hello world") eventer.Post("test", "hello world")
@ -44,7 +48,7 @@ func TestRegister(t *testing.T) {
} }
func TestOn(t *testing.T) { func TestOn(t *testing.T) {
eventer := New(nil) eventer := New()
c := make(Channel, 1) c := make(Channel, 1)
eventer.On("test", c) eventer.On("test", c)
@ -64,3 +68,46 @@ func TestOn(t *testing.T) {
t.Error("Expected function event with data 'hello world'. Got", data) t.Error("Expected function event with data 'hello world'. Got", data)
} }
} }
func TestConcurrentUsage(t *testing.T) {
rand.Seed(time.Now().Unix())
eventer := New()
stop := make(chan struct{})
recv := make(chan int)
poster := func() {
for {
select {
case <-stop:
return
default:
eventer.Post("test", "hi")
}
}
}
listener := func(i int) {
time.Sleep(time.Duration(rand.Intn(99)) * time.Millisecond)
c := eventer.Register("test")
// wait for the first event
<-c
recv <- i
// keep receiving to prevent deadlock
for {
select {
case <-stop:
return
case <-c:
}
}
}
nlisteners := 200
go poster()
for i := 0; i < nlisteners; i++ {
go listener(i)
}
// wait until everyone has been served
for i := 0; i < nlisteners; i++ {
<-recv
}
close(stop)
}