Use prefix comparison for account selective statediffing

This commit is contained in:
Prathamesh Musale 2022-06-22 16:16:44 +05:30
parent c265fdc309
commit cedbd2497e
7 changed files with 168 additions and 110 deletions

View File

@ -198,8 +198,7 @@ func (sdb *StateDiffBuilder) WriteStateDiffObject(args types2.StateRoots, params
}, },
} }
if !params.IntermediateStateNodes || len(params.WatchedAddresses) > 0 { if !params.IntermediateStateNodes {
// if we are watching only specific accounts then we are only diffing leaf nodes
return sdb.BuildStateDiffWithoutIntermediateStateNodes(iterPairs, params, output, codeOutput) return sdb.BuildStateDiffWithoutIntermediateStateNodes(iterPairs, params, output, codeOutput)
} else { } else {
return sdb.BuildStateDiffWithIntermediateStateNodes(iterPairs, params, output, codeOutput) return sdb.BuildStateDiffWithIntermediateStateNodes(iterPairs, params, output, codeOutput)
@ -211,7 +210,7 @@ func (sdb *StateDiffBuilder) BuildStateDiffWithIntermediateStateNodes(iterPairs
// a map of their leafkey to all the accounts that were touched and exist at B // a map of their leafkey to all the accounts that were touched and exist at B
// and a slice of all the paths for the nodes in both of the above sets // and a slice of all the paths for the nodes in both of the above sets
diffAccountsAtB, diffPathsAtB, err := sdb.createdAndUpdatedStateWithIntermediateNodes( diffAccountsAtB, diffPathsAtB, err := sdb.createdAndUpdatedStateWithIntermediateNodes(
iterPairs[0].Older, iterPairs[0].Newer, output) iterPairs[0].Older, iterPairs[0].Newer, params.watchedAddressesLeafPaths, output)
if err != nil { if err != nil {
return fmt.Errorf("error collecting createdAndUpdatedNodes: %v", err) return fmt.Errorf("error collecting createdAndUpdatedNodes: %v", err)
} }
@ -220,7 +219,7 @@ func (sdb *StateDiffBuilder) BuildStateDiffWithIntermediateStateNodes(iterPairs
// a map of their leafkey to all the accounts that were touched and exist at A // a map of their leafkey to all the accounts that were touched and exist at A
diffAccountsAtA, err := sdb.deletedOrUpdatedState( diffAccountsAtA, err := sdb.deletedOrUpdatedState(
iterPairs[1].Older, iterPairs[1].Newer, iterPairs[1].Older, iterPairs[1].Newer,
diffAccountsAtB, diffPathsAtB, params.watchedAddressesLeafKeys, diffAccountsAtB, diffPathsAtB, params.watchedAddressesLeafPaths,
params.IntermediateStateNodes, params.IntermediateStorageNodes, output) params.IntermediateStateNodes, params.IntermediateStorageNodes, output)
if err != nil { if err != nil {
return fmt.Errorf("error collecting deletedOrUpdatedNodes: %v", err) return fmt.Errorf("error collecting deletedOrUpdatedNodes: %v", err)
@ -256,7 +255,7 @@ func (sdb *StateDiffBuilder) BuildStateDiffWithoutIntermediateStateNodes(iterPai
// and a slice of all the paths for the nodes in both of the above sets // and a slice of all the paths for the nodes in both of the above sets
diffAccountsAtB, diffPathsAtB, err := sdb.createdAndUpdatedState( diffAccountsAtB, diffPathsAtB, err := sdb.createdAndUpdatedState(
iterPairs[0].Older, iterPairs[0].Newer, iterPairs[0].Older, iterPairs[0].Newer,
params.watchedAddressesLeafKeys) params.watchedAddressesLeafPaths)
if err != nil { if err != nil {
return fmt.Errorf("error collecting createdAndUpdatedNodes: %v", err) return fmt.Errorf("error collecting createdAndUpdatedNodes: %v", err)
} }
@ -265,7 +264,7 @@ func (sdb *StateDiffBuilder) BuildStateDiffWithoutIntermediateStateNodes(iterPai
// a map of their leafkey to all the accounts that were touched and exist at A // a map of their leafkey to all the accounts that were touched and exist at A
diffAccountsAtA, err := sdb.deletedOrUpdatedState( diffAccountsAtA, err := sdb.deletedOrUpdatedState(
iterPairs[1].Older, iterPairs[1].Newer, iterPairs[1].Older, iterPairs[1].Newer,
diffAccountsAtB, diffPathsAtB, params.watchedAddressesLeafKeys, diffAccountsAtB, diffPathsAtB, params.watchedAddressesLeafPaths,
params.IntermediateStateNodes, params.IntermediateStorageNodes, output) params.IntermediateStateNodes, params.IntermediateStorageNodes, output)
if err != nil { if err != nil {
return fmt.Errorf("error collecting deletedOrUpdatedNodes: %v", err) return fmt.Errorf("error collecting deletedOrUpdatedNodes: %v", err)
@ -299,11 +298,18 @@ func (sdb *StateDiffBuilder) BuildStateDiffWithoutIntermediateStateNodes(iterPai
// createdAndUpdatedState returns // createdAndUpdatedState returns
// a mapping of their leafkeys to all the accounts that exist in a different state at B than A // a mapping of their leafkeys to all the accounts that exist in a different state at B than A
// and a slice of the paths for all of the nodes included in both // and a slice of the paths for all of the nodes included in both
func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator, watchedAddressesLeafKeys map[common.Hash]struct{}) (types2.AccountMap, map[string]bool, error) { func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator, watchedAddressesLeafPaths [][]byte) (types2.AccountMap, map[string]bool, error) {
diffPathsAtB := make(map[string]bool) diffPathsAtB := make(map[string]bool)
diffAcountsAtB := make(types2.AccountMap) diffAcountsAtB := make(types2.AccountMap)
watchingAddresses := len(watchedAddressesLeafPaths) > 0
it, _ := trie.NewDifferenceIterator(a, b) it, _ := trie.NewDifferenceIterator(a, b)
for it.Next(true) { for it.Next(true) {
// ignore node if it is not along paths of interest
if watchingAddresses && !isValidPath(watchedAddressesLeafPaths, it.Path()) {
continue
}
// skip value nodes // skip value nodes
if it.Leaf() || bytes.Equal(nullHashBytes, it.Hash().Bytes()) { if it.Leaf() || bytes.Equal(nullHashBytes, it.Hash().Bytes()) {
continue continue
@ -322,9 +328,14 @@ func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator, watc
} }
partialPath := trie.CompactToHex(nodeElements[0].([]byte)) partialPath := trie.CompactToHex(nodeElements[0].([]byte))
valueNodePath := append(node.Path, partialPath...) valueNodePath := append(node.Path, partialPath...)
// ignore leaf node if it is not a watched address
if !isWatchedAddress(watchedAddressesLeafPaths, valueNodePath) {
continue
}
encodedPath := trie.HexToCompact(valueNodePath) encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:] leafKey := encodedPath[1:]
if isWatchedAddress(watchedAddressesLeafKeys, leafKey) {
diffAcountsAtB[common.Bytes2Hex(leafKey)] = types2.AccountWrapper{ diffAcountsAtB[common.Bytes2Hex(leafKey)] = types2.AccountWrapper{
NodeType: node.NodeType, NodeType: node.NodeType,
Path: node.Path, Path: node.Path,
@ -333,7 +344,6 @@ func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator, watc
Account: &account, Account: &account,
} }
} }
}
// add both intermediate and leaf node paths to the list of diffPathsAtB // add both intermediate and leaf node paths to the list of diffPathsAtB
diffPathsAtB[common.Bytes2Hex(node.Path)] = true diffPathsAtB[common.Bytes2Hex(node.Path)] = true
} }
@ -344,11 +354,18 @@ func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator, watc
// a slice of all the intermediate nodes that exist in a different state at B than A // a slice of all the intermediate nodes that exist in a different state at B than A
// a mapping of their leafkeys to all the accounts that exist in a different state at B than A // a mapping of their leafkeys to all the accounts that exist in a different state at B than A
// and a slice of the paths for all of the nodes included in both // and a slice of the paths for all of the nodes included in both
func (sdb *StateDiffBuilder) createdAndUpdatedStateWithIntermediateNodes(a, b trie.NodeIterator, output types2.StateNodeSink) (types2.AccountMap, map[string]bool, error) { func (sdb *StateDiffBuilder) createdAndUpdatedStateWithIntermediateNodes(a, b trie.NodeIterator, watchedAddressesLeafPaths [][]byte, output types2.StateNodeSink) (types2.AccountMap, map[string]bool, error) {
diffPathsAtB := make(map[string]bool) diffPathsAtB := make(map[string]bool)
diffAcountsAtB := make(types2.AccountMap) diffAccountsAtB := make(types2.AccountMap)
watchingAddresses := len(watchedAddressesLeafPaths) > 0
it, _ := trie.NewDifferenceIterator(a, b) it, _ := trie.NewDifferenceIterator(a, b)
for it.Next(true) { for it.Next(true) {
// ignore node if it is not along paths of interest
if watchingAddresses && !isValidPath(watchedAddressesLeafPaths, it.Path()) {
continue
}
// skip value nodes // skip value nodes
if it.Leaf() || bytes.Equal(nullHashBytes, it.Hash().Bytes()) { if it.Leaf() || bytes.Equal(nullHashBytes, it.Hash().Bytes()) {
continue continue
@ -367,9 +384,15 @@ func (sdb *StateDiffBuilder) createdAndUpdatedStateWithIntermediateNodes(a, b tr
} }
partialPath := trie.CompactToHex(nodeElements[0].([]byte)) partialPath := trie.CompactToHex(nodeElements[0].([]byte))
valueNodePath := append(node.Path, partialPath...) valueNodePath := append(node.Path, partialPath...)
// ignore leaf node if it is not a watched address
if !isWatchedAddress(watchedAddressesLeafPaths, valueNodePath) {
continue
}
encodedPath := trie.HexToCompact(valueNodePath) encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:] leafKey := encodedPath[1:]
diffAcountsAtB[common.Bytes2Hex(leafKey)] = types2.AccountWrapper{ diffAccountsAtB[common.Bytes2Hex(leafKey)] = types2.AccountWrapper{
NodeType: node.NodeType, NodeType: node.NodeType,
Path: node.Path, Path: node.Path,
NodeValue: node.NodeValue, NodeValue: node.NodeValue,
@ -392,15 +415,22 @@ func (sdb *StateDiffBuilder) createdAndUpdatedStateWithIntermediateNodes(a, b tr
// add both intermediate and leaf node paths to the list of diffPathsAtB // add both intermediate and leaf node paths to the list of diffPathsAtB
diffPathsAtB[common.Bytes2Hex(node.Path)] = true diffPathsAtB[common.Bytes2Hex(node.Path)] = true
} }
return diffAcountsAtB, diffPathsAtB, it.Error() return diffAccountsAtB, diffPathsAtB, it.Error()
} }
// deletedOrUpdatedState returns a slice of all the pathes that are emptied at B // deletedOrUpdatedState returns a slice of all the pathes that are emptied at B
// and a mapping of their leafkeys to all the accounts that exist in a different state at A than B // and a mapping of their leafkeys to all the accounts that exist in a different state at A than B
func (sdb *StateDiffBuilder) deletedOrUpdatedState(a, b trie.NodeIterator, diffAccountsAtB types2.AccountMap, diffPathsAtB map[string]bool, watchedAddressesLeafKeys map[common.Hash]struct{}, intermediateStateNodes, intermediateStorageNodes bool, output types2.StateNodeSink) (types2.AccountMap, error) { func (sdb *StateDiffBuilder) deletedOrUpdatedState(a, b trie.NodeIterator, diffAccountsAtB types2.AccountMap, diffPathsAtB map[string]bool, watchedAddressesLeafPaths [][]byte, intermediateStateNodes, intermediateStorageNodes bool, output types2.StateNodeSink) (types2.AccountMap, error) {
diffAccountAtA := make(types2.AccountMap) diffAccountAtA := make(types2.AccountMap)
watchingAddresses := len(watchedAddressesLeafPaths) > 0
it, _ := trie.NewDifferenceIterator(b, a) it, _ := trie.NewDifferenceIterator(b, a)
for it.Next(true) { for it.Next(true) {
// ignore node if it is not along paths of interest
if watchingAddresses && !isValidPath(watchedAddressesLeafPaths, it.Path()) {
continue
}
// skip value nodes // skip value nodes
if it.Leaf() || bytes.Equal(nullHashBytes, it.Hash().Bytes()) { if it.Leaf() || bytes.Equal(nullHashBytes, it.Hash().Bytes()) {
continue continue
@ -419,9 +449,14 @@ func (sdb *StateDiffBuilder) deletedOrUpdatedState(a, b trie.NodeIterator, diffA
} }
partialPath := trie.CompactToHex(nodeElements[0].([]byte)) partialPath := trie.CompactToHex(nodeElements[0].([]byte))
valueNodePath := append(node.Path, partialPath...) valueNodePath := append(node.Path, partialPath...)
// ignore leaf node if it is not a watched address
if !isWatchedAddress(watchedAddressesLeafPaths, valueNodePath) {
continue
}
encodedPath := trie.HexToCompact(valueNodePath) encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:] leafKey := encodedPath[1:]
if isWatchedAddress(watchedAddressesLeafKeys, leafKey) {
diffAccountAtA[common.Bytes2Hex(leafKey)] = types2.AccountWrapper{ diffAccountAtA[common.Bytes2Hex(leafKey)] = types2.AccountWrapper{
NodeType: node.NodeType, NodeType: node.NodeType,
Path: node.Path, Path: node.Path,
@ -464,7 +499,6 @@ func (sdb *StateDiffBuilder) deletedOrUpdatedState(a, b trie.NodeIterator, diffA
return nil, err return nil, err
} }
} }
}
case types2.Extension, types2.Branch: case types2.Extension, types2.Branch:
// if this node's path did not show up in diffPathsAtB // if this node's path did not show up in diffPathsAtB
// that means the node at this path was deleted (or moved) in B // that means the node at this path was deleted (or moved) in B
@ -830,13 +864,29 @@ func (sdb *StateDiffBuilder) deletedOrUpdatedStorage(a, b trie.NodeIterator, dif
return it.Error() return it.Error()
} }
// isValidPath is used to check if a node is a parent | ancestor to one of the addresses the builder is configured to watch
func isValidPath(watchedAddressesLeafPaths [][]byte, currentPath []byte) bool {
for _, watchedAddressPath := range watchedAddressesLeafPaths {
if bytes.HasPrefix(watchedAddressPath, currentPath) {
return true
}
}
return false
}
// isWatchedAddress is used to check if a state account corresponds to one of the addresses the builder is configured to watch // isWatchedAddress is used to check if a state account corresponds to one of the addresses the builder is configured to watch
func isWatchedAddress(watchedAddressesLeafKeys map[common.Hash]struct{}, stateLeafKey []byte) bool { func isWatchedAddress(watchedAddressesLeafPaths [][]byte, valueNodePath []byte) bool {
// If we aren't watching any specific addresses, we are watching everything // If we aren't watching any specific addresses, we are watching everything
if len(watchedAddressesLeafKeys) == 0 { if len(watchedAddressesLeafPaths) == 0 {
return true return true
} }
_, ok := watchedAddressesLeafKeys[common.BytesToHash(stateLeafKey)] for _, watchedAddressPath := range watchedAddressesLeafPaths {
return ok if bytes.Equal(watchedAddressPath, valueNodePath) {
return true
}
}
return false
} }

View File

@ -1003,7 +1003,7 @@ func TestBuilderWithWatchedAddressList(t *testing.T) {
params := statediff.Params{ params := statediff.Params{
WatchedAddresses: []common.Address{test_helpers.Account1Addr, test_helpers.ContractAddr}, WatchedAddresses: []common.Address{test_helpers.Account1Addr, test_helpers.ContractAddr},
} }
params.ComputeWatchedAddressesLeafKeys() params.ComputeWatchedAddressesLeafPaths()
builder = statediff.NewBuilder(chain.StateCache()) builder = statediff.NewBuilder(chain.StateCache())
var tests = []struct { var tests = []struct {
@ -1608,7 +1608,7 @@ func TestBuilderWithRemovedNonWatchedAccount(t *testing.T) {
params := statediff.Params{ params := statediff.Params{
WatchedAddresses: []common.Address{test_helpers.Account1Addr, test_helpers.Account2Addr}, WatchedAddresses: []common.Address{test_helpers.Account1Addr, test_helpers.Account2Addr},
} }
params.ComputeWatchedAddressesLeafKeys() params.ComputeWatchedAddressesLeafPaths()
builder = statediff.NewBuilder(chain.StateCache()) builder = statediff.NewBuilder(chain.StateCache())
var tests = []struct { var tests = []struct {
@ -1726,7 +1726,7 @@ func TestBuilderWithRemovedWatchedAccount(t *testing.T) {
params := statediff.Params{ params := statediff.Params{
WatchedAddresses: []common.Address{test_helpers.Account1Addr, test_helpers.ContractAddr}, WatchedAddresses: []common.Address{test_helpers.Account1Addr, test_helpers.ContractAddr},
} }
params.ComputeWatchedAddressesLeafKeys() params.ComputeWatchedAddressesLeafPaths()
builder = statediff.NewBuilder(chain.StateCache()) builder = statediff.NewBuilder(chain.StateCache())
var tests = []struct { var tests = []struct {

View File

@ -55,14 +55,14 @@ type Params struct {
IncludeTD bool IncludeTD bool
IncludeCode bool IncludeCode bool
WatchedAddresses []common.Address WatchedAddresses []common.Address
watchedAddressesLeafKeys map[common.Hash]struct{} watchedAddressesLeafPaths [][]byte
} }
// ComputeWatchedAddressesLeafKeys populates a map with keys (Keccak256Hash) of each of the WatchedAddresses // ComputeWatchedAddressesLeafPaths populates a slice with paths (hex_encoding(Keccak256)) of each of the WatchedAddresses
func (p *Params) ComputeWatchedAddressesLeafKeys() { func (p *Params) ComputeWatchedAddressesLeafPaths() {
p.watchedAddressesLeafKeys = make(map[common.Hash]struct{}, len(p.WatchedAddresses)) p.watchedAddressesLeafPaths = make([][]byte, len(p.WatchedAddresses))
for _, address := range p.WatchedAddresses { for i, address := range p.WatchedAddresses {
p.watchedAddressesLeafKeys[crypto.Keccak256Hash(address.Bytes())] = struct{}{} p.watchedAddressesLeafPaths[i] = keybytesToHex(crypto.Keccak256(address.Bytes()))
} }
} }
@ -77,3 +77,15 @@ type Args struct {
OldStateRoot, NewStateRoot, BlockHash common.Hash OldStateRoot, NewStateRoot, BlockHash common.Hash
BlockNumber *big.Int BlockNumber *big.Int
} }
// https://github.com/ethereum/go-ethereum/blob/master/trie/encoding.go#L97
func keybytesToHex(str []byte) []byte {
l := len(str)*2 + 1
var nibbles = make([]byte, l)
for i, b := range str {
nibbles[i*2] = b / 16
nibbles[i*2+1] = b % 16
}
nibbles[l-1] = 16
return nibbles
}

View File

@ -477,8 +477,8 @@ func (sds *Service) StateDiffAt(blockNumber uint64, params Params) (*Payload, er
currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber) currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber)
log.Info("sending state diff", "block height", blockNumber) log.Info("sending state diff", "block height", blockNumber)
// compute leaf keys of watched addresses in the params // compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafKeys() params.ComputeWatchedAddressesLeafPaths()
if blockNumber == 0 { if blockNumber == 0 {
return sds.processStateDiff(currentBlock, common.Hash{}, params) return sds.processStateDiff(currentBlock, common.Hash{}, params)
@ -493,8 +493,8 @@ func (sds *Service) StateDiffFor(blockHash common.Hash, params Params) (*Payload
currentBlock := sds.BlockChain.GetBlockByHash(blockHash) currentBlock := sds.BlockChain.GetBlockByHash(blockHash)
log.Info("sending state diff", "block hash", blockHash) log.Info("sending state diff", "block hash", blockHash)
// compute leaf keys of watched addresses in the params // compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafKeys() params.ComputeWatchedAddressesLeafPaths()
if currentBlock.NumberU64() == 0 { if currentBlock.NumberU64() == 0 {
return sds.processStateDiff(currentBlock, common.Hash{}, params) return sds.processStateDiff(currentBlock, common.Hash{}, params)
@ -555,8 +555,8 @@ func (sds *Service) StateTrieAt(blockNumber uint64, params Params) (*Payload, er
currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber) currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber)
log.Info("sending state trie", "block height", blockNumber) log.Info("sending state trie", "block height", blockNumber)
// compute leaf keys of watched addresses in the params // compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafKeys() params.ComputeWatchedAddressesLeafPaths()
return sds.processStateTrie(currentBlock, params) return sds.processStateTrie(currentBlock, params)
} }
@ -581,8 +581,8 @@ func (sds *Service) Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- boo
log.Info("State diffing subscription received; beginning statediff processing") log.Info("State diffing subscription received; beginning statediff processing")
} }
// compute leaf keys of watched addresses in the params // compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafKeys() params.ComputeWatchedAddressesLeafPaths()
// Subscription type is defined as the hash of the rlp-serialized subscription params // Subscription type is defined as the hash of the rlp-serialized subscription params
by, err := rlp.EncodeToBytes(&params) by, err := rlp.EncodeToBytes(&params)
@ -777,8 +777,8 @@ func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- typ
// This operation cannot be performed back past the point of db pruning; it requires an archival node // This operation cannot be performed back past the point of db pruning; it requires an archival node
// for historical data // for historical data
func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error { func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error {
// compute leaf keys of watched addresses in the params // compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafKeys() params.ComputeWatchedAddressesLeafPaths()
currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber) currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber)
parentRoot := common.Hash{} parentRoot := common.Hash{}
@ -793,8 +793,8 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error {
// This operation cannot be performed back past the point of db pruning; it requires an archival node // This operation cannot be performed back past the point of db pruning; it requires an archival node
// for historical data // for historical data
func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) error { func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) error {
// compute leaf keys of watched addresses in the params // compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafKeys() params.ComputeWatchedAddressesLeafPaths()
currentBlock := sds.BlockChain.GetBlockByHash(blockHash) currentBlock := sds.BlockChain.GetBlockByHash(blockHash)
parentRoot := common.Hash{} parentRoot := common.Hash{}
@ -902,9 +902,7 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W
// update in-memory params // update in-memory params
writeLoopParams.WatchedAddresses = append(writeLoopParams.WatchedAddresses, filteredAddresses...) writeLoopParams.WatchedAddresses = append(writeLoopParams.WatchedAddresses, filteredAddresses...)
funk.ForEach(filteredAddresses, func(address common.Address) { writeLoopParams.ComputeWatchedAddressesLeafPaths()
writeLoopParams.watchedAddressesLeafKeys[crypto.Keccak256Hash(address.Bytes())] = struct{}{}
})
case types2.Remove: case types2.Remove:
// get addresses from args // get addresses from args
argAddresses, err := MapWatchAddressArgsToAddresses(args) argAddresses, err := MapWatchAddressArgsToAddresses(args)
@ -926,9 +924,7 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W
// update in-memory params // update in-memory params
writeLoopParams.WatchedAddresses = addresses writeLoopParams.WatchedAddresses = addresses
funk.ForEach(argAddresses, func(address common.Address) { writeLoopParams.ComputeWatchedAddressesLeafPaths()
delete(writeLoopParams.watchedAddressesLeafKeys, crypto.Keccak256Hash(address.Bytes()))
})
case types2.Set: case types2.Set:
// get addresses from args // get addresses from args
argAddresses, err := MapWatchAddressArgsToAddresses(args) argAddresses, err := MapWatchAddressArgsToAddresses(args)
@ -944,7 +940,7 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W
// update in-memory params // update in-memory params
writeLoopParams.WatchedAddresses = argAddresses writeLoopParams.WatchedAddresses = argAddresses
writeLoopParams.ComputeWatchedAddressesLeafKeys() writeLoopParams.ComputeWatchedAddressesLeafPaths()
case types2.Clear: case types2.Clear:
// update the db // update the db
err := sds.indexer.ClearWatchedAddresses() err := sds.indexer.ClearWatchedAddresses()
@ -954,7 +950,7 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W
// update in-memory params // update in-memory params
writeLoopParams.WatchedAddresses = []common.Address{} writeLoopParams.WatchedAddresses = []common.Address{}
writeLoopParams.ComputeWatchedAddressesLeafKeys() writeLoopParams.ComputeWatchedAddressesLeafPaths()
default: default:
return fmt.Errorf("%s %s", unexpectedOperation, operation) return fmt.Errorf("%s %s", unexpectedOperation, operation)
@ -974,7 +970,7 @@ func loadWatchedAddresses(indexer interfaces.StateDiffIndexer) error {
defer writeLoopParams.Unlock() defer writeLoopParams.Unlock()
writeLoopParams.WatchedAddresses = watchedAddresses writeLoopParams.WatchedAddresses = watchedAddresses
writeLoopParams.ComputeWatchedAddressesLeafKeys() writeLoopParams.ComputeWatchedAddressesLeafPaths()
return nil return nil
} }

View File

@ -146,7 +146,7 @@ func testErrorInChainEventLoop(t *testing.T) {
} }
} }
defaultParams.ComputeWatchedAddressesLeafKeys() defaultParams.ComputeWatchedAddressesLeafPaths()
if !reflect.DeepEqual(builder.Params, defaultParams) { if !reflect.DeepEqual(builder.Params, defaultParams) {
t.Error("Test failure:", t.Name()) t.Error("Test failure:", t.Name())
t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams) t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams)
@ -199,7 +199,7 @@ func testErrorInBlockLoop(t *testing.T) {
}() }()
service.Loop(eventsChannel) service.Loop(eventsChannel)
defaultParams.ComputeWatchedAddressesLeafKeys() defaultParams.ComputeWatchedAddressesLeafPaths()
if !reflect.DeepEqual(builder.Params, defaultParams) { if !reflect.DeepEqual(builder.Params, defaultParams) {
t.Error("Test failure:", t.Name()) t.Error("Test failure:", t.Name())
t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams) t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams)
@ -274,7 +274,7 @@ func testErrorInStateDiffAt(t *testing.T) {
t.Error(err) t.Error(err)
} }
defaultParams.ComputeWatchedAddressesLeafKeys() defaultParams.ComputeWatchedAddressesLeafPaths()
if !reflect.DeepEqual(builder.Params, defaultParams) { if !reflect.DeepEqual(builder.Params, defaultParams) {
t.Error("Test failure:", t.Name()) t.Error("Test failure:", t.Name())
t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams) t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams)

View File

@ -380,7 +380,7 @@ func (sds *MockStateDiffService) WatchAddress(operation sdtypes.OperationType, a
// update in-memory params // update in-memory params
sds.writeLoopParams.WatchedAddresses = append(sds.writeLoopParams.WatchedAddresses, filteredAddresses...) sds.writeLoopParams.WatchedAddresses = append(sds.writeLoopParams.WatchedAddresses, filteredAddresses...)
sds.writeLoopParams.ComputeWatchedAddressesLeafKeys() sds.writeLoopParams.ComputeWatchedAddressesLeafPaths()
case sdtypes.Remove: case sdtypes.Remove:
// get addresses from args // get addresses from args
argAddresses, err := statediff.MapWatchAddressArgsToAddresses(args) argAddresses, err := statediff.MapWatchAddressArgsToAddresses(args)
@ -402,7 +402,7 @@ func (sds *MockStateDiffService) WatchAddress(operation sdtypes.OperationType, a
// update in-memory params // update in-memory params
sds.writeLoopParams.WatchedAddresses = addresses sds.writeLoopParams.WatchedAddresses = addresses
sds.writeLoopParams.ComputeWatchedAddressesLeafKeys() sds.writeLoopParams.ComputeWatchedAddressesLeafPaths()
case sdtypes.Set: case sdtypes.Set:
// get addresses from args // get addresses from args
argAddresses, err := statediff.MapWatchAddressArgsToAddresses(args) argAddresses, err := statediff.MapWatchAddressArgsToAddresses(args)
@ -418,7 +418,7 @@ func (sds *MockStateDiffService) WatchAddress(operation sdtypes.OperationType, a
// update in-memory params // update in-memory params
sds.writeLoopParams.WatchedAddresses = argAddresses sds.writeLoopParams.WatchedAddresses = argAddresses
sds.writeLoopParams.ComputeWatchedAddressesLeafKeys() sds.writeLoopParams.ComputeWatchedAddressesLeafPaths()
case sdtypes.Clear: case sdtypes.Clear:
// update the db // update the db
err := sds.Indexer.ClearWatchedAddresses() err := sds.Indexer.ClearWatchedAddresses()
@ -428,7 +428,7 @@ func (sds *MockStateDiffService) WatchAddress(operation sdtypes.OperationType, a
// update in-memory params // update in-memory params
sds.writeLoopParams.WatchedAddresses = []common.Address{} sds.writeLoopParams.WatchedAddresses = []common.Address{}
sds.writeLoopParams.ComputeWatchedAddressesLeafKeys() sds.writeLoopParams.ComputeWatchedAddressesLeafPaths()
default: default:
return fmt.Errorf("%s %s", unexpectedOperation, operation) return fmt.Errorf("%s %s", unexpectedOperation, operation)

View File

@ -513,7 +513,7 @@ func testWatchAddressAPI(t *testing.T) {
mockService.writeLoopParams = statediff.ParamsWithMutex{ mockService.writeLoopParams = statediff.ParamsWithMutex{
Params: test.startingParams, Params: test.startingParams,
} }
mockService.writeLoopParams.ComputeWatchedAddressesLeafKeys() mockService.writeLoopParams.ComputeWatchedAddressesLeafPaths()
// make the API call to change watched addresses // make the API call to change watched addresses
err := mockService.WatchAddress(test.operation, test.args) err := mockService.WatchAddress(test.operation, test.args)
@ -530,7 +530,7 @@ func testWatchAddressAPI(t *testing.T) {
} }
// check updated indexing params // check updated indexing params
test.expectedParams.ComputeWatchedAddressesLeafKeys() test.expectedParams.ComputeWatchedAddressesLeafPaths()
updatedParams := mockService.writeLoopParams.Params updatedParams := mockService.writeLoopParams.Params
if !reflect.DeepEqual(updatedParams, test.expectedParams) { if !reflect.DeepEqual(updatedParams, test.expectedParams) {
t.Logf("Test failed: %s", test.name) t.Logf("Test failed: %s", test.name)