987c1a595a
Pending logs are now filterable through the Go API. Filter API changed such that each filter type has it's own bucket and adding filter explicitly requires you specify the bucket to put it in.
599 lines
15 KiB
Go
599 lines
15 KiB
Go
// Copyright 2015 The go-ethereum Authors
|
|
// This file is part of go-ethereum.
|
|
//
|
|
// go-ethereum is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// go-ethereum is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU General Public License
|
|
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package filters
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
"crypto/rand"
|
|
"encoding/hex"
|
|
"errors"
|
|
|
|
"encoding/json"
|
|
"fmt"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/core/types"
|
|
"github.com/ethereum/go-ethereum/core/vm"
|
|
"github.com/ethereum/go-ethereum/ethdb"
|
|
"github.com/ethereum/go-ethereum/event"
|
|
"github.com/ethereum/go-ethereum/rpc"
|
|
)
|
|
|
|
var (
|
|
filterTickerTime = 5 * time.Minute
|
|
)
|
|
|
|
// byte will be inferred
|
|
const (
|
|
unknownFilterTy = iota
|
|
blockFilterTy
|
|
transactionFilterTy
|
|
logFilterTy
|
|
)
|
|
|
|
// PublicFilterAPI offers support to create and manage filters. This will allow externa clients to retrieve various
|
|
// information related to the Ethereum protocol such als blocks, transactions and logs.
|
|
type PublicFilterAPI struct {
|
|
mux *event.TypeMux
|
|
|
|
quit chan struct{}
|
|
chainDb ethdb.Database
|
|
|
|
filterManager *FilterSystem
|
|
|
|
filterMapMu sync.RWMutex
|
|
filterMapping map[string]int // maps between filter internal filter identifiers and external filter identifiers
|
|
|
|
logMu sync.RWMutex
|
|
logQueue map[int]*logQueue
|
|
|
|
blockMu sync.RWMutex
|
|
blockQueue map[int]*hashQueue
|
|
|
|
transactionMu sync.RWMutex
|
|
transactionQueue map[int]*hashQueue
|
|
|
|
transactMu sync.Mutex
|
|
}
|
|
|
|
// NewPublicFilterAPI returns a new PublicFilterAPI instance.
|
|
func NewPublicFilterAPI(chainDb ethdb.Database, mux *event.TypeMux) *PublicFilterAPI {
|
|
svc := &PublicFilterAPI{
|
|
mux: mux,
|
|
chainDb: chainDb,
|
|
filterManager: NewFilterSystem(mux),
|
|
filterMapping: make(map[string]int),
|
|
logQueue: make(map[int]*logQueue),
|
|
blockQueue: make(map[int]*hashQueue),
|
|
transactionQueue: make(map[int]*hashQueue),
|
|
}
|
|
go svc.start()
|
|
return svc
|
|
}
|
|
|
|
// Stop quits the work loop.
|
|
func (s *PublicFilterAPI) Stop() {
|
|
close(s.quit)
|
|
}
|
|
|
|
// start the work loop, wait and process events.
|
|
func (s *PublicFilterAPI) start() {
|
|
timer := time.NewTicker(2 * time.Second)
|
|
defer timer.Stop()
|
|
done:
|
|
for {
|
|
select {
|
|
case <-timer.C:
|
|
s.logMu.Lock()
|
|
for id, filter := range s.logQueue {
|
|
if time.Since(filter.timeout) > filterTickerTime {
|
|
s.filterManager.Remove(id)
|
|
delete(s.logQueue, id)
|
|
}
|
|
}
|
|
s.logMu.Unlock()
|
|
|
|
s.blockMu.Lock()
|
|
for id, filter := range s.blockQueue {
|
|
if time.Since(filter.timeout) > filterTickerTime {
|
|
s.filterManager.Remove(id)
|
|
delete(s.blockQueue, id)
|
|
}
|
|
}
|
|
s.blockMu.Unlock()
|
|
|
|
s.transactionMu.Lock()
|
|
for id, filter := range s.transactionQueue {
|
|
if time.Since(filter.timeout) > filterTickerTime {
|
|
s.filterManager.Remove(id)
|
|
delete(s.transactionQueue, id)
|
|
}
|
|
}
|
|
s.transactionMu.Unlock()
|
|
case <-s.quit:
|
|
break done
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
// NewBlockFilter create a new filter that returns blocks that are included into the canonical chain.
|
|
func (s *PublicFilterAPI) NewBlockFilter() (string, error) {
|
|
externalId, err := newFilterId()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
s.blockMu.Lock()
|
|
filter := New(s.chainDb)
|
|
id, err := s.filterManager.Add(filter, ChainFilter)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
s.blockQueue[id] = &hashQueue{timeout: time.Now()}
|
|
|
|
filter.BlockCallback = func(block *types.Block, logs vm.Logs) {
|
|
s.blockMu.Lock()
|
|
defer s.blockMu.Unlock()
|
|
|
|
if queue := s.blockQueue[id]; queue != nil {
|
|
queue.add(block.Hash())
|
|
}
|
|
}
|
|
|
|
defer s.blockMu.Unlock()
|
|
|
|
s.filterMapMu.Lock()
|
|
s.filterMapping[externalId] = id
|
|
s.filterMapMu.Unlock()
|
|
|
|
return externalId, nil
|
|
}
|
|
|
|
// NewPendingTransactionFilter creates a filter that returns new pending transactions.
|
|
func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
|
|
externalId, err := newFilterId()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
s.transactionMu.Lock()
|
|
defer s.transactionMu.Unlock()
|
|
|
|
filter := New(s.chainDb)
|
|
id, err := s.filterManager.Add(filter, PendingTxFilter)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
s.transactionQueue[id] = &hashQueue{timeout: time.Now()}
|
|
|
|
filter.TransactionCallback = func(tx *types.Transaction) {
|
|
s.transactionMu.Lock()
|
|
defer s.transactionMu.Unlock()
|
|
|
|
if queue := s.transactionQueue[id]; queue != nil {
|
|
queue.add(tx.Hash())
|
|
}
|
|
}
|
|
|
|
s.filterMapMu.Lock()
|
|
s.filterMapping[externalId] = id
|
|
s.filterMapMu.Unlock()
|
|
|
|
return externalId, nil
|
|
}
|
|
|
|
// newLogFilter creates a new log filter.
|
|
func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash) (int, error) {
|
|
s.logMu.Lock()
|
|
defer s.logMu.Unlock()
|
|
|
|
filter := New(s.chainDb)
|
|
id, err := s.filterManager.Add(filter, LogFilter)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
s.logQueue[id] = &logQueue{timeout: time.Now()}
|
|
|
|
filter.SetBeginBlock(earliest)
|
|
filter.SetEndBlock(latest)
|
|
filter.SetAddresses(addresses)
|
|
filter.SetTopics(topics)
|
|
filter.LogCallback = func(log *vm.Log, removed bool) {
|
|
s.logMu.Lock()
|
|
defer s.logMu.Unlock()
|
|
|
|
if queue := s.logQueue[id]; queue != nil {
|
|
queue.add(vmlog{log, removed})
|
|
}
|
|
}
|
|
|
|
return id, nil
|
|
}
|
|
|
|
// NewFilterArgs represents a request to create a new filter.
|
|
type NewFilterArgs struct {
|
|
FromBlock rpc.BlockNumber
|
|
ToBlock rpc.BlockNumber
|
|
Addresses []common.Address
|
|
Topics [][]common.Hash
|
|
}
|
|
|
|
func (args *NewFilterArgs) UnmarshalJSON(data []byte) error {
|
|
type input struct {
|
|
From *rpc.BlockNumber `json:"fromBlock"`
|
|
ToBlock *rpc.BlockNumber `json:"toBlock"`
|
|
Addresses interface{} `json:"address"`
|
|
Topics interface{} `json:"topics"`
|
|
}
|
|
|
|
var raw input
|
|
if err := json.Unmarshal(data, &raw); err != nil {
|
|
return err
|
|
}
|
|
|
|
if raw.From == nil || raw.From.Int64() < 0 {
|
|
args.FromBlock = rpc.LatestBlockNumber
|
|
} else {
|
|
args.FromBlock = *raw.From
|
|
}
|
|
|
|
if raw.ToBlock == nil || raw.ToBlock.Int64() < 0 {
|
|
args.ToBlock = rpc.LatestBlockNumber
|
|
} else {
|
|
args.ToBlock = *raw.ToBlock
|
|
}
|
|
|
|
args.Addresses = []common.Address{}
|
|
|
|
if raw.Addresses != nil {
|
|
// raw.Address can contain a single address or an array of addresses
|
|
var addresses []common.Address
|
|
|
|
if strAddrs, ok := raw.Addresses.([]interface{}); ok {
|
|
for i, addr := range strAddrs {
|
|
if strAddr, ok := addr.(string); ok {
|
|
if len(strAddr) >= 2 && strAddr[0] == '0' && (strAddr[1] == 'x' || strAddr[1] == 'X') {
|
|
strAddr = strAddr[2:]
|
|
}
|
|
if decAddr, err := hex.DecodeString(strAddr); err == nil {
|
|
addresses = append(addresses, common.BytesToAddress(decAddr))
|
|
} else {
|
|
fmt.Errorf("invalid address given")
|
|
}
|
|
} else {
|
|
return fmt.Errorf("invalid address on index %d", i)
|
|
}
|
|
}
|
|
} else if singleAddr, ok := raw.Addresses.(string); ok {
|
|
if len(singleAddr) >= 2 && singleAddr[0] == '0' && (singleAddr[1] == 'x' || singleAddr[1] == 'X') {
|
|
singleAddr = singleAddr[2:]
|
|
}
|
|
if decAddr, err := hex.DecodeString(singleAddr); err == nil {
|
|
addresses = append(addresses, common.BytesToAddress(decAddr))
|
|
} else {
|
|
fmt.Errorf("invalid address given")
|
|
}
|
|
} else {
|
|
errors.New("invalid address(es) given")
|
|
}
|
|
args.Addresses = addresses
|
|
}
|
|
|
|
topicConverter := func(raw string) (common.Hash, error) {
|
|
if len(raw) == 0 {
|
|
return common.Hash{}, nil
|
|
}
|
|
|
|
if len(raw) >= 2 && raw[0] == '0' && (raw[1] == 'x' || raw[1] == 'X') {
|
|
raw = raw[2:]
|
|
}
|
|
|
|
if decAddr, err := hex.DecodeString(raw); err == nil {
|
|
return common.BytesToHash(decAddr), nil
|
|
}
|
|
|
|
return common.Hash{}, errors.New("invalid topic given")
|
|
}
|
|
|
|
// topics is an array consisting of strings or arrays of strings
|
|
if raw.Topics != nil {
|
|
topics, ok := raw.Topics.([]interface{})
|
|
if ok {
|
|
parsedTopics := make([][]common.Hash, len(topics))
|
|
for i, topic := range topics {
|
|
if topic == nil {
|
|
parsedTopics[i] = []common.Hash{common.StringToHash("")}
|
|
} else if strTopic, ok := topic.(string); ok {
|
|
if t, err := topicConverter(strTopic); err != nil {
|
|
return fmt.Errorf("invalid topic on index %d", i)
|
|
} else {
|
|
parsedTopics[i] = []common.Hash{t}
|
|
}
|
|
} else if arrTopic, ok := topic.([]interface{}); ok {
|
|
parsedTopics[i] = make([]common.Hash, len(arrTopic))
|
|
for j := 0; j < len(parsedTopics[i]); i++ {
|
|
if arrTopic[j] == nil {
|
|
parsedTopics[i][j] = common.StringToHash("")
|
|
} else if str, ok := arrTopic[j].(string); ok {
|
|
if t, err := topicConverter(str); err != nil {
|
|
return fmt.Errorf("invalid topic on index %d", i)
|
|
} else {
|
|
parsedTopics[i] = []common.Hash{t}
|
|
}
|
|
} else {
|
|
fmt.Errorf("topic[%d][%d] not a string", i, j)
|
|
}
|
|
}
|
|
} else {
|
|
return fmt.Errorf("topic[%d] invalid", i)
|
|
}
|
|
}
|
|
args.Topics = parsedTopics
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// NewFilter creates a new filter and returns the filter id. It can be uses to retrieve logs.
|
|
func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) {
|
|
externalId, err := newFilterId()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
var id int
|
|
if len(args.Addresses) > 0 {
|
|
id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics)
|
|
} else {
|
|
id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics)
|
|
}
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
s.filterMapMu.Lock()
|
|
s.filterMapping[externalId] = id
|
|
s.filterMapMu.Unlock()
|
|
|
|
return externalId, nil
|
|
}
|
|
|
|
// GetLogs returns the logs matching the given argument.
|
|
func (s *PublicFilterAPI) GetLogs(args NewFilterArgs) []vmlog {
|
|
filter := New(s.chainDb)
|
|
filter.SetBeginBlock(args.FromBlock.Int64())
|
|
filter.SetEndBlock(args.ToBlock.Int64())
|
|
filter.SetAddresses(args.Addresses)
|
|
filter.SetTopics(args.Topics)
|
|
|
|
return toRPCLogs(filter.Find(), false)
|
|
}
|
|
|
|
// UninstallFilter removes the filter with the given filter id.
|
|
func (s *PublicFilterAPI) UninstallFilter(filterId string) bool {
|
|
s.filterMapMu.Lock()
|
|
defer s.filterMapMu.Unlock()
|
|
|
|
id, ok := s.filterMapping[filterId]
|
|
if !ok {
|
|
return false
|
|
}
|
|
|
|
defer s.filterManager.Remove(id)
|
|
delete(s.filterMapping, filterId)
|
|
|
|
if _, ok := s.logQueue[id]; ok {
|
|
s.logMu.Lock()
|
|
defer s.logMu.Unlock()
|
|
delete(s.logQueue, id)
|
|
return true
|
|
}
|
|
if _, ok := s.blockQueue[id]; ok {
|
|
s.blockMu.Lock()
|
|
defer s.blockMu.Unlock()
|
|
delete(s.blockQueue, id)
|
|
return true
|
|
}
|
|
if _, ok := s.transactionQueue[id]; ok {
|
|
s.transactionMu.Lock()
|
|
defer s.transactionMu.Unlock()
|
|
delete(s.transactionQueue, id)
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// getFilterType is a helper utility that determine the type of filter for the given filter id.
|
|
func (s *PublicFilterAPI) getFilterType(id int) byte {
|
|
if _, ok := s.blockQueue[id]; ok {
|
|
return blockFilterTy
|
|
} else if _, ok := s.transactionQueue[id]; ok {
|
|
return transactionFilterTy
|
|
} else if _, ok := s.logQueue[id]; ok {
|
|
return logFilterTy
|
|
}
|
|
|
|
return unknownFilterTy
|
|
}
|
|
|
|
// blockFilterChanged returns a collection of block hashes for the block filter with the given id.
|
|
func (s *PublicFilterAPI) blockFilterChanged(id int) []common.Hash {
|
|
s.blockMu.Lock()
|
|
defer s.blockMu.Unlock()
|
|
|
|
if s.blockQueue[id] != nil {
|
|
return s.blockQueue[id].get()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// transactionFilterChanged returns a collection of transaction hashes for the pending
|
|
// transaction filter with the given id.
|
|
func (s *PublicFilterAPI) transactionFilterChanged(id int) []common.Hash {
|
|
s.blockMu.Lock()
|
|
defer s.blockMu.Unlock()
|
|
|
|
if s.transactionQueue[id] != nil {
|
|
return s.transactionQueue[id].get()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// logFilterChanged returns a collection of logs for the log filter with the given id.
|
|
func (s *PublicFilterAPI) logFilterChanged(id int) []vmlog {
|
|
s.logMu.Lock()
|
|
defer s.logMu.Unlock()
|
|
|
|
if s.logQueue[id] != nil {
|
|
return s.logQueue[id].get()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetFilterLogs returns the logs for the filter with the given id.
|
|
func (s *PublicFilterAPI) GetFilterLogs(filterId string) []vmlog {
|
|
id, ok := s.filterMapping[filterId]
|
|
if !ok {
|
|
return toRPCLogs(nil, false)
|
|
}
|
|
|
|
if filter := s.filterManager.Get(id); filter != nil {
|
|
return toRPCLogs(filter.Find(), false)
|
|
}
|
|
|
|
return toRPCLogs(nil, false)
|
|
}
|
|
|
|
// GetFilterChanges returns the logs for the filter with the given id since last time is was called.
|
|
// This can be used for polling.
|
|
func (s *PublicFilterAPI) GetFilterChanges(filterId string) interface{} {
|
|
s.filterMapMu.Lock()
|
|
id, ok := s.filterMapping[filterId]
|
|
s.filterMapMu.Unlock()
|
|
|
|
if !ok { // filter not found
|
|
return []interface{}{}
|
|
}
|
|
|
|
switch s.getFilterType(id) {
|
|
case blockFilterTy:
|
|
return returnHashes(s.blockFilterChanged(id))
|
|
case transactionFilterTy:
|
|
return returnHashes(s.transactionFilterChanged(id))
|
|
case logFilterTy:
|
|
return s.logFilterChanged(id)
|
|
}
|
|
|
|
return []interface{}{}
|
|
}
|
|
|
|
type vmlog struct {
|
|
*vm.Log
|
|
Removed bool `json:"removed"`
|
|
}
|
|
|
|
type logQueue struct {
|
|
mu sync.Mutex
|
|
|
|
logs []vmlog
|
|
timeout time.Time
|
|
id int
|
|
}
|
|
|
|
func (l *logQueue) add(logs ...vmlog) {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
l.logs = append(l.logs, logs...)
|
|
}
|
|
|
|
func (l *logQueue) get() []vmlog {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
l.timeout = time.Now()
|
|
tmp := l.logs
|
|
l.logs = nil
|
|
return tmp
|
|
}
|
|
|
|
type hashQueue struct {
|
|
mu sync.Mutex
|
|
|
|
hashes []common.Hash
|
|
timeout time.Time
|
|
id int
|
|
}
|
|
|
|
func (l *hashQueue) add(hashes ...common.Hash) {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
l.hashes = append(l.hashes, hashes...)
|
|
}
|
|
|
|
func (l *hashQueue) get() []common.Hash {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
l.timeout = time.Now()
|
|
tmp := l.hashes
|
|
l.hashes = nil
|
|
return tmp
|
|
}
|
|
|
|
// newFilterId generates a new random filter identifier that can be exposed to the outer world. By publishing random
|
|
// identifiers it is not feasible for DApp's to guess filter id's for other DApp's and uninstall or poll for them
|
|
// causing the affected DApp to miss data.
|
|
func newFilterId() (string, error) {
|
|
var subid [16]byte
|
|
n, _ := rand.Read(subid[:])
|
|
if n != 16 {
|
|
return "", errors.New("Unable to generate filter id")
|
|
}
|
|
return "0x" + hex.EncodeToString(subid[:]), nil
|
|
}
|
|
|
|
// toRPCLogs is a helper that will convert a vm.Logs array to an structure which
|
|
// can hold additional information about the logs such as whether it was deleted.
|
|
// Additionally when nil is given it will by default instead create an empty slice
|
|
// instead. This is required by the RPC specification.
|
|
func toRPCLogs(logs vm.Logs, removed bool) []vmlog {
|
|
convertedLogs := make([]vmlog, len(logs))
|
|
for i, log := range logs {
|
|
convertedLogs[i] = vmlog{Log: log, Removed: removed}
|
|
}
|
|
return convertedLogs
|
|
}
|
|
|
|
// returnHashes is a helper that will return an empty hash array case the given hash array is nil, otherwise is will
|
|
// return the given hashes. The RPC interfaces defines that always an array is returned.
|
|
func returnHashes(hashes []common.Hash) []common.Hash {
|
|
if hashes == nil {
|
|
return []common.Hash{}
|
|
}
|
|
return hashes
|
|
}
|