go-ethereum/eth/filters/api.go
Felix Lange c213fd1fd8 all: import "context" instead of "golang.org/x/net/context"
There is no need to depend on the old context package now that the
minimum Go version is 1.7. The move to "context" eliminates our weird
vendoring setup. Some vendored code still uses golang.org/x/net/context
and it is now vendored in the normal way.

This change triggered new vet checks around context.WithTimeout which
didn't fire with golang.org/x/net/context.
2017-03-22 20:49:15 +01:00

555 lines
15 KiB
Go

// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package filters
import (
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
)
var (
deadline = 5 * time.Minute // consider a filter inactive if it has not been polled for within deadline
)
// filter is a helper struct that holds meta information over the filter type
// and associated subscription in the event system.
type filter struct {
typ Type
deadline *time.Timer // filter is inactiv when deadline triggers
hashes []common.Hash
crit FilterCriteria
logs []*types.Log
s *Subscription // associated subscription in event system
}
// PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
// information related to the Ethereum protocol such als blocks, transactions and logs.
type PublicFilterAPI struct {
backend Backend
useMipMap bool
mux *event.TypeMux
quit chan struct{}
chainDb ethdb.Database
events *EventSystem
filtersMu sync.Mutex
filters map[rpc.ID]*filter
}
// NewPublicFilterAPI returns a new PublicFilterAPI instance.
func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI {
api := &PublicFilterAPI{
backend: backend,
useMipMap: !lightMode,
mux: backend.EventMux(),
chainDb: backend.ChainDb(),
events: NewEventSystem(backend.EventMux(), backend, lightMode),
filters: make(map[rpc.ID]*filter),
}
go api.timeoutLoop()
return api
}
// timeoutLoop runs every 5 minutes and deletes filters that have not been recently used.
// Tt is started when the api is created.
func (api *PublicFilterAPI) timeoutLoop() {
ticker := time.NewTicker(5 * time.Minute)
for {
<-ticker.C
api.filtersMu.Lock()
for id, f := range api.filters {
select {
case <-f.deadline.C:
f.s.Unsubscribe()
delete(api.filters, id)
default:
continue
}
}
api.filtersMu.Unlock()
}
}
// NewPendingTransactionFilter creates a filter that fetches pending transaction hashes
// as transactions enter the pending state.
//
// It is part of the filter package because this filter can be used throug the
// `eth_getFilterChanges` polling method that is also used for log filters.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter
func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
var (
pendingTxs = make(chan common.Hash)
pendingTxSub = api.events.SubscribePendingTxEvents(pendingTxs)
)
api.filtersMu.Lock()
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: pendingTxSub}
api.filtersMu.Unlock()
go func() {
for {
select {
case ph := <-pendingTxs:
api.filtersMu.Lock()
if f, found := api.filters[pendingTxSub.ID]; found {
f.hashes = append(f.hashes, ph)
}
api.filtersMu.Unlock()
case <-pendingTxSub.Err():
api.filtersMu.Lock()
delete(api.filters, pendingTxSub.ID)
api.filtersMu.Unlock()
return
}
}
}()
return pendingTxSub.ID
}
// NewPendingTransactions creates a subscription that is triggered each time a transaction
// enters the transaction pool and was signed from one of the transactions this nodes manages.
func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
rpcSub := notifier.CreateSubscription()
go func() {
txHashes := make(chan common.Hash)
pendingTxSub := api.events.SubscribePendingTxEvents(txHashes)
for {
select {
case h := <-txHashes:
notifier.Notify(rpcSub.ID, h)
case <-rpcSub.Err():
pendingTxSub.Unsubscribe()
return
case <-notifier.Closed():
pendingTxSub.Unsubscribe()
return
}
}
}()
return rpcSub, nil
}
// NewBlockFilter creates a filter that fetches blocks that are imported into the chain.
// It is part of the filter package since polling goes with eth_getFilterChanges.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newblockfilter
func (api *PublicFilterAPI) NewBlockFilter() rpc.ID {
var (
headers = make(chan *types.Header)
headerSub = api.events.SubscribeNewHeads(headers)
)
api.filtersMu.Lock()
api.filters[headerSub.ID] = &filter{typ: BlocksSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: headerSub}
api.filtersMu.Unlock()
go func() {
for {
select {
case h := <-headers:
api.filtersMu.Lock()
if f, found := api.filters[headerSub.ID]; found {
f.hashes = append(f.hashes, h.Hash())
}
api.filtersMu.Unlock()
case <-headerSub.Err():
api.filtersMu.Lock()
delete(api.filters, headerSub.ID)
api.filtersMu.Unlock()
return
}
}
}()
return headerSub.ID
}
// NewHeads send a notification each time a new (header) block is appended to the chain.
func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
rpcSub := notifier.CreateSubscription()
go func() {
headers := make(chan *types.Header)
headersSub := api.events.SubscribeNewHeads(headers)
for {
select {
case h := <-headers:
notifier.Notify(rpcSub.ID, h)
case <-rpcSub.Err():
headersSub.Unsubscribe()
return
case <-notifier.Closed():
headersSub.Unsubscribe()
return
}
}
}()
return rpcSub, nil
}
// Logs creates a subscription that fires for all new log that match the given filter criteria.
func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
var (
rpcSub = notifier.CreateSubscription()
matchedLogs = make(chan []*types.Log)
)
logsSub, err := api.events.SubscribeLogs(crit, matchedLogs)
if err != nil {
return nil, err
}
go func() {
for {
select {
case logs := <-matchedLogs:
for _, log := range logs {
notifier.Notify(rpcSub.ID, &log)
}
case <-rpcSub.Err(): // client send an unsubscribe request
logsSub.Unsubscribe()
return
case <-notifier.Closed(): // connection dropped
logsSub.Unsubscribe()
return
}
}
}()
return rpcSub, nil
}
// FilterCriteria represents a request to create a new filter.
type FilterCriteria struct {
FromBlock *big.Int
ToBlock *big.Int
Addresses []common.Address
Topics [][]common.Hash
}
// NewFilter creates a new filter and returns the filter id. It can be
// used to retrieve logs when the state changes. This method cannot be
// used to fetch logs that are already stored in the state.
//
// Default criteria for the from and to block are "latest".
// Using "latest" as block number will return logs for mined blocks.
// Using "pending" as block number returns logs for not yet mined (pending) blocks.
// In case logs are removed (chain reorg) previously returned logs are returned
// again but with the removed property set to true.
//
// In case "fromBlock" > "toBlock" an error is returned.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter
func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
logs := make(chan []*types.Log)
logsSub, err := api.events.SubscribeLogs(crit, logs)
if err != nil {
return rpc.ID(""), err
}
api.filtersMu.Lock()
api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(deadline), logs: make([]*types.Log, 0), s: logsSub}
api.filtersMu.Unlock()
go func() {
for {
select {
case l := <-logs:
api.filtersMu.Lock()
if f, found := api.filters[logsSub.ID]; found {
f.logs = append(f.logs, l...)
}
api.filtersMu.Unlock()
case <-logsSub.Err():
api.filtersMu.Lock()
delete(api.filters, logsSub.ID)
api.filtersMu.Unlock()
return
}
}
}()
return logsSub.ID, nil
}
// GetLogs returns logs matching the given argument that are stored within the state.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs
func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*types.Log, error) {
if crit.FromBlock == nil {
crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
}
if crit.ToBlock == nil {
crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
}
filter := New(api.backend, api.useMipMap)
filter.SetBeginBlock(crit.FromBlock.Int64())
filter.SetEndBlock(crit.ToBlock.Int64())
filter.SetAddresses(crit.Addresses)
filter.SetTopics(crit.Topics)
logs, err := filter.Find(ctx)
return returnLogs(logs), err
}
// UninstallFilter removes the filter with the given filter id.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_uninstallfilter
func (api *PublicFilterAPI) UninstallFilter(id rpc.ID) bool {
api.filtersMu.Lock()
f, found := api.filters[id]
if found {
delete(api.filters, id)
}
api.filtersMu.Unlock()
if found {
f.s.Unsubscribe()
}
return found
}
// GetFilterLogs returns the logs for the filter with the given id.
// If the filter could not be found an empty array of logs is returned.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterlogs
func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Log, error) {
api.filtersMu.Lock()
f, found := api.filters[id]
api.filtersMu.Unlock()
if !found || f.typ != LogsSubscription {
return nil, fmt.Errorf("filter not found")
}
filter := New(api.backend, api.useMipMap)
if f.crit.FromBlock != nil {
filter.SetBeginBlock(f.crit.FromBlock.Int64())
} else {
filter.SetBeginBlock(rpc.LatestBlockNumber.Int64())
}
if f.crit.ToBlock != nil {
filter.SetEndBlock(f.crit.ToBlock.Int64())
} else {
filter.SetEndBlock(rpc.LatestBlockNumber.Int64())
}
filter.SetAddresses(f.crit.Addresses)
filter.SetTopics(f.crit.Topics)
logs, err := filter.Find(ctx)
if err != nil {
return nil, err
}
return returnLogs(logs), nil
}
// GetFilterChanges returns the logs for the filter with the given id since
// last time is was called. This can be used for polling.
//
// For pending transaction and block filters the result is []common.Hash.
// (pending)Log filters return []Log.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges
func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
api.filtersMu.Lock()
defer api.filtersMu.Unlock()
if f, found := api.filters[id]; found {
if !f.deadline.Stop() {
// timer expired but filter is not yet removed in timeout loop
// receive timer value and reset timer
<-f.deadline.C
}
f.deadline.Reset(deadline)
switch f.typ {
case PendingTransactionsSubscription, BlocksSubscription:
hashes := f.hashes
f.hashes = nil
return returnHashes(hashes), nil
case LogsSubscription:
logs := f.logs
f.logs = nil
return returnLogs(logs), nil
}
}
return []interface{}{}, fmt.Errorf("filter not found")
}
// returnHashes is a helper that will return an empty hash array case the given hash array is nil,
// otherwise the given hashes array is returned.
func returnHashes(hashes []common.Hash) []common.Hash {
if hashes == nil {
return []common.Hash{}
}
return hashes
}
// returnLogs is a helper that will return an empty log array in case the given logs array is nil,
// otherwise the given logs array is returned.
func returnLogs(logs []*types.Log) []*types.Log {
if logs == nil {
return []*types.Log{}
}
return logs
}
// UnmarshalJSON sets *args fields with given data.
func (args *FilterCriteria) UnmarshalJSON(data []byte) error {
type input struct {
From *rpc.BlockNumber `json:"fromBlock"`
ToBlock *rpc.BlockNumber `json:"toBlock"`
Addresses interface{} `json:"address"`
Topics []interface{} `json:"topics"`
}
var raw input
if err := json.Unmarshal(data, &raw); err != nil {
return err
}
if raw.From != nil {
args.FromBlock = big.NewInt(raw.From.Int64())
}
if raw.ToBlock != nil {
args.ToBlock = big.NewInt(raw.ToBlock.Int64())
}
args.Addresses = []common.Address{}
if raw.Addresses != nil {
// raw.Address can contain a single address or an array of addresses
switch rawAddr := raw.Addresses.(type) {
case []interface{}:
for i, addr := range rawAddr {
if strAddr, ok := addr.(string); ok {
addr, err := decodeAddress(strAddr)
if err != nil {
return fmt.Errorf("invalid address at index %d: %v", i, err)
}
args.Addresses = append(args.Addresses, addr)
} else {
return fmt.Errorf("non-string address at index %d", i)
}
}
case string:
addr, err := decodeAddress(rawAddr)
if err != nil {
return fmt.Errorf("invalid address: %v", err)
}
args.Addresses = []common.Address{addr}
default:
return errors.New("invalid addresses in query")
}
}
// topics is an array consisting of strings and/or arrays of strings.
// JSON null values are converted to common.Hash{} and ignored by the filter manager.
if len(raw.Topics) > 0 {
args.Topics = make([][]common.Hash, len(raw.Topics))
for i, t := range raw.Topics {
switch topic := t.(type) {
case nil:
// ignore topic when matching logs
args.Topics[i] = []common.Hash{{}}
case string:
// match specific topic
top, err := decodeTopic(topic)
if err != nil {
return err
}
args.Topics[i] = []common.Hash{top}
case []interface{}:
// or case e.g. [null, "topic0", "topic1"]
for _, rawTopic := range topic {
if rawTopic == nil {
args.Topics[i] = append(args.Topics[i], common.Hash{})
} else if topic, ok := rawTopic.(string); ok {
parsed, err := decodeTopic(topic)
if err != nil {
return err
}
args.Topics[i] = append(args.Topics[i], parsed)
} else {
return fmt.Errorf("invalid topic(s)")
}
}
default:
return fmt.Errorf("invalid topic(s)")
}
}
}
return nil
}
func decodeAddress(s string) (common.Address, error) {
b, err := hexutil.Decode(s)
if err == nil && len(b) != common.AddressLength {
err = fmt.Errorf("hex has invalid length %d after decoding", len(b))
}
return common.BytesToAddress(b), err
}
func decodeTopic(s string) (common.Hash, error) {
b, err := hexutil.Decode(s)
if err == nil && len(b) != common.HashLength {
err = fmt.Errorf("hex has invalid length %d after decoding", len(b))
}
return common.BytesToHash(b), err
}