eth/filter: add support for pending logs (#3219)

This commit is contained in:
bas-vk 2016-11-28 14:59:06 +01:00 committed by Jeffrey Wilcke
parent 318ad3c1e4
commit b5be6b72cb
7 changed files with 328 additions and 101 deletions

View File

@ -239,11 +239,17 @@ func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
} }
rpcSub := notifier.CreateSubscription() var (
rpcSub = notifier.CreateSubscription()
matchedLogs = make(chan []Log)
)
logsSub, err := api.events.SubscribeLogs(crit, matchedLogs)
if err != nil {
return nil, err
}
go func() { go func() {
matchedLogs := make(chan []Log)
logsSub := api.events.SubscribeLogs(crit, matchedLogs)
for { for {
select { select {
@ -276,18 +282,20 @@ type FilterCriteria struct {
// used to retrieve logs when the state changes. This method cannot be // used to retrieve logs when the state changes. This method cannot be
// used to fetch logs that are already stored in the state. // used to fetch logs that are already stored in the state.
// //
// Default criteria for the from and to block are "latest".
// Using "latest" as block number will return logs for mined blocks.
// Using "pending" as block number returns logs for not yet mined (pending) blocks.
// In case logs are removed (chain reorg) previously returned logs are returned
// again but with the removed property set to true.
//
// In case "fromBlock" > "toBlock" an error is returned.
//
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter
func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) rpc.ID { func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
var ( logs := make(chan []Log)
logs = make(chan []Log) logsSub, err := api.events.SubscribeLogs(crit, logs)
logsSub = api.events.SubscribeLogs(crit, logs) if err != nil {
) return rpc.ID(""), err
if crit.FromBlock == nil {
crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
}
if crit.ToBlock == nil {
crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
} }
api.filtersMu.Lock() api.filtersMu.Lock()
@ -312,7 +320,7 @@ func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) rpc.ID {
} }
}() }()
return logsSub.ID return logsSub.ID, nil
} }
// GetLogs returns logs matching the given argument that are stored within the state. // GetLogs returns logs matching the given argument that are stored within the state.
@ -363,28 +371,38 @@ func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]Log
api.filtersMu.Unlock() api.filtersMu.Unlock()
if !found || f.typ != LogsSubscription { if !found || f.typ != LogsSubscription {
return []Log{}, nil return nil, fmt.Errorf("filter not found")
} }
filter := New(api.backend, api.useMipMap) filter := New(api.backend, api.useMipMap)
filter.SetBeginBlock(f.crit.FromBlock.Int64()) if f.crit.FromBlock != nil {
filter.SetEndBlock(f.crit.ToBlock.Int64()) filter.SetBeginBlock(f.crit.FromBlock.Int64())
} else {
filter.SetBeginBlock(rpc.LatestBlockNumber.Int64())
}
if f.crit.ToBlock != nil {
filter.SetEndBlock(f.crit.ToBlock.Int64())
} else {
filter.SetEndBlock(rpc.LatestBlockNumber.Int64())
}
filter.SetAddresses(f.crit.Addresses) filter.SetAddresses(f.crit.Addresses)
filter.SetTopics(f.crit.Topics) filter.SetTopics(f.crit.Topics)
logs, err := filter.Find(ctx) logs, err:= filter.Find(ctx)
return returnLogs(logs), err if err != nil {
return nil, err
}
return returnLogs(logs), nil
} }
// GetFilterChanges returns the logs for the filter with the given id since // GetFilterChanges returns the logs for the filter with the given id since
// last time is was called. This can be used for polling. // last time is was called. This can be used for polling.
// //
// For pending transaction and block filters the result is []common.Hash. // For pending transaction and block filters the result is []common.Hash.
// (pending)Log filters return []Log. If the filter could not be found // (pending)Log filters return []Log.
// []interface{}{} is returned.
// //
// https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges
func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) interface{} { func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
api.filtersMu.Lock() api.filtersMu.Lock()
defer api.filtersMu.Unlock() defer api.filtersMu.Unlock()
@ -400,15 +418,15 @@ func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) interface{} {
case PendingTransactionsSubscription, BlocksSubscription: case PendingTransactionsSubscription, BlocksSubscription:
hashes := f.hashes hashes := f.hashes
f.hashes = nil f.hashes = nil
return returnHashes(hashes) return returnHashes(hashes), nil
case PendingLogsSubscription, LogsSubscription: case LogsSubscription:
logs := f.logs logs := f.logs
f.logs = nil f.logs = nil
return returnLogs(logs) return returnLogs(logs), nil
} }
} }
return []interface{}{} return []interface{}{}, fmt.Errorf("filter not found")
} }
// returnHashes is a helper that will return an empty hash array case the given hash array is nil, // returnHashes is a helper that will return an empty hash array case the given hash array is nil,
@ -443,15 +461,11 @@ func (args *FilterCriteria) UnmarshalJSON(data []byte) error {
return err return err
} }
if raw.From == nil || raw.From.Int64() < 0 { if raw.From != nil {
args.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
} else {
args.FromBlock = big.NewInt(raw.From.Int64()) args.FromBlock = big.NewInt(raw.From.Int64())
} }
if raw.ToBlock == nil || raw.ToBlock.Int64() < 0 { if raw.ToBlock != nil {
args.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
} else {
args.ToBlock = big.NewInt(raw.ToBlock.Int64()) args.ToBlock = big.NewInt(raw.ToBlock.Int64())
} }

View File

@ -42,11 +42,11 @@ func TestUnmarshalJSONNewFilterArgs(t *testing.T) {
if err := json.Unmarshal([]byte("{}"), &test0); err != nil { if err := json.Unmarshal([]byte("{}"), &test0); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if test0.FromBlock.Int64() != rpc.LatestBlockNumber.Int64() { if test0.FromBlock != nil {
t.Fatalf("expected %d, got %d", rpc.LatestBlockNumber, test0.FromBlock) t.Fatalf("expected nil, got %d", test0.FromBlock)
} }
if test0.ToBlock.Int64() != rpc.LatestBlockNumber.Int64() { if test0.ToBlock != nil {
t.Fatalf("expected %d, got %d", rpc.LatestBlockNumber, test0.ToBlock) t.Fatalf("expected nil, got %d", test0.ToBlock)
} }
if len(test0.Addresses) != 0 { if len(test0.Addresses) != 0 {
t.Fatalf("expected 0 addresses, got %d", len(test0.Addresses)) t.Fatalf("expected 0 addresses, got %d", len(test0.Addresses))

View File

@ -20,6 +20,8 @@ import (
"math" "math"
"time" "time"
"math/big"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
@ -162,7 +164,7 @@ func (f *Filter) getLogs(ctx context.Context, start, end uint64) (logs []Log, er
} }
unfiltered = append(unfiltered, rl...) unfiltered = append(unfiltered, rl...)
} }
logs = append(logs, filterLogs(unfiltered, f.addresses, f.topics)...) logs = append(logs, filterLogs(unfiltered, nil, nil, f.addresses, f.topics)...)
} }
} }
@ -179,12 +181,18 @@ func includes(addresses []common.Address, a common.Address) bool {
return false return false
} }
func filterLogs(logs []Log, addresses []common.Address, topics [][]common.Hash) []Log { func filterLogs(logs []Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []Log {
var ret []Log var ret []Log
// Filter the logs for interesting stuff // Filter the logs for interesting stuff
Logs: Logs:
for _, log := range logs { for _, log := range logs {
if fromBlock != nil && fromBlock.Int64() >= 0 && uint64(fromBlock.Int64()) > log.BlockNumber {
continue
}
if toBlock != nil && toBlock.Int64() >= 0 && uint64(toBlock.Int64()) < log.BlockNumber {
continue
}
if len(addresses) > 0 && !includes(addresses, log.Address) { if len(addresses) > 0 && !includes(addresses, log.Address) {
continue continue
} }
@ -211,7 +219,6 @@ Logs:
continue Logs continue Logs
} }
} }
ret = append(ret, log) ret = append(ret, log)
} }

View File

@ -43,13 +43,17 @@ const (
UnknownSubscription Type = iota UnknownSubscription Type = iota
// LogsSubscription queries for new or removed (chain reorg) logs // LogsSubscription queries for new or removed (chain reorg) logs
LogsSubscription LogsSubscription
// PendingLogsSubscription queries for logs for the pending block // PendingLogsSubscription queries for logs in pending blocks
PendingLogsSubscription PendingLogsSubscription
// MinedAndPendingLogsSubscription queries for logs in mined and pending blocks.
MinedAndPendingLogsSubscription
// PendingTransactionsSubscription queries tx hashes for pending // PendingTransactionsSubscription queries tx hashes for pending
// transactions entering the pending state // transactions entering the pending state
PendingTransactionsSubscription PendingTransactionsSubscription
// BlocksSubscription queries hashes for blocks that are imported // BlocksSubscription queries hashes for blocks that are imported
BlocksSubscription BlocksSubscription
// LastSubscription keeps track of the last index
LastIndexSubscription
) )
var ( var (
@ -63,19 +67,26 @@ type Log struct {
Removed bool `json:"removed"` Removed bool `json:"removed"`
} }
// MarshalJSON returns *l as the JSON encoding of l.
func (l *Log) MarshalJSON() ([]byte, error) { func (l *Log) MarshalJSON() ([]byte, error) {
fields := map[string]interface{}{ fields := map[string]interface{}{
"address": l.Address, "address": l.Address,
"data": fmt.Sprintf("0x%x", l.Data), "data": fmt.Sprintf("0x%x", l.Data),
"blockNumber": fmt.Sprintf("%#x", l.BlockNumber), "blockNumber": nil,
"logIndex": fmt.Sprintf("%#x", l.Index), "logIndex": fmt.Sprintf("%#x", l.Index),
"blockHash": l.BlockHash, "blockHash": nil,
"transactionHash": l.TxHash, "transactionHash": l.TxHash,
"transactionIndex": fmt.Sprintf("%#x", l.TxIndex), "transactionIndex": fmt.Sprintf("%#x", l.TxIndex),
"topics": l.Topics, "topics": l.Topics,
"removed": l.Removed, "removed": l.Removed,
} }
// mined logs
if l.BlockHash != (common.Hash{}) {
fields["blockNumber"] = fmt.Sprintf("%#x", l.BlockNumber)
fields["blockHash"] = l.BlockHash
}
return json.Marshal(fields) return json.Marshal(fields)
} }
@ -169,8 +180,65 @@ func (es *EventSystem) subscribe(sub *subscription) *Subscription {
} }
// SubscribeLogs creates a subscription that will write all logs matching the // SubscribeLogs creates a subscription that will write all logs matching the
// given criteria to the given logs channel. Default value for the from and to
// block is "latest". If the fromBlock > toBlock an error is returned.
func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []Log) (*Subscription, error) {
var from, to rpc.BlockNumber
if crit.FromBlock == nil {
from = rpc.LatestBlockNumber
} else {
from = rpc.BlockNumber(crit.FromBlock.Int64())
}
if crit.ToBlock == nil {
to = rpc.LatestBlockNumber
} else {
to = rpc.BlockNumber(crit.ToBlock.Int64())
}
// only interested in pending logs
if from == rpc.PendingBlockNumber && to == rpc.PendingBlockNumber {
return es.subscribePendingLogs(crit, logs), nil
}
// only interested in new mined logs
if from == rpc.LatestBlockNumber && to == rpc.LatestBlockNumber {
return es.subscribeLogs(crit, logs), nil
}
// only interested in mined logs within a specific block range
if from >= 0 && to >= 0 && to >= from {
return es.subscribeLogs(crit, logs), nil
}
// interested in mined logs from a specific block number, new logs and pending logs
if from >= rpc.LatestBlockNumber && to == rpc.PendingBlockNumber {
return es.subscribeMinedPendingLogs(crit, logs), nil
}
// interested in logs from a specific block number to new mined blocks
if from >= 0 && to == rpc.LatestBlockNumber {
return es.subscribeLogs(crit, logs), nil
}
return nil, fmt.Errorf("invalid from and to block combination: from > to")
}
// subscribeMinedPendingLogs creates a subscription that returned mined and
// pending logs that match the given criteria.
func (es *EventSystem) subscribeMinedPendingLogs(crit FilterCriteria, logs chan []Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: MinedAndPendingLogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan common.Hash),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
// subscribeLogs creates a subscription that will write all logs matching the
// given criteria to the given logs channel. // given criteria to the given logs channel.
func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []Log) *Subscription { func (es *EventSystem) subscribeLogs(crit FilterCriteria, logs chan []Log) *Subscription {
sub := &subscription{ sub := &subscription{
id: rpc.NewID(), id: rpc.NewID(),
typ: LogsSubscription, typ: LogsSubscription,
@ -186,9 +254,9 @@ func (es *EventSystem) SubscribeLogs(crit FilterCriteria, logs chan []Log) *Subs
return es.subscribe(sub) return es.subscribe(sub)
} }
// SubscribePendingLogs creates a subscription that will write pending logs matching the // subscribePendingLogs creates a subscription that writes transaction hashes for
// given criteria to the given channel. // transactions that enter the transaction pool.
func (es *EventSystem) SubscribePendingLogs(crit FilterCriteria, logs chan []Log) *Subscription { func (es *EventSystem) subscribePendingLogs(crit FilterCriteria, logs chan []Log) *Subscription {
sub := &subscription{ sub := &subscription{
id: rpc.NewID(), id: rpc.NewID(),
typ: PendingLogsSubscription, typ: PendingLogsSubscription,
@ -204,23 +272,6 @@ func (es *EventSystem) SubscribePendingLogs(crit FilterCriteria, logs chan []Log
return es.subscribe(sub) return es.subscribe(sub)
} }
// SubscribePendingTxEvents creates a sbuscription that writes transaction hashes for
// transactions that enter the transaction pool.
func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: PendingTransactionsSubscription,
created: time.Now(),
logs: make(chan []Log),
hashes: hashes,
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
// SubscribeNewHeads creates a subscription that writes the header of a block that is // SubscribeNewHeads creates a subscription that writes the header of a block that is
// imported in the chain. // imported in the chain.
func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription { func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription {
@ -238,6 +289,23 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
return es.subscribe(sub) return es.subscribe(sub)
} }
// SubscribePendingTxEvents creates a subscription that writes transaction hashes for
// transactions that enter the transaction pool.
func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: PendingTransactionsSubscription,
created: time.Now(),
logs: make(chan []Log),
hashes: hashes,
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
type filterIndex map[Type]map[rpc.ID]*subscription type filterIndex map[Type]map[rpc.ID]*subscription
// broadcast event to filters that match criteria. // broadcast event to filters that match criteria.
@ -251,7 +319,7 @@ func (es *EventSystem) broadcast(filters filterIndex, ev *event.Event) {
if len(e) > 0 { if len(e) > 0 {
for _, f := range filters[LogsSubscription] { for _, f := range filters[LogsSubscription] {
if ev.Time.After(f.created) { if ev.Time.After(f.created) {
if matchedLogs := filterLogs(convertLogs(e, false), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { if matchedLogs := filterLogs(convertLogs(e, false), f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
f.logs <- matchedLogs f.logs <- matchedLogs
} }
} }
@ -260,7 +328,7 @@ func (es *EventSystem) broadcast(filters filterIndex, ev *event.Event) {
case core.RemovedLogsEvent: case core.RemovedLogsEvent:
for _, f := range filters[LogsSubscription] { for _, f := range filters[LogsSubscription] {
if ev.Time.After(f.created) { if ev.Time.After(f.created) {
if matchedLogs := filterLogs(convertLogs(e.Logs, true), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { if matchedLogs := filterLogs(convertLogs(e.Logs, true), f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
f.logs <- matchedLogs f.logs <- matchedLogs
} }
} }
@ -268,7 +336,7 @@ func (es *EventSystem) broadcast(filters filterIndex, ev *event.Event) {
case core.PendingLogsEvent: case core.PendingLogsEvent:
for _, f := range filters[PendingLogsSubscription] { for _, f := range filters[PendingLogsSubscription] {
if ev.Time.After(f.created) { if ev.Time.After(f.created) {
if matchedLogs := filterLogs(convertLogs(e.Logs, false), f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 { if matchedLogs := filterLogs(convertLogs(e.Logs, false), nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
f.logs <- matchedLogs f.logs <- matchedLogs
} }
} }
@ -351,8 +419,8 @@ func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.
} }
unfiltered = append(unfiltered, rl...) unfiltered = append(unfiltered, rl...)
} }
logs := filterLogs(unfiltered, addresses, topics)
//fmt.Println("found", len(logs)) logs := filterLogs(unfiltered, nil, nil, addresses, topics)
return logs return logs
} }
return nil return nil
@ -364,6 +432,11 @@ func (es *EventSystem) eventLoop() {
index = make(filterIndex) index = make(filterIndex)
sub = es.mux.Subscribe(core.PendingLogsEvent{}, core.RemovedLogsEvent{}, vm.Logs{}, core.TxPreEvent{}, core.ChainEvent{}) sub = es.mux.Subscribe(core.PendingLogsEvent{}, core.RemovedLogsEvent{}, vm.Logs{}, core.TxPreEvent{}, core.ChainEvent{})
) )
for i := UnknownSubscription; i < LastIndexSubscription; i++ {
index[i] = make(map[rpc.ID]*subscription)
}
for { for {
select { select {
case ev, active := <-sub.Chan(): case ev, active := <-sub.Chan():
@ -372,13 +445,22 @@ func (es *EventSystem) eventLoop() {
} }
es.broadcast(index, ev) es.broadcast(index, ev)
case f := <-es.install: case f := <-es.install:
if _, found := index[f.typ]; !found { if f.typ == MinedAndPendingLogsSubscription {
index[f.typ] = make(map[rpc.ID]*subscription) // the type are logs and pending logs subscriptions
index[LogsSubscription][f.id] = f
index[PendingLogsSubscription][f.id] = f
} else {
index[f.typ][f.id] = f
} }
index[f.typ][f.id] = f
close(f.installed) close(f.installed)
case f := <-es.uninstall: case f := <-es.uninstall:
delete(index[f.typ], f.id) if f.typ == MinedAndPendingLogsSubscription {
// the type are logs and pending logs subscriptions
delete(index[LogsSubscription], f.id)
delete(index[PendingLogsSubscription], f.id)
} else {
delete(index[f.typ], f.id)
}
close(f.err) close(f.err)
} }
} }
@ -386,6 +468,7 @@ func (es *EventSystem) eventLoop() {
// convertLogs is a helper utility that converts vm.Logs to []filter.Log. // convertLogs is a helper utility that converts vm.Logs to []filter.Log.
func convertLogs(in vm.Logs, removed bool) []Log { func convertLogs(in vm.Logs, removed bool) []Log {
logs := make([]Log, len(in)) logs := make([]Log, len(in))
for i, l := range in { for i, l := range in {
logs[i] = Log{l, removed} logs[i] = Log{l, removed}

View File

@ -34,13 +34,6 @@ import (
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
) )
var (
mux = new(event.TypeMux)
db, _ = ethdb.NewMemDatabase()
backend = &testBackend{mux, db}
api = NewPublicFilterAPI(backend, false)
)
type testBackend struct { type testBackend struct {
mux *event.TypeMux mux *event.TypeMux
db ethdb.Database db ethdb.Database
@ -81,6 +74,11 @@ func TestBlockSubscription(t *testing.T) {
t.Parallel() t.Parallel()
var ( var (
mux = new(event.TypeMux)
db, _ = ethdb.NewMemDatabase()
backend = &testBackend{mux, db}
api = NewPublicFilterAPI(backend, false)
genesis = core.WriteGenesisBlockForTesting(db) genesis = core.WriteGenesisBlockForTesting(db)
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) {}) chain, _ = core.GenerateChain(params.TestChainConfig, genesis, db, 10, func(i int, gen *core.BlockGen) {})
chainEvents = []core.ChainEvent{} chainEvents = []core.ChainEvent{}
@ -130,6 +128,11 @@ func TestPendingTxFilter(t *testing.T) {
t.Parallel() t.Parallel()
var ( var (
mux = new(event.TypeMux)
db, _ = ethdb.NewMemDatabase()
backend = &testBackend{mux, db}
api = NewPublicFilterAPI(backend, false)
transactions = []*types.Transaction{ transactions = []*types.Transaction{
types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil), types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil),
types.NewTransaction(1, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil), types.NewTransaction(1, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), new(big.Int), new(big.Int), nil),
@ -150,9 +153,13 @@ func TestPendingTxFilter(t *testing.T) {
} }
for { for {
h := api.GetFilterChanges(fid0).([]common.Hash) results, err := api.GetFilterChanges(fid0)
hashes = append(hashes, h...) if err != nil {
t.Fatalf("Unable to retrieve logs: %v", err)
}
h := results.([]common.Hash)
hashes = append(hashes, h...)
if len(hashes) >= len(transactions) { if len(hashes) >= len(transactions) {
break break
} }
@ -167,11 +174,86 @@ func TestPendingTxFilter(t *testing.T) {
} }
} }
// TestLogFilterCreation test whether a given filter criteria makes sense.
// If not it must return an error.
func TestLogFilterCreation(t *testing.T) {
var (
mux = new(event.TypeMux)
db, _ = ethdb.NewMemDatabase()
backend = &testBackend{mux, db}
api = NewPublicFilterAPI(backend, false)
testCases = []struct {
crit FilterCriteria
success bool
}{
// defaults
{FilterCriteria{}, true},
// valid block number range
{FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2)}, true},
// "mined" block range to pending
{FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, true},
// new mined and pending blocks
{FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, true},
// from block "higher" than to block
{FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(1)}, false},
// from block "higher" than to block
{FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(100)}, false},
// from block "higher" than to block
{FilterCriteria{FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(100)}, false},
// from block "higher" than to block
{FilterCriteria{FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, false},
}
)
for i, test := range testCases {
_, err := api.NewFilter(test.crit)
if test.success && err != nil {
t.Errorf("expected filter creation for case %d to success, got %v", i, err)
}
if !test.success && err == nil {
t.Errorf("expected testcase %d to fail with an error", i)
}
}
}
// TestInvalidLogFilterCreation tests whether invalid filter log criteria results in an error
// when the filter is created.
func TestInvalidLogFilterCreation(t *testing.T) {
t.Parallel()
var (
mux = new(event.TypeMux)
db, _ = ethdb.NewMemDatabase()
backend = &testBackend{mux, db}
api = NewPublicFilterAPI(backend, false)
)
// different situations where log filter creation should fail.
// Reason: fromBlock > toBlock
testCases := []FilterCriteria{
0: {FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())},
1: {FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(100)},
2: {FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(100)},
}
for i, test := range testCases {
if _, err := api.NewFilter(test); err == nil {
t.Errorf("Expected NewFilter for case #%d to fail", i)
}
}
}
// TestLogFilter tests whether log filters match the correct logs that are posted to the event mux. // TestLogFilter tests whether log filters match the correct logs that are posted to the event mux.
func TestLogFilter(t *testing.T) { func TestLogFilter(t *testing.T) {
t.Parallel() t.Parallel()
var ( var (
mux = new(event.TypeMux)
db, _ = ethdb.NewMemDatabase()
backend = &testBackend{mux, db}
api = NewPublicFilterAPI(backend, false)
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333") thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333")
@ -180,8 +262,8 @@ func TestLogFilter(t *testing.T) {
secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222") secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222")
notUsedTopic = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999") notUsedTopic = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999")
// posted twice, once as vm.Logs and once as core.PendingLogsEvent
allLogs = vm.Logs{ allLogs = vm.Logs{
// Note, these are used for comparison of the test cases.
vm.NewLog(firstAddr, []common.Hash{}, []byte(""), 0), vm.NewLog(firstAddr, []common.Hash{}, []byte(""), 0),
vm.NewLog(firstAddr, []common.Hash{firstTopic}, []byte(""), 1), vm.NewLog(firstAddr, []common.Hash{firstTopic}, []byte(""), 1),
vm.NewLog(secondAddr, []common.Hash{firstTopic}, []byte(""), 1), vm.NewLog(secondAddr, []common.Hash{firstTopic}, []byte(""), 1),
@ -189,45 +271,64 @@ func TestLogFilter(t *testing.T) {
vm.NewLog(thirdAddress, []common.Hash{secondTopic}, []byte(""), 3), vm.NewLog(thirdAddress, []common.Hash{secondTopic}, []byte(""), 3),
} }
expectedCase7 = vm.Logs{allLogs[3], allLogs[4], allLogs[0], allLogs[1], allLogs[2], allLogs[3], allLogs[4]}
expectedCase11 = vm.Logs{allLogs[1], allLogs[2], allLogs[1], allLogs[2]}
testCases = []struct { testCases = []struct {
crit FilterCriteria crit FilterCriteria
expected vm.Logs expected vm.Logs
id rpc.ID id rpc.ID
}{ }{
// match all // match all
{FilterCriteria{}, allLogs, ""}, 0: {FilterCriteria{}, allLogs, ""},
// match none due to no matching addresses // match none due to no matching addresses
{FilterCriteria{Addresses: []common.Address{common.Address{}, notUsedAddress}, Topics: [][]common.Hash{allLogs[0].Topics}}, vm.Logs{}, ""}, 1: {FilterCriteria{Addresses: []common.Address{common.Address{}, notUsedAddress}, Topics: [][]common.Hash{allLogs[0].Topics}}, vm.Logs{}, ""},
// match logs based on addresses, ignore topics // match logs based on addresses, ignore topics
{FilterCriteria{Addresses: []common.Address{firstAddr}}, allLogs[:2], ""}, 2: {FilterCriteria{Addresses: []common.Address{firstAddr}}, allLogs[:2], ""},
// match none due to no matching topics (match with address) // match none due to no matching topics (match with address)
{FilterCriteria{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{[]common.Hash{notUsedTopic}}}, vm.Logs{}, ""}, 3: {FilterCriteria{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{[]common.Hash{notUsedTopic}}}, vm.Logs{}, ""},
// match logs based on addresses and topics // match logs based on addresses and topics
{FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, allLogs[3:5], ""}, 4: {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, allLogs[3:5], ""},
// match logs based on multiple addresses and "or" topics // match logs based on multiple addresses and "or" topics
{FilterCriteria{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, allLogs[2:5], ""}, 5: {FilterCriteria{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{[]common.Hash{firstTopic, secondTopic}}}, allLogs[2:5], ""},
// block numbers are ignored for filters created with New***Filter, these return all logs that match the given criterias when the state changes // logs in the pending block
{FilterCriteria{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(1), ToBlock: big.NewInt(2)}, allLogs[:2], ""}, 6: {FilterCriteria{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, allLogs[:2], ""},
// mined logs with block num >= 2 or pending logs
7: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, expectedCase7, ""},
// all "mined" logs with block num >= 2
8: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs[3:], ""},
// all "mined" logs
9: {FilterCriteria{ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs, ""},
// all "mined" logs with 1>= block num <=2 and topic secondTopic
10: {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2), Topics: [][]common.Hash{[]common.Hash{secondTopic}}}, allLogs[3:4], ""},
// all "mined" and pending logs with topic firstTopic
11: {FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), Topics: [][]common.Hash{[]common.Hash{firstTopic}}}, expectedCase11, ""},
} }
err error
) )
// create all filters // create all filters
for i := range testCases { for i := range testCases {
testCases[i].id = api.NewFilter(testCases[i].crit) testCases[i].id, _ = api.NewFilter(testCases[i].crit)
} }
// raise events // raise events
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
if err = mux.Post(allLogs); err != nil { if err := mux.Post(allLogs); err != nil {
t.Fatal(err)
}
if err := mux.Post(core.PendingLogsEvent{Logs: allLogs}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
for i, tt := range testCases { for i, tt := range testCases {
var fetched []Log var fetched []Log
for { // fetch all expected logs for { // fetch all expected logs
fetched = append(fetched, api.GetFilterChanges(tt.id).([]Log)...) results, err := api.GetFilterChanges(tt.id)
if err != nil {
t.Fatalf("Unable to fetch logs: %v", err)
}
fetched = append(fetched, results.([]Log)...)
if len(fetched) >= len(tt.expected) { if len(fetched) >= len(tt.expected) {
break break
} }
@ -247,7 +348,6 @@ func TestLogFilter(t *testing.T) {
if !reflect.DeepEqual(fetched[l].Log, tt.expected[l]) { if !reflect.DeepEqual(fetched[l].Log, tt.expected[l]) {
t.Errorf("invalid log on index %d for case %d", l, i) t.Errorf("invalid log on index %d for case %d", l, i)
} }
} }
} }
} }
@ -257,6 +357,11 @@ func TestPendingLogsSubscription(t *testing.T) {
t.Parallel() t.Parallel()
var ( var (
mux = new(event.TypeMux)
db, _ = ethdb.NewMemDatabase()
backend = &testBackend{mux, db}
api = NewPublicFilterAPI(backend, false)
firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111") firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222") secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333") thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333")
@ -319,7 +424,7 @@ func TestPendingLogsSubscription(t *testing.T) {
// (some) events are posted. // (some) events are posted.
for i := range testCases { for i := range testCases {
testCases[i].c = make(chan []Log) testCases[i].c = make(chan []Log)
testCases[i].sub = api.events.SubscribePendingLogs(testCases[i].crit, testCases[i].c) testCases[i].sub, _ = api.events.SubscribeLogs(testCases[i].crit, testCases[i].c)
} }
for n, test := range testCases { for n, test := range testCases {

View File

@ -31,6 +31,7 @@ import (
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/event"
) )
func makeReceipt(addr common.Address) *types.Receipt { func makeReceipt(addr common.Address) *types.Receipt {
@ -51,6 +52,7 @@ func BenchmarkMipmaps(b *testing.B) {
var ( var (
db, _ = ethdb.NewLDBDatabase(dir, 0, 0) db, _ = ethdb.NewLDBDatabase(dir, 0, 0)
mux = new(event.TypeMux)
backend = &testBackend{mux, db} backend = &testBackend{mux, db}
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey) addr1 = crypto.PubkeyToAddress(key1.PublicKey)
@ -126,6 +128,7 @@ func TestFilters(t *testing.T) {
var ( var (
db, _ = ethdb.NewLDBDatabase(dir, 0, 0) db, _ = ethdb.NewLDBDatabase(dir, 0, 0)
mux = new(event.TypeMux)
backend = &testBackend{mux, db} backend = &testBackend{mux, db}
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr = crypto.PubkeyToAddress(key1.PublicKey) addr = crypto.PubkeyToAddress(key1.PublicKey)

View File

@ -262,6 +262,7 @@ func newLocalMinedBlock(blockNumber uint64, prevMinedBlocks *uint64RingBuffer) (
func (self *worker) wait() { func (self *worker) wait() {
for { for {
mustCommitNewWork := true
for result := range self.recv { for result := range self.recv {
atomic.AddInt32(&self.atWork, -1) atomic.AddInt32(&self.atWork, -1)
@ -315,6 +316,8 @@ func (self *worker) wait() {
core.WriteReceipts(self.chainDb, work.receipts) core.WriteReceipts(self.chainDb, work.receipts)
// Write map map bloom filters // Write map map bloom filters
core.WriteMipmapBloom(self.chainDb, block.NumberU64(), work.receipts) core.WriteMipmapBloom(self.chainDb, block.NumberU64(), work.receipts)
// implicit by posting ChainHeadEvent
mustCommitNewWork = false
} }
// broadcast before waiting for validation // broadcast before waiting for validation
@ -343,7 +346,9 @@ func (self *worker) wait() {
} }
glog.V(logger.Info).Infof("🔨 Mined %sblock (#%v / %x). %s", stale, block.Number(), block.Hash().Bytes()[:4], confirm) glog.V(logger.Info).Infof("🔨 Mined %sblock (#%v / %x). %s", stale, block.Number(), block.Hash().Bytes()[:4], confirm)
self.commitNewWork() if mustCommitNewWork {
self.commitNewWork()
}
} }
} }
} }
@ -451,6 +456,7 @@ func (self *worker) commitNewWork() {
tstart := time.Now() tstart := time.Now()
parent := self.chain.CurrentBlock() parent := self.chain.CurrentBlock()
tstamp := tstart.Unix() tstamp := tstart.Unix()
if parent.Time().Cmp(new(big.Int).SetInt64(tstamp)) >= 0 { if parent.Time().Cmp(new(big.Int).SetInt64(tstamp)) >= 0 {
tstamp = parent.Time().Int64() + 1 tstamp = parent.Time().Int64() + 1
@ -618,7 +624,16 @@ func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsB
txs.Shift() txs.Shift()
} }
} }
if len(coalescedLogs) > 0 || env.tcount > 0 { if len(coalescedLogs) > 0 || env.tcount > 0 {
// make a copy, the state caches the logs and these logs get "upgraded" from pending to mined
// logs by filling in the block hash when the block was mined by the local miner. This can
// cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed.
cpy := make(vm.Logs, len(coalescedLogs))
for i, l := range coalescedLogs {
cpy[i] = new(vm.Log)
*cpy[i] = *l
}
go func(logs vm.Logs, tcount int) { go func(logs vm.Logs, tcount int) {
if len(logs) > 0 { if len(logs) > 0 {
mux.Post(core.PendingLogsEvent{Logs: logs}) mux.Post(core.PendingLogsEvent{Logs: logs})
@ -626,7 +641,7 @@ func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsB
if tcount > 0 { if tcount > 0 {
mux.Post(core.PendingStateEvent{}) mux.Post(core.PendingStateEvent{})
} }
}(coalescedLogs, env.tcount) }(cpy, env.tcount)
} }
} }