Merge pull request #15489 from karalabe/bloombits-shifted-start
core/bloombits: handle non 8-bit boundary section matches
This commit is contained in:
commit
bb57f1f1e5
@ -192,10 +192,12 @@ func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uin
|
|||||||
}
|
}
|
||||||
// Iterate over all the blocks in the section and return the matching ones
|
// Iterate over all the blocks in the section and return the matching ones
|
||||||
for i := first; i <= last; i++ {
|
for i := first; i <= last; i++ {
|
||||||
// Skip the entire byte if no matches are found inside
|
// Skip the entire byte if no matches are found inside (and we're processing an entire byte!)
|
||||||
next := res.bitset[(i-sectionStart)/8]
|
next := res.bitset[(i-sectionStart)/8]
|
||||||
if next == 0 {
|
if next == 0 {
|
||||||
i += 7
|
if i%8 == 0 {
|
||||||
|
i += 7
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Some bit it set, do the actual submatching
|
// Some bit it set, do the actual submatching
|
||||||
|
@ -56,33 +56,48 @@ func TestMatcherWildcards(t *testing.T) {
|
|||||||
|
|
||||||
// Tests the matcher pipeline on a single continuous workflow without interrupts.
|
// Tests the matcher pipeline on a single continuous workflow without interrupts.
|
||||||
func TestMatcherContinuous(t *testing.T) {
|
func TestMatcherContinuous(t *testing.T) {
|
||||||
testMatcherDiffBatches(t, [][]bloomIndexes{{{10, 20, 30}}}, 100000, false, 75)
|
testMatcherDiffBatches(t, [][]bloomIndexes{{{10, 20, 30}}}, 0, 100000, false, 75)
|
||||||
testMatcherDiffBatches(t, [][]bloomIndexes{{{32, 3125, 100}}, {{40, 50, 10}}}, 100000, false, 81)
|
testMatcherDiffBatches(t, [][]bloomIndexes{{{32, 3125, 100}}, {{40, 50, 10}}}, 0, 100000, false, 81)
|
||||||
testMatcherDiffBatches(t, [][]bloomIndexes{{{4, 8, 11}, {7, 8, 17}}, {{9, 9, 12}, {15, 20, 13}}, {{18, 15, 15}, {12, 10, 4}}}, 10000, false, 36)
|
testMatcherDiffBatches(t, [][]bloomIndexes{{{4, 8, 11}, {7, 8, 17}}, {{9, 9, 12}, {15, 20, 13}}, {{18, 15, 15}, {12, 10, 4}}}, 0, 10000, false, 36)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tests the matcher pipeline on a constantly interrupted and resumed work pattern
|
// Tests the matcher pipeline on a constantly interrupted and resumed work pattern
|
||||||
// with the aim of ensuring data items are requested only once.
|
// with the aim of ensuring data items are requested only once.
|
||||||
func TestMatcherIntermittent(t *testing.T) {
|
func TestMatcherIntermittent(t *testing.T) {
|
||||||
testMatcherDiffBatches(t, [][]bloomIndexes{{{10, 20, 30}}}, 100000, true, 75)
|
testMatcherDiffBatches(t, [][]bloomIndexes{{{10, 20, 30}}}, 0, 100000, true, 75)
|
||||||
testMatcherDiffBatches(t, [][]bloomIndexes{{{32, 3125, 100}}, {{40, 50, 10}}}, 100000, true, 81)
|
testMatcherDiffBatches(t, [][]bloomIndexes{{{32, 3125, 100}}, {{40, 50, 10}}}, 0, 100000, true, 81)
|
||||||
testMatcherDiffBatches(t, [][]bloomIndexes{{{4, 8, 11}, {7, 8, 17}}, {{9, 9, 12}, {15, 20, 13}}, {{18, 15, 15}, {12, 10, 4}}}, 10000, true, 36)
|
testMatcherDiffBatches(t, [][]bloomIndexes{{{4, 8, 11}, {7, 8, 17}}, {{9, 9, 12}, {15, 20, 13}}, {{18, 15, 15}, {12, 10, 4}}}, 0, 10000, true, 36)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tests the matcher pipeline on random input to hopefully catch anomalies.
|
// Tests the matcher pipeline on random input to hopefully catch anomalies.
|
||||||
func TestMatcherRandom(t *testing.T) {
|
func TestMatcherRandom(t *testing.T) {
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
testMatcherBothModes(t, makeRandomIndexes([]int{1}, 50), 10000, 0)
|
testMatcherBothModes(t, makeRandomIndexes([]int{1}, 50), 0, 10000, 0)
|
||||||
testMatcherBothModes(t, makeRandomIndexes([]int{3}, 50), 10000, 0)
|
testMatcherBothModes(t, makeRandomIndexes([]int{3}, 50), 0, 10000, 0)
|
||||||
testMatcherBothModes(t, makeRandomIndexes([]int{2, 2, 2}, 20), 10000, 0)
|
testMatcherBothModes(t, makeRandomIndexes([]int{2, 2, 2}, 20), 0, 10000, 0)
|
||||||
testMatcherBothModes(t, makeRandomIndexes([]int{5, 5, 5}, 50), 10000, 0)
|
testMatcherBothModes(t, makeRandomIndexes([]int{5, 5, 5}, 50), 0, 10000, 0)
|
||||||
testMatcherBothModes(t, makeRandomIndexes([]int{4, 4, 4}, 20), 10000, 0)
|
testMatcherBothModes(t, makeRandomIndexes([]int{4, 4, 4}, 20), 0, 10000, 0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Tests that the matcher can properly find matches if the starting block is
|
||||||
|
// shifter from a multiple of 8. This is needed to cover an optimisation with
|
||||||
|
// bitset matching https://github.com/ethereum/go-ethereum/issues/15309.
|
||||||
|
func TestMatcherShifted(t *testing.T) {
|
||||||
|
// Block 0 always matches in the tests, skip ahead of first 8 blocks with the
|
||||||
|
// start to get a potential zero byte in the matcher bitset.
|
||||||
|
|
||||||
|
// To keep the second bitset byte zero, the filter must only match for the first
|
||||||
|
// time in block 16, so doing an all-16 bit filter should suffice.
|
||||||
|
|
||||||
|
// To keep the starting block non divisible by 8, block number 9 is the first
|
||||||
|
// that would introduce a shift and not match block 0.
|
||||||
|
testMatcherBothModes(t, [][]bloomIndexes{{{16, 16, 16}}}, 9, 64, 0)
|
||||||
|
}
|
||||||
|
|
||||||
// Tests that matching on everything doesn't crash (special case internally).
|
// Tests that matching on everything doesn't crash (special case internally).
|
||||||
func TestWildcardMatcher(t *testing.T) {
|
func TestWildcardMatcher(t *testing.T) {
|
||||||
testMatcherBothModes(t, nil, 10000, 0)
|
testMatcherBothModes(t, nil, 0, 10000, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// makeRandomIndexes generates a random filter system, composed on multiple filter
|
// makeRandomIndexes generates a random filter system, composed on multiple filter
|
||||||
@ -104,9 +119,9 @@ func makeRandomIndexes(lengths []int, max int) [][]bloomIndexes {
|
|||||||
// testMatcherDiffBatches runs the given matches test in single-delivery and also
|
// testMatcherDiffBatches runs the given matches test in single-delivery and also
|
||||||
// in batches delivery mode, verifying that all kinds of deliveries are handled
|
// in batches delivery mode, verifying that all kinds of deliveries are handled
|
||||||
// correctly withn.
|
// correctly withn.
|
||||||
func testMatcherDiffBatches(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermittent bool, retrievals uint32) {
|
func testMatcherDiffBatches(t *testing.T, filter [][]bloomIndexes, start, blocks uint64, intermittent bool, retrievals uint32) {
|
||||||
singleton := testMatcher(t, filter, blocks, intermittent, retrievals, 1)
|
singleton := testMatcher(t, filter, start, blocks, intermittent, retrievals, 1)
|
||||||
batched := testMatcher(t, filter, blocks, intermittent, retrievals, 16)
|
batched := testMatcher(t, filter, start, blocks, intermittent, retrievals, 16)
|
||||||
|
|
||||||
if singleton != batched {
|
if singleton != batched {
|
||||||
t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, %v in signleton vs. %v in batched mode", filter, blocks, intermittent, singleton, batched)
|
t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, %v in signleton vs. %v in batched mode", filter, blocks, intermittent, singleton, batched)
|
||||||
@ -115,9 +130,9 @@ func testMatcherDiffBatches(t *testing.T, filter [][]bloomIndexes, blocks uint64
|
|||||||
|
|
||||||
// testMatcherBothModes runs the given matcher test in both continuous as well as
|
// testMatcherBothModes runs the given matcher test in both continuous as well as
|
||||||
// in intermittent mode, verifying that the request counts match each other.
|
// in intermittent mode, verifying that the request counts match each other.
|
||||||
func testMatcherBothModes(t *testing.T, filter [][]bloomIndexes, blocks uint64, retrievals uint32) {
|
func testMatcherBothModes(t *testing.T, filter [][]bloomIndexes, start, blocks uint64, retrievals uint32) {
|
||||||
continuous := testMatcher(t, filter, blocks, false, retrievals, 16)
|
continuous := testMatcher(t, filter, start, blocks, false, retrievals, 16)
|
||||||
intermittent := testMatcher(t, filter, blocks, true, retrievals, 16)
|
intermittent := testMatcher(t, filter, start, blocks, true, retrievals, 16)
|
||||||
|
|
||||||
if continuous != intermittent {
|
if continuous != intermittent {
|
||||||
t.Errorf("filter = %v blocks = %v: request count mismatch, %v in continuous vs. %v in intermittent mode", filter, blocks, continuous, intermittent)
|
t.Errorf("filter = %v blocks = %v: request count mismatch, %v in continuous vs. %v in intermittent mode", filter, blocks, continuous, intermittent)
|
||||||
@ -126,7 +141,7 @@ func testMatcherBothModes(t *testing.T, filter [][]bloomIndexes, blocks uint64,
|
|||||||
|
|
||||||
// testMatcher is a generic tester to run the given matcher test and return the
|
// testMatcher is a generic tester to run the given matcher test and return the
|
||||||
// number of requests made for cross validation between different modes.
|
// number of requests made for cross validation between different modes.
|
||||||
func testMatcher(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermittent bool, retrievals uint32, maxReqCount int) uint32 {
|
func testMatcher(t *testing.T, filter [][]bloomIndexes, start, blocks uint64, intermittent bool, retrievals uint32, maxReqCount int) uint32 {
|
||||||
// Create a new matcher an simulate our explicit random bitsets
|
// Create a new matcher an simulate our explicit random bitsets
|
||||||
matcher := NewMatcher(testSectionSize, nil)
|
matcher := NewMatcher(testSectionSize, nil)
|
||||||
matcher.filters = filter
|
matcher.filters = filter
|
||||||
@ -145,14 +160,14 @@ func testMatcher(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermitt
|
|||||||
quit := make(chan struct{})
|
quit := make(chan struct{})
|
||||||
matches := make(chan uint64, 16)
|
matches := make(chan uint64, 16)
|
||||||
|
|
||||||
session, err := matcher.Start(context.Background(), 0, blocks-1, matches)
|
session, err := matcher.Start(context.Background(), start, blocks-1, matches)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to stat matcher session: %v", err)
|
t.Fatalf("failed to stat matcher session: %v", err)
|
||||||
}
|
}
|
||||||
startRetrievers(session, quit, &requested, maxReqCount)
|
startRetrievers(session, quit, &requested, maxReqCount)
|
||||||
|
|
||||||
// Iterate over all the blocks and verify that the pipeline produces the correct matches
|
// Iterate over all the blocks and verify that the pipeline produces the correct matches
|
||||||
for i := uint64(0); i < blocks; i++ {
|
for i := start; i < blocks; i++ {
|
||||||
if expMatch3(filter, i) {
|
if expMatch3(filter, i) {
|
||||||
match, ok := <-matches
|
match, ok := <-matches
|
||||||
if !ok {
|
if !ok {
|
||||||
|
@ -158,6 +158,7 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, err
|
|||||||
return logs, err
|
return logs, err
|
||||||
}
|
}
|
||||||
f.begin = int64(number) + 1
|
f.begin = int64(number) + 1
|
||||||
|
|
||||||
// Retrieve the suggested block and pull any truly matching logs
|
// Retrieve the suggested block and pull any truly matching logs
|
||||||
header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
|
header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
|
||||||
if header == nil || err != nil {
|
if header == nil || err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user