eth/filters: add FindOnce for iterator-like operation (#3435)
This commit introduces a FindOnce method for filters. FindOnce finds the next block that matches the filter and returns all matching logs from that block. If there are no further matching logs, it returns a nil slice. This method allows callers to iterate over large sets of logs progressively. The changes introduce a small inefficiency relating to mipmaps: the first time a filter is called, it acts as if all mipmaps are matched, and thus iterates several blocks near the requested start point. This is in the interest of simplicity and avoiding duplicate mipmap lookups each time FindOnce is called.
This commit is contained in:
		
							parent
							
								
									ba996f5e27
								
							
						
					
					
						commit
						1fe67c125d
					
				| @ -54,6 +54,8 @@ type Filter struct { | |||||||
| 
 | 
 | ||||||
| // New creates a new filter which uses a bloom filter on blocks to figure out whether
 | // New creates a new filter which uses a bloom filter on blocks to figure out whether
 | ||||||
| // a particular block is interesting or not.
 | // a particular block is interesting or not.
 | ||||||
|  | // MipMaps allow past blocks to be searched much more efficiently, but are not available
 | ||||||
|  | // to light clients.
 | ||||||
| func New(backend Backend, useMipMap bool) *Filter { | func New(backend Backend, useMipMap bool) *Filter { | ||||||
| 	return &Filter{ | 	return &Filter{ | ||||||
| 		backend:   backend, | 		backend:   backend, | ||||||
| @ -85,8 +87,11 @@ func (f *Filter) SetTopics(topics [][]common.Hash) { | |||||||
| 	f.topics = topics | 	f.topics = topics | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Run filters logs with the current parameters set
 | // FindOnce searches the blockchain for matching log entries, returning
 | ||||||
| func (f *Filter) Find(ctx context.Context) ([]*vm.Log, error) { | // all matching entries from the first block that contains matches,
 | ||||||
|  | // updating the start point of the filter accordingly. If no results are
 | ||||||
|  | // found, a nil slice is returned.
 | ||||||
|  | func (f *Filter) FindOnce(ctx context.Context) ([]*vm.Log, error) { | ||||||
| 	head, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) | 	head, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) | ||||||
| 	if head == nil { | 	if head == nil { | ||||||
| 		return nil, nil | 		return nil, nil | ||||||
| @ -106,47 +111,69 @@ func (f *Filter) Find(ctx context.Context) ([]*vm.Log, error) { | |||||||
| 	// uses the mipmap bloom filters to check for fast inclusion and uses
 | 	// uses the mipmap bloom filters to check for fast inclusion and uses
 | ||||||
| 	// higher range probability in order to ensure at least a false positive
 | 	// higher range probability in order to ensure at least a false positive
 | ||||||
| 	if !f.useMipMap || len(f.addresses) == 0 { | 	if !f.useMipMap || len(f.addresses) == 0 { | ||||||
| 		return f.getLogs(ctx, beginBlockNo, endBlockNo) | 		logs, blockNumber, err := f.getLogs(ctx, beginBlockNo, endBlockNo) | ||||||
|  | 		f.begin = int64(blockNumber + 1) | ||||||
|  | 		return logs, err | ||||||
| 	} | 	} | ||||||
| 	return f.mipFind(beginBlockNo, endBlockNo, 0), nil | 
 | ||||||
|  | 	logs, blockNumber := f.mipFind(beginBlockNo, endBlockNo, 0) | ||||||
|  | 	f.begin = int64(blockNumber + 1) | ||||||
|  | 	return logs, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (f *Filter) mipFind(start, end uint64, depth int) (logs []*vm.Log) { | // Run filters logs with the current parameters set
 | ||||||
|  | func (f *Filter) Find(ctx context.Context) (logs []*vm.Log, err error) { | ||||||
|  | 	for { | ||||||
|  | 		newLogs, err := f.FindOnce(ctx) | ||||||
|  | 		if len(newLogs) == 0 || err != nil { | ||||||
|  | 			return logs, err | ||||||
|  | 		} | ||||||
|  | 		logs = append(logs, newLogs...) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (f *Filter) mipFind(start, end uint64, depth int) (logs []*vm.Log, blockNumber uint64) { | ||||||
| 	level := core.MIPMapLevels[depth] | 	level := core.MIPMapLevels[depth] | ||||||
| 	// normalise numerator so we can work in level specific batches and
 | 	// normalise numerator so we can work in level specific batches and
 | ||||||
| 	// work with the proper range checks
 | 	// work with the proper range checks
 | ||||||
| 	for num := start / level * level; num <= end; num += level { | 	for num := start / level * level; num <= end; num += level { | ||||||
| 		// find addresses in bloom filters
 | 		// find addresses in bloom filters
 | ||||||
| 		bloom := core.GetMipmapBloom(f.db, num, level) | 		bloom := core.GetMipmapBloom(f.db, num, level) | ||||||
|  | 		// Don't bother checking the first time through the loop - we're probably picking
 | ||||||
|  | 		// up where a previous run left off.
 | ||||||
|  | 		first := true | ||||||
| 		for _, addr := range f.addresses { | 		for _, addr := range f.addresses { | ||||||
| 			if bloom.TestBytes(addr[:]) { | 			if first || bloom.TestBytes(addr[:]) { | ||||||
|  | 				first = false | ||||||
| 				// range check normalised values and make sure that
 | 				// range check normalised values and make sure that
 | ||||||
| 				// we're resolving the correct range instead of the
 | 				// we're resolving the correct range instead of the
 | ||||||
| 				// normalised values.
 | 				// normalised values.
 | ||||||
| 				start := uint64(math.Max(float64(num), float64(start))) | 				start := uint64(math.Max(float64(num), float64(start))) | ||||||
| 				end := uint64(math.Min(float64(num+level-1), float64(end))) | 				end := uint64(math.Min(float64(num+level-1), float64(end))) | ||||||
| 				if depth+1 == len(core.MIPMapLevels) { | 				if depth+1 == len(core.MIPMapLevels) { | ||||||
| 					l, _ := f.getLogs(context.Background(), start, end) | 					l, blockNumber, _ := f.getLogs(context.Background(), start, end) | ||||||
| 					logs = append(logs, l...) | 					if len(l) > 0 { | ||||||
| 				} else { | 						return l, blockNumber | ||||||
| 					logs = append(logs, f.mipFind(start, end, depth+1)...) | 					} | ||||||
|  | 				} else { | ||||||
|  | 					l, blockNumber := f.mipFind(start, end, depth+1) | ||||||
|  | 					if len(l) > 0 { | ||||||
|  | 						return l, blockNumber | ||||||
|  | 					} | ||||||
| 				} | 				} | ||||||
| 				// break so we don't check the same range for each
 |  | ||||||
| 				// possible address. Checks on multiple addresses
 |  | ||||||
| 				// are handled further down the stack.
 |  | ||||||
| 				break |  | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return logs | 	return nil, end | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (f *Filter) getLogs(ctx context.Context, start, end uint64) (logs []*vm.Log, err error) { | func (f *Filter) getLogs(ctx context.Context, start, end uint64) (logs []*vm.Log, blockNumber uint64, err error) { | ||||||
| 	for i := start; i <= end; i++ { | 	for i := start; i <= end; i++ { | ||||||
| 		header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(i)) | 		blockNumber := rpc.BlockNumber(i) | ||||||
|  | 		header, err := f.backend.HeaderByNumber(ctx, blockNumber) | ||||||
| 		if header == nil || err != nil { | 		if header == nil || err != nil { | ||||||
| 			return logs, err | 			return logs, end, err | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		// Use bloom filtering to see if this block is interesting given the
 | 		// Use bloom filtering to see if this block is interesting given the
 | ||||||
| @ -155,17 +182,20 @@ func (f *Filter) getLogs(ctx context.Context, start, end uint64) (logs []*vm.Log | |||||||
| 			// Get the logs of the block
 | 			// Get the logs of the block
 | ||||||
| 			receipts, err := f.backend.GetReceipts(ctx, header.Hash()) | 			receipts, err := f.backend.GetReceipts(ctx, header.Hash()) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				return nil, err | 				return nil, end, err | ||||||
| 			} | 			} | ||||||
| 			var unfiltered []*vm.Log | 			var unfiltered []*vm.Log | ||||||
| 			for _, receipt := range receipts { | 			for _, receipt := range receipts { | ||||||
| 				unfiltered = append(unfiltered, ([]*vm.Log)(receipt.Logs)...) | 				unfiltered = append(unfiltered, ([]*vm.Log)(receipt.Logs)...) | ||||||
| 			} | 			} | ||||||
| 			logs = append(logs, filterLogs(unfiltered, nil, nil, f.addresses, f.topics)...) | 			logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics) | ||||||
|  | 			if len(logs) > 0 { | ||||||
|  | 				return logs, uint64(blockNumber), nil | ||||||
|  | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	return logs, nil | 	return logs, end, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func includes(addresses []common.Address, a common.Address) bool { | func includes(addresses []common.Address, a common.Address) bool { | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user