package rpc import ( "errors" "math/big" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth/filters" "github.com/ethereum/go-ethereum/log" ) /* - Filter functions derived from go-ethereum Used to set the criteria passed in from RPC params */ const blockFilter = "block" const pendingTxFilter = "pending" const logFilter = "log" // Filter can be used to retrieve and filter logs, blocks, or pending transactions. type Filter struct { backend Backend fromBlock, toBlock *big.Int // start and end block numbers addresses []common.Address // contract addresses to watch topics [][]common.Hash // log topics to watch for blockHash *common.Hash // Block hash if filtering a single block typ string hashes []common.Hash // filtered block or transaction hashes logs []*ethtypes.Log //nolint // filtered logs stopped bool // set to true once filter in uninstalled err error } // NewFilter returns a new Filter func NewFilter(backend Backend, criteria *filters.FilterCriteria) *Filter { filter := &Filter{ backend: backend, fromBlock: criteria.FromBlock, toBlock: criteria.ToBlock, addresses: criteria.Addresses, topics: criteria.Topics, typ: logFilter, stopped: false, } return filter } // NewFilterWithBlockHash returns a new Filter with a blockHash. func NewFilterWithBlockHash(backend Backend, criteria *filters.FilterCriteria) *Filter { return &Filter{ backend: backend, fromBlock: criteria.FromBlock, toBlock: criteria.ToBlock, addresses: criteria.Addresses, topics: criteria.Topics, blockHash: criteria.BlockHash, typ: logFilter, } } // NewBlockFilter creates a new filter that notifies when a block arrives. func NewBlockFilter(backend Backend) *Filter { filter := NewFilter(backend, &filters.FilterCriteria{}) filter.typ = blockFilter go func() { err := filter.pollForBlocks() if err != nil { filter.err = err } }() return filter } func (f *Filter) pollForBlocks() error { prev := hexutil.Uint64(0) for { if f.stopped { return nil } num, err := f.backend.BlockNumber() if err != nil { return err } if num == prev { continue } block, err := f.backend.GetBlockByNumber(BlockNumber(num), false) if err != nil { return err } hashBytes, ok := block["hash"].(hexutil.Bytes) if !ok { return errors.New("could not convert block hash to hexutil.Bytes") } hash := common.BytesToHash(hashBytes) f.hashes = append(f.hashes, hash) prev = num // TODO: should we add a delay? } } func (f *Filter) pollForTransactions() error { for { if f.stopped { return nil } txs, err := f.backend.PendingTransactions() if err != nil { return err } for _, tx := range txs { if !contains(f.hashes, tx.Hash) { f.hashes = append(f.hashes, tx.Hash) } } <-time.After(1 * time.Second) } } func contains(slice []common.Hash, item common.Hash) bool { set := make(map[common.Hash]struct{}, len(slice)) for _, s := range slice { set[s] = struct{}{} } _, ok := set[item] return ok } // NewPendingTransactionFilter creates a new filter that notifies when a pending transaction arrives. func NewPendingTransactionFilter(backend Backend) *Filter { filter := NewFilter(backend, &filters.FilterCriteria{}) filter.typ = pendingTxFilter go func() { err := filter.pollForTransactions() if err != nil { filter.err = err } }() return filter } func (f *Filter) uninstallFilter() { f.stopped = true } func (f *Filter) getFilterChanges() (interface{}, error) { switch f.typ { case blockFilter: if f.err != nil { return nil, f.err } blocks := make([]common.Hash, len(f.hashes)) copy(blocks, f.hashes) f.hashes = []common.Hash{} return blocks, nil case pendingTxFilter: if f.err != nil { return nil, f.err } txs := make([]common.Hash, len(f.hashes)) copy(txs, f.hashes) f.hashes = []common.Hash{} return txs, nil case logFilter: return f.getFilterLogs() } return nil, errors.New("unsupported filter") } func (f *Filter) getFilterLogs() ([]*ethtypes.Log, error) { ret := []*ethtypes.Log{} // filter specific block only if f.blockHash != nil { block, err := f.backend.GetBlockByHash(*f.blockHash, true) if err != nil { return nil, err } // if the logsBloom == 0, there are no logs in that block if txs, ok := block["transactions"].([]common.Hash); !ok { return ret, nil } else if len(txs) != 0 { return f.checkMatches(block) } } // filter range of blocks num, err := f.backend.BlockNumber() if err != nil { return nil, err } // if f.fromBlock is set to 0, set it to the latest block number if f.fromBlock == nil || f.fromBlock.Cmp(big.NewInt(0)) == 0 { f.fromBlock = big.NewInt(int64(num)) } // if f.toBlock is set to 0, set it to the latest block number if f.toBlock == nil || f.toBlock.Cmp(big.NewInt(0)) == 0 { f.toBlock = big.NewInt(int64(num)) } log.Debug("[ethAPI] Retrieving filter logs", "fromBlock", f.fromBlock, "toBlock", f.toBlock, "topics", f.topics, "addresses", f.addresses) from := f.fromBlock.Int64() to := f.toBlock.Int64() for i := from; i <= to; i++ { block, err := f.backend.GetBlockByNumber(NewBlockNumber(big.NewInt(i)), true) if err != nil { f.err = err log.Debug("[ethAPI] Cannot get block", "block", block["number"], "error", err) break } log.Debug("[ethAPI] filtering", "block", block) // TODO: block logsBloom is often set in the wrong block // if the logsBloom == 0, there are no logs in that block if txs, ok := block["transactions"].([]common.Hash); !ok { continue } else if len(txs) != 0 { logs, err := f.checkMatches(block) if err != nil { f.err = err break } ret = append(ret, logs...) } } return ret, nil } func (f *Filter) checkMatches(block map[string]interface{}) ([]*ethtypes.Log, error) { transactions, ok := block["transactions"].([]common.Hash) if !ok { return nil, errors.New("invalid block transactions") } unfiltered := []*ethtypes.Log{} for _, tx := range transactions { logs, err := f.backend.GetTransactionLogs(common.BytesToHash(tx[:])) if err != nil { return nil, err } unfiltered = append(unfiltered, logs...) } return filterLogs(unfiltered, f.fromBlock, f.toBlock, f.addresses, f.topics), nil } // filterLogs creates a slice of logs matching the given criteria. // [] -> anything // [A] -> A in first position of log topics, anything after // [null, B] -> anything in first position, B in second position // [A, B] -> A in first position and B in second position // [[A, B], [A, B]] -> A or B in first position, A or B in second position func filterLogs(logs []*ethtypes.Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []*ethtypes.Log { var ret []*ethtypes.Log Logs: for _, log := range logs { if fromBlock != nil && fromBlock.Int64() >= 0 && fromBlock.Uint64() > log.BlockNumber { continue } if toBlock != nil && toBlock.Int64() >= 0 && toBlock.Uint64() < log.BlockNumber { continue } if len(addresses) > 0 && !includes(addresses, log.Address) { continue } // If the to filtered topics is greater than the amount of topics in logs, skip. if len(topics) > len(log.Topics) { continue Logs } for i, sub := range topics { match := len(sub) == 0 // empty rule set == wildcard for _, topic := range sub { if log.Topics[i] == topic { match = true break } } if !match { continue Logs } } ret = append(ret, log) } return ret } func includes(addresses []common.Address, a common.Address) bool { for _, addr := range addresses { if addr == a { return true } } return false }