Merge pull request #57 from fjl/feature/raceless-ethereum-filters
Fix filter map race
This commit is contained in:
		
						commit
						3db6a8e92d
					
				
							
								
								
									
										33
									
								
								ethereum.go
									
									
									
									
									
								
							
							
						
						
									
										33
									
								
								ethereum.go
									
									
									
									
									
								
							@ -95,7 +95,9 @@ type Ethereum struct {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	isUpToDate bool
 | 
						isUpToDate bool
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	filters map[int]*ethchain.Filter
 | 
						filterMu sync.RWMutex
 | 
				
			||||||
 | 
						filterId int
 | 
				
			||||||
 | 
						filters  map[int]*ethchain.Filter
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager *ethcrypto.KeyManager, caps Caps, usePnp bool) (*Ethereum, error) {
 | 
					func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager *ethcrypto.KeyManager, caps Caps, usePnp bool) (*Ethereum, error) {
 | 
				
			||||||
@ -594,22 +596,29 @@ out:
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
var filterId = 0
 | 
					// InstallFilter adds filter for blockchain events.
 | 
				
			||||||
 | 
					// The filter's callbacks will run for matching blocks and messages.
 | 
				
			||||||
func (self *Ethereum) InstallFilter(object map[string]interface{}) (*ethchain.Filter, int) {
 | 
					// The filter should not be modified after it has been installed.
 | 
				
			||||||
	defer func() { filterId++ }()
 | 
					func (self *Ethereum) InstallFilter(filter *ethchain.Filter) (id int) {
 | 
				
			||||||
 | 
						self.filterMu.Lock()
 | 
				
			||||||
	filter := ethchain.NewFilterFromMap(object, self)
 | 
						id = self.filterId
 | 
				
			||||||
	self.filters[filterId] = filter
 | 
						self.filters[id] = filter
 | 
				
			||||||
 | 
						self.filterId++
 | 
				
			||||||
	return filter, filterId
 | 
						self.filterMu.Unlock()
 | 
				
			||||||
 | 
						return id
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (self *Ethereum) UninstallFilter(id int) {
 | 
					func (self *Ethereum) UninstallFilter(id int) {
 | 
				
			||||||
 | 
						self.filterMu.Lock()
 | 
				
			||||||
	delete(self.filters, id)
 | 
						delete(self.filters, id)
 | 
				
			||||||
 | 
						self.filterMu.Unlock()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// GetFilter retrieves a filter installed using InstallFilter.
 | 
				
			||||||
 | 
					// The filter may not be modified.
 | 
				
			||||||
func (self *Ethereum) GetFilter(id int) *ethchain.Filter {
 | 
					func (self *Ethereum) GetFilter(id int) *ethchain.Filter {
 | 
				
			||||||
 | 
						self.filterMu.RLock()
 | 
				
			||||||
 | 
						defer self.filterMu.RUnlock()
 | 
				
			||||||
	return self.filters[id]
 | 
						return self.filters[id]
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -627,14 +636,17 @@ out:
 | 
				
			|||||||
			break out
 | 
								break out
 | 
				
			||||||
		case block := <-blockChan:
 | 
							case block := <-blockChan:
 | 
				
			||||||
			if block, ok := block.Resource.(*ethchain.Block); ok {
 | 
								if block, ok := block.Resource.(*ethchain.Block); ok {
 | 
				
			||||||
 | 
									self.filterMu.RLock()
 | 
				
			||||||
				for _, filter := range self.filters {
 | 
									for _, filter := range self.filters {
 | 
				
			||||||
					if filter.BlockCallback != nil {
 | 
										if filter.BlockCallback != nil {
 | 
				
			||||||
						filter.BlockCallback(block)
 | 
											filter.BlockCallback(block)
 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
 | 
									self.filterMu.RUnlock()
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		case msg := <-messageChan:
 | 
							case msg := <-messageChan:
 | 
				
			||||||
			if messages, ok := msg.Resource.(ethstate.Messages); ok {
 | 
								if messages, ok := msg.Resource.(ethstate.Messages); ok {
 | 
				
			||||||
 | 
									self.filterMu.RLock()
 | 
				
			||||||
				for _, filter := range self.filters {
 | 
									for _, filter := range self.filters {
 | 
				
			||||||
					if filter.MessageCallback != nil {
 | 
										if filter.MessageCallback != nil {
 | 
				
			||||||
						msgs := filter.FilterMessages(messages)
 | 
											msgs := filter.FilterMessages(messages)
 | 
				
			||||||
@ -643,6 +655,7 @@ out:
 | 
				
			|||||||
						}
 | 
											}
 | 
				
			||||||
					}
 | 
										}
 | 
				
			||||||
				}
 | 
									}
 | 
				
			||||||
 | 
									self.filterMu.RUnlock()
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
		Reference in New Issue
	
	Block a user