Merge pull request #864 from obscuren/filter_changes

xeth, core, event/filter, rpc: new block and transaction filters
This commit is contained in:
Jeffrey Wilcke 2015-05-08 02:56:55 -07:00
commit 637b2415d9
6 changed files with 225 additions and 87 deletions

View File

@ -22,9 +22,9 @@ type Filter struct {
max int max int
topics [][]common.Hash topics [][]common.Hash
BlockCallback func(*types.Block, state.Logs) BlockCallback func(*types.Block, state.Logs)
PendingCallback func(*types.Transaction) TransactionCallback func(*types.Transaction)
LogsCallback func(state.Logs) LogsCallback func(state.Logs)
} }
// Create a new filter which uses a bloom filter on blocks to figure out whether a particular block // Create a new filter which uses a bloom filter on blocks to figure out whether a particular block

View File

@ -204,6 +204,27 @@ func (self *TxPool) AddTransactions(txs []*types.Transaction) {
} }
} }
// GetTransaction allows you to check the pending and queued transaction in the
// transaction pool.
// It has two stategies, first check the pool (map) then check the queue
func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction {
// check the txs first
if tx, ok := tp.txs[hash]; ok {
return tx
}
// check queue
for _, txs := range tp.queue {
for _, tx := range txs {
if tx.Hash() == hash {
return tx
}
}
}
return nil
}
func (self *TxPool) GetTransactions() (txs types.Transactions) { func (self *TxPool) GetTransactions() (txs types.Transactions) {
self.mu.RLock() self.mu.RLock()
defer self.mu.RUnlock() defer self.mu.RUnlock()

View File

@ -88,8 +88,8 @@ out:
case core.TxPreEvent: case core.TxPreEvent:
self.filterMu.RLock() self.filterMu.RLock()
for _, filter := range self.filters { for _, filter := range self.filters {
if filter.PendingCallback != nil { if filter.TransactionCallback != nil {
filter.PendingCallback(event.Tx) filter.TransactionCallback(event.Tx)
} }
} }
self.filterMu.RUnlock() self.filterMu.RUnlock()

View File

@ -322,14 +322,13 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err
return err return err
} }
id := api.xeth().RegisterFilter(args.Earliest, args.Latest, args.Skip, args.Max, args.Address, args.Topics) id := api.xeth().NewLogFilter(args.Earliest, args.Latest, args.Skip, args.Max, args.Address, args.Topics)
*reply = newHexNum(big.NewInt(int64(id)).Bytes()) *reply = newHexNum(big.NewInt(int64(id)).Bytes())
case "eth_newBlockFilter": case "eth_newBlockFilter":
args := new(FilterStringArgs) *reply = newHexNum(api.xeth().NewBlockFilter())
if err := json.Unmarshal(req.Params, &args); err != nil { case "eth_newPendingTransactionFilter":
return err *reply = newHexNum(api.xeth().NewTransactionFilter())
}
*reply = newHexNum(api.xeth().NewFilterString(args.Word))
case "eth_uninstallFilter": case "eth_uninstallFilter":
args := new(FilterIdArgs) args := new(FilterIdArgs)
if err := json.Unmarshal(req.Params, &args); err != nil { if err := json.Unmarshal(req.Params, &args); err != nil {
@ -341,7 +340,17 @@ func (api *EthereumApi) GetRequestReply(req *RpcRequest, reply *interface{}) err
if err := json.Unmarshal(req.Params, &args); err != nil { if err := json.Unmarshal(req.Params, &args); err != nil {
return err return err
} }
*reply = NewLogsRes(api.xeth().FilterChanged(args.Id))
switch api.xeth().GetFilterType(args.Id) {
case xeth.BlockFilterTy:
*reply = NewHashesRes(api.xeth().BlockFilterChanged(args.Id))
case xeth.TransactionFilterTy:
*reply = NewHashesRes(api.xeth().TransactionFilterChanged(args.Id))
case xeth.LogFilterTy:
*reply = NewLogsRes(api.xeth().LogFilterChanged(args.Id))
default:
*reply = []string{} // reply empty string slice
}
case "eth_getFilterLogs": case "eth_getFilterLogs":
args := new(FilterIdArgs) args := new(FilterIdArgs)
if err := json.Unmarshal(req.Params, &args); err != nil { if err := json.Unmarshal(req.Params, &args); err != nil {

View File

@ -3,6 +3,7 @@ package rpc
import ( import (
"encoding/json" "encoding/json"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
) )
@ -303,3 +304,13 @@ func NewLogsRes(logs state.Logs) (ls []LogRes) {
return return
} }
func NewHashesRes(hs []common.Hash) []string {
hashes := make([]string, len(hs))
for i, hash := range hs {
hashes[i] = hash.Hex()
}
return hashes
}

View File

@ -29,6 +29,14 @@ var (
defaultGas = big.NewInt(90000) //500000 defaultGas = big.NewInt(90000) //500000
) )
// byte will be inferred
const (
UnknownFilterTy = iota
BlockFilterTy
TransactionFilterTy
LogFilterTy
)
func DefaultGas() *big.Int { return new(big.Int).Set(defaultGas) } func DefaultGas() *big.Int { return new(big.Int).Set(defaultGas) }
func DefaultGasPrice() *big.Int { return new(big.Int).Set(defaultGasPrice) } func DefaultGasPrice() *big.Int { return new(big.Int).Set(defaultGasPrice) }
@ -42,11 +50,17 @@ type XEth struct {
quit chan struct{} quit chan struct{}
filterManager *filter.FilterManager filterManager *filter.FilterManager
logMut sync.RWMutex logMu sync.RWMutex
logs map[int]*logFilter logQueue map[int]*logQueue
messagesMut sync.RWMutex blockMu sync.RWMutex
messages map[int]*whisperFilter blockQueue map[int]*hashQueue
transactionMu sync.RWMutex
transactionQueue map[int]*hashQueue
messagesMu sync.RWMutex
messages map[int]*whisperFilter
// regmut sync.Mutex // regmut sync.Mutex
// register map[string][]*interface{} // TODO improve return type // register map[string][]*interface{} // TODO improve return type
@ -59,14 +73,16 @@ type XEth struct {
// confirms all transactions will be used. // confirms all transactions will be used.
func New(eth *eth.Ethereum, frontend Frontend) *XEth { func New(eth *eth.Ethereum, frontend Frontend) *XEth {
xeth := &XEth{ xeth := &XEth{
backend: eth, backend: eth,
frontend: frontend, frontend: frontend,
whisper: NewWhisper(eth.Whisper()), whisper: NewWhisper(eth.Whisper()),
quit: make(chan struct{}), quit: make(chan struct{}),
filterManager: filter.NewFilterManager(eth.EventMux()), filterManager: filter.NewFilterManager(eth.EventMux()),
logs: make(map[int]*logFilter), logQueue: make(map[int]*logQueue),
messages: make(map[int]*whisperFilter), blockQueue: make(map[int]*hashQueue),
agent: miner.NewRemoteAgent(), transactionQueue: make(map[int]*hashQueue),
messages: make(map[int]*whisperFilter),
agent: miner.NewRemoteAgent(),
} }
eth.Miner().Register(xeth.agent) eth.Miner().Register(xeth.agent)
@ -87,23 +103,41 @@ done:
for { for {
select { select {
case <-timer.C: case <-timer.C:
self.logMut.Lock() self.logMu.Lock()
self.messagesMut.Lock() for id, filter := range self.logQueue {
for id, filter := range self.logs {
if time.Since(filter.timeout) > filterTickerTime { if time.Since(filter.timeout) > filterTickerTime {
self.filterManager.UninstallFilter(id) self.filterManager.UninstallFilter(id)
delete(self.logs, id) delete(self.logQueue, id)
} }
} }
self.logMu.Unlock()
self.blockMu.Lock()
for id, filter := range self.blockQueue {
if time.Since(filter.timeout) > filterTickerTime {
self.filterManager.UninstallFilter(id)
delete(self.blockQueue, id)
}
}
self.blockMu.Unlock()
self.transactionMu.Lock()
for id, filter := range self.transactionQueue {
if time.Since(filter.timeout) > filterTickerTime {
self.filterManager.UninstallFilter(id)
delete(self.transactionQueue, id)
}
}
self.transactionMu.Unlock()
self.messagesMu.Lock()
for id, filter := range self.messages { for id, filter := range self.messages {
if time.Since(filter.activity()) > filterTickerTime { if time.Since(filter.activity()) > filterTickerTime {
self.Whisper().Unwatch(id) self.Whisper().Unwatch(id)
delete(self.messages, id) delete(self.messages, id)
} }
} }
self.messagesMut.Unlock() self.messagesMu.Unlock()
self.logMut.Unlock()
case <-self.quit: case <-self.quit:
break done break done
} }
@ -201,6 +235,8 @@ func (self *XEth) EthTransactionByHash(hash string) (tx *types.Transaction, blha
data, _ := self.backend.ExtraDb().Get(common.FromHex(hash)) data, _ := self.backend.ExtraDb().Get(common.FromHex(hash))
if len(data) != 0 { if len(data) != 0 {
tx = types.NewTransactionFromBytes(data) tx = types.NewTransactionFromBytes(data)
} else { // check pending transactions
tx = self.backend.TxPool().GetTransaction(common.HexToHash(hash))
} }
// meta // meta
@ -360,7 +396,32 @@ func (self *XEth) SecretToAddress(key string) string {
return common.ToHex(pair.Address()) return common.ToHex(pair.Address())
} }
func (self *XEth) RegisterFilter(earliest, latest int64, skip, max int, address []string, topics [][]string) int { func (self *XEth) UninstallFilter(id int) bool {
defer self.filterManager.UninstallFilter(id)
if _, ok := self.logQueue[id]; ok {
self.logMu.Lock()
defer self.logMu.Unlock()
delete(self.logQueue, id)
return true
}
if _, ok := self.blockQueue[id]; ok {
self.blockMu.Lock()
defer self.blockMu.Unlock()
delete(self.blockQueue, id)
return true
}
if _, ok := self.transactionQueue[id]; ok {
self.transactionMu.Lock()
defer self.transactionMu.Unlock()
delete(self.transactionQueue, id)
return true
}
return false
}
func (self *XEth) NewLogFilter(earliest, latest int64, skip, max int, address []string, topics [][]string) int {
var id int var id int
filter := core.NewFilter(self.backend) filter := core.NewFilter(self.backend)
filter.SetEarliestBlock(earliest) filter.SetEarliestBlock(earliest)
@ -370,71 +431,90 @@ func (self *XEth) RegisterFilter(earliest, latest int64, skip, max int, address
filter.SetAddress(cAddress(address)) filter.SetAddress(cAddress(address))
filter.SetTopics(cTopics(topics)) filter.SetTopics(cTopics(topics))
filter.LogsCallback = func(logs state.Logs) { filter.LogsCallback = func(logs state.Logs) {
self.logMut.Lock() self.logMu.Lock()
defer self.logMut.Unlock() defer self.logMu.Unlock()
self.logs[id].add(logs...) self.logQueue[id].add(logs...)
} }
id = self.filterManager.InstallFilter(filter) id = self.filterManager.InstallFilter(filter)
self.logs[id] = &logFilter{timeout: time.Now()} self.logQueue[id] = &logQueue{timeout: time.Now()}
return id return id
} }
func (self *XEth) UninstallFilter(id int) bool { func (self *XEth) NewTransactionFilter() int {
if _, ok := self.logs[id]; ok {
delete(self.logs, id)
self.filterManager.UninstallFilter(id)
return true
}
return false
}
func (self *XEth) NewFilterString(word string) int {
var id int var id int
filter := core.NewFilter(self.backend) filter := core.NewFilter(self.backend)
filter.TransactionCallback = func(tx *types.Transaction) {
self.transactionMu.Lock()
defer self.transactionMu.Unlock()
switch word { self.transactionQueue[id].add(tx.Hash())
case "pending":
filter.PendingCallback = func(tx *types.Transaction) {
self.logMut.Lock()
defer self.logMut.Unlock()
self.logs[id].add(&state.Log{})
}
case "latest":
filter.BlockCallback = func(block *types.Block, logs state.Logs) {
self.logMut.Lock()
defer self.logMut.Unlock()
for _, log := range logs {
self.logs[id].add(log)
}
self.logs[id].add(&state.Log{})
}
} }
id = self.filterManager.InstallFilter(filter) id = self.filterManager.InstallFilter(filter)
self.logs[id] = &logFilter{timeout: time.Now()} self.transactionQueue[id] = &hashQueue{timeout: time.Now()}
return id return id
} }
func (self *XEth) FilterChanged(id int) state.Logs { func (self *XEth) NewBlockFilter() int {
self.logMut.Lock() var id int
defer self.logMut.Unlock() filter := core.NewFilter(self.backend)
filter.BlockCallback = func(block *types.Block, logs state.Logs) {
self.blockMu.Lock()
defer self.blockMu.Unlock()
if self.logs[id] != nil { self.blockQueue[id].add(block.Hash())
return self.logs[id].get() }
id = self.filterManager.InstallFilter(filter)
self.blockQueue[id] = &hashQueue{timeout: time.Now()}
return id
}
func (self *XEth) GetFilterType(id int) byte {
if _, ok := self.blockQueue[id]; ok {
return BlockFilterTy
} else if _, ok := self.transactionQueue[id]; ok {
return TransactionFilterTy
} else if _, ok := self.logQueue[id]; ok {
return LogFilterTy
} }
return UnknownFilterTy
}
func (self *XEth) LogFilterChanged(id int) state.Logs {
self.logMu.Lock()
defer self.logMu.Unlock()
if self.logQueue[id] != nil {
return self.logQueue[id].get()
}
return nil
}
func (self *XEth) BlockFilterChanged(id int) []common.Hash {
self.blockMu.Lock()
defer self.blockMu.Unlock()
if self.blockQueue[id] != nil {
return self.blockQueue[id].get()
}
return nil
}
func (self *XEth) TransactionFilterChanged(id int) []common.Hash {
self.blockMu.Lock()
defer self.blockMu.Unlock()
if self.transactionQueue[id] != nil {
return self.transactionQueue[id].get()
}
return nil return nil
} }
func (self *XEth) Logs(id int) state.Logs { func (self *XEth) Logs(id int) state.Logs {
self.logMut.Lock() self.logMu.Lock()
defer self.logMut.Unlock() defer self.logMu.Unlock()
filter := self.filterManager.GetFilter(id) filter := self.filterManager.GetFilter(id)
if filter != nil { if filter != nil {
@ -465,24 +545,24 @@ func (p *XEth) NewWhisperFilter(to, from string, topics [][]string) int {
// Callback to delegate core whisper messages to this xeth filter // Callback to delegate core whisper messages to this xeth filter
callback := func(msg WhisperMessage) { callback := func(msg WhisperMessage) {
p.messagesMut.RLock() // Only read lock to the filter pool p.messagesMu.RLock() // Only read lock to the filter pool
defer p.messagesMut.RUnlock() defer p.messagesMu.RUnlock()
p.messages[id].insert(msg) p.messages[id].insert(msg)
} }
// Initialize the core whisper filter and wrap into xeth // Initialize the core whisper filter and wrap into xeth
id = p.Whisper().Watch(to, from, topics, callback) id = p.Whisper().Watch(to, from, topics, callback)
p.messagesMut.Lock() p.messagesMu.Lock()
p.messages[id] = newWhisperFilter(id, p.Whisper()) p.messages[id] = newWhisperFilter(id, p.Whisper())
p.messagesMut.Unlock() p.messagesMu.Unlock()
return id return id
} }
// UninstallWhisperFilter disables and removes an existing filter. // UninstallWhisperFilter disables and removes an existing filter.
func (p *XEth) UninstallWhisperFilter(id int) bool { func (p *XEth) UninstallWhisperFilter(id int) bool {
p.messagesMut.Lock() p.messagesMu.Lock()
defer p.messagesMut.Unlock() defer p.messagesMu.Unlock()
if _, ok := p.messages[id]; ok { if _, ok := p.messages[id]; ok {
delete(p.messages, id) delete(p.messages, id)
@ -493,8 +573,8 @@ func (p *XEth) UninstallWhisperFilter(id int) bool {
// WhisperMessages retrieves all the known messages that match a specific filter. // WhisperMessages retrieves all the known messages that match a specific filter.
func (self *XEth) WhisperMessages(id int) []WhisperMessage { func (self *XEth) WhisperMessages(id int) []WhisperMessage {
self.messagesMut.RLock() self.messagesMu.RLock()
defer self.messagesMut.RUnlock() defer self.messagesMu.RUnlock()
if self.messages[id] != nil { if self.messages[id] != nil {
return self.messages[id].messages() return self.messages[id].messages()
@ -505,8 +585,8 @@ func (self *XEth) WhisperMessages(id int) []WhisperMessage {
// WhisperMessagesChanged retrieves all the new messages matched by a filter // WhisperMessagesChanged retrieves all the new messages matched by a filter
// since the last retrieval // since the last retrieval
func (self *XEth) WhisperMessagesChanged(id int) []WhisperMessage { func (self *XEth) WhisperMessagesChanged(id int) []WhisperMessage {
self.messagesMut.RLock() self.messagesMu.RLock()
defer self.messagesMut.RUnlock() defer self.messagesMu.RUnlock()
if self.messages[id] != nil { if self.messages[id] != nil {
return self.messages[id].retrieve() return self.messages[id].retrieve()
@ -767,19 +847,36 @@ func (m callmsg) Gas() *big.Int { return m.gas }
func (m callmsg) Value() *big.Int { return m.value } func (m callmsg) Value() *big.Int { return m.value }
func (m callmsg) Data() []byte { return m.data } func (m callmsg) Data() []byte { return m.data }
type logFilter struct { type logQueue struct {
logs state.Logs logs state.Logs
timeout time.Time timeout time.Time
id int id int
} }
func (l *logFilter) add(logs ...*state.Log) { func (l *logQueue) add(logs ...*state.Log) {
l.logs = append(l.logs, logs...) l.logs = append(l.logs, logs...)
} }
func (l *logFilter) get() state.Logs { func (l *logQueue) get() state.Logs {
l.timeout = time.Now() l.timeout = time.Now()
tmp := l.logs tmp := l.logs
l.logs = nil l.logs = nil
return tmp return tmp
} }
type hashQueue struct {
hashes []common.Hash
timeout time.Time
id int
}
func (l *hashQueue) add(hashes ...common.Hash) {
l.hashes = append(l.hashes, hashes...)
}
func (l *hashQueue) get() []common.Hash {
l.timeout = time.Now()
tmp := l.hashes
l.hashes = nil
return tmp
}