From b5be6b72cb06ded22075a41db6abc670d33c5ea7 Mon Sep 17 00:00:00 2001 From: bas-vk Date: Mon, 28 Nov 2016 14:59:06 +0100 Subject: [PATCH] eth/filter: add support for pending logs (#3219) --- eth/filters/api.go | 80 ++++++++------- eth/filters/api_test.go | 8 +- eth/filters/filter.go | 15 ++- eth/filters/filter_system.go | 149 +++++++++++++++++++++------- eth/filters/filter_system_test.go | 155 +++++++++++++++++++++++++----- eth/filters/filter_test.go | 3 + miner/worker.go | 19 +++- 7 files changed, 328 insertions(+), 101 deletions(-) diff --git a/eth/filters/api.go b/eth/filters/api.go index 584f55afd..d5dd57743 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -239,11 +239,17 @@ func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported } - rpcSub := notifier.CreateSubscription() + var ( + rpcSub = notifier.CreateSubscription() + matchedLogs = make(chan []Log) + ) + + logsSub, err := api.events.SubscribeLogs(crit, matchedLogs) + if err != nil { + return nil, err + } go func() { - matchedLogs := make(chan []Log) - logsSub := api.events.SubscribeLogs(crit, matchedLogs) for { select { @@ -276,18 +282,20 @@ type FilterCriteria struct { // used to retrieve logs when the state changes. This method cannot be // used to fetch logs that are already stored in the state. // +// Default criteria for the from and to block are "latest". +// Using "latest" as block number will return logs for mined blocks. +// Using "pending" as block number returns logs for not yet mined (pending) blocks. +// In case logs are removed (chain reorg) previously returned logs are returned +// again but with the removed property set to true. +// +// In case "fromBlock" > "toBlock" an error is returned. +// // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter -func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) rpc.ID { - var ( - logs = make(chan []Log) - logsSub = api.events.SubscribeLogs(crit, logs) - ) - - if crit.FromBlock == nil { - crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) - } - if crit.ToBlock == nil { - crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) +func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) { + logs := make(chan []Log) + logsSub, err := api.events.SubscribeLogs(crit, logs) + if err != nil { + return rpc.ID(""), err } api.filtersMu.Lock() @@ -312,7 +320,7 @@ func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) rpc.ID { } }() - return logsSub.ID + return logsSub.ID, nil } // GetLogs returns logs matching the given argument that are stored within the state. @@ -363,28 +371,38 @@ func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]Log api.filtersMu.Unlock() if !found || f.typ != LogsSubscription { - return []Log{}, nil + return nil, fmt.Errorf("filter not found") } filter := New(api.backend, api.useMipMap) - filter.SetBeginBlock(f.crit.FromBlock.Int64()) - filter.SetEndBlock(f.crit.ToBlock.Int64()) + if f.crit.FromBlock != nil { + filter.SetBeginBlock(f.crit.FromBlock.Int64()) + } else { + filter.SetBeginBlock(rpc.LatestBlockNumber.Int64()) + } + if f.crit.ToBlock != nil { + filter.SetEndBlock(f.crit.ToBlock.Int64()) + } else { + filter.SetEndBlock(rpc.LatestBlockNumber.Int64()) + } filter.SetAddresses(f.crit.Addresses) filter.SetTopics(f.crit.Topics) - logs, err := filter.Find(ctx) - return returnLogs(logs), err + logs, err:= filter.Find(ctx) + if err != nil { + return nil, err + } + return returnLogs(logs), nil } // GetFilterChanges returns the logs for the filter with the given id since // last time is was called. This can be used for polling. // // For pending transaction and block filters the result is []common.Hash. -// (pending)Log filters return []Log. If the filter could not be found -// []interface{}{} is returned. +// (pending)Log filters return []Log. // // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges -func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) interface{} { +func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) { api.filtersMu.Lock() defer api.filtersMu.Unlock() @@ -400,15 +418,15 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) interface{} { case PendingTransactionsSubscription, BlocksSubscription: hashes := f.hashes f.hashes = nil - return returnHashes(hashes) - case PendingLogsSubscription, LogsSubscription: + return returnHashes(hashes), nil + case LogsSubscription: logs := f.logs f.logs = nil - return returnLogs(logs) + return returnLogs(logs), nil } } - return []interface{}{} + return []interface{}{}, fmt.Errorf("filter not found") } // returnHashes is a helper that will return an empty hash array case the given hash array is nil, @@ -443,15 +461,11 @@ func (args *FilterCriteria) UnmarshalJSON(data []byte) error { return err } - if raw.From == nil || raw.From.Int64() < 0 { - args.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) - } else { + if raw.From != nil { args.FromBlock = big.NewInt(raw.From.Int64()) } - if raw.ToBlock == nil || raw.ToBlock.Int64() < 0 { - args.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) - } else { + if raw.ToBlock != nil { args.ToBlock = big.NewInt(raw.ToBlock.Int64()) } diff --git a/eth/filters/api_test.go b/eth/filters/api_test.go index 98eb6cbaa..068a5ea24 100644 --- a/eth/filters/api_test.go +++ b/eth/filters/api_test.go @@ -42,11 +42,11 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) { if err := json.Unmarshal([]byte("{}"), &test0); err != nil { t.Fatal(err) } - if test0.FromBlock.Int64() != rpc.LatestBlockNumber.Int64() { - t.Fatalf("expected %d, got %d", rpc.LatestBlockNumber, test0.FromBlock) + if test0.FromBlock != nil { + t.Fatalf("expected nil, got %d", test0.FromBlock) } - if test0.ToBlock.Int64() != rpc.LatestBlockNumber.Int64() { - t.Fatalf("expected %d, got %d", rpc.LatestBlockNumber, test0.ToBlock) + if test0.ToBlock != nil { + t.Fatalf("expected nil, got %d", test0.ToBlock) } if len(test0.Addresses) != 0 { t.Fatalf("expected 0 addresses, got %d", len(test0.Addresses)) diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 4004af300..ce7383fb3 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -20,6 +20,8 @@ import ( "math" "time" + "math/big" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" @@ -162,7 +164,7 @@ func (f *Filter) getLogs(ctx context.Context, start, end uint64) (logs []Log, er } unfiltered = append(unfiltered, rl...) } - logs = append(logs, filterLogs(unfiltered, f.addresses, f.topics)...) + logs = append(logs, filterLogs(unfiltered, nil, nil, f.addresses, f.topics)...) } } @@ -179,12 +181,18 @@ func includes(addresses []common.Address, a common.Address) bool { return false } -func filterLogs(logs []Log, addresses []common.Address, topics [][]common.Hash) []Log { +func filterLogs(logs []Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []Log { var ret []Log - // Filter the logs for interesting stuff Logs: for _, log := range logs { + if fromBlock != nil && fromBlock.Int64() >= 0 && uint64(fromBlock.Int64()) > log.BlockNumber { + continue + } + if toBlock != nil && toBlock.Int64() >= 0 && uint64(toBlock.Int64()) < log.BlockNumber { + continue + } + if len(addresses) > 0 && !includes(addresses, log.Address) { continue } @@ -211,7 +219,6 @@ Logs: continue Logs } } - ret = append(ret, log) } diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index c2c072a9f..b59718aea 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -43,13 +43,17 @@ const ( UnknownSubscription Type = iota // LogsSubscription queries for new or removed (chain reorg) logs LogsSubscription - // PendingLogsSubscription queries for logs for the pending block + // PendingLogsSubscription queries for logs in pending blocks PendingLogsSubscription + // MinedAndPendingLogsSubscription queries for logs in mined and pending blocks. + MinedAndPendingLogsSubscription // PendingTransactionsSubscription queries tx hashes for pending // transactions entering the pending state PendingTransactionsSubscription // BlocksSubscription queries hashes for blocks that are imported BlocksSubscription + // LastSubscription keeps track of the last index + LastIndexSubscription ) var ( @@ -63,19 +67,26 @@ type Log struct { Removed bool `json:"removed"` } +// MarshalJSON returns *l as the JSON encoding of l. func (l *Log) MarshalJSON() ([]byte, error) { fields := map[string]interface{}{ "address": l.Address, "data": fmt.Sprintf("0x%x", l.Data), - "blockNumber": fmt.Sprintf("%#x", l.BlockNumber), + "blockNumber": nil, "logIndex": fmt.Sprintf("%#x", l.Index), - "blockHash": l.BlockHash, + "blockHash": nil, "transactionHash": l.TxHash, "transactionIndex": fmt.Sprintf("%#x", l.TxIndex), "topics": l.Topics, "removed": l.Removed, } + // mined logs + if l.BlockHash != (common.Hash{}) { + fields["blockNumber"] = fmt.Sprintf("%#x", l.BlockNumber) + fields["blockHash"] = l.BlockHash + } + return json.Marshal(fields) } @@ -169,8 +180,65 @@ func (es *EventSystem) subscribe(sub *subscription) *Subscription { } // SubscribeLogs creates a subscription that will write all logs matching the +// given criteria to the given logs channel. Default value for the from and to +// block is "latest". If the fromBlock > toBlock an error is returned. +func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []Log) (*Subscription, error) { + var from, to rpc.BlockNumber + if crit.FromBlock == nil { + from = rpc.LatestBlockNumber + } else { + from = rpc.BlockNumber(crit.FromBlock.Int64()) + } + if crit.ToBlock == nil { + to = rpc.LatestBlockNumber + } else { + to = rpc.BlockNumber(crit.ToBlock.Int64()) + } + + // only interested in pending logs + if from == rpc.PendingBlockNumber && to == rpc.PendingBlockNumber { + return es.subscribePendingLogs(crit, logs), nil + } + // only interested in new mined logs + if from == rpc.LatestBlockNumber && to == rpc.LatestBlockNumber { + return es.subscribeLogs(crit, logs), nil + } + // only interested in mined logs within a specific block range + if from >= 0 && to >= 0 && to >= from { + return es.subscribeLogs(crit, logs), nil + } + // interested in mined logs from a specific block number, new logs and pending logs + if from >= rpc.LatestBlockNumber && to == rpc.PendingBlockNumber { + return es.subscribeMinedPendingLogs(crit, logs), nil + } + // interested in logs from a specific block number to new mined blocks + if from >= 0 && to == rpc.LatestBlockNumber { + return es.subscribeLogs(crit, logs), nil + } + return nil, fmt.Errorf("invalid from and to block combination: from > to") +} + +// subscribeMinedPendingLogs creates a subscription that returned mined and +// pending logs that match the given criteria. +func (es *EventSystem) subscribeMinedPendingLogs(crit FilterCriteria, logs chan []Log) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: MinedAndPendingLogsSubscription, + logsCrit: crit, + created: time.Now(), + logs: logs, + hashes: make(chan common.Hash), + headers: make(chan *types.Header), + installed: make(chan struct{}), + err: make(chan error), + } + + return es.subscribe(sub) +} + +// subscribeLogs creates a subscription that will write all logs matching the // given criteria to the given logs channel. -func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []Log) *Subscription { +func (es *EventSystem) subscribeLogs(crit FilterCriteria, logs chan []Log) *Subscription { sub := &subscription{ id: rpc.NewID(), typ: LogsSubscription, @@ -186,9 +254,9 @@ func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []Log) *Subs return es.subscribe(sub) } -// SubscribePendingLogs creates a subscription that will write pending logs matching the -// given criteria to the given channel. -func (es *EventSystem) SubscribePendingLogs(crit FilterCriteria, logs chan []Log) *Subscription { +// subscribePendingLogs creates a subscription that writes transaction hashes for +// transactions that enter the transaction pool. +func (es *EventSystem) subscribePendingLogs(crit FilterCriteria, logs chan []Log) *Subscription { sub := &subscription{ id: rpc.NewID(), typ: PendingLogsSubscription, @@ -204,23 +272,6 @@ func (es *EventSystem) SubscribePendingLogs(crit FilterCriteria, logs chan []Log return es.subscribe(sub) } -// SubscribePendingTxEvents creates a sbuscription that writes transaction hashes for -// transactions that enter the transaction pool. -func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscription { - sub := &subscription{ - id: rpc.NewID(), - typ: PendingTransactionsSubscription, - created: time.Now(), - logs: make(chan []Log), - hashes: hashes, - headers: make(chan *types.Header), - installed: make(chan struct{}), - err: make(chan error), - } - - return es.subscribe(sub) -} - // SubscribeNewHeads creates a subscription that writes the header of a block that is // imported in the chain. func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription { @@ -238,6 +289,23 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti return es.subscribe(sub) } +// SubscribePendingTxEvents creates a subscription that writes transaction hashes for +// transactions that enter the transaction pool. +func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: PendingTransactionsSubscription, + created: time.Now(), + logs: make(chan []Log), + hashes: hashes, + headers: make(chan *types.Header), + installed: make(chan struct{}), + err: make(chan error), + } + + return es.subscribe(sub) +} + type filterIndex map[Type]map[rpc.ID]*subscription // broadcast event to filters that match criteria. @@ -251,7 +319,7 @@ func (es *EventSystem) broadcast(filters filterIndex, ev *event.Event) { if len(e) > 0 { for _, f := range filters[LogsSubscription] { if ev.Time.After(f.created) { - if matchedLogs := filterLogs(convertLogs(e, false), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + if matchedLogs := filterLogs(convertLogs(e, false), f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { f.logs <- matchedLogs } } @@ -260,7 +328,7 @@ func (es *EventSystem) broadcast(filters filterIndex, ev *event.Event) { case core.RemovedLogsEvent: for _, f := range filters[LogsSubscription] { if ev.Time.After(f.created) { - if matchedLogs := filterLogs(convertLogs(e.Logs, true), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + if matchedLogs := filterLogs(convertLogs(e.Logs, true), f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { f.logs <- matchedLogs } } @@ -268,7 +336,7 @@ func (es *EventSystem) broadcast(filters filterIndex, ev *event.Event) { case core.PendingLogsEvent: for _, f := range filters[PendingLogsSubscription] { if ev.Time.After(f.created) { - if matchedLogs := filterLogs(convertLogs(e.Logs, false), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { + if matchedLogs := filterLogs(convertLogs(e.Logs, false), nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { f.logs <- matchedLogs } } @@ -351,8 +419,8 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common. } unfiltered = append(unfiltered, rl...) } - logs := filterLogs(unfiltered, addresses, topics) - //fmt.Println("found", len(logs)) + + logs := filterLogs(unfiltered, nil, nil, addresses, topics) return logs } return nil @@ -364,6 +432,11 @@ func (es *EventSystem) eventLoop() { index = make(filterIndex) sub = es.mux.Subscribe(core.PendingLogsEvent{}, core.RemovedLogsEvent{}, vm.Logs{}, core.TxPreEvent{}, core.ChainEvent{}) ) + + for i := UnknownSubscription; i < LastIndexSubscription; i++ { + index[i] = make(map[rpc.ID]*subscription) + } + for { select { case ev, active := <-sub.Chan(): @@ -372,13 +445,22 @@ func (es *EventSystem) eventLoop() { } es.broadcast(index, ev) case f := <-es.install: - if _, found := index[f.typ]; !found { - index[f.typ] = make(map[rpc.ID]*subscription) + if f.typ == MinedAndPendingLogsSubscription { + // the type are logs and pending logs subscriptions + index[LogsSubscription][f.id] = f + index[PendingLogsSubscription][f.id] = f + } else { + index[f.typ][f.id] = f } - index[f.typ][f.id] = f close(f.installed) case f := <-es.uninstall: - delete(index[f.typ], f.id) + if f.typ == MinedAndPendingLogsSubscription { + // the type are logs and pending logs subscriptions + delete(index[LogsSubscription], f.id) + delete(index[PendingLogsSubscription], f.id) + } else { + delete(index[f.typ], f.id) + } close(f.err) } } @@ -386,6 +468,7 @@ func (es *EventSystem) eventLoop() { // convertLogs is a helper utility that converts vm.Logs to []filter.Log. func convertLogs(in vm.Logs, removed bool) []Log { + logs := make([]Log, len(in)) for i, l := range in { logs[i] = Log{l, removed} diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index d6d4199cc..e8591a2e4 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -34,13 +34,6 @@ import ( "github.com/ethereum/go-ethereum/rpc" ) -var ( - mux = new(event.TypeMux) - db, _ = ethdb.NewMemDatabase() - backend = &testBackend{mux, db} - api = NewPublicFilterAPI(backend, false) -) - type testBackend struct { mux *event.TypeMux db ethdb.Database @@ -81,6 +74,11 @@ func TestBlockSubscription(t *testing.T) { t.Parallel() var ( + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + backend = &testBackend{mux, db} + api = NewPublicFilterAPI(backend, false) + genesis = core.WriteGenesisBlockForTesting(db) chain, _ = core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) {}) chainEvents = []core.ChainEvent{} @@ -130,6 +128,11 @@ func TestPendingTxFilter(t *testing.T) { t.Parallel() var ( + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + backend = &testBackend{mux, db} + api = NewPublicFilterAPI(backend, false) + transactions = []*types.Transaction{ types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil), types.NewTransaction(1, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil), @@ -150,9 +153,13 @@ func TestPendingTxFilter(t *testing.T) { } for { - h := api.GetFilterChanges(fid0).([]common.Hash) - hashes = append(hashes, h...) + results, err := api.GetFilterChanges(fid0) + if err != nil { + t.Fatalf("Unable to retrieve logs: %v", err) + } + h := results.([]common.Hash) + hashes = append(hashes, h...) if len(hashes) >= len(transactions) { break } @@ -167,11 +174,86 @@ func TestPendingTxFilter(t *testing.T) { } } +// TestLogFilterCreation test whether a given filter criteria makes sense. +// If not it must return an error. +func TestLogFilterCreation(t *testing.T) { + var ( + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + backend = &testBackend{mux, db} + api = NewPublicFilterAPI(backend, false) + + testCases = []struct { + crit FilterCriteria + success bool + }{ + // defaults + {FilterCriteria{}, true}, + // valid block number range + {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2)}, true}, + // "mined" block range to pending + {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, true}, + // new mined and pending blocks + {FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, true}, + // from block "higher" than to block + {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(1)}, false}, + // from block "higher" than to block + {FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(100)}, false}, + // from block "higher" than to block + {FilterCriteria{FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(100)}, false}, + // from block "higher" than to block + {FilterCriteria{FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, false}, + } + ) + + for i, test := range testCases { + _, err := api.NewFilter(test.crit) + if test.success && err != nil { + t.Errorf("expected filter creation for case %d to success, got %v", i, err) + } + if !test.success && err == nil { + t.Errorf("expected testcase %d to fail with an error", i) + } + } +} + +// TestInvalidLogFilterCreation tests whether invalid filter log criteria results in an error +// when the filter is created. +func TestInvalidLogFilterCreation(t *testing.T) { + t.Parallel() + + var ( + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + backend = &testBackend{mux, db} + api = NewPublicFilterAPI(backend, false) + ) + + // different situations where log filter creation should fail. + // Reason: fromBlock > toBlock + testCases := []FilterCriteria{ + 0: {FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, + 1: {FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(100)}, + 2: {FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(100)}, + } + + for i, test := range testCases { + if _, err := api.NewFilter(test); err == nil { + t.Errorf("Expected NewFilter for case #%d to fail", i) + } + } +} + // TestLogFilter tests whether log filters match the correct logs that are posted to the event mux. func TestLogFilter(t *testing.T) { t.Parallel() var ( + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + backend = &testBackend{mux, db} + api = NewPublicFilterAPI(backend, false) + firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333") @@ -180,8 +262,8 @@ func TestLogFilter(t *testing.T) { secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222") notUsedTopic = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999") + // posted twice, once as vm.Logs and once as core.PendingLogsEvent allLogs = vm.Logs{ - // Note, these are used for comparison of the test cases. vm.NewLog(firstAddr, []common.Hash{}, []byte(""), 0), vm.NewLog(firstAddr, []common.Hash{firstTopic}, []byte(""), 1), vm.NewLog(secondAddr, []common.Hash{firstTopic}, []byte(""), 1), @@ -189,45 +271,64 @@ func TestLogFilter(t *testing.T) { vm.NewLog(thirdAddress, []common.Hash{secondTopic}, []byte(""), 3), } + expectedCase7 = vm.Logs{allLogs[3], allLogs[4], allLogs[0], allLogs[1], allLogs[2], allLogs[3], allLogs[4]} + expectedCase11 = vm.Logs{allLogs[1], allLogs[2], allLogs[1], allLogs[2]} + testCases = []struct { crit FilterCriteria expected vm.Logs id rpc.ID }{ // match all - {FilterCriteria{}, allLogs, ""}, + 0: {FilterCriteria{}, allLogs, ""}, // match none due to no matching addresses - {FilterCriteria{Addresses: []common.Address{common.Address{}, notUsedAddress}, Topics: [][]common.Hash{allLogs[0].Topics}}, vm.Logs{}, ""}, + 1: {FilterCriteria{Addresses: []common.Address{common.Address{}, notUsedAddress}, Topics: [][]common.Hash{allLogs[0].Topics}}, vm.Logs{}, ""}, // match logs based on addresses, ignore topics - {FilterCriteria{Addresses: []common.Address{firstAddr}}, allLogs[:2], ""}, + 2: {FilterCriteria{Addresses: []common.Address{firstAddr}}, allLogs[:2], ""}, // match none due to no matching topics (match with address) - {FilterCriteria{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{[]common.Hash{notUsedTopic}}}, vm.Logs{}, ""}, + 3: {FilterCriteria{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{[]common.Hash{notUsedTopic}}}, vm.Logs{}, ""}, // match logs based on addresses and topics - {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, allLogs[3:5], ""}, + 4: {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, allLogs[3:5], ""}, // match logs based on multiple addresses and "or" topics - {FilterCriteria{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, allLogs[2:5], ""}, - // block numbers are ignored for filters created with New***Filter, these return all logs that match the given criterias when the state changes - {FilterCriteria{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(1), ToBlock: big.NewInt(2)}, allLogs[:2], ""}, + 5: {FilterCriteria{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, allLogs[2:5], ""}, + // logs in the pending block + 6: {FilterCriteria{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, allLogs[:2], ""}, + // mined logs with block num >= 2 or pending logs + 7: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, expectedCase7, ""}, + // all "mined" logs with block num >= 2 + 8: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs[3:], ""}, + // all "mined" logs + 9: {FilterCriteria{ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs, ""}, + // all "mined" logs with 1>= block num <=2 and topic secondTopic + 10: {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2), Topics: [][]common.Hash{[]common.Hash{secondTopic}}}, allLogs[3:4], ""}, + // all "mined" and pending logs with topic firstTopic + 11: {FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), Topics: [][]common.Hash{[]common.Hash{firstTopic}}}, expectedCase11, ""}, } - - err error ) // create all filters for i := range testCases { - testCases[i].id = api.NewFilter(testCases[i].crit) + testCases[i].id, _ = api.NewFilter(testCases[i].crit) } // raise events time.Sleep(1 * time.Second) - if err = mux.Post(allLogs); err != nil { + if err := mux.Post(allLogs); err != nil { + t.Fatal(err) + } + if err := mux.Post(core.PendingLogsEvent{Logs: allLogs}); err != nil { t.Fatal(err) } for i, tt := range testCases { var fetched []Log for { // fetch all expected logs - fetched = append(fetched, api.GetFilterChanges(tt.id).([]Log)...) + results, err := api.GetFilterChanges(tt.id) + if err != nil { + t.Fatalf("Unable to fetch logs: %v", err) + } + + fetched = append(fetched, results.([]Log)...) if len(fetched) >= len(tt.expected) { break } @@ -247,7 +348,6 @@ func TestLogFilter(t *testing.T) { if !reflect.DeepEqual(fetched[l].Log, tt.expected[l]) { t.Errorf("invalid log on index %d for case %d", l, i) } - } } } @@ -257,6 +357,11 @@ func TestPendingLogsSubscription(t *testing.T) { t.Parallel() var ( + mux = new(event.TypeMux) + db, _ = ethdb.NewMemDatabase() + backend = &testBackend{mux, db} + api = NewPublicFilterAPI(backend, false) + firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333") @@ -319,7 +424,7 @@ func TestPendingLogsSubscription(t *testing.T) { // (some) events are posted. for i := range testCases { testCases[i].c = make(chan []Log) - testCases[i].sub = api.events.SubscribePendingLogs(testCases[i].crit, testCases[i].c) + testCases[i].sub, _ = api.events.SubscribeLogs(testCases[i].crit, testCases[i].c) } for n, test := range testCases { diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index a8c767ead..ab6a87851 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -31,6 +31,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/event" ) func makeReceipt(addr common.Address) *types.Receipt { @@ -51,6 +52,7 @@ func BenchmarkMipmaps(b *testing.B) { var ( db, _ = ethdb.NewLDBDatabase(dir, 0, 0) + mux = new(event.TypeMux) backend = &testBackend{mux, db} key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") addr1 = crypto.PubkeyToAddress(key1.PublicKey) @@ -126,6 +128,7 @@ func TestFilters(t *testing.T) { var ( db, _ = ethdb.NewLDBDatabase(dir, 0, 0) + mux = new(event.TypeMux) backend = &testBackend{mux, db} key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") addr = crypto.PubkeyToAddress(key1.PublicKey) diff --git a/miner/worker.go b/miner/worker.go index 2933b6bd3..ca00c7229 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -262,6 +262,7 @@ func newLocalMinedBlock(blockNumber uint64, prevMinedBlocks *uint64RingBuffer) ( func (self *worker) wait() { for { + mustCommitNewWork := true for result := range self.recv { atomic.AddInt32(&self.atWork, -1) @@ -315,6 +316,8 @@ func (self *worker) wait() { core.WriteReceipts(self.chainDb, work.receipts) // Write map map bloom filters core.WriteMipmapBloom(self.chainDb, block.NumberU64(), work.receipts) + // implicit by posting ChainHeadEvent + mustCommitNewWork = false } // broadcast before waiting for validation @@ -343,7 +346,9 @@ func (self *worker) wait() { } glog.V(logger.Info).Infof("🔨 Mined %sblock (#%v / %x). %s", stale, block.Number(), block.Hash().Bytes()[:4], confirm) - self.commitNewWork() + if mustCommitNewWork { + self.commitNewWork() + } } } } @@ -451,6 +456,7 @@ func (self *worker) commitNewWork() { tstart := time.Now() parent := self.chain.CurrentBlock() + tstamp := tstart.Unix() if parent.Time().Cmp(new(big.Int).SetInt64(tstamp)) >= 0 { tstamp = parent.Time().Int64() + 1 @@ -618,7 +624,16 @@ func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsB txs.Shift() } } + if len(coalescedLogs) > 0 || env.tcount > 0 { + // make a copy, the state caches the logs and these logs get "upgraded" from pending to mined + // logs by filling in the block hash when the block was mined by the local miner. This can + // cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed. + cpy := make(vm.Logs, len(coalescedLogs)) + for i, l := range coalescedLogs { + cpy[i] = new(vm.Log) + *cpy[i] = *l + } go func(logs vm.Logs, tcount int) { if len(logs) > 0 { mux.Post(core.PendingLogsEvent{Logs: logs}) @@ -626,7 +641,7 @@ func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsB if tcount > 0 { mux.Post(core.PendingStateEvent{}) } - }(coalescedLogs, env.tcount) + }(cpy, env.tcount) } }