diff --git a/go.mod b/go.mod index 0f94c2611..3baf72200 100644 --- a/go.mod +++ b/go.mod @@ -68,6 +68,7 @@ require ( github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4 github.com/stretchr/testify v1.7.0 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 + github.com/thoas/go-funk v0.9.1 github.com/tklauser/go-sysconf v0.3.5 // indirect github.com/tyler-smith/go-bip39 v1.0.1-0.20181017060643-dbb3b84ba2ef golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a diff --git a/go.sum b/go.sum index 0097b96f7..76d8234e5 100644 --- a/go.sum +++ b/go.sum @@ -470,6 +470,8 @@ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5Cc github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= +github.com/thoas/go-funk v0.9.1 h1:O549iLZqPpTUQ10ykd26sZhzD+rmR5pWhuElrhbC20M= +github.com/thoas/go-funk v0.9.1/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q= github.com/tinylib/msgp v1.0.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= github.com/tklauser/go-sysconf v0.3.5 h1:uu3Xl4nkLzQfXNsWn15rPc/HQCJKObbt1dKJeWp3vU4= github.com/tklauser/go-sysconf v0.3.5/go.mod h1:MkWzOF4RMCshBAMXuhXJs64Rte09mITnppBXY/rYEFI= diff --git a/statediff/helpers.go b/statediff/helpers.go index 209e1d724..4d9b886e4 100644 --- a/statediff/helpers.go +++ b/statediff/helpers.go @@ -76,13 +76,14 @@ func findIntersection(a, b []string) []string { } } -// loadWatched is used to load watched addresses and storage slots to the in-memory write loop params from the db -func loadWatched(db *postgres.DB) error { +// loadWatchedAddresses is used to load watched addresses and storage slots to the in-memory write loop params from the db +func loadWatchedAddresses(db *postgres.DB) error { type Watched struct { Address string `db:"address"` Kind int `db:"kind"` } var watched []Watched + pgStr := "SELECT address, kind FROM eth.watched_addresses" err := db.Select(&watched, pgStr) if err != nil { @@ -109,103 +110,3 @@ func loadWatched(db *postgres.DB) error { return nil } - -// removeAddresses is used to remove given addresses from a list of addresses -func removeAddresses(addresses []common.Address, addressesToRemove []common.Address) []common.Address { - filteredAddresses := []common.Address{} - - for _, address := range addresses { - if idx := containsAddress(addressesToRemove, address); idx == -1 { - filteredAddresses = append(filteredAddresses, address) - } - } - - return filteredAddresses -} - -// removeAddresses is used to remove given storage slots from a list of storage slots -func removeStorageSlots(storageSlots []common.Hash, storageSlotsToRemove []common.Hash) []common.Hash { - filteredStorageSlots := []common.Hash{} - - for _, address := range storageSlots { - if idx := containsStorageSlot(storageSlotsToRemove, address); idx == -1 { - filteredStorageSlots = append(filteredStorageSlots, address) - } - } - - return filteredStorageSlots -} - -// containsAddress is used to check if an address is present in the provided list of addresses -// return the index if found else -1 -func containsAddress(addresses []common.Address, address common.Address) int { - for idx, addr := range addresses { - if addr == address { - return idx - } - } - return -1 -} - -// containsAddress is used to check if a storage slot is present in the provided list of storage slots -// return the index if found else -1 -func containsStorageSlot(storageSlots []common.Hash, storageSlot common.Hash) int { - for idx, slot := range storageSlots { - if slot == storageSlot { - return idx - } - } - return -1 -} - -// getAddresses is used to get the list of addresses from a list of WatchAddressArgs -func getAddresses(args []types.WatchAddressArg) []common.Address { - addresses := make([]common.Address, len(args)) - for idx, arg := range args { - addresses[idx] = common.HexToAddress(arg.Address) - } - - return addresses -} - -// getStorageSlots is used to get the list of storage slots from a list of WatchAddressArgs -func getStorageSlots(args []types.WatchAddressArg) []common.Hash { - storageSlots := make([]common.Hash, len(args)) - for idx, arg := range args { - storageSlots[idx] = common.HexToHash(arg.Address) - } - - return storageSlots -} - -// filterAddressArgs filters out the args having an address from a given list of addresses -func filterAddressArgs(args []types.WatchAddressArg, addressesToRemove []common.Address) ([]types.WatchAddressArg, []common.Address) { - filteredArgs := []types.WatchAddressArg{} - filteredAddresses := []common.Address{} - - for _, arg := range args { - address := common.HexToAddress(arg.Address) - if idx := containsAddress(addressesToRemove, address); idx == -1 { - filteredArgs = append(filteredArgs, arg) - filteredAddresses = append(filteredAddresses, address) - } - } - - return filteredArgs, filteredAddresses -} - -// filterStorageSlotArgs filters out the args having a storage slot from a given list of storage slots -func filterStorageSlotArgs(args []types.WatchAddressArg, storageSlotsToRemove []common.Hash) ([]types.WatchAddressArg, []common.Hash) { - filteredArgs := []types.WatchAddressArg{} - filteredStorageSlots := []common.Hash{} - - for _, arg := range args { - storageSlot := common.HexToHash(arg.Address) - if idx := containsStorageSlot(storageSlotsToRemove, storageSlot); idx == -1 { - filteredArgs = append(filteredArgs, arg) - filteredStorageSlots = append(filteredStorageSlots, storageSlot) - } - } - - return filteredArgs, filteredStorageSlots -} diff --git a/statediff/indexer/indexer.go b/statediff/indexer/indexer.go index 8ec34549b..5132f4942 100644 --- a/statediff/indexer/indexer.go +++ b/statediff/indexer/indexer.go @@ -61,10 +61,10 @@ type Indexer interface { ReportDBMetrics(delay time.Duration, quit <-chan bool) // Methods used by WatchAddress API/functionality. - InsertWatched(addresses []sdtypes.WatchAddressArg, currentBlock *big.Int, kind sdtypes.WatchedAddressType) error - RemoveWatched(addresses []sdtypes.WatchAddressArg, kind sdtypes.WatchedAddressType) error - SetWatched(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int, kind sdtypes.WatchedAddressType) error - ClearWatched(kind sdtypes.WatchedAddressType) error + InsertWatchedAddresses(addresses []sdtypes.WatchAddressArg, currentBlock *big.Int, kind sdtypes.WatchedAddressType) error + RemoveWatchedAddresses(addresses []sdtypes.WatchAddressArg, kind sdtypes.WatchedAddressType) error + SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int, kind sdtypes.WatchedAddressType) error + ClearWatchedAddresses(kind sdtypes.WatchedAddressType) error } // StateDiffIndexer satisfies the Indexer interface for ethereum statediff objects @@ -557,7 +557,7 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(tx *BlockTx, codeAndCodeHash sd } // InsertWatchedAddresses inserts the given addresses | storage slots in the database -func (sdi *StateDiffIndexer) InsertWatched(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int, kind sdtypes.WatchedAddressType) error { +func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int, kind sdtypes.WatchedAddressType) error { tx, err := sdi.dbWriter.db.Begin() if err != nil { return err @@ -581,7 +581,7 @@ func (sdi *StateDiffIndexer) InsertWatched(args []sdtypes.WatchAddressArg, curre } // RemoveWatchedAddresses removes the given addresses | storage slots from the database -func (sdi *StateDiffIndexer) RemoveWatched(args []sdtypes.WatchAddressArg, kind sdtypes.WatchedAddressType) error { +func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressArg, kind sdtypes.WatchedAddressType) error { tx, err := sdi.dbWriter.db.Begin() if err != nil { return err @@ -603,8 +603,8 @@ func (sdi *StateDiffIndexer) RemoveWatched(args []sdtypes.WatchAddressArg, kind return nil } -// SetWatched clears and inserts the given addresses | storage slots in the database -func (sdi *StateDiffIndexer) SetWatched(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int, kind sdtypes.WatchedAddressType) error { +// SetWatchedAddresses clears and inserts the given addresses | storage slots in the database +func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int, kind sdtypes.WatchedAddressType) error { tx, err := sdi.dbWriter.db.Begin() if err != nil { return err @@ -633,7 +633,7 @@ func (sdi *StateDiffIndexer) SetWatched(args []sdtypes.WatchAddressArg, currentB } // ClearWatchedAddresses clears all the addresses | storage slots from the database -func (sdi *StateDiffIndexer) ClearWatched(kind sdtypes.WatchedAddressType) error { +func (sdi *StateDiffIndexer) ClearWatchedAddresses(kind sdtypes.WatchedAddressType) error { _, err := sdi.dbWriter.db.Exec(`DELETE FROM eth.watched_addresses WHERE kind = $1`, kind.Int()) if err != nil { return fmt.Errorf("error clearing watched_addresses table: %v", err) diff --git a/statediff/service.go b/statediff/service.go index 75268596c..39e874d98 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -41,6 +41,7 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/trie" + "github.com/thoas/go-funk" ind "github.com/ethereum/go-ethereum/statediff/indexer" nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node" @@ -49,10 +50,11 @@ import ( ) const ( - chainEventChanSize = 20000 - genesisBlockNumber = 0 - defaultRetryLimit = 3 // default retry limit once deadlock is detected. - deadlockDetected = "deadlock detected" // 40P01 https://www.postgresql.org/docs/current/errcodes-appendix.html + chainEventChanSize = 20000 + genesisBlockNumber = 0 + defaultRetryLimit = 3 // default retry limit once deadlock is detected. + deadlockDetected = "deadlock detected" // 40P01 https://www.postgresql.org/docs/current/errcodes-appendix.html + typeAssertionFailed = "type assertion failed" ) var writeLoopParams = ParamsWithMutex{ @@ -211,7 +213,7 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params stack.RegisterLifecycle(sds) stack.RegisterAPIs(sds.APIs()) - err = loadWatched(db) + err = loadWatchedAddresses(db) if err != nil { return err } @@ -734,7 +736,7 @@ func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot commo return err } -// Performs one of foll. operations on the watched addresses | storage slots in writeLoopParams and the db: +// Performs one of following operations on the watched addresses | storage slots in writeLoopParams and the db: // AddAddresses | RemoveAddresses | SetAddresses | ClearAddresses // AddStorageSlots | RemoveStorageSlots | SetStorageSlots | ClearStorageSlots func (sds *Service) WatchAddress(operation OperationType, args []WatchAddressArg) error { @@ -747,93 +749,155 @@ func (sds *Service) WatchAddress(operation OperationType, args []WatchAddressArg switch operation { case AddAddresses: - addressesToRemove := []common.Address{} - for _, arg := range args { - // Check if address is already being watched - // Throw a warning and continue if found - address := common.HexToAddress(arg.Address) - if containsAddress(writeLoopParams.WatchedAddresses, address) != -1 { + // filter out args having an already watched address with a warning + filteredArgs, ok := funk.Filter(args, func(arg WatchAddressArg) bool { + if funk.Contains(writeLoopParams.WatchedAddresses, common.HexToAddress(arg.Address)) { log.Warn("Address already being watched", "address", arg.Address) - addressesToRemove = append(addressesToRemove, address) - continue + return false } + return true + }).([]WatchAddressArg) + if !ok { + return fmt.Errorf("AddAddresses: filtered args %s", typeAssertionFailed) } - // remove already watched addresses - filteredArgs, filteredAddresses := filterAddressArgs(args, addressesToRemove) + // get addresses from the filtered args + filteredAddresses, ok := funk.Map(filteredArgs, func(arg WatchAddressArg) common.Address { + return common.HexToAddress(arg.Address) + }).([]common.Address) + if !ok { + return fmt.Errorf("AddAddresses: filtered addresses %s", typeAssertionFailed) + } - err := sds.indexer.InsertWatched(filteredArgs, currentBlockNumber, WatchedAddress) + // update the db + err := sds.indexer.InsertWatchedAddresses(filteredArgs, currentBlockNumber, WatchedAddress) if err != nil { return err } + // update in-memory params writeLoopParams.WatchedAddresses = append(writeLoopParams.WatchedAddresses, filteredAddresses...) case RemoveAddresses: - addresses := getAddresses(args) + // get addresses from args + argAddresses, ok := funk.Map(args, func(arg WatchAddressArg) common.Address { + return common.HexToAddress(arg.Address) + }).([]common.Address) + if !ok { + return fmt.Errorf("RemoveAddresses: mapped addresses %s", typeAssertionFailed) + } - err := sds.indexer.RemoveWatched(args, WatchedAddress) + // remove the provided addresses from currently watched addresses + addresses, ok := funk.Subtract(writeLoopParams.WatchedAddresses, argAddresses).([]common.Address) + if !ok { + return fmt.Errorf("RemoveAddresses: filtered addresses %s", typeAssertionFailed) + } + + // update the db + err := sds.indexer.RemoveWatchedAddresses(args, WatchedAddress) if err != nil { return err } - writeLoopParams.WatchedAddresses = removeAddresses(writeLoopParams.WatchedAddresses, addresses) - case SetAddresses: - err := sds.indexer.SetWatched(args, currentBlockNumber, WatchedAddress) - if err != nil { - return err - } - - addresses := getAddresses(args) + // update in-memory params writeLoopParams.WatchedAddresses = addresses - case ClearAddresses: - err := sds.indexer.ClearWatched(WatchedAddress) + case SetAddresses: + // get addresses from args + argAddresses, ok := funk.Map(args, func(arg WatchAddressArg) common.Address { + return common.HexToAddress(arg.Address) + }).([]common.Address) + if !ok { + return fmt.Errorf("SetAddresses: mapped addresses %s", typeAssertionFailed) + } + + // update the db + err := sds.indexer.SetWatchedAddresses(args, currentBlockNumber, WatchedAddress) if err != nil { return err } + // update in-memory params + writeLoopParams.WatchedAddresses = argAddresses + case ClearAddresses: + // update the db + err := sds.indexer.ClearWatchedAddresses(WatchedAddress) + if err != nil { + return err + } + + // update in-memory params writeLoopParams.WatchedAddresses = nil case AddStorageSlots: - storageSlotsToRemove := []common.Hash{} - for _, arg := range args { - // Check if address is already being watched - // Throw a warning and continue if found - storageSlot := common.HexToHash(arg.Address) - if containsStorageSlot(writeLoopParams.WatchedStorageSlots, storageSlot) != -1 { - log.Warn("StorageSlot already being watched", "storage slot", arg.Address) - storageSlotsToRemove = append(storageSlotsToRemove, storageSlot) - continue + // filter out args having an already watched storage slot with a warning + filteredArgs, ok := funk.Filter(args, func(arg WatchAddressArg) bool { + if funk.Contains(writeLoopParams.WatchedStorageSlots, common.HexToHash(arg.Address)) { + log.Warn("StorageSlot already being watched", "address", arg.Address) + return false } + return true + }).([]WatchAddressArg) + if !ok { + return fmt.Errorf("AddStorageSlots: filtered args %s", typeAssertionFailed) } - // remove already watched addresses - filteredArgs, filteredStorageSlots := filterStorageSlotArgs(args, storageSlotsToRemove) + // get storage slots from the filtered args + filteredStorageSlots, ok := funk.Map(filteredArgs, func(arg WatchAddressArg) common.Hash { + return common.HexToHash(arg.Address) + }).([]common.Hash) + if !ok { + return fmt.Errorf("AddStorageSlots: filtered storage slots %s", typeAssertionFailed) + } - err := sds.indexer.InsertWatched(filteredArgs, currentBlockNumber, WatchedStorageSlot) + // update the db + err := sds.indexer.InsertWatchedAddresses(filteredArgs, currentBlockNumber, WatchedStorageSlot) if err != nil { return err } + // update in-memory params writeLoopParams.WatchedStorageSlots = append(writeLoopParams.WatchedStorageSlots, filteredStorageSlots...) case RemoveStorageSlots: - storageSlots := getStorageSlots(args) + // get storage slots from args + argStorageSlots, ok := funk.Map(args, func(arg WatchAddressArg) common.Hash { + return common.HexToHash(arg.Address) + }).([]common.Hash) + if !ok { + return fmt.Errorf("RemoveStorageSlots: mapped storage slots %s", typeAssertionFailed) + } - err := sds.indexer.RemoveWatched(args, WatchedStorageSlot) + // remove the provided storage slots from currently watched storage slots + storageSlots, ok := funk.Subtract(writeLoopParams.WatchedStorageSlots, argStorageSlots).([]common.Hash) + if !ok { + return fmt.Errorf("RemoveStorageSlots: filtered storage slots %s", typeAssertionFailed) + } + + // update the db + err := sds.indexer.RemoveWatchedAddresses(args, WatchedStorageSlot) if err != nil { return err } - writeLoopParams.WatchedStorageSlots = removeStorageSlots(writeLoopParams.WatchedStorageSlots, storageSlots) - case SetStorageSlots: - err := sds.indexer.SetWatched(args, currentBlockNumber, WatchedStorageSlot) - if err != nil { - return err - } - - storageSlots := getStorageSlots(args) + // update in-memory params writeLoopParams.WatchedStorageSlots = storageSlots + case SetStorageSlots: + // get storage slots from args + argStorageSlots, ok := funk.Map(args, func(arg WatchAddressArg) common.Hash { + return common.HexToHash(arg.Address) + }).([]common.Hash) + if !ok { + return fmt.Errorf("SetStorageSlots: mapped storage slots %s", typeAssertionFailed) + } + + // update the db + err := sds.indexer.SetWatchedAddresses(args, currentBlockNumber, WatchedStorageSlot) + if err != nil { + return err + } + + // update in-memory params + writeLoopParams.WatchedStorageSlots = argStorageSlots case ClearStorageSlots: - err := sds.indexer.ClearWatched(WatchedStorageSlot) + err := sds.indexer.ClearWatchedAddresses(WatchedStorageSlot) if err != nil { return err }