5b1a04b9c7
This PR adds a way to subscribe to the _full_ pending transactions, as opposed to just being notified about hashes. In use cases where client subscribes to newPendingTransactions and gets txhashes only to then request the actual transaction, the caller can now shortcut that flow and obtain the transactions directly. Co-authored-by: Felix Lange <fjl@twurst.com>
574 lines
16 KiB
Go
574 lines
16 KiB
Go
// Copyright 2015 The go-ethereum Authors
|
|
// This file is part of the go-ethereum library.
|
|
//
|
|
// The go-ethereum library is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Lesser General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// The go-ethereum library is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Lesser General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Lesser General Public License
|
|
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
package filters
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"math/big"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum"
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/common/hexutil"
|
|
"github.com/ethereum/go-ethereum/core/types"
|
|
"github.com/ethereum/go-ethereum/rpc"
|
|
)
|
|
|
|
// filter is a helper struct that holds meta information over the filter type
|
|
// and associated subscription in the event system.
|
|
type filter struct {
|
|
typ Type
|
|
deadline *time.Timer // filter is inactive when deadline triggers
|
|
hashes []common.Hash
|
|
txs []*types.Transaction
|
|
crit FilterCriteria
|
|
logs []*types.Log
|
|
s *Subscription // associated subscription in event system
|
|
}
|
|
|
|
// FilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
|
|
// information related to the Ethereum protocol such als blocks, transactions and logs.
|
|
type FilterAPI struct {
|
|
sys *FilterSystem
|
|
events *EventSystem
|
|
filtersMu sync.Mutex
|
|
filters map[rpc.ID]*filter
|
|
timeout time.Duration
|
|
}
|
|
|
|
// NewFilterAPI returns a new FilterAPI instance.
|
|
func NewFilterAPI(system *FilterSystem, lightMode bool) *FilterAPI {
|
|
api := &FilterAPI{
|
|
sys: system,
|
|
events: NewEventSystem(system, lightMode),
|
|
filters: make(map[rpc.ID]*filter),
|
|
timeout: system.cfg.Timeout,
|
|
}
|
|
go api.timeoutLoop(system.cfg.Timeout)
|
|
|
|
return api
|
|
}
|
|
|
|
// timeoutLoop runs at the interval set by 'timeout' and deletes filters
|
|
// that have not been recently used. It is started when the API is created.
|
|
func (api *FilterAPI) timeoutLoop(timeout time.Duration) {
|
|
var toUninstall []*Subscription
|
|
ticker := time.NewTicker(timeout)
|
|
defer ticker.Stop()
|
|
for {
|
|
<-ticker.C
|
|
api.filtersMu.Lock()
|
|
for id, f := range api.filters {
|
|
select {
|
|
case <-f.deadline.C:
|
|
toUninstall = append(toUninstall, f.s)
|
|
delete(api.filters, id)
|
|
default:
|
|
continue
|
|
}
|
|
}
|
|
api.filtersMu.Unlock()
|
|
|
|
// Unsubscribes are processed outside the lock to avoid the following scenario:
|
|
// event loop attempts broadcasting events to still active filters while
|
|
// Unsubscribe is waiting for it to process the uninstall request.
|
|
for _, s := range toUninstall {
|
|
s.Unsubscribe()
|
|
}
|
|
toUninstall = nil
|
|
}
|
|
}
|
|
|
|
// NewPendingTransactionFilter creates a filter that fetches pending transactions
|
|
// as transactions enter the pending state.
|
|
//
|
|
// It is part of the filter package because this filter can be used through the
|
|
// `eth_getFilterChanges` polling method that is also used for log filters.
|
|
func (api *FilterAPI) NewPendingTransactionFilter() rpc.ID {
|
|
var (
|
|
pendingTxs = make(chan []*types.Transaction)
|
|
pendingTxSub = api.events.SubscribePendingTxs(pendingTxs)
|
|
)
|
|
|
|
api.filtersMu.Lock()
|
|
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), txs: make([]*types.Transaction, 0), s: pendingTxSub}
|
|
api.filtersMu.Unlock()
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case pTx := <-pendingTxs:
|
|
api.filtersMu.Lock()
|
|
if f, found := api.filters[pendingTxSub.ID]; found {
|
|
f.txs = append(f.txs, pTx...)
|
|
}
|
|
api.filtersMu.Unlock()
|
|
case <-pendingTxSub.Err():
|
|
api.filtersMu.Lock()
|
|
delete(api.filters, pendingTxSub.ID)
|
|
api.filtersMu.Unlock()
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return pendingTxSub.ID
|
|
}
|
|
|
|
// NewPendingTransactions creates a subscription that is triggered each time a
|
|
// transaction enters the transaction pool. If fullTx is true the full tx is
|
|
// sent to the client, otherwise the hash is sent.
|
|
func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) (*rpc.Subscription, error) {
|
|
notifier, supported := rpc.NotifierFromContext(ctx)
|
|
if !supported {
|
|
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
|
|
}
|
|
|
|
rpcSub := notifier.CreateSubscription()
|
|
|
|
go func() {
|
|
txs := make(chan []*types.Transaction, 128)
|
|
pendingTxSub := api.events.SubscribePendingTxs(txs)
|
|
|
|
for {
|
|
select {
|
|
case txs := <-txs:
|
|
// To keep the original behaviour, send a single tx hash in one notification.
|
|
// TODO(rjl493456442) Send a batch of tx hashes in one notification
|
|
for _, tx := range txs {
|
|
if fullTx != nil && *fullTx {
|
|
notifier.Notify(rpcSub.ID, tx)
|
|
} else {
|
|
notifier.Notify(rpcSub.ID, tx.Hash())
|
|
}
|
|
}
|
|
case <-rpcSub.Err():
|
|
pendingTxSub.Unsubscribe()
|
|
return
|
|
case <-notifier.Closed():
|
|
pendingTxSub.Unsubscribe()
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return rpcSub, nil
|
|
}
|
|
|
|
// NewBlockFilter creates a filter that fetches blocks that are imported into the chain.
|
|
// It is part of the filter package since polling goes with eth_getFilterChanges.
|
|
func (api *FilterAPI) NewBlockFilter() rpc.ID {
|
|
var (
|
|
headers = make(chan *types.Header)
|
|
headerSub = api.events.SubscribeNewHeads(headers)
|
|
)
|
|
|
|
api.filtersMu.Lock()
|
|
api.filters[headerSub.ID] = &filter{typ: BlocksSubscription, deadline: time.NewTimer(api.timeout), hashes: make([]common.Hash, 0), s: headerSub}
|
|
api.filtersMu.Unlock()
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case h := <-headers:
|
|
api.filtersMu.Lock()
|
|
if f, found := api.filters[headerSub.ID]; found {
|
|
f.hashes = append(f.hashes, h.Hash())
|
|
}
|
|
api.filtersMu.Unlock()
|
|
case <-headerSub.Err():
|
|
api.filtersMu.Lock()
|
|
delete(api.filters, headerSub.ID)
|
|
api.filtersMu.Unlock()
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return headerSub.ID
|
|
}
|
|
|
|
// NewHeads send a notification each time a new (header) block is appended to the chain.
|
|
func (api *FilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
|
|
notifier, supported := rpc.NotifierFromContext(ctx)
|
|
if !supported {
|
|
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
|
|
}
|
|
|
|
rpcSub := notifier.CreateSubscription()
|
|
|
|
go func() {
|
|
headers := make(chan *types.Header)
|
|
headersSub := api.events.SubscribeNewHeads(headers)
|
|
|
|
for {
|
|
select {
|
|
case h := <-headers:
|
|
notifier.Notify(rpcSub.ID, h)
|
|
case <-rpcSub.Err():
|
|
headersSub.Unsubscribe()
|
|
return
|
|
case <-notifier.Closed():
|
|
headersSub.Unsubscribe()
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return rpcSub, nil
|
|
}
|
|
|
|
// Logs creates a subscription that fires for all new log that match the given filter criteria.
|
|
func (api *FilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
|
|
notifier, supported := rpc.NotifierFromContext(ctx)
|
|
if !supported {
|
|
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
|
|
}
|
|
|
|
var (
|
|
rpcSub = notifier.CreateSubscription()
|
|
matchedLogs = make(chan []*types.Log)
|
|
)
|
|
|
|
logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), matchedLogs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case logs := <-matchedLogs:
|
|
for _, log := range logs {
|
|
log := log
|
|
notifier.Notify(rpcSub.ID, &log)
|
|
}
|
|
case <-rpcSub.Err(): // client send an unsubscribe request
|
|
logsSub.Unsubscribe()
|
|
return
|
|
case <-notifier.Closed(): // connection dropped
|
|
logsSub.Unsubscribe()
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return rpcSub, nil
|
|
}
|
|
|
|
// FilterCriteria represents a request to create a new filter.
|
|
// Same as ethereum.FilterQuery but with UnmarshalJSON() method.
|
|
type FilterCriteria ethereum.FilterQuery
|
|
|
|
// NewFilter creates a new filter and returns the filter id. It can be
|
|
// used to retrieve logs when the state changes. This method cannot be
|
|
// used to fetch logs that are already stored in the state.
|
|
//
|
|
// Default criteria for the from and to block are "latest".
|
|
// Using "latest" as block number will return logs for mined blocks.
|
|
// Using "pending" as block number returns logs for not yet mined (pending) blocks.
|
|
// In case logs are removed (chain reorg) previously returned logs are returned
|
|
// again but with the removed property set to true.
|
|
//
|
|
// In case "fromBlock" > "toBlock" an error is returned.
|
|
func (api *FilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
|
|
logs := make(chan []*types.Log)
|
|
logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), logs)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
api.filtersMu.Lock()
|
|
api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(api.timeout), logs: make([]*types.Log, 0), s: logsSub}
|
|
api.filtersMu.Unlock()
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case l := <-logs:
|
|
api.filtersMu.Lock()
|
|
if f, found := api.filters[logsSub.ID]; found {
|
|
f.logs = append(f.logs, l...)
|
|
}
|
|
api.filtersMu.Unlock()
|
|
case <-logsSub.Err():
|
|
api.filtersMu.Lock()
|
|
delete(api.filters, logsSub.ID)
|
|
api.filtersMu.Unlock()
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return logsSub.ID, nil
|
|
}
|
|
|
|
// GetLogs returns logs matching the given argument that are stored within the state.
|
|
func (api *FilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*types.Log, error) {
|
|
var filter *Filter
|
|
if crit.BlockHash != nil {
|
|
// Block filter requested, construct a single-shot filter
|
|
filter = api.sys.NewBlockFilter(*crit.BlockHash, crit.Addresses, crit.Topics)
|
|
} else {
|
|
// Convert the RPC block numbers into internal representations
|
|
begin := rpc.LatestBlockNumber.Int64()
|
|
if crit.FromBlock != nil {
|
|
begin = crit.FromBlock.Int64()
|
|
}
|
|
end := rpc.LatestBlockNumber.Int64()
|
|
if crit.ToBlock != nil {
|
|
end = crit.ToBlock.Int64()
|
|
}
|
|
// Construct the range filter
|
|
filter = api.sys.NewRangeFilter(begin, end, crit.Addresses, crit.Topics)
|
|
}
|
|
// Run the filter and return all the logs
|
|
logs, err := filter.Logs(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return returnLogs(logs), err
|
|
}
|
|
|
|
// UninstallFilter removes the filter with the given filter id.
|
|
func (api *FilterAPI) UninstallFilter(id rpc.ID) bool {
|
|
api.filtersMu.Lock()
|
|
f, found := api.filters[id]
|
|
if found {
|
|
delete(api.filters, id)
|
|
}
|
|
api.filtersMu.Unlock()
|
|
if found {
|
|
f.s.Unsubscribe()
|
|
}
|
|
|
|
return found
|
|
}
|
|
|
|
// GetFilterLogs returns the logs for the filter with the given id.
|
|
// If the filter could not be found an empty array of logs is returned.
|
|
func (api *FilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Log, error) {
|
|
api.filtersMu.Lock()
|
|
f, found := api.filters[id]
|
|
api.filtersMu.Unlock()
|
|
|
|
if !found || f.typ != LogsSubscription {
|
|
return nil, fmt.Errorf("filter not found")
|
|
}
|
|
|
|
var filter *Filter
|
|
if f.crit.BlockHash != nil {
|
|
// Block filter requested, construct a single-shot filter
|
|
filter = api.sys.NewBlockFilter(*f.crit.BlockHash, f.crit.Addresses, f.crit.Topics)
|
|
} else {
|
|
// Convert the RPC block numbers into internal representations
|
|
begin := rpc.LatestBlockNumber.Int64()
|
|
if f.crit.FromBlock != nil {
|
|
begin = f.crit.FromBlock.Int64()
|
|
}
|
|
end := rpc.LatestBlockNumber.Int64()
|
|
if f.crit.ToBlock != nil {
|
|
end = f.crit.ToBlock.Int64()
|
|
}
|
|
// Construct the range filter
|
|
filter = api.sys.NewRangeFilter(begin, end, f.crit.Addresses, f.crit.Topics)
|
|
}
|
|
// Run the filter and return all the logs
|
|
logs, err := filter.Logs(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return returnLogs(logs), nil
|
|
}
|
|
|
|
// GetFilterChanges returns the logs for the filter with the given id since
|
|
// last time it was called. This can be used for polling.
|
|
//
|
|
// For pending transaction and block filters the result is []common.Hash.
|
|
// (pending)Log filters return []Log.
|
|
func (api *FilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
|
|
api.filtersMu.Lock()
|
|
defer api.filtersMu.Unlock()
|
|
|
|
if f, found := api.filters[id]; found {
|
|
if !f.deadline.Stop() {
|
|
// timer expired but filter is not yet removed in timeout loop
|
|
// receive timer value and reset timer
|
|
<-f.deadline.C
|
|
}
|
|
f.deadline.Reset(api.timeout)
|
|
|
|
switch f.typ {
|
|
case BlocksSubscription:
|
|
hashes := f.hashes
|
|
f.hashes = nil
|
|
return returnHashes(hashes), nil
|
|
case PendingTransactionsSubscription:
|
|
txs := f.txs
|
|
f.txs = nil
|
|
return txs, nil
|
|
case LogsSubscription, MinedAndPendingLogsSubscription:
|
|
logs := f.logs
|
|
f.logs = nil
|
|
return returnLogs(logs), nil
|
|
}
|
|
}
|
|
|
|
return []interface{}{}, fmt.Errorf("filter not found")
|
|
}
|
|
|
|
// returnHashes is a helper that will return an empty hash array case the given hash array is nil,
|
|
// otherwise the given hashes array is returned.
|
|
func returnHashes(hashes []common.Hash) []common.Hash {
|
|
if hashes == nil {
|
|
return []common.Hash{}
|
|
}
|
|
return hashes
|
|
}
|
|
|
|
// returnLogs is a helper that will return an empty log array in case the given logs array is nil,
|
|
// otherwise the given logs array is returned.
|
|
func returnLogs(logs []*types.Log) []*types.Log {
|
|
if logs == nil {
|
|
return []*types.Log{}
|
|
}
|
|
return logs
|
|
}
|
|
|
|
// UnmarshalJSON sets *args fields with given data.
|
|
func (args *FilterCriteria) UnmarshalJSON(data []byte) error {
|
|
type input struct {
|
|
BlockHash *common.Hash `json:"blockHash"`
|
|
FromBlock *rpc.BlockNumber `json:"fromBlock"`
|
|
ToBlock *rpc.BlockNumber `json:"toBlock"`
|
|
Addresses interface{} `json:"address"`
|
|
Topics []interface{} `json:"topics"`
|
|
}
|
|
|
|
var raw input
|
|
if err := json.Unmarshal(data, &raw); err != nil {
|
|
return err
|
|
}
|
|
|
|
if raw.BlockHash != nil {
|
|
if raw.FromBlock != nil || raw.ToBlock != nil {
|
|
// BlockHash is mutually exclusive with FromBlock/ToBlock criteria
|
|
return fmt.Errorf("cannot specify both BlockHash and FromBlock/ToBlock, choose one or the other")
|
|
}
|
|
args.BlockHash = raw.BlockHash
|
|
} else {
|
|
if raw.FromBlock != nil {
|
|
args.FromBlock = big.NewInt(raw.FromBlock.Int64())
|
|
}
|
|
|
|
if raw.ToBlock != nil {
|
|
args.ToBlock = big.NewInt(raw.ToBlock.Int64())
|
|
}
|
|
}
|
|
|
|
args.Addresses = []common.Address{}
|
|
|
|
if raw.Addresses != nil {
|
|
// raw.Address can contain a single address or an array of addresses
|
|
switch rawAddr := raw.Addresses.(type) {
|
|
case []interface{}:
|
|
for i, addr := range rawAddr {
|
|
if strAddr, ok := addr.(string); ok {
|
|
addr, err := decodeAddress(strAddr)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid address at index %d: %v", i, err)
|
|
}
|
|
args.Addresses = append(args.Addresses, addr)
|
|
} else {
|
|
return fmt.Errorf("non-string address at index %d", i)
|
|
}
|
|
}
|
|
case string:
|
|
addr, err := decodeAddress(rawAddr)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid address: %v", err)
|
|
}
|
|
args.Addresses = []common.Address{addr}
|
|
default:
|
|
return errors.New("invalid addresses in query")
|
|
}
|
|
}
|
|
|
|
// topics is an array consisting of strings and/or arrays of strings.
|
|
// JSON null values are converted to common.Hash{} and ignored by the filter manager.
|
|
if len(raw.Topics) > 0 {
|
|
args.Topics = make([][]common.Hash, len(raw.Topics))
|
|
for i, t := range raw.Topics {
|
|
switch topic := t.(type) {
|
|
case nil:
|
|
// ignore topic when matching logs
|
|
|
|
case string:
|
|
// match specific topic
|
|
top, err := decodeTopic(topic)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
args.Topics[i] = []common.Hash{top}
|
|
|
|
case []interface{}:
|
|
// or case e.g. [null, "topic0", "topic1"]
|
|
for _, rawTopic := range topic {
|
|
if rawTopic == nil {
|
|
// null component, match all
|
|
args.Topics[i] = nil
|
|
break
|
|
}
|
|
if topic, ok := rawTopic.(string); ok {
|
|
parsed, err := decodeTopic(topic)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
args.Topics[i] = append(args.Topics[i], parsed)
|
|
} else {
|
|
return fmt.Errorf("invalid topic(s)")
|
|
}
|
|
}
|
|
default:
|
|
return fmt.Errorf("invalid topic(s)")
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func decodeAddress(s string) (common.Address, error) {
|
|
b, err := hexutil.Decode(s)
|
|
if err == nil && len(b) != common.AddressLength {
|
|
err = fmt.Errorf("hex has invalid length %d after decoding; expected %d for address", len(b), common.AddressLength)
|
|
}
|
|
return common.BytesToAddress(b), err
|
|
}
|
|
|
|
func decodeTopic(s string) (common.Hash, error) {
|
|
b, err := hexutil.Decode(s)
|
|
if err == nil && len(b) != common.HashLength {
|
|
err = fmt.Errorf("hex has invalid length %d after decoding; expected %d for topic", len(b), common.HashLength)
|
|
}
|
|
return common.BytesToHash(b), err
|
|
}
|