plugeth/eth/filters/api.go
Jeffrey Wilcke 987c1a595a eth/filters: pending logs
Pending logs are now filterable through the Go API. Filter API changed
such that each filter type has it's own bucket and adding filter
explicitly requires you specify the bucket to put it in.
2016-02-13 13:14:02 +01:00

599 lines
15 KiB
Go

// Copyright 2015 The go-ethereum Authors
// This file is part of go-ethereum.
//
// go-ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// go-ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
package filters
import (
"sync"
"time"
"crypto/rand"
"encoding/hex"
"errors"
"encoding/json"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
)
var (
filterTickerTime = 5 * time.Minute
)
// byte will be inferred
const (
unknownFilterTy = iota
blockFilterTy
transactionFilterTy
logFilterTy
)
// PublicFilterAPI offers support to create and manage filters. This will allow externa clients to retrieve various
// information related to the Ethereum protocol such als blocks, transactions and logs.
type PublicFilterAPI struct {
mux *event.TypeMux
quit chan struct{}
chainDb ethdb.Database
filterManager *FilterSystem
filterMapMu sync.RWMutex
filterMapping map[string]int // maps between filter internal filter identifiers and external filter identifiers
logMu sync.RWMutex
logQueue map[int]*logQueue
blockMu sync.RWMutex
blockQueue map[int]*hashQueue
transactionMu sync.RWMutex
transactionQueue map[int]*hashQueue
transactMu sync.Mutex
}
// NewPublicFilterAPI returns a new PublicFilterAPI instance.
func NewPublicFilterAPI(chainDb ethdb.Database, mux *event.TypeMux) *PublicFilterAPI {
svc := &PublicFilterAPI{
mux: mux,
chainDb: chainDb,
filterManager: NewFilterSystem(mux),
filterMapping: make(map[string]int),
logQueue: make(map[int]*logQueue),
blockQueue: make(map[int]*hashQueue),
transactionQueue: make(map[int]*hashQueue),
}
go svc.start()
return svc
}
// Stop quits the work loop.
func (s *PublicFilterAPI) Stop() {
close(s.quit)
}
// start the work loop, wait and process events.
func (s *PublicFilterAPI) start() {
timer := time.NewTicker(2 * time.Second)
defer timer.Stop()
done:
for {
select {
case <-timer.C:
s.logMu.Lock()
for id, filter := range s.logQueue {
if time.Since(filter.timeout) > filterTickerTime {
s.filterManager.Remove(id)
delete(s.logQueue, id)
}
}
s.logMu.Unlock()
s.blockMu.Lock()
for id, filter := range s.blockQueue {
if time.Since(filter.timeout) > filterTickerTime {
s.filterManager.Remove(id)
delete(s.blockQueue, id)
}
}
s.blockMu.Unlock()
s.transactionMu.Lock()
for id, filter := range s.transactionQueue {
if time.Since(filter.timeout) > filterTickerTime {
s.filterManager.Remove(id)
delete(s.transactionQueue, id)
}
}
s.transactionMu.Unlock()
case <-s.quit:
break done
}
}
}
// NewBlockFilter create a new filter that returns blocks that are included into the canonical chain.
func (s *PublicFilterAPI) NewBlockFilter() (string, error) {
externalId, err := newFilterId()
if err != nil {
return "", err
}
s.blockMu.Lock()
filter := New(s.chainDb)
id, err := s.filterManager.Add(filter, ChainFilter)
if err != nil {
return "", err
}
s.blockQueue[id] = &hashQueue{timeout: time.Now()}
filter.BlockCallback = func(block *types.Block, logs vm.Logs) {
s.blockMu.Lock()
defer s.blockMu.Unlock()
if queue := s.blockQueue[id]; queue != nil {
queue.add(block.Hash())
}
}
defer s.blockMu.Unlock()
s.filterMapMu.Lock()
s.filterMapping[externalId] = id
s.filterMapMu.Unlock()
return externalId, nil
}
// NewPendingTransactionFilter creates a filter that returns new pending transactions.
func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
externalId, err := newFilterId()
if err != nil {
return "", err
}
s.transactionMu.Lock()
defer s.transactionMu.Unlock()
filter := New(s.chainDb)
id, err := s.filterManager.Add(filter, PendingTxFilter)
if err != nil {
return "", err
}
s.transactionQueue[id] = &hashQueue{timeout: time.Now()}
filter.TransactionCallback = func(tx *types.Transaction) {
s.transactionMu.Lock()
defer s.transactionMu.Unlock()
if queue := s.transactionQueue[id]; queue != nil {
queue.add(tx.Hash())
}
}
s.filterMapMu.Lock()
s.filterMapping[externalId] = id
s.filterMapMu.Unlock()
return externalId, nil
}
// newLogFilter creates a new log filter.
func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash) (int, error) {
s.logMu.Lock()
defer s.logMu.Unlock()
filter := New(s.chainDb)
id, err := s.filterManager.Add(filter, LogFilter)
if err != nil {
return 0, err
}
s.logQueue[id] = &logQueue{timeout: time.Now()}
filter.SetBeginBlock(earliest)
filter.SetEndBlock(latest)
filter.SetAddresses(addresses)
filter.SetTopics(topics)
filter.LogCallback = func(log *vm.Log, removed bool) {
s.logMu.Lock()
defer s.logMu.Unlock()
if queue := s.logQueue[id]; queue != nil {
queue.add(vmlog{log, removed})
}
}
return id, nil
}
// NewFilterArgs represents a request to create a new filter.
type NewFilterArgs struct {
FromBlock rpc.BlockNumber
ToBlock rpc.BlockNumber
Addresses []common.Address
Topics [][]common.Hash
}
func (args *NewFilterArgs) UnmarshalJSON(data []byte) error {
type input struct {
From *rpc.BlockNumber `json:"fromBlock"`
ToBlock *rpc.BlockNumber `json:"toBlock"`
Addresses interface{} `json:"address"`
Topics interface{} `json:"topics"`
}
var raw input
if err := json.Unmarshal(data, &raw); err != nil {
return err
}
if raw.From == nil || raw.From.Int64() < 0 {
args.FromBlock = rpc.LatestBlockNumber
} else {
args.FromBlock = *raw.From
}
if raw.ToBlock == nil || raw.ToBlock.Int64() < 0 {
args.ToBlock = rpc.LatestBlockNumber
} else {
args.ToBlock = *raw.ToBlock
}
args.Addresses = []common.Address{}
if raw.Addresses != nil {
// raw.Address can contain a single address or an array of addresses
var addresses []common.Address
if strAddrs, ok := raw.Addresses.([]interface{}); ok {
for i, addr := range strAddrs {
if strAddr, ok := addr.(string); ok {
if len(strAddr) >= 2 && strAddr[0] == '0' && (strAddr[1] == 'x' || strAddr[1] == 'X') {
strAddr = strAddr[2:]
}
if decAddr, err := hex.DecodeString(strAddr); err == nil {
addresses = append(addresses, common.BytesToAddress(decAddr))
} else {
fmt.Errorf("invalid address given")
}
} else {
return fmt.Errorf("invalid address on index %d", i)
}
}
} else if singleAddr, ok := raw.Addresses.(string); ok {
if len(singleAddr) >= 2 && singleAddr[0] == '0' && (singleAddr[1] == 'x' || singleAddr[1] == 'X') {
singleAddr = singleAddr[2:]
}
if decAddr, err := hex.DecodeString(singleAddr); err == nil {
addresses = append(addresses, common.BytesToAddress(decAddr))
} else {
fmt.Errorf("invalid address given")
}
} else {
errors.New("invalid address(es) given")
}
args.Addresses = addresses
}
topicConverter := func(raw string) (common.Hash, error) {
if len(raw) == 0 {
return common.Hash{}, nil
}
if len(raw) >= 2 && raw[0] == '0' && (raw[1] == 'x' || raw[1] == 'X') {
raw = raw[2:]
}
if decAddr, err := hex.DecodeString(raw); err == nil {
return common.BytesToHash(decAddr), nil
}
return common.Hash{}, errors.New("invalid topic given")
}
// topics is an array consisting of strings or arrays of strings
if raw.Topics != nil {
topics, ok := raw.Topics.([]interface{})
if ok {
parsedTopics := make([][]common.Hash, len(topics))
for i, topic := range topics {
if topic == nil {
parsedTopics[i] = []common.Hash{common.StringToHash("")}
} else if strTopic, ok := topic.(string); ok {
if t, err := topicConverter(strTopic); err != nil {
return fmt.Errorf("invalid topic on index %d", i)
} else {
parsedTopics[i] = []common.Hash{t}
}
} else if arrTopic, ok := topic.([]interface{}); ok {
parsedTopics[i] = make([]common.Hash, len(arrTopic))
for j := 0; j < len(parsedTopics[i]); i++ {
if arrTopic[j] == nil {
parsedTopics[i][j] = common.StringToHash("")
} else if str, ok := arrTopic[j].(string); ok {
if t, err := topicConverter(str); err != nil {
return fmt.Errorf("invalid topic on index %d", i)
} else {
parsedTopics[i] = []common.Hash{t}
}
} else {
fmt.Errorf("topic[%d][%d] not a string", i, j)
}
}
} else {
return fmt.Errorf("topic[%d] invalid", i)
}
}
args.Topics = parsedTopics
}
}
return nil
}
// NewFilter creates a new filter and returns the filter id. It can be uses to retrieve logs.
func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) {
externalId, err := newFilterId()
if err != nil {
return "", err
}
var id int
if len(args.Addresses) > 0 {
id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics)
} else {
id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics)
}
if err != nil {
return "", err
}
s.filterMapMu.Lock()
s.filterMapping[externalId] = id
s.filterMapMu.Unlock()
return externalId, nil
}
// GetLogs returns the logs matching the given argument.
func (s *PublicFilterAPI) GetLogs(args NewFilterArgs) []vmlog {
filter := New(s.chainDb)
filter.SetBeginBlock(args.FromBlock.Int64())
filter.SetEndBlock(args.ToBlock.Int64())
filter.SetAddresses(args.Addresses)
filter.SetTopics(args.Topics)
return toRPCLogs(filter.Find(), false)
}
// UninstallFilter removes the filter with the given filter id.
func (s *PublicFilterAPI) UninstallFilter(filterId string) bool {
s.filterMapMu.Lock()
defer s.filterMapMu.Unlock()
id, ok := s.filterMapping[filterId]
if !ok {
return false
}
defer s.filterManager.Remove(id)
delete(s.filterMapping, filterId)
if _, ok := s.logQueue[id]; ok {
s.logMu.Lock()
defer s.logMu.Unlock()
delete(s.logQueue, id)
return true
}
if _, ok := s.blockQueue[id]; ok {
s.blockMu.Lock()
defer s.blockMu.Unlock()
delete(s.blockQueue, id)
return true
}
if _, ok := s.transactionQueue[id]; ok {
s.transactionMu.Lock()
defer s.transactionMu.Unlock()
delete(s.transactionQueue, id)
return true
}
return false
}
// getFilterType is a helper utility that determine the type of filter for the given filter id.
func (s *PublicFilterAPI) getFilterType(id int) byte {
if _, ok := s.blockQueue[id]; ok {
return blockFilterTy
} else if _, ok := s.transactionQueue[id]; ok {
return transactionFilterTy
} else if _, ok := s.logQueue[id]; ok {
return logFilterTy
}
return unknownFilterTy
}
// blockFilterChanged returns a collection of block hashes for the block filter with the given id.
func (s *PublicFilterAPI) blockFilterChanged(id int) []common.Hash {
s.blockMu.Lock()
defer s.blockMu.Unlock()
if s.blockQueue[id] != nil {
return s.blockQueue[id].get()
}
return nil
}
// transactionFilterChanged returns a collection of transaction hashes for the pending
// transaction filter with the given id.
func (s *PublicFilterAPI) transactionFilterChanged(id int) []common.Hash {
s.blockMu.Lock()
defer s.blockMu.Unlock()
if s.transactionQueue[id] != nil {
return s.transactionQueue[id].get()
}
return nil
}
// logFilterChanged returns a collection of logs for the log filter with the given id.
func (s *PublicFilterAPI) logFilterChanged(id int) []vmlog {
s.logMu.Lock()
defer s.logMu.Unlock()
if s.logQueue[id] != nil {
return s.logQueue[id].get()
}
return nil
}
// GetFilterLogs returns the logs for the filter with the given id.
func (s *PublicFilterAPI) GetFilterLogs(filterId string) []vmlog {
id, ok := s.filterMapping[filterId]
if !ok {
return toRPCLogs(nil, false)
}
if filter := s.filterManager.Get(id); filter != nil {
return toRPCLogs(filter.Find(), false)
}
return toRPCLogs(nil, false)
}
// GetFilterChanges returns the logs for the filter with the given id since last time is was called.
// This can be used for polling.
func (s *PublicFilterAPI) GetFilterChanges(filterId string) interface{} {
s.filterMapMu.Lock()
id, ok := s.filterMapping[filterId]
s.filterMapMu.Unlock()
if !ok { // filter not found
return []interface{}{}
}
switch s.getFilterType(id) {
case blockFilterTy:
return returnHashes(s.blockFilterChanged(id))
case transactionFilterTy:
return returnHashes(s.transactionFilterChanged(id))
case logFilterTy:
return s.logFilterChanged(id)
}
return []interface{}{}
}
type vmlog struct {
*vm.Log
Removed bool `json:"removed"`
}
type logQueue struct {
mu sync.Mutex
logs []vmlog
timeout time.Time
id int
}
func (l *logQueue) add(logs ...vmlog) {
l.mu.Lock()
defer l.mu.Unlock()
l.logs = append(l.logs, logs...)
}
func (l *logQueue) get() []vmlog {
l.mu.Lock()
defer l.mu.Unlock()
l.timeout = time.Now()
tmp := l.logs
l.logs = nil
return tmp
}
type hashQueue struct {
mu sync.Mutex
hashes []common.Hash
timeout time.Time
id int
}
func (l *hashQueue) add(hashes ...common.Hash) {
l.mu.Lock()
defer l.mu.Unlock()
l.hashes = append(l.hashes, hashes...)
}
func (l *hashQueue) get() []common.Hash {
l.mu.Lock()
defer l.mu.Unlock()
l.timeout = time.Now()
tmp := l.hashes
l.hashes = nil
return tmp
}
// newFilterId generates a new random filter identifier that can be exposed to the outer world. By publishing random
// identifiers it is not feasible for DApp's to guess filter id's for other DApp's and uninstall or poll for them
// causing the affected DApp to miss data.
func newFilterId() (string, error) {
var subid [16]byte
n, _ := rand.Read(subid[:])
if n != 16 {
return "", errors.New("Unable to generate filter id")
}
return "0x" + hex.EncodeToString(subid[:]), nil
}
// toRPCLogs is a helper that will convert a vm.Logs array to an structure which
// can hold additional information about the logs such as whether it was deleted.
// Additionally when nil is given it will by default instead create an empty slice
// instead. This is required by the RPC specification.
func toRPCLogs(logs vm.Logs, removed bool) []vmlog {
convertedLogs := make([]vmlog, len(logs))
for i, log := range logs {
convertedLogs[i] = vmlog{Log: log, Removed: removed}
}
return convertedLogs
}
// returnHashes is a helper that will return an empty hash array case the given hash array is nil, otherwise is will
// return the given hashes. The RPC interfaces defines that always an array is returned.
func returnHashes(hashes []common.Hash) []common.Hash {
if hashes == nil {
return []common.Hash{}
}
return hashes
}