laconicd/rpc/filters.go
thomasmodeneis c9b09c1d55
Implement eth_newPendingTransactionFilter #51 (#251)
* Implement eth_newPendingTransactionFilter
2020-04-16 16:53:14 +02:00

336 lines
7.7 KiB
Go

package rpc
import (
"errors"
"math/big"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/log"
)
/*
- Filter functions derived from go-ethereum
Used to set the criteria passed in from RPC params
*/
const blockFilter = "block"
const pendingTxFilter = "pending"
const logFilter = "log"
// Filter can be used to retrieve and filter logs, blocks, or pending transactions.
type Filter struct {
backend Backend
fromBlock, toBlock *big.Int // start and end block numbers
addresses []common.Address // contract addresses to watch
topics [][]common.Hash // log topics to watch for
blockHash *common.Hash // Block hash if filtering a single block
typ string
hashes []common.Hash // filtered block or transaction hashes
logs []*ethtypes.Log //nolint // filtered logs
stopped bool // set to true once filter in uninstalled
err error
}
// NewFilter returns a new Filter
func NewFilter(backend Backend, criteria *filters.FilterCriteria) *Filter {
filter := &Filter{
backend: backend,
fromBlock: criteria.FromBlock,
toBlock: criteria.ToBlock,
addresses: criteria.Addresses,
topics: criteria.Topics,
typ: logFilter,
stopped: false,
}
return filter
}
// NewFilterWithBlockHash returns a new Filter with a blockHash.
func NewFilterWithBlockHash(backend Backend, criteria *filters.FilterCriteria) *Filter {
return &Filter{
backend: backend,
fromBlock: criteria.FromBlock,
toBlock: criteria.ToBlock,
addresses: criteria.Addresses,
topics: criteria.Topics,
blockHash: criteria.BlockHash,
typ: logFilter,
}
}
// NewBlockFilter creates a new filter that notifies when a block arrives.
func NewBlockFilter(backend Backend) *Filter {
filter := NewFilter(backend, &filters.FilterCriteria{})
filter.typ = blockFilter
go func() {
err := filter.pollForBlocks()
if err != nil {
filter.err = err
}
}()
return filter
}
func (f *Filter) pollForBlocks() error {
prev := hexutil.Uint64(0)
for {
if f.stopped {
return nil
}
num, err := f.backend.BlockNumber()
if err != nil {
return err
}
if num == prev {
continue
}
block, err := f.backend.GetBlockByNumber(BlockNumber(num), false)
if err != nil {
return err
}
hashBytes, ok := block["hash"].(hexutil.Bytes)
if !ok {
return errors.New("could not convert block hash to hexutil.Bytes")
}
hash := common.BytesToHash(hashBytes)
f.hashes = append(f.hashes, hash)
prev = num
// TODO: should we add a delay?
}
}
func (f *Filter) pollForTransactions() error {
for {
if f.stopped {
return nil
}
txs, err := f.backend.PendingTransactions()
if err != nil {
return err
}
for _, tx := range txs {
if !contains(f.hashes, tx.Hash) {
f.hashes = append(f.hashes, tx.Hash)
}
}
<-time.After(1 * time.Second)
}
}
func contains(slice []common.Hash, item common.Hash) bool {
set := make(map[common.Hash]struct{}, len(slice))
for _, s := range slice {
set[s] = struct{}{}
}
_, ok := set[item]
return ok
}
// NewPendingTransactionFilter creates a new filter that notifies when a pending transaction arrives.
func NewPendingTransactionFilter(backend Backend) *Filter {
filter := NewFilter(backend, &filters.FilterCriteria{})
filter.typ = pendingTxFilter
go func() {
err := filter.pollForTransactions()
if err != nil {
filter.err = err
}
}()
return filter
}
func (f *Filter) uninstallFilter() {
f.stopped = true
}
func (f *Filter) getFilterChanges() (interface{}, error) {
switch f.typ {
case blockFilter:
if f.err != nil {
return nil, f.err
}
blocks := make([]common.Hash, len(f.hashes))
copy(blocks, f.hashes)
f.hashes = []common.Hash{}
return blocks, nil
case pendingTxFilter:
if f.err != nil {
return nil, f.err
}
txs := make([]common.Hash, len(f.hashes))
copy(txs, f.hashes)
f.hashes = []common.Hash{}
return txs, nil
case logFilter:
return f.getFilterLogs()
}
return nil, errors.New("unsupported filter")
}
func (f *Filter) getFilterLogs() ([]*ethtypes.Log, error) {
ret := []*ethtypes.Log{}
// filter specific block only
if f.blockHash != nil {
block, err := f.backend.GetBlockByHash(*f.blockHash, true)
if err != nil {
return nil, err
}
// if the logsBloom == 0, there are no logs in that block
if txs, ok := block["transactions"].([]common.Hash); !ok {
return ret, nil
} else if len(txs) != 0 {
return f.checkMatches(block)
}
}
// filter range of blocks
num, err := f.backend.BlockNumber()
if err != nil {
return nil, err
}
// if f.fromBlock is set to 0, set it to the latest block number
if f.fromBlock == nil || f.fromBlock.Cmp(big.NewInt(0)) == 0 {
f.fromBlock = big.NewInt(int64(num))
}
// if f.toBlock is set to 0, set it to the latest block number
if f.toBlock == nil || f.toBlock.Cmp(big.NewInt(0)) == 0 {
f.toBlock = big.NewInt(int64(num))
}
log.Debug("[ethAPI] Retrieving filter logs", "fromBlock", f.fromBlock, "toBlock", f.toBlock,
"topics", f.topics, "addresses", f.addresses)
from := f.fromBlock.Int64()
to := f.toBlock.Int64()
for i := from; i <= to; i++ {
block, err := f.backend.GetBlockByNumber(NewBlockNumber(big.NewInt(i)), true)
if err != nil {
f.err = err
log.Debug("[ethAPI] Cannot get block", "block", block["number"], "error", err)
break
}
log.Debug("[ethAPI] filtering", "block", block)
// TODO: block logsBloom is often set in the wrong block
// if the logsBloom == 0, there are no logs in that block
if txs, ok := block["transactions"].([]common.Hash); !ok {
continue
} else if len(txs) != 0 {
logs, err := f.checkMatches(block)
if err != nil {
f.err = err
break
}
ret = append(ret, logs...)
}
}
return ret, nil
}
func (f *Filter) checkMatches(block map[string]interface{}) ([]*ethtypes.Log, error) {
transactions, ok := block["transactions"].([]common.Hash)
if !ok {
return nil, errors.New("invalid block transactions")
}
unfiltered := []*ethtypes.Log{}
for _, tx := range transactions {
logs, err := f.backend.GetTxLogs(common.BytesToHash(tx[:]))
if err != nil {
return nil, err
}
unfiltered = append(unfiltered, logs...)
}
return filterLogs(unfiltered, f.fromBlock, f.toBlock, f.addresses, f.topics), nil
}
// filterLogs creates a slice of logs matching the given criteria.
// [] -> anything
// [A] -> A in first position of log topics, anything after
// [null, B] -> anything in first position, B in second position
// [A, B] -> A in first position and B in second position
// [[A, B], [A, B]] -> A or B in first position, A or B in second position
func filterLogs(logs []*ethtypes.Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []*ethtypes.Log {
var ret []*ethtypes.Log
Logs:
for _, log := range logs {
if fromBlock != nil && fromBlock.Int64() >= 0 && fromBlock.Uint64() > log.BlockNumber {
continue
}
if toBlock != nil && toBlock.Int64() >= 0 && toBlock.Uint64() < log.BlockNumber {
continue
}
if len(addresses) > 0 && !includes(addresses, log.Address) {
continue
}
// If the to filtered topics is greater than the amount of topics in logs, skip.
if len(topics) > len(log.Topics) {
continue Logs
}
for i, sub := range topics {
match := len(sub) == 0 // empty rule set == wildcard
for _, topic := range sub {
if log.Topics[i] == topic {
match = true
break
}
}
if !match {
continue Logs
}
}
ret = append(ret, log)
}
return ret
}
func includes(addresses []common.Address, a common.Address) bool {
for _, addr := range addresses {
if addr == a {
return true
}
}
return false
}