Statediff API to change addresses watched in direct indexing mode #194

Merged
prathamesh0 merged 17 commits from pm-watched-addresses into v1.10.15-statediff-v2 2022-04-05 04:03:36 +00:00
20 changed files with 1483 additions and 280 deletions

View File

@ -52,6 +52,7 @@ ios:
.PHONY: statedifftest .PHONY: statedifftest
statedifftest: | $(GOOSE) statedifftest: | $(GOOSE)
go get github.com/stretchr/testify/assert@v1.7.0
MODE=statediff go test ./statediff/... -v MODE=statediff go test ./statediff/... -v
test: all test: all

View File

@ -3,7 +3,7 @@ version: '3.2'
services: services:
ipld-eth-db: ipld-eth-db:
restart: always restart: always
image: vulcanize/ipld-eth-db:v0.2.0 image: vulcanize/ipld-eth-db:v2.1.1
environment: environment:
POSTGRES_USER: "vdbm" POSTGRES_USER: "vdbm"
POSTGRES_DB: "vulcanize_public" POSTGRES_DB: "vulcanize_public"

1
go.mod
View File

@ -68,6 +68,7 @@ require (
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4 github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4
github.com/stretchr/testify v1.7.0 github.com/stretchr/testify v1.7.0
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
github.com/thoas/go-funk v0.9.1
github.com/tklauser/go-sysconf v0.3.5 // indirect github.com/tklauser/go-sysconf v0.3.5 // indirect
github.com/tyler-smith/go-bip39 v1.0.1-0.20181017060643-dbb3b84ba2ef github.com/tyler-smith/go-bip39 v1.0.1-0.20181017060643-dbb3b84ba2ef
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a

2
go.sum
View File

@ -470,6 +470,8 @@ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5Cc
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY=
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc=
github.com/thoas/go-funk v0.9.1 h1:O549iLZqPpTUQ10ykd26sZhzD+rmR5pWhuElrhbC20M=
github.com/thoas/go-funk v0.9.1/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q=
github.com/tinylib/msgp v1.0.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= github.com/tinylib/msgp v1.0.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=
github.com/tklauser/go-sysconf v0.3.5 h1:uu3Xl4nkLzQfXNsWn15rPc/HQCJKObbt1dKJeWp3vU4= github.com/tklauser/go-sysconf v0.3.5 h1:uu3Xl4nkLzQfXNsWn15rPc/HQCJKObbt1dKJeWp3vU4=
github.com/tklauser/go-sysconf v0.3.5/go.mod h1:MkWzOF4RMCshBAMXuhXJs64Rte09mITnppBXY/rYEFI= github.com/tklauser/go-sysconf v0.3.5/go.mod h1:MkWzOF4RMCshBAMXuhXJs64Rte09mITnppBXY/rYEFI=

View File

@ -106,15 +106,13 @@ type Params struct {
IncludeTD bool IncludeTD bool
IncludeCode bool IncludeCode bool
WatchedAddresses []common.Address WatchedAddresses []common.Address
WatchedStorageSlots []common.Hash
} }
``` ```
Using these params we can tell the service whether to include state and/or storage intermediate nodes; whether Using these params we can tell the service whether to include state and/or storage intermediate nodes; whether
to include the associated block (header, uncles, and transactions); whether to include the associated receipts; to include the associated block (header, uncles, and transactions); whether to include the associated receipts;
whether to include the total difficulty for this block; whether to include the set of code hashes and code for whether to include the total difficulty for this block; whether to include the set of code hashes and code for
contracts deployed in this block; whether to limit the diffing process to a list of specific addresses; and/or contracts deployed in this block; whether to limit the diffing process to a list of specific addresses.
whether to limit the diffing process to a list of specific storage slot keys.
#### Subscription endpoint #### Subscription endpoint
A websocket supporting RPC endpoint is exposed for subscribing to state diff `StateObjects` that come off the head of the chain while the geth node syncs. A websocket supporting RPC endpoint is exposed for subscribing to state diff `StateObjects` that come off the head of the chain while the geth node syncs.

View File

@ -149,3 +149,8 @@ func (api *PublicStateDiffAPI) WriteStateDiffAt(ctx context.Context, blockNumber
func (api *PublicStateDiffAPI) WriteStateDiffFor(ctx context.Context, blockHash common.Hash, params Params) error { func (api *PublicStateDiffAPI) WriteStateDiffFor(ctx context.Context, blockHash common.Hash, params Params) error {
return api.sds.WriteStateDiffFor(blockHash, params) return api.sds.WriteStateDiffFor(blockHash, params)
} }
// WatchAddress changes the list of watched addresses to which the direct indexing is restricted according to given operation
func (api *PublicStateDiffAPI) WatchAddress(operation OperationType, args []WatchAddressArg) error {
return api.sds.WatchAddress(operation, args)
}

View File

@ -123,7 +123,7 @@ func (sdb *builder) buildStateTrie(it trie.NodeIterator) ([]StateNode, []CodeAnd
node.LeafKey = leafKey node.LeafKey = leafKey
if !bytes.Equal(account.CodeHash, nullCodeHash) { if !bytes.Equal(account.CodeHash, nullCodeHash) {
var storageNodes []StorageNode var storageNodes []StorageNode
err := sdb.buildStorageNodesEventual(account.Root, nil, true, storageNodeAppender(&storageNodes)) err := sdb.buildStorageNodesEventual(account.Root, true, storageNodeAppender(&storageNodes))
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("failed building eventual storage diffs for account %+v\r\nerror: %v", account, err) return nil, nil, fmt.Errorf("failed building eventual storage diffs for account %+v\r\nerror: %v", account, err)
} }
@ -202,7 +202,7 @@ func (sdb *builder) buildStateDiffWithIntermediateStateNodes(args StateRoots, pa
// a map of their leafkey to all the accounts that were touched and exist at A // a map of their leafkey to all the accounts that were touched and exist at A
diffAccountsAtA, err := sdb.deletedOrUpdatedState( diffAccountsAtA, err := sdb.deletedOrUpdatedState(
oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{}), oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{}),
diffPathsAtB, output) diffPathsAtB, params.watchedAddressesLeafKeys, output)
if err != nil { if err != nil {
return fmt.Errorf("error collecting deletedOrUpdatedNodes: %v", err) return fmt.Errorf("error collecting deletedOrUpdatedNodes: %v", err)
} }
@ -220,12 +220,12 @@ func (sdb *builder) buildStateDiffWithIntermediateStateNodes(args StateRoots, pa
// build the diff nodes for the updated accounts using the mappings at both A and B as directed by the keys found as the intersection of the two // build the diff nodes for the updated accounts using the mappings at both A and B as directed by the keys found as the intersection of the two
err = sdb.buildAccountUpdates( err = sdb.buildAccountUpdates(
diffAccountsAtB, diffAccountsAtA, updatedKeys, diffAccountsAtB, diffAccountsAtA, updatedKeys,
params.WatchedStorageSlots, params.IntermediateStorageNodes, output) params.IntermediateStorageNodes, output)
if err != nil { if err != nil {
return fmt.Errorf("error building diff for updated accounts: %v", err) return fmt.Errorf("error building diff for updated accounts: %v", err)
} }
// build the diff nodes for created accounts // build the diff nodes for created accounts
err = sdb.buildAccountCreations(diffAccountsAtB, params.WatchedStorageSlots, params.IntermediateStorageNodes, output, codeOutput) err = sdb.buildAccountCreations(diffAccountsAtB, params.IntermediateStorageNodes, output, codeOutput)
if err != nil { if err != nil {
return fmt.Errorf("error building diff for created accounts: %v", err) return fmt.Errorf("error building diff for created accounts: %v", err)
} }
@ -247,7 +247,7 @@ func (sdb *builder) buildStateDiffWithoutIntermediateStateNodes(args StateRoots,
// and a slice of all the paths for the nodes in both of the above sets // and a slice of all the paths for the nodes in both of the above sets
diffAccountsAtB, diffPathsAtB, err := sdb.createdAndUpdatedState( diffAccountsAtB, diffPathsAtB, err := sdb.createdAndUpdatedState(
oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{}), oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{}),
params.WatchedAddresses) params.watchedAddressesLeafKeys)
if err != nil { if err != nil {
return fmt.Errorf("error collecting createdAndUpdatedNodes: %v", err) return fmt.Errorf("error collecting createdAndUpdatedNodes: %v", err)
} }
@ -256,7 +256,7 @@ func (sdb *builder) buildStateDiffWithoutIntermediateStateNodes(args StateRoots,
// a map of their leafkey to all the accounts that were touched and exist at A // a map of their leafkey to all the accounts that were touched and exist at A
diffAccountsAtA, err := sdb.deletedOrUpdatedState( diffAccountsAtA, err := sdb.deletedOrUpdatedState(
oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{}), oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{}),
diffPathsAtB, output) diffPathsAtB, params.watchedAddressesLeafKeys, output)
if err != nil { if err != nil {
return fmt.Errorf("error collecting deletedOrUpdatedNodes: %v", err) return fmt.Errorf("error collecting deletedOrUpdatedNodes: %v", err)
} }
@ -274,12 +274,12 @@ func (sdb *builder) buildStateDiffWithoutIntermediateStateNodes(args StateRoots,
// build the diff nodes for the updated accounts using the mappings at both A and B as directed by the keys found as the intersection of the two // build the diff nodes for the updated accounts using the mappings at both A and B as directed by the keys found as the intersection of the two
err = sdb.buildAccountUpdates( err = sdb.buildAccountUpdates(
diffAccountsAtB, diffAccountsAtA, updatedKeys, diffAccountsAtB, diffAccountsAtA, updatedKeys,
params.WatchedStorageSlots, params.IntermediateStorageNodes, output) params.IntermediateStorageNodes, output)
if err != nil { if err != nil {
return fmt.Errorf("error building diff for updated accounts: %v", err) return fmt.Errorf("error building diff for updated accounts: %v", err)
} }
// build the diff nodes for created accounts // build the diff nodes for created accounts
err = sdb.buildAccountCreations(diffAccountsAtB, params.WatchedStorageSlots, params.IntermediateStorageNodes, output, codeOutput) err = sdb.buildAccountCreations(diffAccountsAtB, params.IntermediateStorageNodes, output, codeOutput)
if err != nil { if err != nil {
return fmt.Errorf("error building diff for created accounts: %v", err) return fmt.Errorf("error building diff for created accounts: %v", err)
} }
@ -289,7 +289,7 @@ func (sdb *builder) buildStateDiffWithoutIntermediateStateNodes(args StateRoots,
// createdAndUpdatedState returns // createdAndUpdatedState returns
// a mapping of their leafkeys to all the accounts that exist in a different state at B than A // a mapping of their leafkeys to all the accounts that exist in a different state at B than A
// and a slice of the paths for all of the nodes included in both // and a slice of the paths for all of the nodes included in both
func (sdb *builder) createdAndUpdatedState(a, b trie.NodeIterator, watchedAddresses []common.Address) (AccountMap, map[string]bool, error) { func (sdb *builder) createdAndUpdatedState(a, b trie.NodeIterator, watchedAddressesLeafKeys map[common.Hash]struct{}) (AccountMap, map[string]bool, error) {
diffPathsAtB := make(map[string]bool) diffPathsAtB := make(map[string]bool)
diffAcountsAtB := make(AccountMap) diffAcountsAtB := make(AccountMap)
it, _ := trie.NewDifferenceIterator(a, b) it, _ := trie.NewDifferenceIterator(a, b)
@ -313,7 +313,7 @@ func (sdb *builder) createdAndUpdatedState(a, b trie.NodeIterator, watchedAddres
valueNodePath := append(node.Path, partialPath...) valueNodePath := append(node.Path, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath) encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:] leafKey := encodedPath[1:]
if isWatchedAddress(watchedAddresses, leafKey) { if isWatchedAddress(watchedAddressesLeafKeys, leafKey) {
diffAcountsAtB[common.Bytes2Hex(leafKey)] = accountWrapper{ diffAcountsAtB[common.Bytes2Hex(leafKey)] = accountWrapper{
NodeType: node.NodeType, NodeType: node.NodeType,
Path: node.Path, Path: node.Path,
@ -386,7 +386,7 @@ func (sdb *builder) createdAndUpdatedStateWithIntermediateNodes(a, b trie.NodeIt
// deletedOrUpdatedState returns a slice of all the pathes that are emptied at B // deletedOrUpdatedState returns a slice of all the pathes that are emptied at B
// and a mapping of their leafkeys to all the accounts that exist in a different state at A than B // and a mapping of their leafkeys to all the accounts that exist in a different state at A than B
func (sdb *builder) deletedOrUpdatedState(a, b trie.NodeIterator, diffPathsAtB map[string]bool, output StateNodeSink) (AccountMap, error) { func (sdb *builder) deletedOrUpdatedState(a, b trie.NodeIterator, diffPathsAtB map[string]bool, watchedAddressesLeafKeys map[common.Hash]struct{}, output StateNodeSink) (AccountMap, error) {
diffAccountAtA := make(AccountMap) diffAccountAtA := make(AccountMap)
it, _ := trie.NewDifferenceIterator(b, a) it, _ := trie.NewDifferenceIterator(b, a)
for it.Next(true) { for it.Next(true) {
@ -409,24 +409,26 @@ func (sdb *builder) deletedOrUpdatedState(a, b trie.NodeIterator, diffPathsAtB m
valueNodePath := append(node.Path, partialPath...) valueNodePath := append(node.Path, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath) encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:] leafKey := encodedPath[1:]
diffAccountAtA[common.Bytes2Hex(leafKey)] = accountWrapper{ if isWatchedAddress(watchedAddressesLeafKeys, leafKey) {
NodeType: node.NodeType, diffAccountAtA[common.Bytes2Hex(leafKey)] = accountWrapper{
Path: node.Path, NodeType: node.NodeType,
NodeValue: node.NodeValue,
LeafKey: leafKey,
Account: &account,
}
// if this node's path did not show up in diffPathsAtB
// that means the node at this path was deleted (or moved) in B
// emit an empty "removed" diff to signify as such
if _, ok := diffPathsAtB[common.Bytes2Hex(node.Path)]; !ok {
if err := output(StateNode{
Path: node.Path, Path: node.Path,
NodeValue: []byte{}, NodeValue: node.NodeValue,
NodeType: Removed,
LeafKey: leafKey, LeafKey: leafKey,
}); err != nil { Account: &account,
return nil, err }
// if this node's path did not show up in diffPathsAtB
// that means the node at this path was deleted (or moved) in B
// emit an empty "removed" diff to signify as such
if _, ok := diffPathsAtB[common.Bytes2Hex(node.Path)]; !ok {
if err := output(StateNode{
Path: node.Path,
NodeValue: []byte{},
NodeType: Removed,
LeafKey: leafKey,
}); err != nil {
return nil, err
}
} }
} }
case Extension, Branch: case Extension, Branch:
@ -454,8 +456,7 @@ func (sdb *builder) deletedOrUpdatedState(a, b trie.NodeIterator, diffPathsAtB m
// to generate the statediff node objects for all of the accounts that existed at both A and B but in different states // to generate the statediff node objects for all of the accounts that existed at both A and B but in different states
// needs to be called before building account creations and deletions as this mutates // needs to be called before building account creations and deletions as this mutates
// those account maps to remove the accounts which were updated // those account maps to remove the accounts which were updated
func (sdb *builder) buildAccountUpdates(creations, deletions AccountMap, updatedKeys []string, func (sdb *builder) buildAccountUpdates(creations, deletions AccountMap, updatedKeys []string, intermediateStorageNodes bool, output StateNodeSink) error {
watchedStorageKeys []common.Hash, intermediateStorageNodes bool, output StateNodeSink) error {
var err error var err error
for _, key := range updatedKeys { for _, key := range updatedKeys {
createdAcc := creations[key] createdAcc := creations[key]
@ -465,7 +466,7 @@ func (sdb *builder) buildAccountUpdates(creations, deletions AccountMap, updated
oldSR := deletedAcc.Account.Root oldSR := deletedAcc.Account.Root
newSR := createdAcc.Account.Root newSR := createdAcc.Account.Root
err = sdb.buildStorageNodesIncremental( err = sdb.buildStorageNodesIncremental(
oldSR, newSR, watchedStorageKeys, intermediateStorageNodes, oldSR, newSR, intermediateStorageNodes,
storageNodeAppender(&storageDiffs)) storageNodeAppender(&storageDiffs))
if err != nil { if err != nil {
return fmt.Errorf("failed building incremental storage diffs for account with leafkey %s\r\nerror: %v", key, err) return fmt.Errorf("failed building incremental storage diffs for account with leafkey %s\r\nerror: %v", key, err)
@ -489,7 +490,7 @@ func (sdb *builder) buildAccountUpdates(creations, deletions AccountMap, updated
// buildAccountCreations returns the statediff node objects for all the accounts that exist at B but not at A // buildAccountCreations returns the statediff node objects for all the accounts that exist at B but not at A
// it also returns the code and codehash for created contract accounts // it also returns the code and codehash for created contract accounts
func (sdb *builder) buildAccountCreations(accounts AccountMap, watchedStorageKeys []common.Hash, intermediateStorageNodes bool, output StateNodeSink, codeOutput CodeSink) error { func (sdb *builder) buildAccountCreations(accounts AccountMap, intermediateStorageNodes bool, output StateNodeSink, codeOutput CodeSink) error {
for _, val := range accounts { for _, val := range accounts {
diff := StateNode{ diff := StateNode{
NodeType: val.NodeType, NodeType: val.NodeType,
@ -500,7 +501,7 @@ func (sdb *builder) buildAccountCreations(accounts AccountMap, watchedStorageKey
if !bytes.Equal(val.Account.CodeHash, nullCodeHash) { if !bytes.Equal(val.Account.CodeHash, nullCodeHash) {
// For contract creations, any storage node contained is a diff // For contract creations, any storage node contained is a diff
var storageDiffs []StorageNode var storageDiffs []StorageNode
err := sdb.buildStorageNodesEventual(val.Account.Root, watchedStorageKeys, intermediateStorageNodes, storageNodeAppender(&storageDiffs)) err := sdb.buildStorageNodesEventual(val.Account.Root, intermediateStorageNodes, storageNodeAppender(&storageDiffs))
if err != nil { if err != nil {
return fmt.Errorf("failed building eventual storage diffs for node %x\r\nerror: %v", val.Path, err) return fmt.Errorf("failed building eventual storage diffs for node %x\r\nerror: %v", val.Path, err)
} }
@ -528,7 +529,7 @@ func (sdb *builder) buildAccountCreations(accounts AccountMap, watchedStorageKey
// buildStorageNodesEventual builds the storage diff node objects for a created account // buildStorageNodesEventual builds the storage diff node objects for a created account
// i.e. it returns all the storage nodes at this state, since there is no previous state // i.e. it returns all the storage nodes at this state, since there is no previous state
func (sdb *builder) buildStorageNodesEventual(sr common.Hash, watchedStorageKeys []common.Hash, intermediateNodes bool, output StorageNodeSink) error { func (sdb *builder) buildStorageNodesEventual(sr common.Hash, intermediateNodes bool, output StorageNodeSink) error {
if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) { if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) {
return nil return nil
} }
@ -539,7 +540,7 @@ func (sdb *builder) buildStorageNodesEventual(sr common.Hash, watchedStorageKeys
return err return err
} }
it := sTrie.NodeIterator(make([]byte, 0)) it := sTrie.NodeIterator(make([]byte, 0))
err = sdb.buildStorageNodesFromTrie(it, watchedStorageKeys, intermediateNodes, output) err = sdb.buildStorageNodesFromTrie(it, intermediateNodes, output)
if err != nil { if err != nil {
return err return err
} }
@ -549,7 +550,7 @@ func (sdb *builder) buildStorageNodesEventual(sr common.Hash, watchedStorageKeys
// buildStorageNodesFromTrie returns all the storage diff node objects in the provided node interator // buildStorageNodesFromTrie returns all the storage diff node objects in the provided node interator
// if any storage keys are provided it will only return those leaf nodes // if any storage keys are provided it will only return those leaf nodes
// including intermediate nodes can be turned on or off // including intermediate nodes can be turned on or off
func (sdb *builder) buildStorageNodesFromTrie(it trie.NodeIterator, watchedStorageKeys []common.Hash, intermediateNodes bool, output StorageNodeSink) error { func (sdb *builder) buildStorageNodesFromTrie(it trie.NodeIterator, intermediateNodes bool, output StorageNodeSink) error {
for it.Next(true) { for it.Next(true) {
// skip value nodes // skip value nodes
if it.Leaf() || bytes.Equal(nullHashBytes, it.Hash().Bytes()) { if it.Leaf() || bytes.Equal(nullHashBytes, it.Hash().Bytes()) {
@ -565,15 +566,13 @@ func (sdb *builder) buildStorageNodesFromTrie(it trie.NodeIterator, watchedStora
valueNodePath := append(node.Path, partialPath...) valueNodePath := append(node.Path, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath) encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:] leafKey := encodedPath[1:]
if isWatchedStorageKey(watchedStorageKeys, leafKey) { if err := output(StorageNode{
if err := output(StorageNode{ NodeType: node.NodeType,
NodeType: node.NodeType, Path: node.Path,
Path: node.Path, NodeValue: node.NodeValue,
NodeValue: node.NodeValue, LeafKey: leafKey,
LeafKey: leafKey, }); err != nil {
}); err != nil { return err
return err
}
} }
case Extension, Branch: case Extension, Branch:
if intermediateNodes { if intermediateNodes {
@ -593,7 +592,7 @@ func (sdb *builder) buildStorageNodesFromTrie(it trie.NodeIterator, watchedStora
} }
// buildStorageNodesIncremental builds the storage diff node objects for all nodes that exist in a different state at B than A // buildStorageNodesIncremental builds the storage diff node objects for all nodes that exist in a different state at B than A
func (sdb *builder) buildStorageNodesIncremental(oldSR common.Hash, newSR common.Hash, watchedStorageKeys []common.Hash, intermediateNodes bool, output StorageNodeSink) error { func (sdb *builder) buildStorageNodesIncremental(oldSR common.Hash, newSR common.Hash, intermediateNodes bool, output StorageNodeSink) error {
if bytes.Equal(newSR.Bytes(), oldSR.Bytes()) { if bytes.Equal(newSR.Bytes(), oldSR.Bytes()) {
return nil return nil
} }
@ -609,19 +608,19 @@ func (sdb *builder) buildStorageNodesIncremental(oldSR common.Hash, newSR common
diffPathsAtB, err := sdb.createdAndUpdatedStorage( diffPathsAtB, err := sdb.createdAndUpdatedStorage(
oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{}), oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{}),
watchedStorageKeys, intermediateNodes, output) intermediateNodes, output)
if err != nil { if err != nil {
return err return err
} }
err = sdb.deletedOrUpdatedStorage(oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{}), err = sdb.deletedOrUpdatedStorage(oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{}),
diffPathsAtB, watchedStorageKeys, intermediateNodes, output) diffPathsAtB, intermediateNodes, output)
if err != nil { if err != nil {
return err return err
} }
return nil return nil
} }
func (sdb *builder) createdAndUpdatedStorage(a, b trie.NodeIterator, watchedKeys []common.Hash, intermediateNodes bool, output StorageNodeSink) (map[string]bool, error) { func (sdb *builder) createdAndUpdatedStorage(a, b trie.NodeIterator, intermediateNodes bool, output StorageNodeSink) (map[string]bool, error) {
diffPathsAtB := make(map[string]bool) diffPathsAtB := make(map[string]bool)
it, _ := trie.NewDifferenceIterator(a, b) it, _ := trie.NewDifferenceIterator(a, b)
for it.Next(true) { for it.Next(true) {
@ -639,15 +638,13 @@ func (sdb *builder) createdAndUpdatedStorage(a, b trie.NodeIterator, watchedKeys
valueNodePath := append(node.Path, partialPath...) valueNodePath := append(node.Path, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath) encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:] leafKey := encodedPath[1:]
if isWatchedStorageKey(watchedKeys, leafKey) { if err := output(StorageNode{
if err := output(StorageNode{ NodeType: node.NodeType,
NodeType: node.NodeType, Path: node.Path,
Path: node.Path, NodeValue: node.NodeValue,
NodeValue: node.NodeValue, LeafKey: leafKey,
LeafKey: leafKey, }); err != nil {
}); err != nil { return nil, err
return nil, err
}
} }
case Extension, Branch: case Extension, Branch:
if intermediateNodes { if intermediateNodes {
@ -667,7 +664,7 @@ func (sdb *builder) createdAndUpdatedStorage(a, b trie.NodeIterator, watchedKeys
return diffPathsAtB, it.Error() return diffPathsAtB, it.Error()
} }
func (sdb *builder) deletedOrUpdatedStorage(a, b trie.NodeIterator, diffPathsAtB map[string]bool, watchedKeys []common.Hash, intermediateNodes bool, output StorageNodeSink) error { func (sdb *builder) deletedOrUpdatedStorage(a, b trie.NodeIterator, diffPathsAtB map[string]bool, intermediateNodes bool, output StorageNodeSink) error {
it, _ := trie.NewDifferenceIterator(b, a) it, _ := trie.NewDifferenceIterator(b, a)
for it.Next(true) { for it.Next(true) {
// skip value nodes // skip value nodes
@ -690,15 +687,13 @@ func (sdb *builder) deletedOrUpdatedStorage(a, b trie.NodeIterator, diffPathsAtB
valueNodePath := append(node.Path, partialPath...) valueNodePath := append(node.Path, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath) encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:] leafKey := encodedPath[1:]
if isWatchedStorageKey(watchedKeys, leafKey) { if err := output(StorageNode{
if err := output(StorageNode{ NodeType: Removed,
NodeType: Removed, Path: node.Path,
Path: node.Path, NodeValue: []byte{},
NodeValue: []byte{}, LeafKey: leafKey,
LeafKey: leafKey, }); err != nil {
}); err != nil { return err
return err
}
} }
case Extension, Branch: case Extension, Branch:
if intermediateNodes { if intermediateNodes {
@ -718,30 +713,12 @@ func (sdb *builder) deletedOrUpdatedStorage(a, b trie.NodeIterator, diffPathsAtB
} }
// isWatchedAddress is used to check if a state account corresponds to one of the addresses the builder is configured to watch // isWatchedAddress is used to check if a state account corresponds to one of the addresses the builder is configured to watch
func isWatchedAddress(watchedAddresses []common.Address, stateLeafKey []byte) bool { func isWatchedAddress(watchedAddressesLeafKeys map[common.Hash]struct{}, stateLeafKey []byte) bool {
// If we aren't watching any specific addresses, we are watching everything // If we aren't watching any specific addresses, we are watching everything
if len(watchedAddresses) == 0 { if len(watchedAddressesLeafKeys) == 0 {
return true return true
} }
for _, addr := range watchedAddresses {
addrHashKey := crypto.Keccak256(addr.Bytes())
if bytes.Equal(addrHashKey, stateLeafKey) {
return true
}
}
return false
}
// isWatchedStorageKey is used to check if a storage leaf corresponds to one of the storage slots the builder is configured to watch _, ok := watchedAddressesLeafKeys[common.BytesToHash(stateLeafKey)]
func isWatchedStorageKey(watchedKeys []common.Hash, storageLeafKey []byte) bool { return ok
// If we aren't watching any specific addresses, we are watching everything
if len(watchedKeys) == 0 {
return true
}
for _, hashKey := range watchedKeys {
if bytes.Equal(hashKey.Bytes(), storageLeafKey) {
return true
}
}
return false
} }

View File

@ -987,6 +987,7 @@ func TestBuilderWithWatchedAddressList(t *testing.T) {
params := statediff.Params{ params := statediff.Params{
WatchedAddresses: []common.Address{testhelpers.Account1Addr, testhelpers.ContractAddr}, WatchedAddresses: []common.Address{testhelpers.Account1Addr, testhelpers.ContractAddr},
} }
params.ComputeWatchedAddressesLeafKeys()
builder = statediff.NewBuilder(chain.StateCache()) builder = statediff.NewBuilder(chain.StateCache())
var tests = []struct { var tests = []struct {
@ -1151,169 +1152,6 @@ func TestBuilderWithWatchedAddressList(t *testing.T) {
} }
} }
func TestBuilderWithWatchedAddressAndStorageKeyList(t *testing.T) {
blocks, chain := testhelpers.MakeChain(3, testhelpers.Genesis, testhelpers.TestChainGen)
contractLeafKey = testhelpers.AddressToLeafKey(testhelpers.ContractAddr)
defer chain.Stop()
block0 = testhelpers.Genesis
block1 = blocks[0]
block2 = blocks[1]
block3 = blocks[2]
params := statediff.Params{
WatchedAddresses: []common.Address{testhelpers.Account1Addr, testhelpers.ContractAddr},
WatchedStorageSlots: []common.Hash{slot1StorageKey},
}
builder = statediff.NewBuilder(chain.StateCache())
var tests = []struct {
name string
startingArguments statediff.Args
expected *statediff.StateObject
}{
{
"testEmptyDiff",
statediff.Args{
OldStateRoot: block0.Root(),
NewStateRoot: block0.Root(),
BlockNumber: block0.Number(),
BlockHash: block0.Hash(),
},
&statediff.StateObject{
BlockNumber: block0.Number(),
BlockHash: block0.Hash(),
Nodes: emptyDiffs,
},
},
{
"testBlock0",
//10000 transferred from testBankAddress to account1Addr
statediff.Args{
OldStateRoot: testhelpers.NullHash,
NewStateRoot: block0.Root(),
BlockNumber: block0.Number(),
BlockHash: block0.Hash(),
},
&statediff.StateObject{
BlockNumber: block0.Number(),
BlockHash: block0.Hash(),
Nodes: emptyDiffs,
},
},
{
"testBlock1",
//10000 transferred from testBankAddress to account1Addr
statediff.Args{
OldStateRoot: block0.Root(),
NewStateRoot: block1.Root(),
BlockNumber: block1.Number(),
BlockHash: block1.Hash(),
},
&statediff.StateObject{
BlockNumber: block1.Number(),
BlockHash: block1.Hash(),
Nodes: []sdtypes.StateNode{
{
Path: []byte{'\x0e'},
NodeType: sdtypes.Leaf,
LeafKey: testhelpers.Account1LeafKey,
NodeValue: account1AtBlock1LeafNode,
StorageNodes: emptyStorage,
},
},
},
},
{
"testBlock2",
//1000 transferred from testBankAddress to account1Addr
//1000 transferred from account1Addr to account2Addr
statediff.Args{
OldStateRoot: block1.Root(),
NewStateRoot: block2.Root(),
BlockNumber: block2.Number(),
BlockHash: block2.Hash(),
},
&statediff.StateObject{
BlockNumber: block2.Number(),
BlockHash: block2.Hash(),
Nodes: []sdtypes.StateNode{
{
Path: []byte{'\x06'},
NodeType: sdtypes.Leaf,
LeafKey: contractLeafKey,
NodeValue: contractAccountAtBlock2LeafNode,
StorageNodes: []sdtypes.StorageNode{
{
Path: []byte{'\x0b'},
NodeType: sdtypes.Leaf,
LeafKey: slot1StorageKey.Bytes(),
NodeValue: slot1StorageLeafNode,
},
},
},
{
Path: []byte{'\x0e'},
NodeType: sdtypes.Leaf,
LeafKey: testhelpers.Account1LeafKey,
NodeValue: account1AtBlock2LeafNode,
StorageNodes: emptyStorage,
},
},
CodeAndCodeHashes: []sdtypes.CodeAndCodeHash{
{
Hash: testhelpers.CodeHash,
Code: testhelpers.ByteCodeAfterDeployment,
},
},
},
},
{
"testBlock3",
//the contract's storage is changed
//and the block is mined by account 2
statediff.Args{
OldStateRoot: block2.Root(),
NewStateRoot: block3.Root(),
BlockNumber: block3.Number(),
BlockHash: block3.Hash(),
},
&statediff.StateObject{
BlockNumber: block3.Number(),
BlockHash: block3.Hash(),
Nodes: []sdtypes.StateNode{
{
Path: []byte{'\x06'},
NodeType: sdtypes.Leaf,
LeafKey: contractLeafKey,
NodeValue: contractAccountAtBlock3LeafNode,
StorageNodes: emptyStorage,
},
},
},
},
}
for _, test := range tests {
diff, err := builder.BuildStateDiffObject(test.startingArguments, params)
if err != nil {
t.Error(err)
}
receivedStateDiffRlp, err := rlp.EncodeToBytes(diff)
if err != nil {
t.Error(err)
}
expectedStateDiffRlp, err := rlp.EncodeToBytes(test.expected)
if err != nil {
t.Error(err)
}
sort.Slice(receivedStateDiffRlp, func(i, j int) bool { return receivedStateDiffRlp[i] < receivedStateDiffRlp[j] })
sort.Slice(expectedStateDiffRlp, func(i, j int) bool { return expectedStateDiffRlp[i] < expectedStateDiffRlp[j] })
if !bytes.Equal(receivedStateDiffRlp, expectedStateDiffRlp) {
t.Logf("Test failed: %s", test.name)
t.Errorf("actual state diff: %+v\nexpected state diff: %+v", diff, test.expected)
}
}
}
func TestBuilderWithRemovedAccountAndStorage(t *testing.T) { func TestBuilderWithRemovedAccountAndStorage(t *testing.T) {
blocks, chain := testhelpers.MakeChain(6, testhelpers.Genesis, testhelpers.TestChainGen) blocks, chain := testhelpers.MakeChain(6, testhelpers.Genesis, testhelpers.TestChainGen)
contractLeafKey = testhelpers.AddressToLeafKey(testhelpers.ContractAddr) contractLeafKey = testhelpers.AddressToLeafKey(testhelpers.ContractAddr)
@ -1718,6 +1556,286 @@ func TestBuilderWithRemovedAccountAndStorageWithoutIntermediateNodes(t *testing.
} }
} }
func TestBuilderWithRemovedNonWatchedAccount(t *testing.T) {
blocks, chain := testhelpers.MakeChain(6, testhelpers.Genesis, testhelpers.TestChainGen)
contractLeafKey = testhelpers.AddressToLeafKey(testhelpers.ContractAddr)
defer chain.Stop()
block3 = blocks[2]
block4 = blocks[3]
block5 = blocks[4]
block6 = blocks[5]
params := statediff.Params{
WatchedAddresses: []common.Address{testhelpers.Account1Addr, testhelpers.Account2Addr},
}
params.ComputeWatchedAddressesLeafKeys()
builder = statediff.NewBuilder(chain.StateCache())
var tests = []struct {
name string
startingArguments statediff.Args
expected *statediff.StateObject
}{
{
"testBlock4",
statediff.Args{
OldStateRoot: block3.Root(),
NewStateRoot: block4.Root(),
BlockNumber: block4.Number(),
BlockHash: block4.Hash(),
},
&statediff.StateObject{
BlockNumber: block4.Number(),
BlockHash: block4.Hash(),
Nodes: []sdtypes.StateNode{
{
Path: []byte{'\x0c'},
NodeType: sdtypes.Leaf,
LeafKey: testhelpers.Account2LeafKey,
NodeValue: account2AtBlock4LeafNode,
StorageNodes: emptyStorage,
},
},
},
},
{
"testBlock5",
statediff.Args{
OldStateRoot: block4.Root(),
NewStateRoot: block5.Root(),
BlockNumber: block5.Number(),
BlockHash: block5.Hash(),
},
&statediff.StateObject{
BlockNumber: block5.Number(),
BlockHash: block5.Hash(),
Nodes: []sdtypes.StateNode{
{
Path: []byte{'\x0e'},
NodeType: sdtypes.Leaf,
LeafKey: testhelpers.Account1LeafKey,
NodeValue: account1AtBlock5LeafNode,
StorageNodes: emptyStorage,
},
},
},
},
{
"testBlock6",
statediff.Args{
OldStateRoot: block5.Root(),
NewStateRoot: block6.Root(),
BlockNumber: block6.Number(),
BlockHash: block6.Hash(),
},
&statediff.StateObject{
BlockNumber: block6.Number(),
BlockHash: block6.Hash(),
Nodes: []sdtypes.StateNode{
{
Path: []byte{'\x0c'},
NodeType: sdtypes.Leaf,
LeafKey: testhelpers.Account2LeafKey,
NodeValue: account2AtBlock6LeafNode,
StorageNodes: emptyStorage,
},
{
Path: []byte{'\x0e'},
NodeType: sdtypes.Leaf,
LeafKey: testhelpers.Account1LeafKey,
NodeValue: account1AtBlock6LeafNode,
StorageNodes: emptyStorage,
},
},
},
},
}
for _, test := range tests {
diff, err := builder.BuildStateDiffObject(test.startingArguments, params)
if err != nil {
t.Error(err)
}
receivedStateDiffRlp, err := rlp.EncodeToBytes(diff)
if err != nil {
t.Error(err)
}
expectedStateDiffRlp, err := rlp.EncodeToBytes(test.expected)
if err != nil {
t.Error(err)
}
sort.Slice(receivedStateDiffRlp, func(i, j int) bool { return receivedStateDiffRlp[i] < receivedStateDiffRlp[j] })
sort.Slice(expectedStateDiffRlp, func(i, j int) bool { return expectedStateDiffRlp[i] < expectedStateDiffRlp[j] })
if !bytes.Equal(receivedStateDiffRlp, expectedStateDiffRlp) {
t.Logf("Test failed: %s", test.name)
t.Errorf("actual state diff: %+v\r\n\r\n\r\nexpected state diff: %+v", diff, test.expected)
}
}
}
func TestBuilderWithRemovedWatchedAccount(t *testing.T) {
blocks, chain := testhelpers.MakeChain(6, testhelpers.Genesis, testhelpers.TestChainGen)
contractLeafKey = testhelpers.AddressToLeafKey(testhelpers.ContractAddr)
defer chain.Stop()
block3 = blocks[2]
block4 = blocks[3]
block5 = blocks[4]
block6 = blocks[5]
params := statediff.Params{
WatchedAddresses: []common.Address{testhelpers.Account1Addr, testhelpers.ContractAddr},
}
params.ComputeWatchedAddressesLeafKeys()
builder = statediff.NewBuilder(chain.StateCache())
var tests = []struct {
name string
startingArguments statediff.Args
expected *statediff.StateObject
}{
{
"testBlock4",
statediff.Args{
OldStateRoot: block3.Root(),
NewStateRoot: block4.Root(),
BlockNumber: block4.Number(),
BlockHash: block4.Hash(),
},
&statediff.StateObject{
BlockNumber: block4.Number(),
BlockHash: block4.Hash(),
Nodes: []sdtypes.StateNode{
{
Path: []byte{'\x06'},
NodeType: sdtypes.Leaf,
LeafKey: contractLeafKey,
NodeValue: contractAccountAtBlock4LeafNode,
StorageNodes: []sdtypes.StorageNode{
{
Path: []byte{'\x04'},
NodeType: sdtypes.Leaf,
LeafKey: slot2StorageKey.Bytes(),
NodeValue: slot2StorageLeafNode,
},
{
Path: []byte{'\x0b'},
NodeType: sdtypes.Removed,
LeafKey: slot1StorageKey.Bytes(),
NodeValue: []byte{},
},
{
Path: []byte{'\x0c'},
NodeType: sdtypes.Removed,
LeafKey: slot3StorageKey.Bytes(),
NodeValue: []byte{},
},
},
},
},
},
},
{
"testBlock5",
statediff.Args{
OldStateRoot: block4.Root(),
NewStateRoot: block5.Root(),
BlockNumber: block5.Number(),
BlockHash: block5.Hash(),
},
&statediff.StateObject{
BlockNumber: block5.Number(),
BlockHash: block5.Hash(),
Nodes: []sdtypes.StateNode{
{
Path: []byte{'\x06'},
NodeType: sdtypes.Leaf,
LeafKey: contractLeafKey,
NodeValue: contractAccountAtBlock5LeafNode,
StorageNodes: []sdtypes.StorageNode{
{
Path: []byte{},
NodeType: sdtypes.Leaf,
LeafKey: slot0StorageKey.Bytes(),
NodeValue: slot0StorageLeafRootNode,
},
{
Path: []byte{'\x02'},
NodeType: sdtypes.Removed,
LeafKey: slot0StorageKey.Bytes(),
NodeValue: []byte{},
},
{
Path: []byte{'\x04'},
NodeType: sdtypes.Removed,
LeafKey: slot2StorageKey.Bytes(),
NodeValue: []byte{},
},
},
},
{
Path: []byte{'\x0e'},
NodeType: sdtypes.Leaf,
LeafKey: testhelpers.Account1LeafKey,
NodeValue: account1AtBlock5LeafNode,
StorageNodes: emptyStorage,
},
},
},
},
{
"testBlock6",
statediff.Args{
OldStateRoot: block5.Root(),
NewStateRoot: block6.Root(),
BlockNumber: block6.Number(),
BlockHash: block6.Hash(),
},
&statediff.StateObject{
BlockNumber: block6.Number(),
BlockHash: block6.Hash(),
Nodes: []sdtypes.StateNode{
{
Path: []byte{'\x06'},
NodeType: sdtypes.Removed,
LeafKey: contractLeafKey,
NodeValue: []byte{},
},
{
Path: []byte{'\x0e'},
NodeType: sdtypes.Leaf,
LeafKey: testhelpers.Account1LeafKey,
NodeValue: account1AtBlock6LeafNode,
StorageNodes: emptyStorage,
},
},
},
},
}
for _, test := range tests {
diff, err := builder.BuildStateDiffObject(test.startingArguments, params)
if err != nil {
t.Error(err)
}
receivedStateDiffRlp, err := rlp.EncodeToBytes(diff)
if err != nil {
t.Error(err)
}
expectedStateDiffRlp, err := rlp.EncodeToBytes(test.expected)
if err != nil {
t.Error(err)
}
sort.Slice(receivedStateDiffRlp, func(i, j int) bool { return receivedStateDiffRlp[i] < receivedStateDiffRlp[j] })
sort.Slice(expectedStateDiffRlp, func(i, j int) bool { return expectedStateDiffRlp[i] < expectedStateDiffRlp[j] })
if !bytes.Equal(receivedStateDiffRlp, expectedStateDiffRlp) {
t.Logf("Test failed: %s", test.name)
t.Errorf("actual state diff: %+v\r\n\r\n\r\nexpected state diff: %+v", diff, test.expected)
}
}
}
var ( var (
slot00StorageValue = common.Hex2Bytes("9471562b71999873db5b286df957af199ec94617f7") // prefixed TestBankAddress slot00StorageValue = common.Hex2Bytes("9471562b71999873db5b286df957af199ec94617f7") // prefixed TestBankAddress

View File

@ -20,8 +20,15 @@
package statediff package statediff
import ( import (
"fmt"
"sort" "sort"
"strings" "strings"
"github.com/thoas/go-funk"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
. "github.com/ethereum/go-ethereum/statediff/types"
) )
func sortKeys(data AccountMap) []string { func sortKeys(data AccountMap) []string {
@ -69,5 +76,42 @@ func findIntersection(a, b []string) []string {
} }
} }
} }
}
// loadWatchedAddresses is used to load watched addresses to in-memory write loop params from the db
func loadWatchedAddresses(db *postgres.DB) error {
type Watched struct {
Address string `db:"address"`
}
var watched []Watched
pgStr := "SELECT address FROM eth_meta.watched_addresses"
err := db.Select(&watched, pgStr)
if err != nil {
return fmt.Errorf("error loading watched addresses: %v", err)
}
watchedAddresses := []common.Address{}
for _, entry := range watched {
watchedAddresses = append(watchedAddresses, common.HexToAddress(entry.Address))
}
writeLoopParams.Lock()
defer writeLoopParams.Unlock()
writeLoopParams.WatchedAddresses = watchedAddresses
writeLoopParams.ComputeWatchedAddressesLeafKeys()
return nil
}
// MapWatchAddressArgsToAddresses maps []WatchAddressArg to corresponding []common.Address
func MapWatchAddressArgsToAddresses(args []WatchAddressArg) ([]common.Address, error) {
addresses, ok := funk.Map(args, func(arg WatchAddressArg) common.Address {
return common.HexToAddress(arg.Address)
}).([]common.Address)
if !ok {
return nil, fmt.Errorf(typeAssertionFailed)
}
return addresses, nil
} }

View File

@ -59,6 +59,12 @@ type Indexer interface {
PushStateNode(tx *BlockTx, stateNode sdtypes.StateNode) error PushStateNode(tx *BlockTx, stateNode sdtypes.StateNode) error
PushCodeAndCodeHash(tx *BlockTx, codeAndCodeHash sdtypes.CodeAndCodeHash) error PushCodeAndCodeHash(tx *BlockTx, codeAndCodeHash sdtypes.CodeAndCodeHash) error
ReportDBMetrics(delay time.Duration, quit <-chan bool) ReportDBMetrics(delay time.Duration, quit <-chan bool)
// Methods used by WatchAddress API/functionality.
InsertWatchedAddresses(addresses []sdtypes.WatchAddressArg, currentBlock *big.Int) error
RemoveWatchedAddresses(addresses []sdtypes.WatchAddressArg) error
SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error
ClearWatchedAddresses() error
} }
// StateDiffIndexer satisfies the Indexer interface for ethereum statediff objects // StateDiffIndexer satisfies the Indexer interface for ethereum statediff objects
@ -549,3 +555,101 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(tx *BlockTx, codeAndCodeHash sd
} }
return nil return nil
} }
// InsertWatchedAddresses inserts the given addresses in the database
func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error {
tx, err := sdi.dbWriter.db.Beginx()
if err != nil {
return err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
for _, arg := range args {
_, err = tx.Exec(`INSERT INTO eth_meta.watched_addresses (address, created_at, watched_at) VALUES ($1, $2, $3) ON CONFLICT (address) DO NOTHING`,
arg.Address, arg.CreatedAt, currentBlockNumber.Uint64())
if err != nil {
return fmt.Errorf("error inserting watched_addresses entry: %v", err)
}
}
return err
}
// RemoveWatchedAddresses removes the given watched addresses from the database
func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressArg) error {
tx, err := sdi.dbWriter.db.Beginx()
if err != nil {
return err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
for _, arg := range args {
_, err = tx.Exec(`DELETE FROM eth_meta.watched_addresses WHERE address = $1`, arg.Address)
if err != nil {
return fmt.Errorf("error removing watched_addresses entry: %v", err)
}
}
return err
}
// SetWatchedAddresses clears and inserts the given addresses in the database
func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error {
tx, err := sdi.dbWriter.db.Beginx()
if err != nil {
return err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
_, err = tx.Exec(`DELETE FROM eth_meta.watched_addresses`)
if err != nil {
return fmt.Errorf("error setting watched_addresses table: %v", err)
}
for _, arg := range args {
_, err = tx.Exec(`INSERT INTO eth_meta.watched_addresses (address, created_at, watched_at) VALUES ($1, $2, $3) ON CONFLICT (address) DO NOTHING`,
arg.Address, arg.CreatedAt, currentBlockNumber.Uint64())
if err != nil {
return fmt.Errorf("error setting watched_addresses table: %v", err)
}
}
return err
}
// ClearWatchedAddresses clears all the watched addresses from the database
func (sdi *StateDiffIndexer) ClearWatchedAddresses() error {
_, err := sdi.dbWriter.db.Exec(`DELETE FROM eth_meta.watched_addresses`)
if err != nil {
return fmt.Errorf("error clearing watched_addresses table: %v", err)
}
return nil
}

View File

@ -19,6 +19,7 @@ package indexer_test
import ( import (
"bytes" "bytes"
"fmt" "fmt"
"math/big"
"os" "os"
"testing" "testing"
@ -32,6 +33,7 @@ import (
"github.com/ethereum/go-ethereum/statediff/indexer/models" "github.com/ethereum/go-ethereum/statediff/indexer/models"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres" "github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/shared" "github.com/ethereum/go-ethereum/statediff/indexer/shared"
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore" blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help" dshelp "github.com/ipfs/go-ipfs-ds-help"
@ -45,12 +47,15 @@ var (
ind *indexer.StateDiffIndexer ind *indexer.StateDiffIndexer
ipfsPgGet = `SELECT data FROM public.blocks ipfsPgGet = `SELECT data FROM public.blocks
WHERE key = $1` WHERE key = $1`
tx1, tx2, tx3, tx4, tx5, rct1, rct2, rct3, rct4, rct5 []byte tx1, tx2, tx3, tx4, tx5, rct1, rct2, rct3, rct4, rct5 []byte
mockBlock *types.Block mockBlock *types.Block
headerCID, trx1CID, trx2CID, trx3CID, trx4CID, trx5CID cid.Cid headerCID, trx1CID, trx2CID, trx3CID, trx4CID, trx5CID cid.Cid
rct1CID, rct2CID, rct3CID, rct4CID, rct5CID cid.Cid rct1CID, rct2CID, rct3CID, rct4CID, rct5CID cid.Cid
rctLeaf1, rctLeaf2, rctLeaf3, rctLeaf4, rctLeaf5 []byte rctLeaf1, rctLeaf2, rctLeaf3, rctLeaf4, rctLeaf5 []byte
state1CID, state2CID, storageCID cid.Cid state1CID, state2CID, storageCID cid.Cid
contract1Address, contract2Address, contract3Address, contract4Address string
contract1CreatedAt, contract2CreatedAt, contract3CreatedAt, contract4CreatedAt uint64
lastFilledAt, watchedAt1, watchedAt2, watchedAt3 uint64
) )
func expectTrue(t *testing.T, value bool) { func expectTrue(t *testing.T, value bool) {
@ -161,15 +166,33 @@ func init() {
rctLeaf3 = orderedRctLeafNodes[2] rctLeaf3 = orderedRctLeafNodes[2]
rctLeaf4 = orderedRctLeafNodes[3] rctLeaf4 = orderedRctLeafNodes[3]
rctLeaf5 = orderedRctLeafNodes[4] rctLeaf5 = orderedRctLeafNodes[4]
contract1Address = "0x5d663F5269090bD2A7DC2390c911dF6083D7b28F"
contract2Address = "0x6Eb7e5C66DB8af2E96159AC440cbc8CDB7fbD26B"
contract3Address = "0xcfeB164C328CA13EFd3C77E1980d94975aDfedfc"
contract4Address = "0x0Edf0c4f393a628DE4828B228C48175b3EA297fc"
contract1CreatedAt = uint64(1)
contract2CreatedAt = uint64(2)
contract3CreatedAt = uint64(3)
contract4CreatedAt = uint64(4)
lastFilledAt = uint64(0)
watchedAt1 = uint64(10)
watchedAt2 = uint64(15)
watchedAt3 = uint64(20)
} }
func setup(t *testing.T) { func setupIndexer(t *testing.T) {
db, err = shared.SetupDB() db, err = shared.SetupDB()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
ind, err = indexer.NewStateDiffIndexer(mocks.TestConfig, db) ind, err = indexer.NewStateDiffIndexer(mocks.TestConfig, db)
require.NoError(t, err) require.NoError(t, err)
}
func setup(t *testing.T) {
setupIndexer(t)
var tx *indexer.BlockTx var tx *indexer.BlockTx
tx, err = ind.PushBlock( tx, err = ind.PushBlock(
mockBlock, mockBlock,
@ -654,3 +677,297 @@ func TestPublishAndIndexer(t *testing.T) {
shared.ExpectEqual(t, data, []byte{}) shared.ExpectEqual(t, data, []byte{})
}) })
} }
func TestWatchAddressMethods(t *testing.T) {
setupIndexer(t)
defer tearDown(t)
type res struct {
Address string `db:"address"`
CreatedAt uint64 `db:"created_at"`
WatchedAt uint64 `db:"watched_at"`
LastFilledAt uint64 `db:"last_filled_at"`
}
pgStr := "SELECT * FROM eth_meta.watched_addresses"
t.Run("Insert watched addresses", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
}
expectedData := []res{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
}
ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt1)))
rows := []res{}
err = db.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
shared.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Insert watched addresses (some already watched)", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
}
expectedData := []res{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
}
ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt2)))
rows := []res{}
err = db.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
shared.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Remove watched addresses", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
}
expectedData := []res{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
}
ind.RemoveWatchedAddresses(args)
rows := []res{}
err = db.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
shared.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Remove watched addresses (some non-watched)", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
}
expectedData := []res{}
ind.RemoveWatchedAddresses(args)
rows := []res{}
err = db.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
shared.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Set watched addresses", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
},
}
expectedData := []res{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
}
ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt2)))
rows := []res{}
err = db.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
shared.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Set watched addresses (some already watched)", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: contract4Address,
CreatedAt: contract4CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
},
}
expectedData := []res{
{
Address: contract4Address,
CreatedAt: contract4CreatedAt,
WatchedAt: watchedAt3,
LastFilledAt: lastFilledAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
WatchedAt: watchedAt3,
LastFilledAt: lastFilledAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
WatchedAt: watchedAt3,
LastFilledAt: lastFilledAt,
},
}
ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt3)))
rows := []res{}
err = db.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
shared.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Clear watched addresses", func(t *testing.T) {
expectedData := []res{}
ind.ClearWatchedAddresses()
rows := []res{}
err = db.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
shared.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Clear watched addresses (empty table)", func(t *testing.T) {
expectedData := []res{}
ind.ClearWatchedAddresses()
rows := []res{}
err = db.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
shared.ExpectEqual(t, row, expectedData[idx])
}
})
}

View File

@ -53,6 +53,10 @@ func TearDownDB(t *testing.T, db *postgres.DB) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
_, err = tx.Exec(`DELETE FROM eth_meta.watched_addresses`)
if err != nil {
t.Fatal(err)
}
err = tx.Commit() err = tx.Commit()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View File

@ -18,6 +18,7 @@ package statediff
import ( import (
"bytes" "bytes"
"fmt"
"math/big" "math/big"
"strconv" "strconv"
"strings" "strings"
@ -40,6 +41,7 @@ import (
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie"
"github.com/thoas/go-funk"
ind "github.com/ethereum/go-ethereum/statediff/indexer" ind "github.com/ethereum/go-ethereum/statediff/indexer"
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node" nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
@ -48,25 +50,30 @@ import (
) )
const ( const (
chainEventChanSize = 20000 chainEventChanSize = 20000
genesisBlockNumber = 0 genesisBlockNumber = 0
defaultRetryLimit = 3 // default retry limit once deadlock is detected. defaultRetryLimit = 3 // default retry limit once deadlock is detected.
deadlockDetected = "deadlock detected" // 40P01 https://www.postgresql.org/docs/current/errcodes-appendix.html deadlockDetected = "deadlock detected" // 40P01 https://www.postgresql.org/docs/current/errcodes-appendix.html
typeAssertionFailed = "type assertion failed"
unexpectedOperation = "unexpected operation"
) )
var writeLoopParams = Params{ var writeLoopParams = ParamsWithMutex{
IntermediateStateNodes: true, Params: Params{
IntermediateStorageNodes: true, IntermediateStateNodes: true,
IncludeBlock: true, IntermediateStorageNodes: true,
IncludeReceipts: true, IncludeBlock: true,
IncludeTD: true, IncludeReceipts: true,
IncludeCode: true, IncludeTD: true,
IncludeCode: true,
},
} }
var statediffMetrics = RegisterStatediffMetrics(metrics.DefaultRegistry) var statediffMetrics = RegisterStatediffMetrics(metrics.DefaultRegistry)
type blockChain interface { type blockChain interface {
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
CurrentBlock() *types.Block
GetBlockByHash(hash common.Hash) *types.Block GetBlockByHash(hash common.Hash) *types.Block
GetBlockByNumber(number uint64) *types.Block GetBlockByNumber(number uint64) *types.Block
GetReceiptsByHash(hash common.Hash) types.Receipts GetReceiptsByHash(hash common.Hash) types.Receipts
@ -101,6 +108,8 @@ type IService interface {
WriteStateDiffFor(blockHash common.Hash, params Params) error WriteStateDiffFor(blockHash common.Hash, params Params) error
// Event loop for progressively processing and writing diffs directly to DB // Event loop for progressively processing and writing diffs directly to DB
WriteLoop(chainEventCh chan core.ChainEvent) WriteLoop(chainEventCh chan core.ChainEvent)
// Method to change the addresses being watched in write loop params
WatchAddress(operation OperationType, args []WatchAddressArg) error
} }
// Wraps consructor parameters // Wraps consructor parameters
@ -159,6 +168,8 @@ func NewBlockCache(max uint) blockCache {
func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params ServiceParams) error { func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params ServiceParams) error {
blockChain := ethServ.BlockChain() blockChain := ethServ.BlockChain()
var indexer ind.Indexer var indexer ind.Indexer
var db *postgres.DB
var err error
quitCh := make(chan bool) quitCh := make(chan bool)
if params.DBParams != nil { if params.DBParams != nil {
info := nodeinfo.Info{ info := nodeinfo.Info{
@ -170,7 +181,7 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
} }
// TODO: pass max idle, open, lifetime? // TODO: pass max idle, open, lifetime?
db, err := postgres.NewDB(params.DBParams.ConnectionURL, postgres.ConnectionConfig{}, info) db, err = postgres.NewDB(params.DBParams.ConnectionURL, postgres.ConnectionConfig{}, info)
if err != nil { if err != nil {
return err return err
} }
@ -200,6 +211,12 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
} }
stack.RegisterLifecycle(sds) stack.RegisterLifecycle(sds)
stack.RegisterAPIs(sds.APIs()) stack.RegisterAPIs(sds.APIs())
err = loadWatchedAddresses(db)
if err != nil {
return err
}
return nil return nil
} }
@ -278,7 +295,9 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint) { func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint) {
// For genesis block we need to return the entire state trie hence we diff it with an empty trie. // For genesis block we need to return the entire state trie hence we diff it with an empty trie.
log.Info("Writing state diff", "block height", genesisBlockNumber, "worker", workerId) log.Info("Writing state diff", "block height", genesisBlockNumber, "worker", workerId)
err := sds.writeStateDiffWithRetry(currBlock, common.Hash{}, writeLoopParams) writeLoopParams.RLock()
err := sds.writeStateDiffWithRetry(currBlock, common.Hash{}, writeLoopParams.Params)
writeLoopParams.RUnlock()
if err != nil { if err != nil {
log.Error("statediff.Service.WriteLoop: processing error", "block height", log.Error("statediff.Service.WriteLoop: processing error", "block height",
genesisBlockNumber, "error", err.Error(), "worker", workerId) genesisBlockNumber, "error", err.Error(), "worker", workerId)
@ -307,7 +326,9 @@ func (sds *Service) writeLoopWorker(params workerParams) {
} }
log.Info("Writing state diff", "block height", currentBlock.Number().Uint64(), "worker", params.id) log.Info("Writing state diff", "block height", currentBlock.Number().Uint64(), "worker", params.id)
err := sds.writeStateDiffWithRetry(currentBlock, parentBlock.Root(), writeLoopParams) writeLoopParams.RLock()
err := sds.writeStateDiffWithRetry(currentBlock, parentBlock.Root(), writeLoopParams.Params)
writeLoopParams.RUnlock()
if err != nil { if err != nil {
log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id) log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id)
continue continue
@ -402,6 +423,9 @@ func (sds *Service) streamStateDiff(currentBlock *types.Block, parentRoot common
func (sds *Service) StateDiffAt(blockNumber uint64, params Params) (*Payload, error) { func (sds *Service) StateDiffAt(blockNumber uint64, params Params) (*Payload, error) {
currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber) currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber)
log.Info("sending state diff", "block height", blockNumber) log.Info("sending state diff", "block height", blockNumber)
params.ComputeWatchedAddressesLeafKeys()
if blockNumber == 0 { if blockNumber == 0 {
return sds.processStateDiff(currentBlock, common.Hash{}, params) return sds.processStateDiff(currentBlock, common.Hash{}, params)
} }
@ -414,6 +438,9 @@ func (sds *Service) StateDiffAt(blockNumber uint64, params Params) (*Payload, er
func (sds *Service) StateDiffFor(blockHash common.Hash, params Params) (*Payload, error) { func (sds *Service) StateDiffFor(blockHash common.Hash, params Params) (*Payload, error) {
currentBlock := sds.BlockChain.GetBlockByHash(blockHash) currentBlock := sds.BlockChain.GetBlockByHash(blockHash)
log.Info("sending state diff", "block hash", blockHash) log.Info("sending state diff", "block hash", blockHash)
params.ComputeWatchedAddressesLeafKeys()
if currentBlock.NumberU64() == 0 { if currentBlock.NumberU64() == 0 {
return sds.processStateDiff(currentBlock, common.Hash{}, params) return sds.processStateDiff(currentBlock, common.Hash{}, params)
} }
@ -472,6 +499,9 @@ func (sds *Service) newPayload(stateObject []byte, block *types.Block, params Pa
func (sds *Service) StateTrieAt(blockNumber uint64, params Params) (*Payload, error) { func (sds *Service) StateTrieAt(blockNumber uint64, params Params) (*Payload, error) {
currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber) currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber)
log.Info("sending state trie", "block height", blockNumber) log.Info("sending state trie", "block height", blockNumber)
params.ComputeWatchedAddressesLeafKeys()
return sds.processStateTrie(currentBlock, params) return sds.processStateTrie(currentBlock, params)
} }
@ -494,6 +524,9 @@ func (sds *Service) Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- boo
if atomic.CompareAndSwapInt32(&sds.subscribers, 0, 1) { if atomic.CompareAndSwapInt32(&sds.subscribers, 0, 1) {
log.Info("State diffing subscription received; beginning statediff processing") log.Info("State diffing subscription received; beginning statediff processing")
} }
params.ComputeWatchedAddressesLeafKeys()
// Subscription type is defined as the hash of the rlp-serialized subscription params // Subscription type is defined as the hash of the rlp-serialized subscription params
by, err := rlp.EncodeToBytes(params) by, err := rlp.EncodeToBytes(params)
if err != nil { if err != nil {
@ -543,7 +576,7 @@ func (sds *Service) Start() error {
go sds.Loop(chainEventCh) go sds.Loop(chainEventCh)
if sds.enableWriteLoop { if sds.enableWriteLoop {
log.Info("Starting statediff DB write loop", "params", writeLoopParams) log.Info("Starting statediff DB write loop", "params", writeLoopParams.Params)
chainEventCh := make(chan core.ChainEvent, chainEventChanSize) chainEventCh := make(chan core.ChainEvent, chainEventChanSize)
go sds.WriteLoop(chainEventCh) go sds.WriteLoop(chainEventCh)
} }
@ -640,6 +673,8 @@ func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- Cod
// This operation cannot be performed back past the point of db pruning; it requires an archival node // This operation cannot be performed back past the point of db pruning; it requires an archival node
// for historical data // for historical data
func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error { func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error {
params.ComputeWatchedAddressesLeafKeys()
currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber) currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber)
parentRoot := common.Hash{} parentRoot := common.Hash{}
if blockNumber != 0 { if blockNumber != 0 {
@ -653,6 +688,8 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error {
// This operation cannot be performed back past the point of db pruning; it requires an archival node // This operation cannot be performed back past the point of db pruning; it requires an archival node
// for historical data // for historical data
func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) error { func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) error {
params.ComputeWatchedAddressesLeafKeys()
currentBlock := sds.BlockChain.GetBlockByHash(blockHash) currentBlock := sds.BlockChain.GetBlockByHash(blockHash)
parentRoot := common.Hash{} parentRoot := common.Hash{}
if currentBlock.NumberU64() != 0 { if currentBlock.NumberU64() != 0 {
@ -713,3 +750,102 @@ func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot commo
} }
return err return err
} }
// Performs one of following operations on the watched addresses in writeLoopParams and the db:
// add | remove | set | clear
func (sds *Service) WatchAddress(operation OperationType, args []WatchAddressArg) error {
// lock writeLoopParams for a write
writeLoopParams.Lock()
defer writeLoopParams.Unlock()
// get the current block number
currentBlockNumber := sds.BlockChain.CurrentBlock().Number()
switch operation {
case Add:
// filter out args having an already watched address with a warning
filteredArgs, ok := funk.Filter(args, func(arg WatchAddressArg) bool {
if funk.Contains(writeLoopParams.WatchedAddresses, common.HexToAddress(arg.Address)) {
log.Warn("Address already being watched", "address", arg.Address)
return false
}
return true
}).([]WatchAddressArg)
if !ok {
return fmt.Errorf("add: filtered args %s", typeAssertionFailed)
}
// get addresses from the filtered args
filteredAddresses, err := MapWatchAddressArgsToAddresses(filteredArgs)
if err != nil {
return fmt.Errorf("add: filtered addresses %s", err.Error())
}
// update the db
err = sds.indexer.InsertWatchedAddresses(filteredArgs, currentBlockNumber)
if err != nil {
return err
}
// update in-memory params
writeLoopParams.WatchedAddresses = append(writeLoopParams.WatchedAddresses, filteredAddresses...)
funk.ForEach(filteredAddresses, func(address common.Address) {
writeLoopParams.watchedAddressesLeafKeys[crypto.Keccak256Hash(address.Bytes())] = struct{}{}
})
case Remove:
// get addresses from args
argAddresses, err := MapWatchAddressArgsToAddresses(args)
if err != nil {
return fmt.Errorf("remove: mapped addresses %s", err.Error())
}
// remove the provided addresses from currently watched addresses
addresses, ok := funk.Subtract(writeLoopParams.WatchedAddresses, argAddresses).([]common.Address)
if !ok {
return fmt.Errorf("remove: filtered addresses %s", typeAssertionFailed)
}
// update the db
err = sds.indexer.RemoveWatchedAddresses(args)
if err != nil {
return err
}
// update in-memory params
writeLoopParams.WatchedAddresses = addresses
funk.ForEach(argAddresses, func(address common.Address) {
delete(writeLoopParams.watchedAddressesLeafKeys, crypto.Keccak256Hash(address.Bytes()))
})
case Set:
// get addresses from args
argAddresses, err := MapWatchAddressArgsToAddresses(args)
if err != nil {
return fmt.Errorf("set: mapped addresses %s", err.Error())
}
// update the db
err = sds.indexer.SetWatchedAddresses(args, currentBlockNumber)
if err != nil {
return err
}
// update in-memory params
writeLoopParams.WatchedAddresses = argAddresses
writeLoopParams.ComputeWatchedAddressesLeafKeys()
case Clear:
// update the db
err := sds.indexer.ClearWatchedAddresses()
if err != nil {
return err
}
// update in-memory params
writeLoopParams.WatchedAddresses = []common.Address{}
writeLoopParams.ComputeWatchedAddressesLeafKeys()
default:
return fmt.Errorf("%s %s", unexpectedOperation, operation)
}
return nil
}

View File

@ -144,6 +144,7 @@ func testErrorInChainEventLoop(t *testing.T) {
} }
} }
defaultParams.ComputeWatchedAddressesLeafKeys()
if !reflect.DeepEqual(builder.Params, defaultParams) { if !reflect.DeepEqual(builder.Params, defaultParams) {
t.Error("Test failure:", t.Name()) t.Error("Test failure:", t.Name())
t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams) t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams)
@ -195,6 +196,8 @@ func testErrorInBlockLoop(t *testing.T) {
} }
}() }()
service.Loop(eventsChannel) service.Loop(eventsChannel)
defaultParams.ComputeWatchedAddressesLeafKeys()
if !reflect.DeepEqual(builder.Params, defaultParams) { if !reflect.DeepEqual(builder.Params, defaultParams) {
t.Error("Test failure:", t.Name()) t.Error("Test failure:", t.Name())
t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams) t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams)
@ -268,6 +271,8 @@ func testErrorInStateDiffAt(t *testing.T) {
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
defaultParams.ComputeWatchedAddressesLeafKeys()
if !reflect.DeepEqual(builder.Params, defaultParams) { if !reflect.DeepEqual(builder.Params, defaultParams) {
t.Error("Test failure:", t.Name()) t.Error("Test failure:", t.Name())
t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams) t.Logf("Actual params does not equal expected.\nactual:%+v\nexpected: %+v", builder.Params, defaultParams)

View File

@ -39,6 +39,7 @@ type BlockChain struct {
Receipts map[common.Hash]types.Receipts Receipts map[common.Hash]types.Receipts
TDByHash map[common.Hash]*big.Int TDByHash map[common.Hash]*big.Int
TDByNum map[uint64]*big.Int TDByNum map[uint64]*big.Int
currentBlock *types.Block
} }
// SetBlocksForHashes mock method // SetBlocksForHashes mock method
@ -128,6 +129,16 @@ func (bc *BlockChain) GetTd(hash common.Hash, blockNum uint64) *big.Int {
return nil return nil
} }
// SetCurrentBlock test method
func (bc *BlockChain) SetCurrentBlock(block *types.Block) {
bc.currentBlock = block
}
// CurrentBlock mock method
func (bc *BlockChain) CurrentBlock() *types.Block {
return bc.currentBlock
}
func (bc *BlockChain) SetTd(hash common.Hash, blockNum uint64, td *big.Int) { func (bc *BlockChain) SetTd(hash common.Hash, blockNum uint64, td *big.Int) {
if bc.TDByHash == nil { if bc.TDByHash == nil {
bc.TDByHash = make(map[common.Hash]*big.Int) bc.TDByHash = make(map[common.Hash]*big.Int)

View File

@ -0,0 +1,59 @@
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package mocks
import (
"math/big"
"time"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/statediff/indexer"
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
)
// Indexer is a mock state diff indexer
type Indexer struct{}
func (sdi *Indexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*indexer.BlockTx, error) {
return nil, nil
}
func (sdi *Indexer) PushStateNode(tx *indexer.BlockTx, stateNode sdtypes.StateNode) error {
return nil
}
func (sdi *Indexer) PushCodeAndCodeHash(tx *indexer.BlockTx, codeAndCodeHash sdtypes.CodeAndCodeHash) error {
return nil
}
func (sdi *Indexer) ReportDBMetrics(delay time.Duration, quit <-chan bool) {}
func (sdi *Indexer) InsertWatchedAddresses(addresses []sdtypes.WatchAddressArg, currentBlock *big.Int) error {
return nil
}
func (sdi *Indexer) RemoveWatchedAddresses(addresses []sdtypes.WatchAddressArg) error {
return nil
}
func (sdi *Indexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error {
return nil
}
func (sdi *Indexer) ClearWatchedAddresses() error {
return nil
}

View File

@ -25,6 +25,7 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/thoas/go-funk"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
@ -32,9 +33,15 @@ import (
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
ind "github.com/ethereum/go-ethereum/statediff/indexer"
sdtypes "github.com/ethereum/go-ethereum/statediff/types" sdtypes "github.com/ethereum/go-ethereum/statediff/types"
) )
var (
typeAssertionFailed = "type assertion failed"
unexpectedOperation = "unexpected operation"
)
// MockStateDiffService is a mock state diff service // MockStateDiffService is a mock state diff service
type MockStateDiffService struct { type MockStateDiffService struct {
sync.Mutex sync.Mutex
@ -47,6 +54,8 @@ type MockStateDiffService struct {
QuitChan chan bool QuitChan chan bool
Subscriptions map[common.Hash]map[rpc.ID]statediff.Subscription Subscriptions map[common.Hash]map[rpc.ID]statediff.Subscription
SubscriptionTypes map[common.Hash]statediff.Params SubscriptionTypes map[common.Hash]statediff.Params
Indexer ind.Indexer
writeLoopParams statediff.ParamsWithMutex
} }
// Protocols mock method // Protocols mock method
@ -332,3 +341,97 @@ func sendNonBlockingQuit(id rpc.ID, sub statediff.Subscription) {
log.Info("unable to close subscription %s; channel has no receiver", id) log.Info("unable to close subscription %s; channel has no receiver", id)
} }
} }
// WatchAddress mock method
func (sds *MockStateDiffService) WatchAddress(operation statediff.OperationType, args []sdtypes.WatchAddressArg) error {
// lock writeLoopParams for a write
sds.writeLoopParams.Lock()
defer sds.writeLoopParams.Unlock()
// get the current block number
currentBlockNumber := sds.BlockChain.CurrentBlock().Number()
switch operation {
case statediff.Add:
// filter out args having an already watched address with a warning
filteredArgs, ok := funk.Filter(args, func(arg sdtypes.WatchAddressArg) bool {
if funk.Contains(sds.writeLoopParams.WatchedAddresses, common.HexToAddress(arg.Address)) {
log.Warn("Address already being watched", "address", arg.Address)
return false
}
return true
}).([]sdtypes.WatchAddressArg)
if !ok {
return fmt.Errorf("add: filtered args %s", typeAssertionFailed)
}
// get addresses from the filtered args
filteredAddresses, err := statediff.MapWatchAddressArgsToAddresses(filteredArgs)
if err != nil {
return fmt.Errorf("add: filtered addresses %s", err.Error())
}
// update the db
err = sds.Indexer.InsertWatchedAddresses(filteredArgs, currentBlockNumber)
if err != nil {
return err
}
// update in-memory params
sds.writeLoopParams.WatchedAddresses = append(sds.writeLoopParams.WatchedAddresses, filteredAddresses...)
sds.writeLoopParams.ComputeWatchedAddressesLeafKeys()
case statediff.Remove:
// get addresses from args
argAddresses, err := statediff.MapWatchAddressArgsToAddresses(args)
if err != nil {
return fmt.Errorf("remove: mapped addresses %s", err.Error())
}
// remove the provided addresses from currently watched addresses
addresses, ok := funk.Subtract(sds.writeLoopParams.WatchedAddresses, argAddresses).([]common.Address)
if !ok {
return fmt.Errorf("remove: filtered addresses %s", typeAssertionFailed)
}
// update the db
err = sds.Indexer.RemoveWatchedAddresses(args)
if err != nil {
return err
}
// update in-memory params
sds.writeLoopParams.WatchedAddresses = addresses
sds.writeLoopParams.ComputeWatchedAddressesLeafKeys()
case statediff.Set:
// get addresses from args
argAddresses, err := statediff.MapWatchAddressArgsToAddresses(args)
if err != nil {
return fmt.Errorf("set: mapped addresses %s", err.Error())
}
// update the db
err = sds.Indexer.SetWatchedAddresses(args, currentBlockNumber)
if err != nil {
return err
}
// update in-memory params
sds.writeLoopParams.WatchedAddresses = argAddresses
sds.writeLoopParams.ComputeWatchedAddressesLeafKeys()
case statediff.Clear:
// update the db
err := sds.Indexer.ClearWatchedAddresses()
if err != nil {
return err
}
// update in-memory params
sds.writeLoopParams.WatchedAddresses = []common.Address{}
sds.writeLoopParams.ComputeWatchedAddressesLeafKeys()
default:
return fmt.Errorf("%s %s", unexpectedOperation, operation)
}
return nil
}

View File

@ -21,6 +21,7 @@ import (
"fmt" "fmt"
"math/big" "math/big"
"os" "os"
"reflect"
"sort" "sort"
"sync" "sync"
"testing" "testing"
@ -87,6 +88,7 @@ func init() {
func TestAPI(t *testing.T) { func TestAPI(t *testing.T) {
testSubscriptionAPI(t) testSubscriptionAPI(t)
testHTTPAPI(t) testHTTPAPI(t)
testWatchAddressAPI(t)
} }
func testSubscriptionAPI(t *testing.T) { func testSubscriptionAPI(t *testing.T) {
@ -246,3 +248,286 @@ func testHTTPAPI(t *testing.T) {
t.Errorf("paylaod does not have the expected total difficulty\r\nactual td: %d\r\nexpected td: %d", payload.TotalDifficulty.Int64(), mockTotalDifficulty.Int64()) t.Errorf("paylaod does not have the expected total difficulty\r\nactual td: %d\r\nexpected td: %d", payload.TotalDifficulty.Int64(), mockTotalDifficulty.Int64())
} }
} }
func testWatchAddressAPI(t *testing.T) {
blocks, chain := testhelpers.MakeChain(6, testhelpers.Genesis, testhelpers.TestChainGen)
defer chain.Stop()
block6 := blocks[5]
mockBlockChain := &BlockChain{}
mockBlockChain.SetCurrentBlock(block6)
mockIndexer := Indexer{}
mockService := MockStateDiffService{
BlockChain: mockBlockChain,
Indexer: &mockIndexer,
}
// test data
var (
contract1Address = "0x5d663F5269090bD2A7DC2390c911dF6083D7b28F"
contract2Address = "0x6Eb7e5C66DB8af2E96159AC440cbc8CDB7fbD26B"
contract3Address = "0xcfeB164C328CA13EFd3C77E1980d94975aDfedfc"
contract4Address = "0x0Edf0c4f393a628DE4828B228C48175b3EA297fc"
contract1CreatedAt = uint64(1)
contract2CreatedAt = uint64(2)
contract3CreatedAt = uint64(3)
contract4CreatedAt = uint64(4)
args1 = []sdtypes.WatchAddressArg{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
}
startingParams1 = statediff.Params{
WatchedAddresses: []common.Address{},
}
expectedParams1 = statediff.Params{
WatchedAddresses: []common.Address{
common.HexToAddress(contract1Address),
common.HexToAddress(contract2Address),
},
}
args2 = []sdtypes.WatchAddressArg{
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
}
startingParams2 = expectedParams1
expectedParams2 = statediff.Params{
WatchedAddresses: []common.Address{
common.HexToAddress(contract1Address),
common.HexToAddress(contract2Address),
common.HexToAddress(contract3Address),
},
}
args3 = []sdtypes.WatchAddressArg{
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
}
startingParams3 = expectedParams2
expectedParams3 = statediff.Params{
WatchedAddresses: []common.Address{
common.HexToAddress(contract1Address),
},
}
args4 = []sdtypes.WatchAddressArg{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
}
startingParams4 = expectedParams3
expectedParams4 = statediff.Params{
WatchedAddresses: []common.Address{},
}
args5 = []sdtypes.WatchAddressArg{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
},
}
startingParams5 = expectedParams4
expectedParams5 = statediff.Params{
WatchedAddresses: []common.Address{
common.HexToAddress(contract1Address),
common.HexToAddress(contract2Address),
common.HexToAddress(contract3Address),
},
}
args6 = []sdtypes.WatchAddressArg{
{
Address: contract4Address,
CreatedAt: contract4CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
},
}
startingParams6 = expectedParams5
expectedParams6 = statediff.Params{
WatchedAddresses: []common.Address{
common.HexToAddress(contract4Address),
common.HexToAddress(contract2Address),
common.HexToAddress(contract3Address),
},
}
args7 = []sdtypes.WatchAddressArg{}
startingParams7 = expectedParams6
expectedParams7 = statediff.Params{
WatchedAddresses: []common.Address{},
}
args8 = []sdtypes.WatchAddressArg{}
startingParams8 = expectedParams6
expectedParams8 = statediff.Params{
WatchedAddresses: []common.Address{},
}
args9 = []sdtypes.WatchAddressArg{}
startingParams9 = expectedParams8
expectedParams9 = statediff.Params{
WatchedAddresses: []common.Address{},
}
)
tests := []struct {
name string
operation statediff.OperationType
args []sdtypes.WatchAddressArg
startingParams statediff.Params
expectedParams statediff.Params
expectedErr error
}{
{
"testAddAddresses",
statediff.Add,
args1,
startingParams1,
expectedParams1,
nil,
},
{
"testAddAddressesSomeWatched",
statediff.Add,
args2,
startingParams2,
expectedParams2,
nil,
},
{
"testRemoveAddresses",
statediff.Remove,
args3,
startingParams3,
expectedParams3,
nil,
},
{
"testRemoveAddressesSomeWatched",
statediff.Remove,
args4,
startingParams4,
expectedParams4,
nil,
},
{
"testSetAddresses",
statediff.Set,
args5,
startingParams5,
expectedParams5,
nil,
},
{
"testSetAddressesSomeWatched",
statediff.Set,
args6,
startingParams6,
expectedParams6,
nil,
},
{
"testSetAddressesEmtpyArgs",
statediff.Set,
args7,
startingParams7,
expectedParams7,
nil,
},
{
"testClearAddresses",
statediff.Clear,
args8,
startingParams8,
expectedParams8,
nil,
},
{
"testClearAddressesEmpty",
statediff.Clear,
args9,
startingParams9,
expectedParams9,
nil,
},
// invalid args
{
"testInvalidOperation",
"WrongOp",
args9,
startingParams9,
statediff.Params{},
fmt.Errorf("%s WrongOp", unexpectedOperation),
},
}
for _, test := range tests {
// set indexing params
mockService.writeLoopParams = statediff.ParamsWithMutex{
Params: test.startingParams,
}
mockService.writeLoopParams.ComputeWatchedAddressesLeafKeys()
// make the API call to change watched addresses
err := mockService.WatchAddress(test.operation, test.args)
if test.expectedErr != nil {
if err.Error() != test.expectedErr.Error() {
t.Logf("Test failed: %s", test.name)
t.Errorf("actual err: %+v\nexpected err: %+v", err, test.expectedErr)
}
continue
}
if err != nil {
t.Error(err)
}
// check updated indexing params
test.expectedParams.ComputeWatchedAddressesLeafKeys()
updatedParams := mockService.writeLoopParams.Params
if !reflect.DeepEqual(updatedParams, test.expectedParams) {
t.Logf("Test failed: %s", test.name)
t.Errorf("actual params: %+v\nexpected params: %+v", updatedParams, test.expectedParams)
}
}
}

View File

@ -22,9 +22,11 @@ package statediff
import ( import (
"encoding/json" "encoding/json"
"math/big" "math/big"
"sync"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
ctypes "github.com/ethereum/go-ethereum/core/types" ctypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/statediff/types" "github.com/ethereum/go-ethereum/statediff/types"
) )
@ -50,7 +52,21 @@ type Params struct {
IncludeTD bool IncludeTD bool
IncludeCode bool IncludeCode bool
WatchedAddresses []common.Address WatchedAddresses []common.Address
WatchedStorageSlots []common.Hash watchedAddressesLeafKeys map[common.Hash]struct{}
}
// ComputeWatchedAddressesLeafKeys populates a map with keys (Keccak256Hash) of each of the WatchedAddresses
func (p *Params) ComputeWatchedAddressesLeafKeys() {
p.watchedAddressesLeafKeys = make(map[common.Hash]struct{}, len(p.WatchedAddresses))
for _, address := range p.WatchedAddresses {
p.watchedAddressesLeafKeys[crypto.Keccak256Hash(address.Bytes())] = struct{}{}
}
}
// ParamsWithMutex allows to lock the parameters while they are being updated | read from
type ParamsWithMutex struct {
Params
sync.RWMutex
} }
// Args bundles the arguments for the state diff builder // Args bundles the arguments for the state diff builder
@ -111,3 +127,13 @@ type accountWrapper struct {
NodeValue []byte NodeValue []byte
LeafKey []byte LeafKey []byte
} }
// OperationType for type of WatchAddress operation
type OperationType string
const (
Add OperationType = "add"
Remove OperationType = "remove"
Set OperationType = "set"
Clear OperationType = "clear"
)

View File

@ -74,3 +74,10 @@ type CodeAndCodeHash struct {
type StateNodeSink func(StateNode) error type StateNodeSink func(StateNode) error
type StorageNodeSink func(StorageNode) error type StorageNodeSink func(StorageNode) error
type CodeSink func(CodeAndCodeHash) error type CodeSink func(CodeAndCodeHash) error
// WatchAddressArg is a arg type for WatchAddress API
type WatchAddressArg struct {
// Address represents common.Address
Address string
CreatedAt uint64
}