eth: fix filter map data race
This commit also documents (but doesn't enforce) that filters are immutable while they're installed. This required a minor API change.
This commit is contained in:
parent
a38dafcc57
commit
e83a999039
33
ethereum.go
33
ethereum.go
@ -95,7 +95,9 @@ type Ethereum struct {
|
|||||||
|
|
||||||
isUpToDate bool
|
isUpToDate bool
|
||||||
|
|
||||||
filters map[int]*ethchain.Filter
|
filterMu sync.RWMutex
|
||||||
|
filterId int
|
||||||
|
filters map[int]*ethchain.Filter
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager *ethcrypto.KeyManager, caps Caps, usePnp bool) (*Ethereum, error) {
|
func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager *ethcrypto.KeyManager, caps Caps, usePnp bool) (*Ethereum, error) {
|
||||||
@ -594,22 +596,29 @@ out:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var filterId = 0
|
// InstallFilter adds filter for blockchain events.
|
||||||
|
// The filter's callbacks will run for matching blocks and messages.
|
||||||
func (self *Ethereum) InstallFilter(object map[string]interface{}) (*ethchain.Filter, int) {
|
// The filter should not be modified after it has been installed.
|
||||||
defer func() { filterId++ }()
|
func (self *Ethereum) InstallFilter(filter *ethchain.Filter) (id int) {
|
||||||
|
self.filterMu.Lock()
|
||||||
filter := ethchain.NewFilterFromMap(object, self)
|
id = self.filterId
|
||||||
self.filters[filterId] = filter
|
self.filters[id] = filter
|
||||||
|
self.filterId++
|
||||||
return filter, filterId
|
self.filterMu.Unlock()
|
||||||
|
return id
|
||||||
}
|
}
|
||||||
|
|
||||||
func (self *Ethereum) UninstallFilter(id int) {
|
func (self *Ethereum) UninstallFilter(id int) {
|
||||||
|
self.filterMu.Lock()
|
||||||
delete(self.filters, id)
|
delete(self.filters, id)
|
||||||
|
self.filterMu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetFilter retrieves a filter installed using InstallFilter.
|
||||||
|
// The filter may not be modified.
|
||||||
func (self *Ethereum) GetFilter(id int) *ethchain.Filter {
|
func (self *Ethereum) GetFilter(id int) *ethchain.Filter {
|
||||||
|
self.filterMu.RLock()
|
||||||
|
defer self.filterMu.RUnlock()
|
||||||
return self.filters[id]
|
return self.filters[id]
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -627,14 +636,17 @@ out:
|
|||||||
break out
|
break out
|
||||||
case block := <-blockChan:
|
case block := <-blockChan:
|
||||||
if block, ok := block.Resource.(*ethchain.Block); ok {
|
if block, ok := block.Resource.(*ethchain.Block); ok {
|
||||||
|
self.filterMu.RLock()
|
||||||
for _, filter := range self.filters {
|
for _, filter := range self.filters {
|
||||||
if filter.BlockCallback != nil {
|
if filter.BlockCallback != nil {
|
||||||
filter.BlockCallback(block)
|
filter.BlockCallback(block)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
self.filterMu.RUnlock()
|
||||||
}
|
}
|
||||||
case msg := <-messageChan:
|
case msg := <-messageChan:
|
||||||
if messages, ok := msg.Resource.(ethstate.Messages); ok {
|
if messages, ok := msg.Resource.(ethstate.Messages); ok {
|
||||||
|
self.filterMu.RLock()
|
||||||
for _, filter := range self.filters {
|
for _, filter := range self.filters {
|
||||||
if filter.MessageCallback != nil {
|
if filter.MessageCallback != nil {
|
||||||
msgs := filter.FilterMessages(messages)
|
msgs := filter.FilterMessages(messages)
|
||||||
@ -643,6 +655,7 @@ out:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
self.filterMu.RUnlock()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user