core, core/vm, eth/filters: move Removed field into vm.Log

This field used to be assigned by the filter system and returned through
the RPC API. Now that we have a Go client that uses the underlying type,
the field needs to move. It is now assigned to true when the RemovedLogs
event is generated so the filter system doesn't need to care about the
field at all.

While here, remove the log list from ChainSideEvent. There are no users
of this field right now and any potential users could subscribe to
RemovedLogsEvent instead.
This commit is contained in:
Felix Lange 2016-12-04 19:07:24 +01:00
parent 3bc0fe1ee3
commit f52a1ae849
8 changed files with 252 additions and 165 deletions

View File

@ -988,7 +988,7 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
glog.Infof("inserted forked block #%d [%x…] (TD=%v) in %9v: %3d txs %d uncles.", block.Number(), block.Hash().Bytes()[0:4], block.Difficulty(), common.PrettyDuration(time.Since(bstart)), len(block.Transactions()), len(block.Uncles()))
}
blockInsertTimer.UpdateSince(bstart)
events = append(events, ChainSideEvent{block, logs})
events = append(events, ChainSideEvent{block})
case SplitStatTy:
events = append(events, ChainSplitEvent{block, logs})
@ -1062,24 +1062,25 @@ func countTransactions(chain []*types.Block) (c int) {
// event about them
func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
var (
newChain types.Blocks
oldChain types.Blocks
commonBlock *types.Block
oldStart = oldBlock
newStart = newBlock
deletedTxs types.Transactions
deletedLogs vm.Logs
deletedLogsByHash = make(map[common.Hash]vm.Logs)
newChain types.Blocks
oldChain types.Blocks
commonBlock *types.Block
oldStart = oldBlock
newStart = newBlock
deletedTxs types.Transactions
deletedLogs vm.Logs
// collectLogs collects the logs that were generated during the
// processing of the block that corresponds with the given hash.
// These logs are later announced as deleted.
collectLogs = func(h common.Hash) {
// Coalesce logs
// Coalesce logs and set 'Removed'.
receipts := GetBlockReceipts(self.chainDb, h, self.hc.GetBlockNumber(h))
for _, receipt := range receipts {
deletedLogs = append(deletedLogs, receipt.Logs...)
deletedLogsByHash[h] = receipt.Logs
for _, log := range receipt.Logs {
del := *log
del.Removed = true
deletedLogs = append(deletedLogs, &del)
}
}
}
)
@ -1173,7 +1174,7 @@ func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
if len(oldChain) > 0 {
go func() {
for _, block := range oldChain {
self.eventMux.Post(ChainSideEvent{Block: block, Logs: deletedLogsByHash[block.Hash()]})
self.eventMux.Post(ChainSideEvent{Block: block})
}
}()
}

View File

@ -61,7 +61,6 @@ type ChainEvent struct {
type ChainSideEvent struct {
Block *types.Block
Logs vm.Logs
}
type PendingBlockEvent struct {

View File

@ -29,20 +29,42 @@ import (
var errMissingLogFields = errors.New("missing required JSON log fields")
// Log represents a contract log event. These events are generated by the LOG
// opcode and stored/indexed by the node.
// Log represents a contract log event. These events are generated by the LOG opcode and
// stored/indexed by the node.
type Log struct {
// Consensus fields.
Address common.Address // address of the contract that generated the event
Topics []common.Hash // list of topics provided by the contract.
Data []byte // supplied by the contract, usually ABI-encoded
// Derived fields (don't reorder!).
// Derived fields. These fields are filled in by the node
// but not secured by consensus.
BlockNumber uint64 // block in which the transaction was included
TxHash common.Hash // hash of the transaction
TxIndex uint // index of the transaction in the block
BlockHash common.Hash // hash of the block in which the transaction was included
Index uint // index of the log in the receipt
// The Removed field is true if this log was reverted due to a chain reorganisation.
// You must pay attention to this field if you receive logs through a filter query.
Removed bool
}
type rlpLog struct {
Address common.Address
Topics []common.Hash
Data []byte
}
type rlpStorageLog struct {
Address common.Address
Topics []common.Hash
Data []byte
BlockNumber uint64
TxHash common.Hash
TxIndex uint
BlockHash common.Hash
Index uint
}
type jsonLog struct {
@ -54,27 +76,26 @@ type jsonLog struct {
TxHash *common.Hash `json:"transactionHash"`
BlockHash *common.Hash `json:"blockHash"`
Index *hexutil.Uint `json:"logIndex"`
Removed bool `json:"removed"`
}
func NewLog(address common.Address, topics []common.Hash, data []byte, number uint64) *Log {
return &Log{Address: address, Topics: topics, Data: data, BlockNumber: number}
}
// EncodeRLP implements rlp.Encoder.
func (l *Log) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, []interface{}{l.Address, l.Topics, l.Data})
return rlp.Encode(w, rlpLog{Address: l.Address, Topics: l.Topics, Data: l.Data})
}
// DecodeRLP implements rlp.Decoder.
func (l *Log) DecodeRLP(s *rlp.Stream) error {
var log struct {
Address common.Address
Topics []common.Hash
Data []byte
var dec rlpLog
err := s.Decode(&dec)
if err == nil {
l.Address, l.Topics, l.Data = dec.Address, dec.Topics, dec.Data
}
if err := s.Decode(&log); err != nil {
return err
}
l.Address, l.Topics, l.Data = log.Address, log.Topics, log.Data
return nil
return err
}
func (l *Log) String() string {
@ -82,45 +103,88 @@ func (l *Log) String() string {
}
// MarshalJSON implements json.Marshaler.
func (r *Log) MarshalJSON() ([]byte, error) {
return json.Marshal(&jsonLog{
Address: &r.Address,
Topics: &r.Topics,
Data: (*hexutil.Bytes)(&r.Data),
BlockNumber: (*hexutil.Uint64)(&r.BlockNumber),
TxIndex: (*hexutil.Uint)(&r.TxIndex),
TxHash: &r.TxHash,
BlockHash: &r.BlockHash,
Index: (*hexutil.Uint)(&r.Index),
})
func (l *Log) MarshalJSON() ([]byte, error) {
jslog := &jsonLog{
Address: &l.Address,
Topics: &l.Topics,
Data: (*hexutil.Bytes)(&l.Data),
TxIndex: (*hexutil.Uint)(&l.TxIndex),
TxHash: &l.TxHash,
Index: (*hexutil.Uint)(&l.Index),
Removed: l.Removed,
}
// Set block information for mined logs.
if (l.BlockHash != common.Hash{}) {
jslog.BlockHash = &l.BlockHash
jslog.BlockNumber = (*hexutil.Uint64)(&l.BlockNumber)
}
return json.Marshal(jslog)
}
// UnmarshalJSON implements json.Umarshaler.
func (r *Log) UnmarshalJSON(input []byte) error {
func (l *Log) UnmarshalJSON(input []byte) error {
var dec jsonLog
if err := json.Unmarshal(input, &dec); err != nil {
return err
}
if dec.Address == nil || dec.Topics == nil || dec.Data == nil || dec.BlockNumber == nil ||
dec.TxIndex == nil || dec.TxHash == nil || dec.BlockHash == nil || dec.Index == nil {
if dec.Address == nil || dec.Topics == nil || dec.Data == nil ||
dec.TxIndex == nil || dec.TxHash == nil || dec.Index == nil {
return errMissingLogFields
}
*r = Log{
Address: *dec.Address,
Topics: *dec.Topics,
Data: *dec.Data,
BlockNumber: uint64(*dec.BlockNumber),
TxHash: *dec.TxHash,
TxIndex: uint(*dec.TxIndex),
BlockHash: *dec.BlockHash,
Index: uint(*dec.Index),
declog := Log{
Address: *dec.Address,
Topics: *dec.Topics,
Data: *dec.Data,
TxHash: *dec.TxHash,
TxIndex: uint(*dec.TxIndex),
Index: uint(*dec.Index),
Removed: dec.Removed,
}
// Block information may be missing if the log is received through
// the pending log filter, so it's handled specially here.
if dec.BlockHash != nil && dec.BlockNumber != nil {
declog.BlockHash = *dec.BlockHash
declog.BlockNumber = uint64(*dec.BlockNumber)
}
*l = declog
return nil
}
type Logs []*Log
// LogForStorage is a wrapper around a Log that flattens and parses the entire
// content of a log, as opposed to only the consensus fields originally (by hiding
// the rlp interface methods).
// LogForStorage is a wrapper around a Log that flattens and parses the entire content of
// a log including non-consensus fields.
type LogForStorage Log
// EncodeRLP implements rlp.Encoder.
func (l *LogForStorage) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, rlpStorageLog{
Address: l.Address,
Topics: l.Topics,
Data: l.Data,
BlockNumber: l.BlockNumber,
TxHash: l.TxHash,
TxIndex: l.TxIndex,
BlockHash: l.BlockHash,
Index: l.Index,
})
}
// DecodeRLP implements rlp.Decoder.
func (l *LogForStorage) DecodeRLP(s *rlp.Stream) error {
var dec rlpStorageLog
err := s.Decode(&dec)
if err == nil {
*l = LogForStorage{
Address: dec.Address,
Topics: dec.Topics,
Data: dec.Data,
BlockNumber: dec.BlockNumber,
TxHash: dec.TxHash,
TxIndex: dec.TxIndex,
BlockHash: dec.BlockHash,
Index: dec.Index,
}
}
return err
}

View File

@ -18,18 +18,81 @@ package vm
import (
"encoding/json"
"reflect"
"testing"
"github.com/davecgh/go-spew/spew"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
)
var unmarshalLogTests = map[string]struct {
input string
want *Log
wantError error
}{
"ok": {
input: `{"address":"0xecf8f87f810ecf450940c9f60066b4a7a501d6a7","blockHash":"0x656c34545f90a730a19008c0e7a7cd4fb3895064b48d6d69761bd5abad681056","blockNumber":"0x1ecfa4","data":"0x000000000000000000000000000000000000000000000001a055690d9db80000","logIndex":"0x2","topics":["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef","0x00000000000000000000000080b2c9d7cbbf30a1b0fc8983c647d754c6525615","0x000000000000000000000000f9dff387dcb5cc4cca5b91adb07a95f54e9f1bb6"],"transactionHash":"0x3b198bfd5d2907285af009e9ae84a0ecd63677110d89d7e030251acb87f6487e","transactionIndex":"0x3"}`,
input: `{"address":"0xecf8f87f810ecf450940c9f60066b4a7a501d6a7","blockHash":"0x656c34545f90a730a19008c0e7a7cd4fb3895064b48d6d69761bd5abad681056","blockNumber":"0x1ecfa4","data":"0x000000000000000000000000000000000000000000000001a055690d9db80000","logIndex":"0x2","topics":["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef","0x00000000000000000000000080b2c9d7cbbf30a1b0fc8983c647d754c6525615"],"transactionHash":"0x3b198bfd5d2907285af009e9ae84a0ecd63677110d89d7e030251acb87f6487e","transactionIndex":"0x3"}`,
want: &Log{
Address: common.HexToAddress("0xecf8f87f810ecf450940c9f60066b4a7a501d6a7"),
BlockHash: common.HexToHash("0x656c34545f90a730a19008c0e7a7cd4fb3895064b48d6d69761bd5abad681056"),
BlockNumber: 2019236,
Data: hexutil.MustDecode("0x000000000000000000000000000000000000000000000001a055690d9db80000"),
Index: 2,
TxIndex: 3,
TxHash: common.HexToHash("0x3b198bfd5d2907285af009e9ae84a0ecd63677110d89d7e030251acb87f6487e"),
Topics: []common.Hash{
common.HexToHash("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"),
common.HexToHash("0x00000000000000000000000080b2c9d7cbbf30a1b0fc8983c647d754c6525615"),
},
},
},
"empty data": {
input: `{"address":"0xecf8f87f810ecf450940c9f60066b4a7a501d6a7","blockHash":"0x656c34545f90a730a19008c0e7a7cd4fb3895064b48d6d69761bd5abad681056","blockNumber":"0x1ecfa4","data":"0x","logIndex":"0x2","topics":["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef","0x00000000000000000000000080b2c9d7cbbf30a1b0fc8983c647d754c6525615","0x000000000000000000000000f9dff387dcb5cc4cca5b91adb07a95f54e9f1bb6"],"transactionHash":"0x3b198bfd5d2907285af009e9ae84a0ecd63677110d89d7e030251acb87f6487e","transactionIndex":"0x3"}`,
input: `{"address":"0xecf8f87f810ecf450940c9f60066b4a7a501d6a7","blockHash":"0x656c34545f90a730a19008c0e7a7cd4fb3895064b48d6d69761bd5abad681056","blockNumber":"0x1ecfa4","data":"0x","logIndex":"0x2","topics":["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef","0x00000000000000000000000080b2c9d7cbbf30a1b0fc8983c647d754c6525615"],"transactionHash":"0x3b198bfd5d2907285af009e9ae84a0ecd63677110d89d7e030251acb87f6487e","transactionIndex":"0x3"}`,
want: &Log{
Address: common.HexToAddress("0xecf8f87f810ecf450940c9f60066b4a7a501d6a7"),
BlockHash: common.HexToHash("0x656c34545f90a730a19008c0e7a7cd4fb3895064b48d6d69761bd5abad681056"),
BlockNumber: 2019236,
Data: []byte{},
Index: 2,
TxIndex: 3,
TxHash: common.HexToHash("0x3b198bfd5d2907285af009e9ae84a0ecd63677110d89d7e030251acb87f6487e"),
Topics: []common.Hash{
common.HexToHash("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"),
common.HexToHash("0x00000000000000000000000080b2c9d7cbbf30a1b0fc8983c647d754c6525615"),
},
},
},
"missing block fields (pending logs)": {
input: `{"address":"0xecf8f87f810ecf450940c9f60066b4a7a501d6a7","data":"0x","logIndex":"0x0","topics":["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"],"transactionHash":"0x3b198bfd5d2907285af009e9ae84a0ecd63677110d89d7e030251acb87f6487e","transactionIndex":"0x3"}`,
want: &Log{
Address: common.HexToAddress("0xecf8f87f810ecf450940c9f60066b4a7a501d6a7"),
BlockHash: common.Hash{},
BlockNumber: 0,
Data: []byte{},
Index: 0,
TxIndex: 3,
TxHash: common.HexToHash("0x3b198bfd5d2907285af009e9ae84a0ecd63677110d89d7e030251acb87f6487e"),
Topics: []common.Hash{
common.HexToHash("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"),
},
},
},
"Removed: true": {
input: `{"address":"0xecf8f87f810ecf450940c9f60066b4a7a501d6a7","blockHash":"0x656c34545f90a730a19008c0e7a7cd4fb3895064b48d6d69761bd5abad681056","blockNumber":"0x1ecfa4","data":"0x","logIndex":"0x2","topics":["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"],"transactionHash":"0x3b198bfd5d2907285af009e9ae84a0ecd63677110d89d7e030251acb87f6487e","transactionIndex":"0x3","removed":true}`,
want: &Log{
Address: common.HexToAddress("0xecf8f87f810ecf450940c9f60066b4a7a501d6a7"),
BlockHash: common.HexToHash("0x656c34545f90a730a19008c0e7a7cd4fb3895064b48d6d69761bd5abad681056"),
BlockNumber: 2019236,
Data: []byte{},
Index: 2,
TxIndex: 3,
TxHash: common.HexToHash("0x3b198bfd5d2907285af009e9ae84a0ecd63677110d89d7e030251acb87f6487e"),
Topics: []common.Hash{
common.HexToHash("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"),
},
Removed: true,
},
},
"missing data": {
input: `{"address":"0xecf8f87f810ecf450940c9f60066b4a7a501d6a7","blockHash":"0x656c34545f90a730a19008c0e7a7cd4fb3895064b48d6d69761bd5abad681056","blockNumber":"0x1ecfa4","logIndex":"0x2","topics":["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef","0x00000000000000000000000080b2c9d7cbbf30a1b0fc8983c647d754c6525615","0x000000000000000000000000f9dff387dcb5cc4cca5b91adb07a95f54e9f1bb6"],"transactionHash":"0x3b198bfd5d2907285af009e9ae84a0ecd63677110d89d7e030251acb87f6487e","transactionIndex":"0x3"}`,
@ -38,10 +101,16 @@ var unmarshalLogTests = map[string]struct {
}
func TestUnmarshalLog(t *testing.T) {
dumper := spew.ConfigState{DisableMethods: true, Indent: " "}
for name, test := range unmarshalLogTests {
var log *Log
err := json.Unmarshal([]byte(test.input), &log)
checkError(t, name, err, test.wantError)
if test.wantError == nil && err == nil {
if !reflect.DeepEqual(log, test.want) {
t.Errorf("test %q:\nGOT %sWANT %s", name, dumper.Sdump(log), dumper.Sdump(test.want))
}
}
}
}

View File

@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
@ -45,7 +46,7 @@ type filter struct {
deadline *time.Timer // filter is inactiv when deadline triggers
hashes []common.Hash
crit FilterCriteria
logs []Log
logs []*vm.Log
s *Subscription // associated subscription in event system
}
@ -241,7 +242,7 @@ func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc
var (
rpcSub = notifier.CreateSubscription()
matchedLogs = make(chan []Log)
matchedLogs = make(chan []*vm.Log)
)
logsSub, err := api.events.SubscribeLogs(crit, matchedLogs)
@ -292,14 +293,14 @@ type FilterCriteria struct {
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter
func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
logs := make(chan []Log)
logs := make(chan []*vm.Log)
logsSub, err := api.events.SubscribeLogs(crit, logs)
if err != nil {
return rpc.ID(""), err
}
api.filtersMu.Lock()
api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(deadline), logs: make([]Log, 0), s: logsSub}
api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(deadline), logs: make([]*vm.Log, 0), s: logsSub}
api.filtersMu.Unlock()
go func() {
@ -326,7 +327,7 @@ func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
// GetLogs returns logs matching the given argument that are stored within the state.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs
func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]Log, error) {
func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*vm.Log, error) {
if crit.FromBlock == nil {
crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
}
@ -365,7 +366,7 @@ func (api *PublicFilterAPI) UninstallFilter(id rpc.ID) bool {
// If the filter could not be found an empty array of logs is returned.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterlogs
func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]Log, error) {
func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*vm.Log, error) {
api.filtersMu.Lock()
f, found := api.filters[id]
api.filtersMu.Unlock()
@ -388,7 +389,7 @@ func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]Log
filter.SetAddresses(f.crit.Addresses)
filter.SetTopics(f.crit.Topics)
logs, err:= filter.Find(ctx)
logs, err := filter.Find(ctx)
if err != nil {
return nil, err
}
@ -440,9 +441,9 @@ func returnHashes(hashes []common.Hash) []common.Hash {
// returnLogs is a helper that will return an empty log array in case the given logs array is nil,
// otherwise the given logs array is returned.
func returnLogs(logs []Log) []Log {
func returnLogs(logs []*vm.Log) []*vm.Log {
if logs == nil {
return []Log{}
return []*vm.Log{}
}
return logs
}

View File

@ -25,6 +25,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
@ -38,7 +39,7 @@ type Backend interface {
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
}
// Filter can be used to retrieve and filter logs
// Filter can be used to retrieve and filter logs.
type Filter struct {
backend Backend
useMipMap bool
@ -85,7 +86,7 @@ func (f *Filter) SetTopics(topics [][]common.Hash) {
}
// Run filters logs with the current parameters set
func (f *Filter) Find(ctx context.Context) ([]Log, error) {
func (f *Filter) Find(ctx context.Context) ([]*vm.Log, error) {
head, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
if head == nil {
return nil, nil
@ -110,7 +111,7 @@ func (f *Filter) Find(ctx context.Context) ([]Log, error) {
return f.mipFind(beginBlockNo, endBlockNo, 0), nil
}
func (f *Filter) mipFind(start, end uint64, depth int) (logs []Log) {
func (f *Filter) mipFind(start, end uint64, depth int) (logs []*vm.Log) {
level := core.MIPMapLevels[depth]
// normalise numerator so we can work in level specific batches and
// work with the proper range checks
@ -141,7 +142,7 @@ func (f *Filter) mipFind(start, end uint64, depth int) (logs []Log) {
return logs
}
func (f *Filter) getLogs(ctx context.Context, start, end uint64) (logs []Log, err error) {
func (f *Filter) getLogs(ctx context.Context, start, end uint64) (logs []*vm.Log, err error) {
for i := start; i <= end; i++ {
header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(i))
if header == nil || err != nil {
@ -156,13 +157,9 @@ func (f *Filter) getLogs(ctx context.Context, start, end uint64) (logs []Log, er
if err != nil {
return nil, err
}
var unfiltered []Log
var unfiltered []*vm.Log
for _, receipt := range receipts {
rl := make([]Log, len(receipt.Logs))
for i, l := range receipt.Logs {
rl[i] = Log{l, false}
}
unfiltered = append(unfiltered, rl...)
unfiltered = append(unfiltered, ([]*vm.Log)(receipt.Logs)...)
}
logs = append(logs, filterLogs(unfiltered, nil, nil, f.addresses, f.topics)...)
}
@ -181,15 +178,15 @@ func includes(addresses []common.Address, a common.Address) bool {
return false
}
func filterLogs(logs []Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []Log {
var ret []Log
// Filter the logs for interesting stuff
// filterLogs creates a slice of logs matching the given criteria.
func filterLogs(logs []*vm.Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []*vm.Log {
var ret []*vm.Log
Logs:
for _, log := range logs {
if fromBlock != nil && fromBlock.Int64() >= 0 && uint64(fromBlock.Int64()) > log.BlockNumber {
if fromBlock != nil && fromBlock.Int64() >= 0 && fromBlock.Uint64() > log.BlockNumber {
continue
}
if toBlock != nil && toBlock.Int64() >= 0 && uint64(toBlock.Int64()) < log.BlockNumber {
if toBlock != nil && toBlock.Int64() >= 0 && toBlock.Uint64() < log.BlockNumber {
continue
}

View File

@ -19,7 +19,6 @@
package filters
import (
"encoding/json"
"errors"
"fmt"
"sync"
@ -60,42 +59,12 @@ var (
ErrInvalidSubscriptionID = errors.New("invalid id")
)
// Log is a helper that can hold additional information about vm.Log
// necessary for the RPC interface.
type Log struct {
*vm.Log
Removed bool `json:"removed"`
}
// MarshalJSON returns *l as the JSON encoding of l.
func (l *Log) MarshalJSON() ([]byte, error) {
fields := map[string]interface{}{
"address": l.Address,
"data": fmt.Sprintf("0x%x", l.Data),
"blockNumber": nil,
"logIndex": fmt.Sprintf("%#x", l.Index),
"blockHash": nil,
"transactionHash": l.TxHash,
"transactionIndex": fmt.Sprintf("%#x", l.TxIndex),
"topics": l.Topics,
"removed": l.Removed,
}
// mined logs
if l.BlockHash != (common.Hash{}) {
fields["blockNumber"] = fmt.Sprintf("%#x", l.BlockNumber)
fields["blockHash"] = l.BlockHash
}
return json.Marshal(fields)
}
type subscription struct {
id rpc.ID
typ Type
created time.Time
logsCrit FilterCriteria
logs chan []Log
logs chan []*vm.Log
hashes chan common.Hash
headers chan *types.Header
installed chan struct{} // closed when the filter is installed
@ -182,7 +151,7 @@ func (es *EventSystem) subscribe(sub *subscription) *Subscription {
// SubscribeLogs creates a subscription that will write all logs matching the
// given criteria to the given logs channel. Default value for the from and to
// block is "latest". If the fromBlock > toBlock an error is returned.
func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []Log) (*Subscription, error) {
func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []*vm.Log) (*Subscription, error) {
var from, to rpc.BlockNumber
if crit.FromBlock == nil {
from = rpc.LatestBlockNumber
@ -220,7 +189,7 @@ func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []Log) (*Sub
// subscribeMinedPendingLogs creates a subscription that returned mined and
// pending logs that match the given criteria.
func (es *EventSystem) subscribeMinedPendingLogs(crit FilterCriteria, logs chan []Log) *Subscription {
func (es *EventSystem) subscribeMinedPendingLogs(crit FilterCriteria, logs chan []*vm.Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: MinedAndPendingLogsSubscription,
@ -238,7 +207,7 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit FilterCriteria, logs chan
// subscribeLogs creates a subscription that will write all logs matching the
// given criteria to the given logs channel.
func (es *EventSystem) subscribeLogs(crit FilterCriteria, logs chan []Log) *Subscription {
func (es *EventSystem) subscribeLogs(crit FilterCriteria, logs chan []*vm.Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: LogsSubscription,
@ -256,7 +225,7 @@ func (es *EventSystem) subscribeLogs(crit FilterCriteria, logs chan []Log) *Subs
// subscribePendingLogs creates a subscription that writes transaction hashes for
// transactions that enter the transaction pool.
func (es *EventSystem) subscribePendingLogs(crit FilterCriteria, logs chan []Log) *Subscription {
func (es *EventSystem) subscribePendingLogs(crit FilterCriteria, logs chan []*vm.Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: PendingLogsSubscription,
@ -279,7 +248,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
id: rpc.NewID(),
typ: BlocksSubscription,
created: time.Now(),
logs: make(chan []Log),
logs: make(chan []*vm.Log),
hashes: make(chan common.Hash),
headers: headers,
installed: make(chan struct{}),
@ -296,7 +265,7 @@ func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscr
id: rpc.NewID(),
typ: PendingTransactionsSubscription,
created: time.Now(),
logs: make(chan []Log),
logs: make(chan []*vm.Log),
hashes: hashes,
headers: make(chan *types.Header),
installed: make(chan struct{}),
@ -319,7 +288,7 @@ func (es *EventSystem) broadcast(filters filterIndex, ev *event.Event) {
if len(e) > 0 {
for _, f := range filters[LogsSubscription] {
if ev.Time.After(f.created) {
if matchedLogs := filterLogs(convertLogs(e, false), f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
@ -328,7 +297,7 @@ func (es *EventSystem) broadcast(filters filterIndex, ev *event.Event) {
case core.RemovedLogsEvent:
for _, f := range filters[LogsSubscription] {
if ev.Time.After(f.created) {
if matchedLogs := filterLogs(convertLogs(e.Logs, true), f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
@ -336,7 +305,7 @@ func (es *EventSystem) broadcast(filters filterIndex, ev *event.Event) {
case core.PendingLogsEvent:
for _, f := range filters[PendingLogsSubscription] {
if ev.Time.After(f.created) {
if matchedLogs := filterLogs(convertLogs(e.Logs, false), nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
if matchedLogs := filterLogs(e.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
@ -401,25 +370,22 @@ func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func
}
// filter logs of a single header in light client mode
func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []Log {
//fmt.Println("lightFilterLogs", header.Number.Uint64(), remove)
func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []*vm.Log {
if bloomFilter(header.Bloom, addresses, topics) {
//fmt.Println("bloom match")
// Get the logs of the block
ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
receipts, err := es.backend.GetReceipts(ctx, header.Hash())
if err != nil {
return nil
}
var unfiltered []Log
var unfiltered []*vm.Log
for _, receipt := range receipts {
rl := make([]Log, len(receipt.Logs))
for i, l := range receipt.Logs {
rl[i] = Log{l, remove}
for _, log := range receipt.Logs {
logcopy := *log
logcopy.Removed = remove
unfiltered = append(unfiltered, &logcopy)
}
unfiltered = append(unfiltered, rl...)
}
logs := filterLogs(unfiltered, nil, nil, addresses, topics)
return logs
}
@ -465,13 +431,3 @@ func (es *EventSystem) eventLoop() {
}
}
}
// convertLogs is a helper utility that converts vm.Logs to []filter.Log.
func convertLogs(in vm.Logs, removed bool) []Log {
logs := make([]Log, len(in))
for i, l := range in {
logs[i] = Log{l, removed}
}
return logs
}

View File

@ -74,10 +74,10 @@ func TestBlockSubscription(t *testing.T) {
t.Parallel()
var (
mux = new(event.TypeMux)
db, _ = ethdb.NewMemDatabase()
mux = new(event.TypeMux)
db, _ = ethdb.NewMemDatabase()
backend = &testBackend{mux, db}
api = NewPublicFilterAPI(backend, false)
api = NewPublicFilterAPI(backend, false)
genesis = core.WriteGenesisBlockForTesting(db)
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) {})
@ -128,10 +128,10 @@ func TestPendingTxFilter(t *testing.T) {
t.Parallel()
var (
mux = new(event.TypeMux)
db, _ = ethdb.NewMemDatabase()
mux = new(event.TypeMux)
db, _ = ethdb.NewMemDatabase()
backend = &testBackend{mux, db}
api = NewPublicFilterAPI(backend, false)
api = NewPublicFilterAPI(backend, false)
transactions = []*types.Transaction{
types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil),
@ -178,10 +178,10 @@ func TestPendingTxFilter(t *testing.T) {
// If not it must return an error.
func TestLogFilterCreation(t *testing.T) {
var (
mux = new(event.TypeMux)
db, _ = ethdb.NewMemDatabase()
mux = new(event.TypeMux)
db, _ = ethdb.NewMemDatabase()
backend = &testBackend{mux, db}
api = NewPublicFilterAPI(backend, false)
api = NewPublicFilterAPI(backend, false)
testCases = []struct {
crit FilterCriteria
@ -223,10 +223,10 @@ func TestInvalidLogFilterCreation(t *testing.T) {
t.Parallel()
var (
mux = new(event.TypeMux)
db, _ = ethdb.NewMemDatabase()
mux = new(event.TypeMux)
db, _ = ethdb.NewMemDatabase()
backend = &testBackend{mux, db}
api = NewPublicFilterAPI(backend, false)
api = NewPublicFilterAPI(backend, false)
)
// different situations where log filter creation should fail.
@ -249,10 +249,10 @@ func TestLogFilter(t *testing.T) {
t.Parallel()
var (
mux = new(event.TypeMux)
db, _ = ethdb.NewMemDatabase()
mux = new(event.TypeMux)
db, _ = ethdb.NewMemDatabase()
backend = &testBackend{mux, db}
api = NewPublicFilterAPI(backend, false)
api = NewPublicFilterAPI(backend, false)
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
@ -321,14 +321,14 @@ func TestLogFilter(t *testing.T) {
}
for i, tt := range testCases {
var fetched []Log
var fetched []*vm.Log
for { // fetch all expected logs
results, err := api.GetFilterChanges(tt.id)
if err != nil {
t.Fatalf("Unable to fetch logs: %v", err)
}
fetched = append(fetched, results.([]Log)...)
fetched = append(fetched, results.([]*vm.Log)...)
if len(fetched) >= len(tt.expected) {
break
}
@ -345,7 +345,7 @@ func TestLogFilter(t *testing.T) {
if fetched[l].Removed {
t.Errorf("expected log not to be removed for log %d in case %d", l, i)
}
if !reflect.DeepEqual(fetched[l].Log, tt.expected[l]) {
if !reflect.DeepEqual(fetched[l], tt.expected[l]) {
t.Errorf("invalid log on index %d for case %d", l, i)
}
}
@ -357,10 +357,10 @@ func TestPendingLogsSubscription(t *testing.T) {
t.Parallel()
var (
mux = new(event.TypeMux)
db, _ = ethdb.NewMemDatabase()
mux = new(event.TypeMux)
db, _ = ethdb.NewMemDatabase()
backend = &testBackend{mux, db}
api = NewPublicFilterAPI(backend, false)
api = NewPublicFilterAPI(backend, false)
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
@ -397,7 +397,7 @@ func TestPendingLogsSubscription(t *testing.T) {
testCases = []struct {
crit FilterCriteria
expected vm.Logs
c chan []Log
c chan []*vm.Log
sub *Subscription
}{
// match all
@ -423,7 +423,7 @@ func TestPendingLogsSubscription(t *testing.T) {
// on slow machines this could otherwise lead to missing events when the subscription is created after
// (some) events are posted.
for i := range testCases {
testCases[i].c = make(chan []Log)
testCases[i].c = make(chan []*vm.Log)
testCases[i].sub, _ = api.events.SubscribeLogs(testCases[i].crit, testCases[i].c)
}
@ -431,7 +431,7 @@ func TestPendingLogsSubscription(t *testing.T) {
i := n
tt := test
go func() {
var fetched []Log
var fetched []*vm.Log
fetchLoop:
for {
logs := <-tt.c
@ -449,7 +449,7 @@ func TestPendingLogsSubscription(t *testing.T) {
if fetched[l].Removed {
t.Errorf("expected log not to be removed for log %d in case %d", l, i)
}
if !reflect.DeepEqual(fetched[l].Log, tt.expected[l]) {
if !reflect.DeepEqual(fetched[l], tt.expected[l]) {
t.Errorf("invalid log on index %d for case %d", l, i)
}
}