eth/filters: fix TestPendingLogsSubscription (#23619)

The test did not synchronize with per-case goroutines, and thus didn't notice
that some tests were just hanging. This change adds missing synchronization
and fixes the broken tests.
This commit is contained in:
Miro 2021-10-04 08:09:51 -04:00 committed by GitHub
parent 12f971fb2d
commit 5a0e1d88f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -506,58 +506,80 @@ func TestPendingLogsSubscription(t *testing.T) {
}, },
} }
pendingBlockNumber = big.NewInt(rpc.PendingBlockNumber.Int64())
testCases = []struct { testCases = []struct {
crit ethereum.FilterQuery crit ethereum.FilterQuery
expected []*types.Log expected []*types.Log
c chan []*types.Log c chan []*types.Log
sub *Subscription sub *Subscription
err chan error
}{ }{
// match all // match all
{ {
ethereum.FilterQuery{}, flattenLogs(allLogs), ethereum.FilterQuery{FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber},
nil, nil, flattenLogs(allLogs),
nil, nil, nil,
}, },
// match none due to no matching addresses // match none due to no matching addresses
{ {
ethereum.FilterQuery{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}}, ethereum.FilterQuery{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber},
nil, nil,
nil, nil, nil, nil, nil,
}, },
// match logs based on addresses, ignore topics // match logs based on addresses, ignore topics
{ {
ethereum.FilterQuery{Addresses: []common.Address{firstAddr}}, ethereum.FilterQuery{Addresses: []common.Address{firstAddr}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber},
append(flattenLogs(allLogs[:2]), allLogs[5][3]), append(flattenLogs(allLogs[:2]), allLogs[5][3]),
nil, nil, nil, nil, nil,
}, },
// match none due to no matching topics (match with address) // match none due to no matching topics (match with address)
{ {
ethereum.FilterQuery{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{{notUsedTopic}}}, ethereum.FilterQuery{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{{notUsedTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber},
nil,
nil, nil, nil, nil, nil, nil,
}, },
// match logs based on addresses and topics // match logs based on addresses and topics
{ {
ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber},
append(flattenLogs(allLogs[3:5]), allLogs[5][0]), append(flattenLogs(allLogs[3:5]), allLogs[5][0]),
nil, nil, nil, nil, nil,
}, },
// match logs based on multiple addresses and "or" topics // match logs based on multiple addresses and "or" topics
{ {
ethereum.FilterQuery{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, ethereum.FilterQuery{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber},
append(flattenLogs(allLogs[2:5]), allLogs[5][0]), append(flattenLogs(allLogs[2:5]), allLogs[5][0]),
nil, nil, nil, nil,
nil,
},
// block numbers are ignored for filters created with New***Filter, these return all logs that match the given criteria when the state changes
{
ethereum.FilterQuery{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(2), ToBlock: big.NewInt(3)},
append(flattenLogs(allLogs[:2]), allLogs[5][3]),
nil, nil,
}, },
// multiple pending logs, should match only 2 topics from the logs in block 5 // multiple pending logs, should match only 2 topics from the logs in block 5
{ {
ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, fourthTopic}}}, ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, fourthTopic}}, FromBlock: pendingBlockNumber, ToBlock: pendingBlockNumber},
[]*types.Log{allLogs[5][0], allLogs[5][2]}, []*types.Log{allLogs[5][0], allLogs[5][2]},
nil, nil, nil, nil, nil,
},
// match none due to only matching new mined logs
{
ethereum.FilterQuery{},
nil,
nil, nil, nil,
},
// match none due to only matching mined logs within a specific block range
{
ethereum.FilterQuery{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2)},
nil,
nil, nil, nil,
},
// match all due to matching mined and pending logs
{
ethereum.FilterQuery{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())},
flattenLogs(allLogs),
nil, nil, nil,
},
// match none due to matching logs from a specific block number to new mined blocks
{
ethereum.FilterQuery{FromBlock: big.NewInt(1), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())},
nil,
nil, nil, nil,
}, },
} }
) )
@ -567,43 +589,69 @@ func TestPendingLogsSubscription(t *testing.T) {
// (some) events are posted. // (some) events are posted.
for i := range testCases { for i := range testCases {
testCases[i].c = make(chan []*types.Log) testCases[i].c = make(chan []*types.Log)
testCases[i].sub, _ = api.events.SubscribeLogs(testCases[i].crit, testCases[i].c) testCases[i].err = make(chan error)
var err error
testCases[i].sub, err = api.events.SubscribeLogs(testCases[i].crit, testCases[i].c)
if err != nil {
t.Fatalf("SubscribeLogs %d failed: %v\n", i, err)
}
} }
for n, test := range testCases { for n, test := range testCases {
i := n i := n
tt := test tt := test
go func() { go func() {
defer tt.sub.Unsubscribe()
var fetched []*types.Log var fetched []*types.Log
timeout := time.After(1 * time.Second)
fetchLoop: fetchLoop:
for { for {
logs := <-tt.c select {
case logs := <-tt.c:
// Do not break early if we've fetched greater, or equal,
// to the number of logs expected. This ensures we do not
// deadlock the filter system because it will do a blocking
// send on this channel if another log arrives.
fetched = append(fetched, logs...) fetched = append(fetched, logs...)
if len(fetched) >= len(tt.expected) { case <-timeout:
break fetchLoop break fetchLoop
} }
} }
if len(fetched) != len(tt.expected) { if len(fetched) != len(tt.expected) {
panic(fmt.Sprintf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched))) tt.err <- fmt.Errorf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched))
return
} }
for l := range fetched { for l := range fetched {
if fetched[l].Removed { if fetched[l].Removed {
panic(fmt.Sprintf("expected log not to be removed for log %d in case %d", l, i)) tt.err <- fmt.Errorf("expected log not to be removed for log %d in case %d", l, i)
return
} }
if !reflect.DeepEqual(fetched[l], tt.expected[l]) { if !reflect.DeepEqual(fetched[l], tt.expected[l]) {
panic(fmt.Sprintf("invalid log on index %d for case %d", l, i)) tt.err <- fmt.Errorf("invalid log on index %d for case %d\n", l, i)
return
} }
} }
tt.err <- nil
}() }()
} }
// raise events // raise events
time.Sleep(1 * time.Second)
for _, ev := range allLogs { for _, ev := range allLogs {
backend.pendingLogsFeed.Send(ev) backend.pendingLogsFeed.Send(ev)
} }
for i := range testCases {
err := <-testCases[i].err
if err != nil {
t.Fatalf("test %d failed: %v", i, err)
}
<-testCases[i].sub.Err()
}
} }
// TestPendingTxFilterDeadlock tests if the event loop hangs when pending // TestPendingTxFilterDeadlock tests if the event loop hangs when pending