// Copyright 2015 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify // it under the terms of the GNU Lesser General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // The go-ethereum library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . package filters import ( "encoding/hex" "encoding/json" "errors" "fmt" "math/big" "sync" "time" "golang.org/x/net/context" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rpc" ) var ( deadline = 5 * time.Minute // consider a filter inactive if it has not been polled for within deadline ) // filter is a helper struct that holds meta information over the filter type // and associated subscription in the event system. type filter struct { typ Type deadline *time.Timer // filter is inactiv when deadline triggers hashes []common.Hash crit FilterCriteria logs []Log s *Subscription // associated subscription in event system } // PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various // information related to the Ethereum protocol such als blocks, transactions and logs. type PublicFilterAPI struct { backend Backend useMipMap bool mux *event.TypeMux quit chan struct{} chainDb ethdb.Database events *EventSystem filtersMu sync.Mutex filters map[rpc.ID]*filter } // NewPublicFilterAPI returns a new PublicFilterAPI instance. func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI { api := &PublicFilterAPI{ backend: backend, useMipMap: !lightMode, mux: backend.EventMux(), chainDb: backend.ChainDb(), events: NewEventSystem(backend.EventMux(), backend, lightMode), filters: make(map[rpc.ID]*filter), } go api.timeoutLoop() return api } // timeoutLoop runs every 5 minutes and deletes filters that have not been recently used. // Tt is started when the api is created. func (api *PublicFilterAPI) timeoutLoop() { ticker := time.NewTicker(5 * time.Minute) for { <-ticker.C api.filtersMu.Lock() for id, f := range api.filters { select { case <-f.deadline.C: f.s.Unsubscribe() delete(api.filters, id) default: continue } } api.filtersMu.Unlock() } } // NewPendingTransactionFilter creates a filter that fetches pending transaction hashes // as transactions enter the pending state. // // It is part of the filter package because this filter can be used throug the // `eth_getFilterChanges` polling method that is also used for log filters. // // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID { var ( pendingTxs = make(chan common.Hash) pendingTxSub = api.events.SubscribePendingTxEvents(pendingTxs) ) api.filtersMu.Lock() api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: pendingTxSub} api.filtersMu.Unlock() go func() { for { select { case ph := <-pendingTxs: api.filtersMu.Lock() if f, found := api.filters[pendingTxSub.ID]; found { f.hashes = append(f.hashes, ph) } api.filtersMu.Unlock() case <-pendingTxSub.Err(): api.filtersMu.Lock() delete(api.filters, pendingTxSub.ID) api.filtersMu.Unlock() return } } }() return pendingTxSub.ID } // NewPendingTransactions creates a subscription that is triggered each time a transaction // enters the transaction pool and was signed from one of the transactions this nodes manages. func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) { notifier, supported := rpc.NotifierFromContext(ctx) if !supported { return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported } rpcSub := notifier.CreateSubscription() go func() { txHashes := make(chan common.Hash) pendingTxSub := api.events.SubscribePendingTxEvents(txHashes) for { select { case h := <-txHashes: notifier.Notify(rpcSub.ID, h) case <-rpcSub.Err(): pendingTxSub.Unsubscribe() return case <-notifier.Closed(): pendingTxSub.Unsubscribe() return } } }() return rpcSub, nil } // NewBlockFilter creates a filter that fetches blocks that are imported into the chain. // It is part of the filter package since polling goes with eth_getFilterChanges. // // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newblockfilter func (api *PublicFilterAPI) NewBlockFilter() rpc.ID { var ( headers = make(chan *types.Header) headerSub = api.events.SubscribeNewHeads(headers) ) api.filtersMu.Lock() api.filters[headerSub.ID] = &filter{typ: BlocksSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: headerSub} api.filtersMu.Unlock() go func() { for { select { case h := <-headers: api.filtersMu.Lock() if f, found := api.filters[headerSub.ID]; found { f.hashes = append(f.hashes, h.Hash()) } api.filtersMu.Unlock() case <-headerSub.Err(): api.filtersMu.Lock() delete(api.filters, headerSub.ID) api.filtersMu.Unlock() return } } }() return headerSub.ID } // NewHeads send a notification each time a new (header) block is appended to the chain. func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) { notifier, supported := rpc.NotifierFromContext(ctx) if !supported { return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported } rpcSub := notifier.CreateSubscription() go func() { headers := make(chan *types.Header) headersSub := api.events.SubscribeNewHeads(headers) for { select { case h := <-headers: notifier.Notify(rpcSub.ID, h) case <-rpcSub.Err(): headersSub.Unsubscribe() return case <-notifier.Closed(): headersSub.Unsubscribe() return } } }() return rpcSub, nil } // Logs creates a subscription that fires for all new log that match the given filter criteria. func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) { notifier, supported := rpc.NotifierFromContext(ctx) if !supported { return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported } rpcSub := notifier.CreateSubscription() go func() { matchedLogs := make(chan []Log) logsSub := api.events.SubscribeLogs(crit, matchedLogs) for { select { case logs := <-matchedLogs: for _, log := range logs { notifier.Notify(rpcSub.ID, &log) } case <-rpcSub.Err(): // client send an unsubscribe request logsSub.Unsubscribe() return case <-notifier.Closed(): // connection dropped logsSub.Unsubscribe() return } } }() return rpcSub, nil } // FilterCriteria represents a request to create a new filter. type FilterCriteria struct { FromBlock *big.Int ToBlock *big.Int Addresses []common.Address Topics [][]common.Hash } // NewFilter creates a new filter and returns the filter id. It can be // used to retrieve logs when the state changes. This method cannot be // used to fetch logs that are already stored in the state. // // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) rpc.ID { var ( logs = make(chan []Log) logsSub = api.events.SubscribeLogs(crit, logs) ) if crit.FromBlock == nil { crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) } if crit.ToBlock == nil { crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) } api.filtersMu.Lock() api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(deadline), logs: make([]Log, 0), s: logsSub} api.filtersMu.Unlock() go func() { for { select { case l := <-logs: api.filtersMu.Lock() if f, found := api.filters[logsSub.ID]; found { f.logs = append(f.logs, l...) } api.filtersMu.Unlock() case <-logsSub.Err(): api.filtersMu.Lock() delete(api.filters, logsSub.ID) api.filtersMu.Unlock() return } } }() return logsSub.ID } // GetLogs returns logs matching the given argument that are stored within the state. // // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]Log, error) { if crit.FromBlock == nil { crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) } if crit.ToBlock == nil { crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) } filter := New(api.backend, api.useMipMap) filter.SetBeginBlock(crit.FromBlock.Int64()) filter.SetEndBlock(crit.ToBlock.Int64()) filter.SetAddresses(crit.Addresses) filter.SetTopics(crit.Topics) logs, err := filter.Find(ctx) return returnLogs(logs), err } // UninstallFilter removes the filter with the given filter id. // // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_uninstallfilter func (api *PublicFilterAPI) UninstallFilter(id rpc.ID) bool { api.filtersMu.Lock() f, found := api.filters[id] if found { delete(api.filters, id) } api.filtersMu.Unlock() if found { f.s.Unsubscribe() } return found } // GetFilterLogs returns the logs for the filter with the given id. // If the filter could not be found an empty array of logs is returned. // // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterlogs func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]Log, error) { api.filtersMu.Lock() f, found := api.filters[id] api.filtersMu.Unlock() if !found || f.typ != LogsSubscription { return []Log{}, nil } filter := New(api.backend, api.useMipMap) filter.SetBeginBlock(f.crit.FromBlock.Int64()) filter.SetEndBlock(f.crit.ToBlock.Int64()) filter.SetAddresses(f.crit.Addresses) filter.SetTopics(f.crit.Topics) logs, err := filter.Find(ctx) return returnLogs(logs), err } // GetFilterChanges returns the logs for the filter with the given id since // last time is was called. This can be used for polling. // // For pending transaction and block filters the result is []common.Hash. // (pending)Log filters return []Log. If the filter could not be found // []interface{}{} is returned. // // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) interface{} { api.filtersMu.Lock() defer api.filtersMu.Unlock() if f, found := api.filters[id]; found { if !f.deadline.Stop() { // timer expired but filter is not yet removed in timeout loop // receive timer value and reset timer <-f.deadline.C } f.deadline.Reset(deadline) switch f.typ { case PendingTransactionsSubscription, BlocksSubscription: hashes := f.hashes f.hashes = nil return returnHashes(hashes) case PendingLogsSubscription, LogsSubscription: logs := f.logs f.logs = nil return returnLogs(logs) } } return []interface{}{} } // returnHashes is a helper that will return an empty hash array case the given hash array is nil, // otherwise the given hashes array is returned. func returnHashes(hashes []common.Hash) []common.Hash { if hashes == nil { return []common.Hash{} } return hashes } // returnLogs is a helper that will return an empty log array in case the given logs array is nil, // otherwise the given logs array is returned. func returnLogs(logs []Log) []Log { if logs == nil { return []Log{} } return logs } // UnmarshalJSON sets *args fields with given data. func (args *FilterCriteria) UnmarshalJSON(data []byte) error { type input struct { From *rpc.BlockNumber `json:"fromBlock"` ToBlock *rpc.BlockNumber `json:"toBlock"` Addresses interface{} `json:"address"` Topics []interface{} `json:"topics"` } var raw input if err := json.Unmarshal(data, &raw); err != nil { return err } if raw.From == nil || raw.From.Int64() < 0 { args.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) } else { args.FromBlock = big.NewInt(raw.From.Int64()) } if raw.ToBlock == nil || raw.ToBlock.Int64() < 0 { args.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) } else { args.ToBlock = big.NewInt(raw.ToBlock.Int64()) } args.Addresses = []common.Address{} if raw.Addresses != nil { // raw.Address can contain a single address or an array of addresses var addresses []common.Address if strAddrs, ok := raw.Addresses.([]interface{}); ok { for i, addr := range strAddrs { if strAddr, ok := addr.(string); ok { if len(strAddr) >= 2 && strAddr[0] == '0' && (strAddr[1] == 'x' || strAddr[1] == 'X') { strAddr = strAddr[2:] } if decAddr, err := hex.DecodeString(strAddr); err == nil { addresses = append(addresses, common.BytesToAddress(decAddr)) } else { return fmt.Errorf("invalid address given") } } else { return fmt.Errorf("invalid address on index %d", i) } } } else if singleAddr, ok := raw.Addresses.(string); ok { if len(singleAddr) >= 2 && singleAddr[0] == '0' && (singleAddr[1] == 'x' || singleAddr[1] == 'X') { singleAddr = singleAddr[2:] } if decAddr, err := hex.DecodeString(singleAddr); err == nil { addresses = append(addresses, common.BytesToAddress(decAddr)) } else { return fmt.Errorf("invalid address given") } } else { return errors.New("invalid address(es) given") } args.Addresses = addresses } // helper function which parses a string to a topic hash topicConverter := func(raw string) (common.Hash, error) { if len(raw) == 0 { return common.Hash{}, nil } if len(raw) >= 2 && raw[0] == '0' && (raw[1] == 'x' || raw[1] == 'X') { raw = raw[2:] } if len(raw) != 2*common.HashLength { return common.Hash{}, errors.New("invalid topic(s)") } if decAddr, err := hex.DecodeString(raw); err == nil { return common.BytesToHash(decAddr), nil } return common.Hash{}, errors.New("invalid topic(s)") } // topics is an array consisting of strings and/or arrays of strings. // JSON null values are converted to common.Hash{} and ignored by the filter manager. if len(raw.Topics) > 0 { args.Topics = make([][]common.Hash, len(raw.Topics)) for i, t := range raw.Topics { if t == nil { // ignore topic when matching logs args.Topics[i] = []common.Hash{common.Hash{}} } else if topic, ok := t.(string); ok { // match specific topic top, err := topicConverter(topic) if err != nil { return err } args.Topics[i] = []common.Hash{top} } else if topics, ok := t.([]interface{}); ok { // or case e.g. [null, "topic0", "topic1"] for _, rawTopic := range topics { if rawTopic == nil { args.Topics[i] = append(args.Topics[i], common.Hash{}) } else if topic, ok := rawTopic.(string); ok { parsed, err := topicConverter(topic) if err != nil { return err } args.Topics[i] = append(args.Topics[i], parsed) } else { return fmt.Errorf("invalid topic(s)") } } } else { return fmt.Errorf("invalid topic(s)") } } } return nil }