652 lines
16 KiB
Go
652 lines
16 KiB
Go
// Copyright 2015 The go-ethereum Authors
|
|
// This file is part of go-ethereum.
|
|
//
|
|
// go-ethereum is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// go-ethereum is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU General Public License
|
|
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package filters
|
|
|
|
import (
|
|
"crypto/rand"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/core/types"
|
|
"github.com/ethereum/go-ethereum/core/vm"
|
|
"github.com/ethereum/go-ethereum/ethdb"
|
|
"github.com/ethereum/go-ethereum/event"
|
|
"github.com/ethereum/go-ethereum/rpc"
|
|
|
|
"golang.org/x/net/context"
|
|
)
|
|
|
|
var (
|
|
filterTickerTime = 5 * time.Minute
|
|
)
|
|
|
|
// byte will be inferred
|
|
const (
|
|
unknownFilterTy = iota
|
|
blockFilterTy
|
|
transactionFilterTy
|
|
logFilterTy
|
|
)
|
|
|
|
// PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
|
|
// information related to the Ethereum protocol such als blocks, transactions and logs.
|
|
type PublicFilterAPI struct {
|
|
mux *event.TypeMux
|
|
|
|
quit chan struct{}
|
|
chainDb ethdb.Database
|
|
|
|
filterManager *FilterSystem
|
|
|
|
filterMapMu sync.RWMutex
|
|
filterMapping map[string]int // maps between filter internal filter identifiers and external filter identifiers
|
|
|
|
logMu sync.RWMutex
|
|
logQueue map[int]*logQueue
|
|
|
|
blockMu sync.RWMutex
|
|
blockQueue map[int]*hashQueue
|
|
|
|
transactionMu sync.RWMutex
|
|
transactionQueue map[int]*hashQueue
|
|
|
|
transactMu sync.Mutex
|
|
}
|
|
|
|
// NewPublicFilterAPI returns a new PublicFilterAPI instance.
|
|
func NewPublicFilterAPI(chainDb ethdb.Database, mux *event.TypeMux) *PublicFilterAPI {
|
|
svc := &PublicFilterAPI{
|
|
mux: mux,
|
|
chainDb: chainDb,
|
|
filterManager: NewFilterSystem(mux),
|
|
filterMapping: make(map[string]int),
|
|
logQueue: make(map[int]*logQueue),
|
|
blockQueue: make(map[int]*hashQueue),
|
|
transactionQueue: make(map[int]*hashQueue),
|
|
}
|
|
go svc.start()
|
|
return svc
|
|
}
|
|
|
|
// Stop quits the work loop.
|
|
func (s *PublicFilterAPI) Stop() {
|
|
close(s.quit)
|
|
}
|
|
|
|
// start the work loop, wait and process events.
|
|
func (s *PublicFilterAPI) start() {
|
|
timer := time.NewTicker(2 * time.Second)
|
|
defer timer.Stop()
|
|
done:
|
|
for {
|
|
select {
|
|
case <-timer.C:
|
|
s.logMu.Lock()
|
|
for id, filter := range s.logQueue {
|
|
if time.Since(filter.timeout) > filterTickerTime {
|
|
s.filterManager.Remove(id)
|
|
delete(s.logQueue, id)
|
|
}
|
|
}
|
|
s.logMu.Unlock()
|
|
|
|
s.blockMu.Lock()
|
|
for id, filter := range s.blockQueue {
|
|
if time.Since(filter.timeout) > filterTickerTime {
|
|
s.filterManager.Remove(id)
|
|
delete(s.blockQueue, id)
|
|
}
|
|
}
|
|
s.blockMu.Unlock()
|
|
|
|
s.transactionMu.Lock()
|
|
for id, filter := range s.transactionQueue {
|
|
if time.Since(filter.timeout) > filterTickerTime {
|
|
s.filterManager.Remove(id)
|
|
delete(s.transactionQueue, id)
|
|
}
|
|
}
|
|
s.transactionMu.Unlock()
|
|
case <-s.quit:
|
|
break done
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
// NewBlockFilter create a new filter that returns blocks that are included into the canonical chain.
|
|
func (s *PublicFilterAPI) NewBlockFilter() (string, error) {
|
|
externalId, err := newFilterId()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
s.blockMu.Lock()
|
|
filter := New(s.chainDb)
|
|
id, err := s.filterManager.Add(filter, ChainFilter)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
s.blockQueue[id] = &hashQueue{timeout: time.Now()}
|
|
|
|
filter.BlockCallback = func(block *types.Block, logs vm.Logs) {
|
|
s.blockMu.Lock()
|
|
defer s.blockMu.Unlock()
|
|
|
|
if queue := s.blockQueue[id]; queue != nil {
|
|
queue.add(block.Hash())
|
|
}
|
|
}
|
|
|
|
defer s.blockMu.Unlock()
|
|
|
|
s.filterMapMu.Lock()
|
|
s.filterMapping[externalId] = id
|
|
s.filterMapMu.Unlock()
|
|
|
|
return externalId, nil
|
|
}
|
|
|
|
// NewPendingTransactionFilter creates a filter that returns new pending transactions.
|
|
func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
|
|
externalId, err := newFilterId()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
s.transactionMu.Lock()
|
|
defer s.transactionMu.Unlock()
|
|
|
|
filter := New(s.chainDb)
|
|
id, err := s.filterManager.Add(filter, PendingTxFilter)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
s.transactionQueue[id] = &hashQueue{timeout: time.Now()}
|
|
|
|
filter.TransactionCallback = func(tx *types.Transaction) {
|
|
s.transactionMu.Lock()
|
|
defer s.transactionMu.Unlock()
|
|
|
|
if queue := s.transactionQueue[id]; queue != nil {
|
|
queue.add(tx.Hash())
|
|
}
|
|
}
|
|
|
|
s.filterMapMu.Lock()
|
|
s.filterMapping[externalId] = id
|
|
s.filterMapMu.Unlock()
|
|
|
|
return externalId, nil
|
|
}
|
|
|
|
// newLogFilter creates a new log filter.
|
|
func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash, callback func(log *vm.Log, removed bool)) (int, error) {
|
|
s.logMu.Lock()
|
|
defer s.logMu.Unlock()
|
|
|
|
filter := New(s.chainDb)
|
|
id, err := s.filterManager.Add(filter, LogFilter)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
s.logQueue[id] = &logQueue{timeout: time.Now()}
|
|
|
|
filter.SetBeginBlock(earliest)
|
|
filter.SetEndBlock(latest)
|
|
filter.SetAddresses(addresses)
|
|
filter.SetTopics(topics)
|
|
filter.LogCallback = func(log *vm.Log, removed bool) {
|
|
if callback != nil {
|
|
callback(log, removed)
|
|
} else {
|
|
s.logMu.Lock()
|
|
defer s.logMu.Unlock()
|
|
if queue := s.logQueue[id]; queue != nil {
|
|
queue.add(vmlog{log, removed})
|
|
}
|
|
}
|
|
}
|
|
|
|
return id, nil
|
|
}
|
|
|
|
func (s *PublicFilterAPI) Logs(ctx context.Context, args NewFilterArgs) (rpc.Subscription, error) {
|
|
notifier, supported := ctx.Value(rpc.NotifierContextKey).(rpc.Notifier)
|
|
if !supported {
|
|
return nil, rpc.ErrNotificationsUnsupported
|
|
}
|
|
|
|
var (
|
|
externalId string
|
|
subscription rpc.Subscription
|
|
err error
|
|
)
|
|
|
|
if externalId, err = newFilterId(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// uninstall filter when subscription is unsubscribed/cancelled
|
|
if subscription, err = notifier.NewSubscription(func(string) {
|
|
s.UninstallFilter(externalId)
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
notifySubscriber := func(log *vm.Log, removed bool) {
|
|
rpcLog := toRPCLogs(vm.Logs{log}, removed)
|
|
if err := subscription.Notify(rpcLog); err != nil {
|
|
subscription.Cancel()
|
|
}
|
|
}
|
|
|
|
// from and to block number are not used since subscriptions don't allow you to travel to "time"
|
|
var id int
|
|
if len(args.Addresses) > 0 {
|
|
id, err = s.newLogFilter(-1, -1, args.Addresses, args.Topics, notifySubscriber)
|
|
} else {
|
|
id, err = s.newLogFilter(-1, -1, nil, args.Topics, notifySubscriber)
|
|
}
|
|
|
|
if err != nil {
|
|
subscription.Cancel()
|
|
return nil, err
|
|
}
|
|
|
|
s.filterMapMu.Lock()
|
|
s.filterMapping[externalId] = id
|
|
s.filterMapMu.Unlock()
|
|
|
|
return subscription, err
|
|
}
|
|
|
|
// NewFilterArgs represents a request to create a new filter.
|
|
type NewFilterArgs struct {
|
|
FromBlock rpc.BlockNumber
|
|
ToBlock rpc.BlockNumber
|
|
Addresses []common.Address
|
|
Topics [][]common.Hash
|
|
}
|
|
|
|
func (args *NewFilterArgs) UnmarshalJSON(data []byte) error {
|
|
type input struct {
|
|
From *rpc.BlockNumber `json:"fromBlock"`
|
|
ToBlock *rpc.BlockNumber `json:"toBlock"`
|
|
Addresses interface{} `json:"address"`
|
|
Topics interface{} `json:"topics"`
|
|
}
|
|
|
|
var raw input
|
|
if err := json.Unmarshal(data, &raw); err != nil {
|
|
return err
|
|
}
|
|
|
|
if raw.From == nil || raw.From.Int64() < 0 {
|
|
args.FromBlock = rpc.LatestBlockNumber
|
|
} else {
|
|
args.FromBlock = *raw.From
|
|
}
|
|
|
|
if raw.ToBlock == nil || raw.ToBlock.Int64() < 0 {
|
|
args.ToBlock = rpc.LatestBlockNumber
|
|
} else {
|
|
args.ToBlock = *raw.ToBlock
|
|
}
|
|
|
|
args.Addresses = []common.Address{}
|
|
|
|
if raw.Addresses != nil {
|
|
// raw.Address can contain a single address or an array of addresses
|
|
var addresses []common.Address
|
|
|
|
if strAddrs, ok := raw.Addresses.([]interface{}); ok {
|
|
for i, addr := range strAddrs {
|
|
if strAddr, ok := addr.(string); ok {
|
|
if len(strAddr) >= 2 && strAddr[0] == '0' && (strAddr[1] == 'x' || strAddr[1] == 'X') {
|
|
strAddr = strAddr[2:]
|
|
}
|
|
if decAddr, err := hex.DecodeString(strAddr); err == nil {
|
|
addresses = append(addresses, common.BytesToAddress(decAddr))
|
|
} else {
|
|
fmt.Errorf("invalid address given")
|
|
}
|
|
} else {
|
|
return fmt.Errorf("invalid address on index %d", i)
|
|
}
|
|
}
|
|
} else if singleAddr, ok := raw.Addresses.(string); ok {
|
|
if len(singleAddr) >= 2 && singleAddr[0] == '0' && (singleAddr[1] == 'x' || singleAddr[1] == 'X') {
|
|
singleAddr = singleAddr[2:]
|
|
}
|
|
if decAddr, err := hex.DecodeString(singleAddr); err == nil {
|
|
addresses = append(addresses, common.BytesToAddress(decAddr))
|
|
} else {
|
|
fmt.Errorf("invalid address given")
|
|
}
|
|
} else {
|
|
errors.New("invalid address(es) given")
|
|
}
|
|
args.Addresses = addresses
|
|
}
|
|
|
|
topicConverter := func(raw string) (common.Hash, error) {
|
|
if len(raw) == 0 {
|
|
return common.Hash{}, nil
|
|
}
|
|
|
|
if len(raw) >= 2 && raw[0] == '0' && (raw[1] == 'x' || raw[1] == 'X') {
|
|
raw = raw[2:]
|
|
}
|
|
|
|
if decAddr, err := hex.DecodeString(raw); err == nil {
|
|
return common.BytesToHash(decAddr), nil
|
|
}
|
|
|
|
return common.Hash{}, errors.New("invalid topic given")
|
|
}
|
|
|
|
// topics is an array consisting of strings or arrays of strings
|
|
if raw.Topics != nil {
|
|
topics, ok := raw.Topics.([]interface{})
|
|
if ok {
|
|
parsedTopics := make([][]common.Hash, len(topics))
|
|
for i, topic := range topics {
|
|
if topic == nil {
|
|
parsedTopics[i] = []common.Hash{common.StringToHash("")}
|
|
} else if strTopic, ok := topic.(string); ok {
|
|
if t, err := topicConverter(strTopic); err != nil {
|
|
return fmt.Errorf("invalid topic on index %d", i)
|
|
} else {
|
|
parsedTopics[i] = []common.Hash{t}
|
|
}
|
|
} else if arrTopic, ok := topic.([]interface{}); ok {
|
|
parsedTopics[i] = make([]common.Hash, len(arrTopic))
|
|
for j := 0; j < len(parsedTopics[i]); i++ {
|
|
if arrTopic[j] == nil {
|
|
parsedTopics[i][j] = common.StringToHash("")
|
|
} else if str, ok := arrTopic[j].(string); ok {
|
|
if t, err := topicConverter(str); err != nil {
|
|
return fmt.Errorf("invalid topic on index %d", i)
|
|
} else {
|
|
parsedTopics[i] = []common.Hash{t}
|
|
}
|
|
} else {
|
|
fmt.Errorf("topic[%d][%d] not a string", i, j)
|
|
}
|
|
}
|
|
} else {
|
|
return fmt.Errorf("topic[%d] invalid", i)
|
|
}
|
|
}
|
|
args.Topics = parsedTopics
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// NewFilter creates a new filter and returns the filter id. It can be uses to retrieve logs.
|
|
func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) {
|
|
externalId, err := newFilterId()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
var id int
|
|
if len(args.Addresses) > 0 {
|
|
id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics, nil)
|
|
} else {
|
|
id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics, nil)
|
|
}
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
s.filterMapMu.Lock()
|
|
s.filterMapping[externalId] = id
|
|
s.filterMapMu.Unlock()
|
|
|
|
return externalId, nil
|
|
}
|
|
|
|
// GetLogs returns the logs matching the given argument.
|
|
func (s *PublicFilterAPI) GetLogs(args NewFilterArgs) []vmlog {
|
|
filter := New(s.chainDb)
|
|
filter.SetBeginBlock(args.FromBlock.Int64())
|
|
filter.SetEndBlock(args.ToBlock.Int64())
|
|
filter.SetAddresses(args.Addresses)
|
|
filter.SetTopics(args.Topics)
|
|
|
|
return toRPCLogs(filter.Find(), false)
|
|
}
|
|
|
|
// UninstallFilter removes the filter with the given filter id.
|
|
func (s *PublicFilterAPI) UninstallFilter(filterId string) bool {
|
|
s.filterMapMu.Lock()
|
|
defer s.filterMapMu.Unlock()
|
|
|
|
id, ok := s.filterMapping[filterId]
|
|
if !ok {
|
|
return false
|
|
}
|
|
|
|
defer s.filterManager.Remove(id)
|
|
delete(s.filterMapping, filterId)
|
|
|
|
if _, ok := s.logQueue[id]; ok {
|
|
s.logMu.Lock()
|
|
defer s.logMu.Unlock()
|
|
delete(s.logQueue, id)
|
|
return true
|
|
}
|
|
if _, ok := s.blockQueue[id]; ok {
|
|
s.blockMu.Lock()
|
|
defer s.blockMu.Unlock()
|
|
delete(s.blockQueue, id)
|
|
return true
|
|
}
|
|
if _, ok := s.transactionQueue[id]; ok {
|
|
s.transactionMu.Lock()
|
|
defer s.transactionMu.Unlock()
|
|
delete(s.transactionQueue, id)
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// getFilterType is a helper utility that determine the type of filter for the given filter id.
|
|
func (s *PublicFilterAPI) getFilterType(id int) byte {
|
|
if _, ok := s.blockQueue[id]; ok {
|
|
return blockFilterTy
|
|
} else if _, ok := s.transactionQueue[id]; ok {
|
|
return transactionFilterTy
|
|
} else if _, ok := s.logQueue[id]; ok {
|
|
return logFilterTy
|
|
}
|
|
|
|
return unknownFilterTy
|
|
}
|
|
|
|
// blockFilterChanged returns a collection of block hashes for the block filter with the given id.
|
|
func (s *PublicFilterAPI) blockFilterChanged(id int) []common.Hash {
|
|
s.blockMu.Lock()
|
|
defer s.blockMu.Unlock()
|
|
|
|
if s.blockQueue[id] != nil {
|
|
return s.blockQueue[id].get()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// transactionFilterChanged returns a collection of transaction hashes for the pending
|
|
// transaction filter with the given id.
|
|
func (s *PublicFilterAPI) transactionFilterChanged(id int) []common.Hash {
|
|
s.blockMu.Lock()
|
|
defer s.blockMu.Unlock()
|
|
|
|
if s.transactionQueue[id] != nil {
|
|
return s.transactionQueue[id].get()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// logFilterChanged returns a collection of logs for the log filter with the given id.
|
|
func (s *PublicFilterAPI) logFilterChanged(id int) []vmlog {
|
|
s.logMu.Lock()
|
|
defer s.logMu.Unlock()
|
|
|
|
if s.logQueue[id] != nil {
|
|
return s.logQueue[id].get()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetFilterLogs returns the logs for the filter with the given id.
|
|
func (s *PublicFilterAPI) GetFilterLogs(filterId string) []vmlog {
|
|
id, ok := s.filterMapping[filterId]
|
|
if !ok {
|
|
return toRPCLogs(nil, false)
|
|
}
|
|
|
|
if filter := s.filterManager.Get(id); filter != nil {
|
|
return toRPCLogs(filter.Find(), false)
|
|
}
|
|
|
|
return toRPCLogs(nil, false)
|
|
}
|
|
|
|
// GetFilterChanges returns the logs for the filter with the given id since last time is was called.
|
|
// This can be used for polling.
|
|
func (s *PublicFilterAPI) GetFilterChanges(filterId string) interface{} {
|
|
s.filterMapMu.Lock()
|
|
id, ok := s.filterMapping[filterId]
|
|
s.filterMapMu.Unlock()
|
|
|
|
if !ok { // filter not found
|
|
return []interface{}{}
|
|
}
|
|
|
|
switch s.getFilterType(id) {
|
|
case blockFilterTy:
|
|
return returnHashes(s.blockFilterChanged(id))
|
|
case transactionFilterTy:
|
|
return returnHashes(s.transactionFilterChanged(id))
|
|
case logFilterTy:
|
|
return s.logFilterChanged(id)
|
|
}
|
|
|
|
return []interface{}{}
|
|
}
|
|
|
|
type vmlog struct {
|
|
*vm.Log
|
|
Removed bool `json:"removed"`
|
|
}
|
|
|
|
type logQueue struct {
|
|
mu sync.Mutex
|
|
|
|
logs []vmlog
|
|
timeout time.Time
|
|
id int
|
|
}
|
|
|
|
func (l *logQueue) add(logs ...vmlog) {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
l.logs = append(l.logs, logs...)
|
|
}
|
|
|
|
func (l *logQueue) get() []vmlog {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
l.timeout = time.Now()
|
|
tmp := l.logs
|
|
l.logs = nil
|
|
return tmp
|
|
}
|
|
|
|
type hashQueue struct {
|
|
mu sync.Mutex
|
|
|
|
hashes []common.Hash
|
|
timeout time.Time
|
|
id int
|
|
}
|
|
|
|
func (l *hashQueue) add(hashes ...common.Hash) {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
l.hashes = append(l.hashes, hashes...)
|
|
}
|
|
|
|
func (l *hashQueue) get() []common.Hash {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
|
|
l.timeout = time.Now()
|
|
tmp := l.hashes
|
|
l.hashes = nil
|
|
return tmp
|
|
}
|
|
|
|
// newFilterId generates a new random filter identifier that can be exposed to the outer world. By publishing random
|
|
// identifiers it is not feasible for DApp's to guess filter id's for other DApp's and uninstall or poll for them
|
|
// causing the affected DApp to miss data.
|
|
func newFilterId() (string, error) {
|
|
var subid [16]byte
|
|
n, _ := rand.Read(subid[:])
|
|
if n != 16 {
|
|
return "", errors.New("Unable to generate filter id")
|
|
}
|
|
return "0x" + hex.EncodeToString(subid[:]), nil
|
|
}
|
|
|
|
// toRPCLogs is a helper that will convert a vm.Logs array to an structure which
|
|
// can hold additional information about the logs such as whether it was deleted.
|
|
// Additionally when nil is given it will by default instead create an empty slice
|
|
// instead. This is required by the RPC specification.
|
|
func toRPCLogs(logs vm.Logs, removed bool) []vmlog {
|
|
convertedLogs := make([]vmlog, len(logs))
|
|
for i, log := range logs {
|
|
convertedLogs[i] = vmlog{Log: log, Removed: removed}
|
|
}
|
|
return convertedLogs
|
|
}
|
|
|
|
// returnHashes is a helper that will return an empty hash array case the given hash array is nil, otherwise is will
|
|
// return the given hashes. The RPC interfaces defines that always an array is returned.
|
|
func returnHashes(hashes []common.Hash) []common.Hash {
|
|
if hashes == nil {
|
|
return []common.Hash{}
|
|
}
|
|
return hashes
|
|
}
|