Remove support for watched storage slots

This commit is contained in:
Prathamesh Musale 2022-02-03 11:12:51 +05:30
parent 4e96d4f444
commit 8088141a40
13 changed files with 127 additions and 1081 deletions

View File

@ -106,15 +106,13 @@ type Params struct {
IncludeTD bool
IncludeCode bool
WatchedAddresses []common.Address
WatchedStorageSlots []common.Hash
}
```
Using these params we can tell the service whether to include state and/or storage intermediate nodes; whether
to include the associated block (header, uncles, and transactions); whether to include the associated receipts;
whether to include the total difficulty for this block; whether to include the set of code hashes and code for
contracts deployed in this block; whether to limit the diffing process to a list of specific addresses; and/or
whether to limit the diffing process to a list of specific storage slot keys.
contracts deployed in this block; whether to limit the diffing process to a list of specific addresses.
#### Subscription endpoint
A websocket supporting RPC endpoint is exposed for subscribing to state diff `StateObjects` that come off the head of the chain while the geth node syncs.

View File

@ -150,7 +150,7 @@ func (api *PublicStateDiffAPI) WriteStateDiffFor(ctx context.Context, blockHash
return api.sds.WriteStateDiffFor(blockHash, params)
}
// WatchAddress changes the list of watched addresses | storage slots to which the direct indexing is restricted according to given operation
// WatchAddress changes the list of watched addresses to which the direct indexing is restricted according to given operation
func (api *PublicStateDiffAPI) WatchAddress(operation OperationType, args []WatchAddressArg) error {
return api.sds.WatchAddress(operation, args)
}

View File

@ -123,7 +123,7 @@ func (sdb *builder) buildStateTrie(it trie.NodeIterator) ([]StateNode, []CodeAnd
node.LeafKey = leafKey
if !bytes.Equal(account.CodeHash, nullCodeHash) {
var storageNodes []StorageNode
err := sdb.buildStorageNodesEventual(account.Root, nil, true, storageNodeAppender(&storageNodes))
err := sdb.buildStorageNodesEventual(account.Root, true, storageNodeAppender(&storageNodes))
if err != nil {
return nil, nil, fmt.Errorf("failed building eventual storage diffs for account %+v\r\nerror: %v", account, err)
}
@ -220,12 +220,12 @@ func (sdb *builder) buildStateDiffWithIntermediateStateNodes(args StateRoots, pa
// build the diff nodes for the updated accounts using the mappings at both A and B as directed by the keys found as the intersection of the two
err = sdb.buildAccountUpdates(
diffAccountsAtB, diffAccountsAtA, updatedKeys,
params.WatchedStorageSlots, params.IntermediateStorageNodes, output)
params.IntermediateStorageNodes, output)
if err != nil {
return fmt.Errorf("error building diff for updated accounts: %v", err)
}
// build the diff nodes for created accounts
err = sdb.buildAccountCreations(diffAccountsAtB, params.WatchedStorageSlots, params.IntermediateStorageNodes, output, codeOutput)
err = sdb.buildAccountCreations(diffAccountsAtB, params.IntermediateStorageNodes, output, codeOutput)
if err != nil {
return fmt.Errorf("error building diff for created accounts: %v", err)
}
@ -274,12 +274,12 @@ func (sdb *builder) buildStateDiffWithoutIntermediateStateNodes(args StateRoots,
// build the diff nodes for the updated accounts using the mappings at both A and B as directed by the keys found as the intersection of the two
err = sdb.buildAccountUpdates(
diffAccountsAtB, diffAccountsAtA, updatedKeys,
params.WatchedStorageSlots, params.IntermediateStorageNodes, output)
params.IntermediateStorageNodes, output)
if err != nil {
return fmt.Errorf("error building diff for updated accounts: %v", err)
}
// build the diff nodes for created accounts
err = sdb.buildAccountCreations(diffAccountsAtB, params.WatchedStorageSlots, params.IntermediateStorageNodes, output, codeOutput)
err = sdb.buildAccountCreations(diffAccountsAtB, params.IntermediateStorageNodes, output, codeOutput)
if err != nil {
return fmt.Errorf("error building diff for created accounts: %v", err)
}
@ -456,8 +456,7 @@ func (sdb *builder) deletedOrUpdatedState(a, b trie.NodeIterator, diffPathsAtB m
// to generate the statediff node objects for all of the accounts that existed at both A and B but in different states
// needs to be called before building account creations and deletions as this mutates
// those account maps to remove the accounts which were updated
func (sdb *builder) buildAccountUpdates(creations, deletions AccountMap, updatedKeys []string,
watchedStorageKeys []common.Hash, intermediateStorageNodes bool, output StateNodeSink) error {
func (sdb *builder) buildAccountUpdates(creations, deletions AccountMap, updatedKeys []string, intermediateStorageNodes bool, output StateNodeSink) error {
var err error
for _, key := range updatedKeys {
createdAcc := creations[key]
@ -467,7 +466,7 @@ func (sdb *builder) buildAccountUpdates(creations, deletions AccountMap, updated
oldSR := deletedAcc.Account.Root
newSR := createdAcc.Account.Root
err = sdb.buildStorageNodesIncremental(
oldSR, newSR, watchedStorageKeys, intermediateStorageNodes,
oldSR, newSR, intermediateStorageNodes,
storageNodeAppender(&storageDiffs))
if err != nil {
return fmt.Errorf("failed building incremental storage diffs for account with leafkey %s\r\nerror: %v", key, err)
@ -491,7 +490,7 @@ func (sdb *builder) buildAccountUpdates(creations, deletions AccountMap, updated
// buildAccountCreations returns the statediff node objects for all the accounts that exist at B but not at A
// it also returns the code and codehash for created contract accounts
func (sdb *builder) buildAccountCreations(accounts AccountMap, watchedStorageKeys []common.Hash, intermediateStorageNodes bool, output StateNodeSink, codeOutput CodeSink) error {
func (sdb *builder) buildAccountCreations(accounts AccountMap, intermediateStorageNodes bool, output StateNodeSink, codeOutput CodeSink) error {
for _, val := range accounts {
diff := StateNode{
NodeType: val.NodeType,
@ -502,7 +501,7 @@ func (sdb *builder) buildAccountCreations(accounts AccountMap, watchedStorageKey
if !bytes.Equal(val.Account.CodeHash, nullCodeHash) {
// For contract creations, any storage node contained is a diff
var storageDiffs []StorageNode
err := sdb.buildStorageNodesEventual(val.Account.Root, watchedStorageKeys, intermediateStorageNodes, storageNodeAppender(&storageDiffs))
err := sdb.buildStorageNodesEventual(val.Account.Root, intermediateStorageNodes, storageNodeAppender(&storageDiffs))
if err != nil {
return fmt.Errorf("failed building eventual storage diffs for node %x\r\nerror: %v", val.Path, err)
}
@ -530,7 +529,7 @@ func (sdb *builder) buildAccountCreations(accounts AccountMap, watchedStorageKey
// buildStorageNodesEventual builds the storage diff node objects for a created account
// i.e. it returns all the storage nodes at this state, since there is no previous state
func (sdb *builder) buildStorageNodesEventual(sr common.Hash, watchedStorageKeys []common.Hash, intermediateNodes bool, output StorageNodeSink) error {
func (sdb *builder) buildStorageNodesEventual(sr common.Hash, intermediateNodes bool, output StorageNodeSink) error {
if bytes.Equal(sr.Bytes(), emptyContractRoot.Bytes()) {
return nil
}
@ -541,7 +540,7 @@ func (sdb *builder) buildStorageNodesEventual(sr common.Hash, watchedStorageKeys
return err
}
it := sTrie.NodeIterator(make([]byte, 0))
err = sdb.buildStorageNodesFromTrie(it, watchedStorageKeys, intermediateNodes, output)
err = sdb.buildStorageNodesFromTrie(it, intermediateNodes, output)
if err != nil {
return err
}
@ -551,7 +550,7 @@ func (sdb *builder) buildStorageNodesEventual(sr common.Hash, watchedStorageKeys
// buildStorageNodesFromTrie returns all the storage diff node objects in the provided node interator
// if any storage keys are provided it will only return those leaf nodes
// including intermediate nodes can be turned on or off
func (sdb *builder) buildStorageNodesFromTrie(it trie.NodeIterator, watchedStorageKeys []common.Hash, intermediateNodes bool, output StorageNodeSink) error {
func (sdb *builder) buildStorageNodesFromTrie(it trie.NodeIterator, intermediateNodes bool, output StorageNodeSink) error {
for it.Next(true) {
// skip value nodes
if it.Leaf() || bytes.Equal(nullHashBytes, it.Hash().Bytes()) {
@ -567,15 +566,13 @@ func (sdb *builder) buildStorageNodesFromTrie(it trie.NodeIterator, watchedStora
valueNodePath := append(node.Path, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:]
if isWatchedStorageKey(watchedStorageKeys, leafKey) {
if err := output(StorageNode{
NodeType: node.NodeType,
Path: node.Path,
NodeValue: node.NodeValue,
LeafKey: leafKey,
}); err != nil {
return err
}
if err := output(StorageNode{
NodeType: node.NodeType,
Path: node.Path,
NodeValue: node.NodeValue,
LeafKey: leafKey,
}); err != nil {
return err
}
case Extension, Branch:
if intermediateNodes {
@ -595,7 +592,7 @@ func (sdb *builder) buildStorageNodesFromTrie(it trie.NodeIterator, watchedStora
}
// buildStorageNodesIncremental builds the storage diff node objects for all nodes that exist in a different state at B than A
func (sdb *builder) buildStorageNodesIncremental(oldSR common.Hash, newSR common.Hash, watchedStorageKeys []common.Hash, intermediateNodes bool, output StorageNodeSink) error {
func (sdb *builder) buildStorageNodesIncremental(oldSR common.Hash, newSR common.Hash, intermediateNodes bool, output StorageNodeSink) error {
if bytes.Equal(newSR.Bytes(), oldSR.Bytes()) {
return nil
}
@ -611,19 +608,19 @@ func (sdb *builder) buildStorageNodesIncremental(oldSR common.Hash, newSR common
diffPathsAtB, err := sdb.createdAndUpdatedStorage(
oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{}),
watchedStorageKeys, intermediateNodes, output)
intermediateNodes, output)
if err != nil {
return err
}
err = sdb.deletedOrUpdatedStorage(oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{}),
diffPathsAtB, watchedStorageKeys, intermediateNodes, output)
diffPathsAtB, intermediateNodes, output)
if err != nil {
return err
}
return nil
}
func (sdb *builder) createdAndUpdatedStorage(a, b trie.NodeIterator, watchedKeys []common.Hash, intermediateNodes bool, output StorageNodeSink) (map[string]bool, error) {
func (sdb *builder) createdAndUpdatedStorage(a, b trie.NodeIterator, intermediateNodes bool, output StorageNodeSink) (map[string]bool, error) {
diffPathsAtB := make(map[string]bool)
it, _ := trie.NewDifferenceIterator(a, b)
for it.Next(true) {
@ -641,15 +638,13 @@ func (sdb *builder) createdAndUpdatedStorage(a, b trie.NodeIterator, watchedKeys
valueNodePath := append(node.Path, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:]
if isWatchedStorageKey(watchedKeys, leafKey) {
if err := output(StorageNode{
NodeType: node.NodeType,
Path: node.Path,
NodeValue: node.NodeValue,
LeafKey: leafKey,
}); err != nil {
return nil, err
}
if err := output(StorageNode{
NodeType: node.NodeType,
Path: node.Path,
NodeValue: node.NodeValue,
LeafKey: leafKey,
}); err != nil {
return nil, err
}
case Extension, Branch:
if intermediateNodes {
@ -669,7 +664,7 @@ func (sdb *builder) createdAndUpdatedStorage(a, b trie.NodeIterator, watchedKeys
return diffPathsAtB, it.Error()
}
func (sdb *builder) deletedOrUpdatedStorage(a, b trie.NodeIterator, diffPathsAtB map[string]bool, watchedKeys []common.Hash, intermediateNodes bool, output StorageNodeSink) error {
func (sdb *builder) deletedOrUpdatedStorage(a, b trie.NodeIterator, diffPathsAtB map[string]bool, intermediateNodes bool, output StorageNodeSink) error {
it, _ := trie.NewDifferenceIterator(b, a)
for it.Next(true) {
// skip value nodes
@ -692,15 +687,13 @@ func (sdb *builder) deletedOrUpdatedStorage(a, b trie.NodeIterator, diffPathsAtB
valueNodePath := append(node.Path, partialPath...)
encodedPath := trie.HexToCompact(valueNodePath)
leafKey := encodedPath[1:]
if isWatchedStorageKey(watchedKeys, leafKey) {
if err := output(StorageNode{
NodeType: Removed,
Path: node.Path,
NodeValue: []byte{},
LeafKey: leafKey,
}); err != nil {
return err
}
if err := output(StorageNode{
NodeType: Removed,
Path: node.Path,
NodeValue: []byte{},
LeafKey: leafKey,
}); err != nil {
return err
}
case Extension, Branch:
if intermediateNodes {
@ -733,17 +726,3 @@ func isWatchedAddress(watchedAddresses []common.Address, stateLeafKey []byte) bo
}
return false
}
// isWatchedStorageKey is used to check if a storage leaf corresponds to one of the storage slots the builder is configured to watch
func isWatchedStorageKey(watchedKeys []common.Hash, storageLeafKey []byte) bool {
// If we aren't watching any specific addresses, we are watching everything
if len(watchedKeys) == 0 {
return true
}
for _, hashKey := range watchedKeys {
if bytes.Equal(hashKey.Bytes(), storageLeafKey) {
return true
}
}
return false
}

View File

@ -1151,199 +1151,6 @@ func TestBuilderWithWatchedAddressList(t *testing.T) {
}
}
func TestBuilderWithWatchedAddressAndStorageKeyList(t *testing.T) {
blocks, chain := testhelpers.MakeChain(4, testhelpers.Genesis, testhelpers.TestChainGen)
contractLeafKey = testhelpers.AddressToLeafKey(testhelpers.ContractAddr)
defer chain.Stop()
block0 = testhelpers.Genesis
block1 = blocks[0]
block2 = blocks[1]
block3 = blocks[2]
block4 = blocks[3]
params := statediff.Params{
WatchedAddresses: []common.Address{testhelpers.Account1Addr, testhelpers.ContractAddr},
WatchedStorageSlots: []common.Hash{slot1StorageKey},
}
builder = statediff.NewBuilder(chain.StateCache())
var tests = []struct {
name string
startingArguments statediff.Args
expected *statediff.StateObject
}{
{
"testEmptyDiff",
statediff.Args{
OldStateRoot: block0.Root(),
NewStateRoot: block0.Root(),
BlockNumber: block0.Number(),
BlockHash: block0.Hash(),
},
&statediff.StateObject{
BlockNumber: block0.Number(),
BlockHash: block0.Hash(),
Nodes: emptyDiffs,
},
},
{
"testBlock0",
//10000 transferred from testBankAddress to account1Addr
statediff.Args{
OldStateRoot: testhelpers.NullHash,
NewStateRoot: block0.Root(),
BlockNumber: block0.Number(),
BlockHash: block0.Hash(),
},
&statediff.StateObject{
BlockNumber: block0.Number(),
BlockHash: block0.Hash(),
Nodes: emptyDiffs,
},
},
{
"testBlock1",
//10000 transferred from testBankAddress to account1Addr
statediff.Args{
OldStateRoot: block0.Root(),
NewStateRoot: block1.Root(),
BlockNumber: block1.Number(),
BlockHash: block1.Hash(),
},
&statediff.StateObject{
BlockNumber: block1.Number(),
BlockHash: block1.Hash(),
Nodes: []sdtypes.StateNode{
{
Path: []byte{'\x0e'},
NodeType: sdtypes.Leaf,
LeafKey: testhelpers.Account1LeafKey,
NodeValue: account1AtBlock1LeafNode,
StorageNodes: emptyStorage,
},
},
},
},
{
"testBlock2",
//1000 transferred from testBankAddress to account1Addr
//1000 transferred from account1Addr to account2Addr
statediff.Args{
OldStateRoot: block1.Root(),
NewStateRoot: block2.Root(),
BlockNumber: block2.Number(),
BlockHash: block2.Hash(),
},
&statediff.StateObject{
BlockNumber: block2.Number(),
BlockHash: block2.Hash(),
Nodes: []sdtypes.StateNode{
{
Path: []byte{'\x06'},
NodeType: sdtypes.Leaf,
LeafKey: contractLeafKey,
NodeValue: contractAccountAtBlock2LeafNode,
StorageNodes: []sdtypes.StorageNode{
{
Path: []byte{'\x0b'},
NodeType: sdtypes.Leaf,
LeafKey: slot1StorageKey.Bytes(),
NodeValue: slot1StorageLeafNode,
},
},
},
{
Path: []byte{'\x0e'},
NodeType: sdtypes.Leaf,
LeafKey: testhelpers.Account1LeafKey,
NodeValue: account1AtBlock2LeafNode,
StorageNodes: emptyStorage,
},
},
CodeAndCodeHashes: []sdtypes.CodeAndCodeHash{
{
Hash: testhelpers.CodeHash,
Code: testhelpers.ByteCodeAfterDeployment,
},
},
},
},
{
"testBlock3",
//the contract's storage is changed
//and the block is mined by account 2
statediff.Args{
OldStateRoot: block2.Root(),
NewStateRoot: block3.Root(),
BlockNumber: block3.Number(),
BlockHash: block3.Hash(),
},
&statediff.StateObject{
BlockNumber: block3.Number(),
BlockHash: block3.Hash(),
Nodes: []sdtypes.StateNode{
{
Path: []byte{'\x06'},
NodeType: sdtypes.Leaf,
LeafKey: contractLeafKey,
NodeValue: contractAccountAtBlock3LeafNode,
StorageNodes: emptyStorage,
},
},
},
},
{
"testBlock4",
statediff.Args{
OldStateRoot: block3.Root(),
NewStateRoot: block4.Root(),
BlockNumber: block4.Number(),
BlockHash: block4.Hash(),
},
&statediff.StateObject{
BlockNumber: block4.Number(),
BlockHash: block4.Hash(),
Nodes: []sdtypes.StateNode{
{
Path: []byte{'\x06'},
NodeType: sdtypes.Leaf,
LeafKey: contractLeafKey,
NodeValue: contractAccountAtBlock4LeafNode,
StorageNodes: []sdtypes.StorageNode{
{
Path: []byte{'\x0b'},
NodeType: sdtypes.Removed,
LeafKey: slot1StorageKey.Bytes(),
NodeValue: []byte{},
},
},
},
},
},
},
}
for _, test := range tests {
diff, err := builder.BuildStateDiffObject(test.startingArguments, params)
if err != nil {
t.Error(err)
}
receivedStateDiffRlp, err := rlp.EncodeToBytes(diff)
if err != nil {
t.Error(err)
}
expectedStateDiffRlp, err := rlp.EncodeToBytes(test.expected)
if err != nil {
t.Error(err)
}
sort.Slice(receivedStateDiffRlp, func(i, j int) bool { return receivedStateDiffRlp[i] < receivedStateDiffRlp[j] })
sort.Slice(expectedStateDiffRlp, func(i, j int) bool { return expectedStateDiffRlp[i] < expectedStateDiffRlp[j] })
if !bytes.Equal(receivedStateDiffRlp, expectedStateDiffRlp) {
t.Logf("Test failed: %s", test.name)
t.Errorf("actual state diff: %+v\nexpected state diff: %+v", diff, test.expected)
}
}
}
func TestBuilderWithRemovedAccountAndStorage(t *testing.T) {
blocks, chain := testhelpers.MakeChain(6, testhelpers.Genesis, testhelpers.TestChainGen)
contractLeafKey = testhelpers.AddressToLeafKey(testhelpers.ContractAddr)

View File

@ -26,7 +26,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/ethereum/go-ethereum/statediff/types"
)
func sortKeys(data AccountMap) []string {
@ -76,39 +75,27 @@ func findIntersection(a, b []string) []string {
}
}
// loadWatchedAddresses is used to load watched addresses and storage slots to the in-memory write loop params from the db
// loadWatchedAddresses is used to load watched addresses to in-memory write loop params from the db
func loadWatchedAddresses(db *postgres.DB) error {
type Watched struct {
Address string `db:"address"`
Kind int `db:"kind"`
}
var watched []Watched
pgStr := "SELECT address, kind FROM eth.watched_addresses"
pgStr := "SELECT address FROM eth.watched_addresses"
err := db.Select(&watched, pgStr)
if err != nil {
return fmt.Errorf("error loading watched addresses: %v", err)
}
var (
watchedAddresses = []common.Address{}
watchedStorageSlots = []common.Hash{}
)
watchedAddresses := []common.Address{}
for _, entry := range watched {
switch entry.Kind {
case types.WatchedAddress.Int():
watchedAddresses = append(watchedAddresses, common.HexToAddress(entry.Address))
case types.WatchedStorageSlot.Int():
watchedStorageSlots = append(watchedStorageSlots, common.HexToHash(entry.Address))
default:
return fmt.Errorf("Unexpected kind %d", entry.Kind)
}
watchedAddresses = append(watchedAddresses, common.HexToAddress(entry.Address))
}
writeLoopParams.Lock()
defer writeLoopParams.Unlock()
writeLoopParams.WatchedAddresses = watchedAddresses
writeLoopParams.WatchedStorageSlots = watchedStorageSlots
return nil
}

View File

@ -61,10 +61,10 @@ type Indexer interface {
ReportDBMetrics(delay time.Duration, quit <-chan bool)
// Methods used by WatchAddress API/functionality.
InsertWatchedAddresses(addresses []sdtypes.WatchAddressArg, currentBlock *big.Int, kind sdtypes.WatchedAddressType) error
RemoveWatchedAddresses(addresses []sdtypes.WatchAddressArg, kind sdtypes.WatchedAddressType) error
SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int, kind sdtypes.WatchedAddressType) error
ClearWatchedAddresses(kind sdtypes.WatchedAddressType) error
InsertWatchedAddresses(addresses []sdtypes.WatchAddressArg, currentBlock *big.Int) error
RemoveWatchedAddresses(addresses []sdtypes.WatchAddressArg) error
SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error
ClearWatchedAddresses() error
}
// StateDiffIndexer satisfies the Indexer interface for ethereum statediff objects
@ -556,8 +556,8 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(tx *BlockTx, codeAndCodeHash sd
return nil
}
// InsertWatchedAddresses inserts the given addresses | storage slots in the database
func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int, kind sdtypes.WatchedAddressType) error {
// InsertWatchedAddresses inserts the given addresses in the database
func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error {
tx, err := sdi.dbWriter.db.Begin()
if err != nil {
return err
@ -565,8 +565,8 @@ func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressA
defer tx.Rollback()
for _, arg := range args {
_, err = tx.Exec(`INSERT INTO eth.watched_addresses (address, kind, created_at, watched_at) VALUES ($1, $2, $3, $4) ON CONFLICT (address) DO NOTHING`,
arg.Address, kind.Int(), arg.CreatedAt, currentBlockNumber.Uint64())
_, err = tx.Exec(`INSERT INTO eth.watched_addresses (address, created_at, watched_at) VALUES ($1, $2, $3) ON CONFLICT (address) DO NOTHING`,
arg.Address, arg.CreatedAt, currentBlockNumber.Uint64())
if err != nil {
return fmt.Errorf("error inserting watched_addresses entry: %v", err)
}
@ -580,8 +580,8 @@ func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressA
return nil
}
// RemoveWatchedAddresses removes the given addresses | storage slots from the database
func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressArg, kind sdtypes.WatchedAddressType) error {
// RemoveWatchedAddresses removes the given watched addresses from the database
func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressArg) error {
tx, err := sdi.dbWriter.db.Begin()
if err != nil {
return err
@ -589,7 +589,7 @@ func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressA
defer tx.Rollback()
for _, arg := range args {
_, err = tx.Exec(`DELETE FROM eth.watched_addresses WHERE address = $1 AND kind = $2`, arg.Address, kind.Int())
_, err = tx.Exec(`DELETE FROM eth.watched_addresses WHERE address = $1`, arg.Address)
if err != nil {
return fmt.Errorf("error removing watched_addresses entry: %v", err)
}
@ -603,22 +603,22 @@ func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressA
return nil
}
// SetWatchedAddresses clears and inserts the given addresses | storage slots in the database
func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int, kind sdtypes.WatchedAddressType) error {
// SetWatchedAddresses clears and inserts the given addresses in the database
func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error {
tx, err := sdi.dbWriter.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
_, err = tx.Exec(`DELETE FROM eth.watched_addresses WHERE kind = $1`, kind.Int())
_, err = tx.Exec(`DELETE FROM eth.watched_addresses`)
if err != nil {
return fmt.Errorf("error setting watched_addresses table: %v", err)
}
for _, arg := range args {
_, err = tx.Exec(`INSERT INTO eth.watched_addresses (address, kind, created_at, watched_at) VALUES ($1, $2, $3, $4) ON CONFLICT (address) DO NOTHING`,
arg.Address, kind.Int(), arg.CreatedAt, currentBlockNumber.Uint64())
_, err = tx.Exec(`INSERT INTO eth.watched_addresses (address, created_at, watched_at) VALUES ($1, $2, $3) ON CONFLICT (address) DO NOTHING`,
arg.Address, arg.CreatedAt, currentBlockNumber.Uint64())
if err != nil {
return fmt.Errorf("error setting watched_addresses table: %v", err)
}
@ -632,9 +632,9 @@ func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg,
return nil
}
// ClearWatchedAddresses clears all the addresses | storage slots from the database
func (sdi *StateDiffIndexer) ClearWatchedAddresses(kind sdtypes.WatchedAddressType) error {
_, err := sdi.dbWriter.db.Exec(`DELETE FROM eth.watched_addresses WHERE kind = $1`, kind.Int())
// ClearWatchedAddresses clears all the watched addresses from the database
func (sdi *StateDiffIndexer) ClearWatchedAddresses() error {
_, err := sdi.dbWriter.db.Exec(`DELETE FROM eth.watched_addresses`)
if err != nil {
return fmt.Errorf("error clearing watched_addresses table: %v", err)
}

View File

@ -25,7 +25,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff/indexer"
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs"
@ -56,8 +55,6 @@ var (
state1CID, state2CID, storageCID cid.Cid
contract1Address, contract2Address, contract3Address, contract4Address string
contract1CreatedAt, contract2CreatedAt, contract3CreatedAt, contract4CreatedAt uint64
slot1StorageKeyHex, slot2StorageKeyHex, slot3StorageKeyHex, slot4StorageKeyHex string
slot1CreatedAt, slot2CreatedAt, slot3CreatedAt, slot4CreatedAt uint64
lastFilledAt, watchedAt1, watchedAt2, watchedAt3 uint64
)
@ -179,15 +176,6 @@ func init() {
contract3CreatedAt = uint64(3)
contract4CreatedAt = uint64(4)
slot1StorageKeyHex = crypto.Keccak256Hash(common.HexToHash("1").Bytes()).Hex()
slot2StorageKeyHex = crypto.Keccak256Hash(common.HexToHash("2").Bytes()).Hex()
slot3StorageKeyHex = crypto.Keccak256Hash(common.HexToHash("3").Bytes()).Hex()
slot4StorageKeyHex = crypto.Keccak256Hash(common.HexToHash("4").Bytes()).Hex()
slot1CreatedAt = uint64(1)
slot2CreatedAt = uint64(2)
slot3CreatedAt = uint64(3)
slot4CreatedAt = uint64(4)
lastFilledAt = uint64(0)
watchedAt1 = uint64(10)
watchedAt2 = uint64(15)
@ -696,7 +684,6 @@ func TestWatchAddressMethods(t *testing.T) {
type res struct {
Address string `db:"address"`
Kind int `db:"kind"`
CreatedAt uint64 `db:"created_at"`
WatchedAt uint64 `db:"watched_at"`
LastFilledAt uint64 `db:"last_filled_at"`
@ -718,21 +705,19 @@ func TestWatchAddressMethods(t *testing.T) {
expectedData := []res{
{
Address: contract1Address,
Kind: sdtypes.WatchedAddress.Int(),
CreatedAt: contract1CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
{
Address: contract2Address,
Kind: sdtypes.WatchedAddress.Int(),
CreatedAt: contract2CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
}
ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt1)), sdtypes.WatchedAddress)
ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt1)))
rows := []res{}
err = db.Select(&rows, pgStr)
@ -760,28 +745,25 @@ func TestWatchAddressMethods(t *testing.T) {
expectedData := []res{
{
Address: contract1Address,
Kind: sdtypes.WatchedAddress.Int(),
CreatedAt: contract1CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
{
Address: contract2Address,
Kind: sdtypes.WatchedAddress.Int(),
CreatedAt: contract2CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
{
Address: contract3Address,
Kind: sdtypes.WatchedAddress.Int(),
CreatedAt: contract3CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
}
ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt2)), sdtypes.WatchedAddress)
ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt2)))
rows := []res{}
err = db.Select(&rows, pgStr)
@ -809,14 +791,13 @@ func TestWatchAddressMethods(t *testing.T) {
expectedData := []res{
{
Address: contract1Address,
Kind: sdtypes.WatchedAddress.Int(),
CreatedAt: contract1CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
}
ind.RemoveWatchedAddresses(args, sdtypes.WatchedAddress)
ind.RemoveWatchedAddresses(args)
rows := []res{}
err = db.Select(&rows, pgStr)
@ -843,7 +824,7 @@ func TestWatchAddressMethods(t *testing.T) {
}
expectedData := []res{}
ind.RemoveWatchedAddresses(args, sdtypes.WatchedAddress)
ind.RemoveWatchedAddresses(args)
rows := []res{}
err = db.Select(&rows, pgStr)
@ -875,28 +856,25 @@ func TestWatchAddressMethods(t *testing.T) {
expectedData := []res{
{
Address: contract1Address,
Kind: sdtypes.WatchedAddress.Int(),
CreatedAt: contract1CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
{
Address: contract2Address,
Kind: sdtypes.WatchedAddress.Int(),
CreatedAt: contract2CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
{
Address: contract3Address,
Kind: sdtypes.WatchedAddress.Int(),
CreatedAt: contract3CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
}
ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt2)), sdtypes.WatchedAddress)
ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt2)))
rows := []res{}
err = db.Select(&rows, pgStr)
@ -928,28 +906,25 @@ func TestWatchAddressMethods(t *testing.T) {
expectedData := []res{
{
Address: contract4Address,
Kind: sdtypes.WatchedAddress.Int(),
CreatedAt: contract4CreatedAt,
WatchedAt: watchedAt3,
LastFilledAt: lastFilledAt,
},
{
Address: contract2Address,
Kind: sdtypes.WatchedAddress.Int(),
CreatedAt: contract2CreatedAt,
WatchedAt: watchedAt3,
LastFilledAt: lastFilledAt,
},
{
Address: contract3Address,
Kind: sdtypes.WatchedAddress.Int(),
CreatedAt: contract3CreatedAt,
WatchedAt: watchedAt3,
LastFilledAt: lastFilledAt,
},
}
ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt3)), sdtypes.WatchedAddress)
ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt3)))
rows := []res{}
err = db.Select(&rows, pgStr)
@ -966,7 +941,7 @@ func TestWatchAddressMethods(t *testing.T) {
t.Run("Clear watched addresses", func(t *testing.T) {
expectedData := []res{}
ind.ClearWatchedAddresses(sdtypes.WatchedAddress)
ind.ClearWatchedAddresses()
rows := []res{}
err = db.Select(&rows, pgStr)
@ -983,304 +958,7 @@ func TestWatchAddressMethods(t *testing.T) {
t.Run("Clear watched addresses (empty table)", func(t *testing.T) {
expectedData := []res{}
ind.ClearWatchedAddresses(sdtypes.WatchedAddress)
rows := []res{}
err = db.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
shared.ExpectEqual(t, row, expectedData[idx])
}
})
// Watched storage slots
// Reset the db.
tearDown(t)
t.Run("Insert watched storage slots", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: slot1StorageKeyHex,
CreatedAt: slot1CreatedAt,
},
{
Address: slot2StorageKeyHex,
CreatedAt: slot2CreatedAt,
},
}
expectedData := []res{
{
Address: slot1StorageKeyHex,
Kind: sdtypes.WatchedStorageSlot.Int(),
CreatedAt: slot1CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
{
Address: slot2StorageKeyHex,
Kind: sdtypes.WatchedStorageSlot.Int(),
CreatedAt: slot2CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
}
ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt1)), sdtypes.WatchedStorageSlot)
rows := []res{}
err = db.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
shared.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Insert watched storage slots (some already watched)", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: slot3StorageKeyHex,
CreatedAt: slot3CreatedAt,
},
{
Address: slot2StorageKeyHex,
CreatedAt: slot2CreatedAt,
},
}
expectedData := []res{
{
Address: slot1StorageKeyHex,
Kind: sdtypes.WatchedStorageSlot.Int(),
CreatedAt: slot1CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
{
Address: slot2StorageKeyHex,
Kind: sdtypes.WatchedStorageSlot.Int(),
CreatedAt: slot2CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
{
Address: slot3StorageKeyHex,
Kind: sdtypes.WatchedStorageSlot.Int(),
CreatedAt: slot3CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
}
ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt2)), sdtypes.WatchedStorageSlot)
rows := []res{}
err = db.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
shared.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Remove watched storage slots", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: slot3StorageKeyHex,
CreatedAt: slot3CreatedAt,
},
{
Address: slot2StorageKeyHex,
CreatedAt: slot2CreatedAt,
},
}
expectedData := []res{
{
Address: slot1StorageKeyHex,
Kind: sdtypes.WatchedStorageSlot.Int(),
CreatedAt: slot1CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
}
ind.RemoveWatchedAddresses(args, sdtypes.WatchedStorageSlot)
rows := []res{}
err = db.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
shared.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Remove watched storage slots (some non-watched)", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: slot1StorageKeyHex,
CreatedAt: slot1CreatedAt,
},
{
Address: slot2StorageKeyHex,
CreatedAt: slot2CreatedAt,
},
}
expectedData := []res{}
ind.RemoveWatchedAddresses(args, sdtypes.WatchedStorageSlot)
rows := []res{}
err = db.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
shared.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Set watched storage slots", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: slot1StorageKeyHex,
CreatedAt: slot1CreatedAt,
},
{
Address: slot2StorageKeyHex,
CreatedAt: slot2CreatedAt,
},
{
Address: slot3StorageKeyHex,
CreatedAt: slot3CreatedAt,
},
}
expectedData := []res{
{
Address: slot1StorageKeyHex,
Kind: sdtypes.WatchedStorageSlot.Int(),
CreatedAt: slot1CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
{
Address: slot2StorageKeyHex,
Kind: sdtypes.WatchedStorageSlot.Int(),
CreatedAt: slot2CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
{
Address: slot3StorageKeyHex,
Kind: sdtypes.WatchedStorageSlot.Int(),
CreatedAt: slot3CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
}
ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt2)), sdtypes.WatchedStorageSlot)
rows := []res{}
err = db.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
shared.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Set watched storage slots (some already watched)", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: slot4StorageKeyHex,
CreatedAt: slot4CreatedAt,
},
{
Address: slot2StorageKeyHex,
CreatedAt: slot2CreatedAt,
},
{
Address: slot3StorageKeyHex,
CreatedAt: slot3CreatedAt,
},
}
expectedData := []res{
{
Address: slot4StorageKeyHex,
Kind: sdtypes.WatchedStorageSlot.Int(),
CreatedAt: slot4CreatedAt,
WatchedAt: watchedAt3,
LastFilledAt: lastFilledAt,
},
{
Address: slot2StorageKeyHex,
Kind: sdtypes.WatchedStorageSlot.Int(),
CreatedAt: slot2CreatedAt,
WatchedAt: watchedAt3,
LastFilledAt: lastFilledAt,
},
{
Address: slot3StorageKeyHex,
Kind: sdtypes.WatchedStorageSlot.Int(),
CreatedAt: slot3CreatedAt,
WatchedAt: watchedAt3,
LastFilledAt: lastFilledAt,
},
}
ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt3)), sdtypes.WatchedStorageSlot)
rows := []res{}
err = db.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
shared.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Clear watched storage slots", func(t *testing.T) {
expectedData := []res{}
ind.ClearWatchedAddresses(sdtypes.WatchedStorageSlot)
rows := []res{}
err = db.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
shared.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Clear watched storage slots (empty table)", func(t *testing.T) {
expectedData := []res{}
ind.ClearWatchedAddresses(sdtypes.WatchedStorageSlot)
ind.ClearWatchedAddresses()
rows := []res{}
err = db.Select(&rows, pgStr)

View File

@ -55,6 +55,7 @@ const (
defaultRetryLimit = 3 // default retry limit once deadlock is detected.
deadlockDetected = "deadlock detected" // 40P01 https://www.postgresql.org/docs/current/errcodes-appendix.html
typeAssertionFailed = "type assertion failed"
unexpectedOperation = "unexpected operation"
)
var writeLoopParams = ParamsWithMutex{
@ -734,9 +735,8 @@ func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot commo
return err
}
// Performs one of following operations on the watched addresses | storage slots in writeLoopParams and the db:
// AddAddresses | RemoveAddresses | SetAddresses | ClearAddresses
// AddStorageSlots | RemoveStorageSlots | SetStorageSlots | ClearStorageSlots
// Performs one of following operations on the watched addresses in writeLoopParams and the db:
// Add | Remove | Set | Clear
func (sds *Service) WatchAddress(operation OperationType, args []WatchAddressArg) error {
// lock writeLoopParams for a write
writeLoopParams.Lock()
@ -746,7 +746,7 @@ func (sds *Service) WatchAddress(operation OperationType, args []WatchAddressArg
currentBlockNumber := sds.BlockChain.CurrentBlock().Number()
switch operation {
case AddAddresses:
case Add:
// filter out args having an already watched address with a warning
filteredArgs, ok := funk.Filter(args, func(arg WatchAddressArg) bool {
if funk.Contains(writeLoopParams.WatchedAddresses, common.HexToAddress(arg.Address)) {
@ -756,7 +756,7 @@ func (sds *Service) WatchAddress(operation OperationType, args []WatchAddressArg
return true
}).([]WatchAddressArg)
if !ok {
return fmt.Errorf("AddAddresses: filtered args %s", typeAssertionFailed)
return fmt.Errorf("Add: filtered args %s", typeAssertionFailed)
}
// get addresses from the filtered args
@ -764,60 +764,60 @@ func (sds *Service) WatchAddress(operation OperationType, args []WatchAddressArg
return common.HexToAddress(arg.Address)
}).([]common.Address)
if !ok {
return fmt.Errorf("AddAddresses: filtered addresses %s", typeAssertionFailed)
return fmt.Errorf("Add: filtered addresses %s", typeAssertionFailed)
}
// update the db
err := sds.indexer.InsertWatchedAddresses(filteredArgs, currentBlockNumber, WatchedAddress)
err := sds.indexer.InsertWatchedAddresses(filteredArgs, currentBlockNumber)
if err != nil {
return err
}
// update in-memory params
writeLoopParams.WatchedAddresses = append(writeLoopParams.WatchedAddresses, filteredAddresses...)
case RemoveAddresses:
case Remove:
// get addresses from args
argAddresses, ok := funk.Map(args, func(arg WatchAddressArg) common.Address {
return common.HexToAddress(arg.Address)
}).([]common.Address)
if !ok {
return fmt.Errorf("RemoveAddresses: mapped addresses %s", typeAssertionFailed)
return fmt.Errorf("Remove: mapped addresses %s", typeAssertionFailed)
}
// remove the provided addresses from currently watched addresses
addresses, ok := funk.Subtract(writeLoopParams.WatchedAddresses, argAddresses).([]common.Address)
if !ok {
return fmt.Errorf("RemoveAddresses: filtered addresses %s", typeAssertionFailed)
return fmt.Errorf("Remove: filtered addresses %s", typeAssertionFailed)
}
// update the db
err := sds.indexer.RemoveWatchedAddresses(args, WatchedAddress)
err := sds.indexer.RemoveWatchedAddresses(args)
if err != nil {
return err
}
// update in-memory params
writeLoopParams.WatchedAddresses = addresses
case SetAddresses:
case Set:
// get addresses from args
argAddresses, ok := funk.Map(args, func(arg WatchAddressArg) common.Address {
return common.HexToAddress(arg.Address)
}).([]common.Address)
if !ok {
return fmt.Errorf("SetAddresses: mapped addresses %s", typeAssertionFailed)
return fmt.Errorf("Set: mapped addresses %s", typeAssertionFailed)
}
// update the db
err := sds.indexer.SetWatchedAddresses(args, currentBlockNumber, WatchedAddress)
err := sds.indexer.SetWatchedAddresses(args, currentBlockNumber)
if err != nil {
return err
}
// update in-memory params
writeLoopParams.WatchedAddresses = argAddresses
case ClearAddresses:
case Clear:
// update the db
err := sds.indexer.ClearWatchedAddresses(WatchedAddress)
err := sds.indexer.ClearWatchedAddresses()
if err != nil {
return err
}
@ -825,85 +825,8 @@ func (sds *Service) WatchAddress(operation OperationType, args []WatchAddressArg
// update in-memory params
writeLoopParams.WatchedAddresses = []common.Address{}
case AddStorageSlots:
// filter out args having an already watched storage slot with a warning
filteredArgs, ok := funk.Filter(args, func(arg WatchAddressArg) bool {
if funk.Contains(writeLoopParams.WatchedStorageSlots, common.HexToHash(arg.Address)) {
log.Warn("StorageSlot already being watched", "address", arg.Address)
return false
}
return true
}).([]WatchAddressArg)
if !ok {
return fmt.Errorf("AddStorageSlots: filtered args %s", typeAssertionFailed)
}
// get storage slots from the filtered args
filteredStorageSlots, ok := funk.Map(filteredArgs, func(arg WatchAddressArg) common.Hash {
return common.HexToHash(arg.Address)
}).([]common.Hash)
if !ok {
return fmt.Errorf("AddStorageSlots: filtered storage slots %s", typeAssertionFailed)
}
// update the db
err := sds.indexer.InsertWatchedAddresses(filteredArgs, currentBlockNumber, WatchedStorageSlot)
if err != nil {
return err
}
// update in-memory params
writeLoopParams.WatchedStorageSlots = append(writeLoopParams.WatchedStorageSlots, filteredStorageSlots...)
case RemoveStorageSlots:
// get storage slots from args
argStorageSlots, ok := funk.Map(args, func(arg WatchAddressArg) common.Hash {
return common.HexToHash(arg.Address)
}).([]common.Hash)
if !ok {
return fmt.Errorf("RemoveStorageSlots: mapped storage slots %s", typeAssertionFailed)
}
// remove the provided storage slots from currently watched storage slots
storageSlots, ok := funk.Subtract(writeLoopParams.WatchedStorageSlots, argStorageSlots).([]common.Hash)
if !ok {
return fmt.Errorf("RemoveStorageSlots: filtered storage slots %s", typeAssertionFailed)
}
// update the db
err := sds.indexer.RemoveWatchedAddresses(args, WatchedStorageSlot)
if err != nil {
return err
}
// update in-memory params
writeLoopParams.WatchedStorageSlots = storageSlots
case SetStorageSlots:
// get storage slots from args
argStorageSlots, ok := funk.Map(args, func(arg WatchAddressArg) common.Hash {
return common.HexToHash(arg.Address)
}).([]common.Hash)
if !ok {
return fmt.Errorf("SetStorageSlots: mapped storage slots %s", typeAssertionFailed)
}
// update the db
err := sds.indexer.SetWatchedAddresses(args, currentBlockNumber, WatchedStorageSlot)
if err != nil {
return err
}
// update in-memory params
writeLoopParams.WatchedStorageSlots = argStorageSlots
case ClearStorageSlots:
err := sds.indexer.ClearWatchedAddresses(WatchedStorageSlot)
if err != nil {
return err
}
writeLoopParams.WatchedStorageSlots = []common.Hash{}
default:
return fmt.Errorf("Unexpected operation %s", operation)
return fmt.Errorf("%s %s", unexpectedOperation, operation)
}
return nil

View File

@ -42,18 +42,18 @@ func (sdi *Indexer) PushCodeAndCodeHash(tx *indexer.BlockTx, codeAndCodeHash sdt
func (sdi *Indexer) ReportDBMetrics(delay time.Duration, quit <-chan bool) {}
func (sdi *Indexer) InsertWatchedAddresses(addresses []sdtypes.WatchAddressArg, currentBlock *big.Int, kind sdtypes.WatchedAddressType) error {
func (sdi *Indexer) InsertWatchedAddresses(addresses []sdtypes.WatchAddressArg, currentBlock *big.Int) error {
return nil
}
func (sdi *Indexer) RemoveWatchedAddresses(addresses []sdtypes.WatchAddressArg, kind sdtypes.WatchedAddressType) error {
func (sdi *Indexer) RemoveWatchedAddresses(addresses []sdtypes.WatchAddressArg) error {
return nil
}
func (sdi *Indexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int, kind sdtypes.WatchedAddressType) error {
func (sdi *Indexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error {
return nil
}
func (sdi *Indexer) ClearWatchedAddresses(kind sdtypes.WatchedAddressType) error {
func (sdi *Indexer) ClearWatchedAddresses() error {
return nil
}

View File

@ -37,7 +37,10 @@ import (
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
)
var typeAssertionFailed = "type assertion failed"
var (
typeAssertionFailed = "type assertion failed"
unexpectedOperation = "unexpected operation"
)
// MockStateDiffService is a mock state diff service
type MockStateDiffService struct {
@ -349,7 +352,7 @@ func (sds *MockStateDiffService) WatchAddress(operation statediff.OperationType,
currentBlockNumber := sds.BlockChain.CurrentBlock().Number()
switch operation {
case statediff.AddAddresses:
case statediff.Add:
// filter out args having an already watched address with a warning
filteredArgs, ok := funk.Filter(args, func(arg sdtypes.WatchAddressArg) bool {
if funk.Contains(sds.writeLoopParams.WatchedAddresses, common.HexToAddress(arg.Address)) {
@ -359,7 +362,7 @@ func (sds *MockStateDiffService) WatchAddress(operation statediff.OperationType,
return true
}).([]sdtypes.WatchAddressArg)
if !ok {
return fmt.Errorf("AddAddresses: filtered args %s", typeAssertionFailed)
return fmt.Errorf("Add: filtered args %s", typeAssertionFailed)
}
// get addresses from the filtered args
@ -367,60 +370,60 @@ func (sds *MockStateDiffService) WatchAddress(operation statediff.OperationType,
return common.HexToAddress(arg.Address)
}).([]common.Address)
if !ok {
return fmt.Errorf("AddAddresses: filtered addresses %s", typeAssertionFailed)
return fmt.Errorf("Add: filtered addresses %s", typeAssertionFailed)
}
// update the db
err := sds.Indexer.InsertWatchedAddresses(filteredArgs, currentBlockNumber, sdtypes.WatchedAddress)
err := sds.Indexer.InsertWatchedAddresses(filteredArgs, currentBlockNumber)
if err != nil {
return err
}
// update in-memory params
sds.writeLoopParams.WatchedAddresses = append(sds.writeLoopParams.WatchedAddresses, filteredAddresses...)
case statediff.RemoveAddresses:
case statediff.Remove:
// get addresses from args
argAddresses, ok := funk.Map(args, func(arg sdtypes.WatchAddressArg) common.Address {
return common.HexToAddress(arg.Address)
}).([]common.Address)
if !ok {
return fmt.Errorf("RemoveAddresses: mapped addresses %s", typeAssertionFailed)
return fmt.Errorf("Remove: mapped addresses %s", typeAssertionFailed)
}
// remove the provided addresses from currently watched addresses
addresses, ok := funk.Subtract(sds.writeLoopParams.WatchedAddresses, argAddresses).([]common.Address)
if !ok {
return fmt.Errorf("RemoveAddresses: filtered addresses %s", typeAssertionFailed)
return fmt.Errorf("Remove: filtered addresses %s", typeAssertionFailed)
}
// update the db
err := sds.Indexer.RemoveWatchedAddresses(args, sdtypes.WatchedAddress)
err := sds.Indexer.RemoveWatchedAddresses(args)
if err != nil {
return err
}
// update in-memory params
sds.writeLoopParams.WatchedAddresses = addresses
case statediff.SetAddresses:
case statediff.Set:
// get addresses from args
argAddresses, ok := funk.Map(args, func(arg sdtypes.WatchAddressArg) common.Address {
return common.HexToAddress(arg.Address)
}).([]common.Address)
if !ok {
return fmt.Errorf("SetAddresses: mapped addresses %s", typeAssertionFailed)
return fmt.Errorf("Set: mapped addresses %s", typeAssertionFailed)
}
// update the db
err := sds.Indexer.SetWatchedAddresses(args, currentBlockNumber, sdtypes.WatchedAddress)
err := sds.Indexer.SetWatchedAddresses(args, currentBlockNumber)
if err != nil {
return err
}
// update in-memory params
sds.writeLoopParams.WatchedAddresses = argAddresses
case statediff.ClearAddresses:
case statediff.Clear:
// update the db
err := sds.Indexer.ClearWatchedAddresses(sdtypes.WatchedAddress)
err := sds.Indexer.ClearWatchedAddresses()
if err != nil {
return err
}
@ -428,85 +431,8 @@ func (sds *MockStateDiffService) WatchAddress(operation statediff.OperationType,
// update in-memory params
sds.writeLoopParams.WatchedAddresses = []common.Address{}
case statediff.AddStorageSlots:
// filter out args having an already watched storage slot with a warning
filteredArgs, ok := funk.Filter(args, func(arg sdtypes.WatchAddressArg) bool {
if funk.Contains(sds.writeLoopParams.WatchedStorageSlots, common.HexToHash(arg.Address)) {
log.Warn("StorageSlot already being watched", "address", arg.Address)
return false
}
return true
}).([]sdtypes.WatchAddressArg)
if !ok {
return fmt.Errorf("AddStorageSlots: filtered args %s", typeAssertionFailed)
}
// get storage slots from the filtered args
filteredStorageSlots, ok := funk.Map(filteredArgs, func(arg sdtypes.WatchAddressArg) common.Hash {
return common.HexToHash(arg.Address)
}).([]common.Hash)
if !ok {
return fmt.Errorf("AddStorageSlots: filtered storage slots %s", typeAssertionFailed)
}
// update the db
err := sds.Indexer.InsertWatchedAddresses(filteredArgs, currentBlockNumber, sdtypes.WatchedStorageSlot)
if err != nil {
return err
}
// update in-memory params
sds.writeLoopParams.WatchedStorageSlots = append(sds.writeLoopParams.WatchedStorageSlots, filteredStorageSlots...)
case statediff.RemoveStorageSlots:
// get storage slots from args
argStorageSlots, ok := funk.Map(args, func(arg sdtypes.WatchAddressArg) common.Hash {
return common.HexToHash(arg.Address)
}).([]common.Hash)
if !ok {
return fmt.Errorf("RemoveStorageSlots: mapped storage slots %s", typeAssertionFailed)
}
// remove the provided storage slots from currently watched storage slots
storageSlots, ok := funk.Subtract(sds.writeLoopParams.WatchedStorageSlots, argStorageSlots).([]common.Hash)
if !ok {
return fmt.Errorf("RemoveStorageSlots: filtered storage slots %s", typeAssertionFailed)
}
// update the db
err := sds.Indexer.RemoveWatchedAddresses(args, sdtypes.WatchedStorageSlot)
if err != nil {
return err
}
// update in-memory params
sds.writeLoopParams.WatchedStorageSlots = storageSlots
case statediff.SetStorageSlots:
// get storage slots from args
argStorageSlots, ok := funk.Map(args, func(arg sdtypes.WatchAddressArg) common.Hash {
return common.HexToHash(arg.Address)
}).([]common.Hash)
if !ok {
return fmt.Errorf("SetStorageSlots: mapped storage slots %s", typeAssertionFailed)
}
// update the db
err := sds.Indexer.SetWatchedAddresses(args, currentBlockNumber, sdtypes.WatchedStorageSlot)
if err != nil {
return err
}
// update in-memory params
sds.writeLoopParams.WatchedStorageSlots = argStorageSlots
case statediff.ClearStorageSlots:
err := sds.Indexer.ClearWatchedAddresses(sdtypes.WatchedStorageSlot)
if err != nil {
return err
}
sds.writeLoopParams.WatchedStorageSlots = []common.Hash{}
default:
return fmt.Errorf("Unexpected operation %s", operation)
return fmt.Errorf("%s %s", unexpectedOperation, operation)
}
return nil

View File

@ -28,7 +28,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff"
@ -273,23 +272,6 @@ func testWatchAddressAPI(t *testing.T) {
contract3CreatedAt = uint64(3)
contract4CreatedAt = uint64(4)
slot1 = common.HexToHash("1")
slot2 = common.HexToHash("2")
slot3 = common.HexToHash("3")
slot4 = common.HexToHash("4")
slot1StorageKey = crypto.Keccak256Hash(slot1.Bytes())
slot2StorageKey = crypto.Keccak256Hash(slot2.Bytes())
slot3StorageKey = crypto.Keccak256Hash(slot3.Bytes())
slot4StorageKey = crypto.Keccak256Hash(slot4.Bytes())
slot1StorageKeyHex = crypto.Keccak256Hash(slot1.Bytes()).Hex()
slot2StorageKeyHex = crypto.Keccak256Hash(slot2.Bytes()).Hex()
slot3StorageKeyHex = crypto.Keccak256Hash(slot3.Bytes()).Hex()
slot4StorageKeyHex = crypto.Keccak256Hash(slot4.Bytes()).Hex()
slot1CreatedAt = uint64(1)
slot2CreatedAt = uint64(2)
slot3CreatedAt = uint64(3)
slot4CreatedAt = uint64(4)
args1 = []sdtypes.WatchAddressArg{
{
Address: contract1Address,
@ -424,141 +406,6 @@ func testWatchAddressAPI(t *testing.T) {
expectedParams9 = statediff.Params{
WatchedAddresses: []common.Address{},
}
args10 = []sdtypes.WatchAddressArg{
{
Address: slot1StorageKeyHex,
CreatedAt: slot1CreatedAt,
},
{
Address: slot2StorageKeyHex,
CreatedAt: slot2CreatedAt,
},
}
startingParams10 = statediff.Params{
WatchedStorageSlots: []common.Hash{},
}
expectedParams10 = statediff.Params{
WatchedStorageSlots: []common.Hash{
slot1StorageKey,
slot2StorageKey,
},
}
args11 = []sdtypes.WatchAddressArg{
{
Address: slot3StorageKeyHex,
CreatedAt: slot3CreatedAt,
},
{
Address: slot2StorageKeyHex,
CreatedAt: slot2CreatedAt,
},
}
startingParams11 = expectedParams10
expectedParams11 = statediff.Params{
WatchedStorageSlots: []common.Hash{
slot1StorageKey,
slot2StorageKey,
slot3StorageKey,
},
}
args12 = []sdtypes.WatchAddressArg{
{
Address: slot3StorageKeyHex,
CreatedAt: slot3CreatedAt,
},
{
Address: slot2StorageKeyHex,
CreatedAt: slot2CreatedAt,
},
}
startingParams12 = expectedParams11
expectedParams12 = statediff.Params{
WatchedStorageSlots: []common.Hash{
slot1StorageKey,
},
}
args13 = []sdtypes.WatchAddressArg{
{
Address: slot1StorageKeyHex,
CreatedAt: slot1CreatedAt,
},
{
Address: slot2StorageKeyHex,
CreatedAt: slot2CreatedAt,
},
}
startingParams13 = expectedParams12
expectedParams13 = statediff.Params{
WatchedStorageSlots: []common.Hash{},
}
args14 = []sdtypes.WatchAddressArg{
{
Address: slot1StorageKeyHex,
CreatedAt: slot1CreatedAt,
},
{
Address: slot2StorageKeyHex,
CreatedAt: slot2CreatedAt,
},
{
Address: slot3StorageKeyHex,
CreatedAt: slot3CreatedAt,
},
}
startingParams14 = expectedParams13
expectedParams14 = statediff.Params{
WatchedStorageSlots: []common.Hash{
slot1StorageKey,
slot2StorageKey,
slot3StorageKey,
},
}
args15 = []sdtypes.WatchAddressArg{
{
Address: slot4StorageKeyHex,
CreatedAt: slot4CreatedAt,
},
{
Address: slot2StorageKeyHex,
CreatedAt: slot2CreatedAt,
},
{
Address: slot3StorageKeyHex,
CreatedAt: slot3CreatedAt,
},
}
startingParams15 = expectedParams14
expectedParams15 = statediff.Params{
WatchedStorageSlots: []common.Hash{
slot4StorageKey,
slot2StorageKey,
slot3StorageKey,
},
}
args16 = []sdtypes.WatchAddressArg{}
startingParams16 = expectedParams15
expectedParams16 = statediff.Params{
WatchedStorageSlots: []common.Hash{},
}
args17 = []sdtypes.WatchAddressArg{}
startingParams17 = expectedParams15
expectedParams17 = statediff.Params{
WatchedStorageSlots: []common.Hash{},
}
args18 = []sdtypes.WatchAddressArg{}
startingParams18 = expectedParams17
expectedParams18 = statediff.Params{
WatchedStorageSlots: []common.Hash{},
}
)
tests := []struct {
@ -572,7 +419,7 @@ func testWatchAddressAPI(t *testing.T) {
// addresses tests
{
"testAddAddresses",
statediff.AddAddresses,
statediff.Add,
args1,
startingParams1,
expectedParams1,
@ -580,7 +427,7 @@ func testWatchAddressAPI(t *testing.T) {
},
{
"testAddAddressesSomeWatched",
statediff.AddAddresses,
statediff.Add,
args2,
startingParams2,
expectedParams2,
@ -588,7 +435,7 @@ func testWatchAddressAPI(t *testing.T) {
},
{
"testRemoveAddresses",
statediff.RemoveAddresses,
statediff.Remove,
args3,
startingParams3,
expectedParams3,
@ -596,7 +443,7 @@ func testWatchAddressAPI(t *testing.T) {
},
{
"testRemoveAddressesSomeWatched",
statediff.RemoveAddresses,
statediff.Remove,
args4,
startingParams4,
expectedParams4,
@ -604,7 +451,7 @@ func testWatchAddressAPI(t *testing.T) {
},
{
"testSetAddresses",
statediff.SetAddresses,
statediff.Set,
args5,
startingParams5,
expectedParams5,
@ -612,7 +459,7 @@ func testWatchAddressAPI(t *testing.T) {
},
{
"testSetAddressesSomeWatched",
statediff.SetAddresses,
statediff.Set,
args6,
startingParams6,
expectedParams6,
@ -620,7 +467,7 @@ func testWatchAddressAPI(t *testing.T) {
},
{
"testSetAddressesEmtpyArgs",
statediff.SetAddresses,
statediff.Set,
args7,
startingParams7,
expectedParams7,
@ -628,7 +475,7 @@ func testWatchAddressAPI(t *testing.T) {
},
{
"testClearAddresses",
statediff.ClearAddresses,
statediff.Clear,
args8,
startingParams8,
expectedParams8,
@ -636,95 +483,21 @@ func testWatchAddressAPI(t *testing.T) {
},
{
"testClearAddressesEmpty",
statediff.ClearAddresses,
statediff.Clear,
args9,
startingParams9,
expectedParams9,
nil,
},
// storage slots tests
{
"testAddStorageSlots",
statediff.AddStorageSlots,
args10,
startingParams10,
expectedParams10,
nil,
},
{
"testAddStorageSlotsSomeWatched",
statediff.AddStorageSlots,
args11,
startingParams11,
expectedParams11,
nil,
},
{
"testRemoveStorageSlots",
statediff.RemoveStorageSlots,
args12,
startingParams12,
expectedParams12,
nil,
},
{
"testRemoveStorageSlotsSomeWatched",
statediff.RemoveStorageSlots,
args13,
startingParams13,
expectedParams13,
nil,
},
{
"testSetStorageSlots",
statediff.SetStorageSlots,
args14,
startingParams14,
expectedParams14,
nil,
},
{
"testSetStorageSlotsSomeWatched",
statediff.SetStorageSlots,
args15,
startingParams15,
expectedParams15,
nil,
},
{
"testSetStorageSlotsEmtpyArgs",
statediff.SetStorageSlots,
args16,
startingParams16,
expectedParams16,
nil,
},
{
"testClearStorageSlots",
statediff.ClearStorageSlots,
args17,
startingParams17,
expectedParams17,
nil,
},
{
"testClearStorageSlotsEmpty",
statediff.ClearStorageSlots,
args18,
startingParams18,
expectedParams18,
nil,
},
// invalid args
{
"testInvalidOperation",
"WrongOp",
args18,
startingParams18,
args9,
startingParams9,
statediff.Params{},
fmt.Errorf("Unexpected operation WrongOp"),
fmt.Errorf("%s WrongOp", unexpectedOperation),
},
}

View File

@ -51,7 +51,6 @@ type Params struct {
IncludeTD bool
IncludeCode bool
WatchedAddresses []common.Address
WatchedStorageSlots []common.Hash
}
// ParamsWithMutex allows to lock the parameters while they are being updated | read from
@ -123,13 +122,8 @@ type accountWrapper struct {
type OperationType string
const (
AddAddresses OperationType = "AddAddresses"
RemoveAddresses OperationType = "RemoveAddresses"
SetAddresses OperationType = "SetAddresses"
ClearAddresses OperationType = "ClearAddresses"
AddStorageSlots OperationType = "AddStorageSlots"
RemoveStorageSlots OperationType = "RemoveStorageSlots"
SetStorageSlots OperationType = "SetStorageSlots"
ClearStorageSlots OperationType = "ClearStorageSlots"
Add OperationType = "Add"
Remove OperationType = "Remove"
Set OperationType = "Set"
Clear OperationType = "Clear"
)

View File

@ -77,26 +77,7 @@ type CodeSink func(CodeAndCodeHash) error
// WatchAddressArg is a arg type for WatchAddress API
type WatchAddressArg struct {
// Address represents common.Address | common.Hash
// Address represents common.Address
Address string
CreatedAt uint64
}
// WatchedAddressType for denoting watched: address | storage slot
type WatchedAddressType string
const (
WatchedAddress WatchedAddressType = "WatchedAddress"
WatchedStorageSlot WatchedAddressType = "WatchedStorageSlot"
)
func (n WatchedAddressType) Int() int {
switch n {
case WatchedAddress:
return 0
case WatchedStorageSlot:
return 1
default:
return -1
}
}