diff --git a/cmd/composeAndExecute.go b/cmd/composeAndExecute.go index 05549847..98ccd0ff 100644 --- a/cmd/composeAndExecute.go +++ b/cmd/composeAndExecute.go @@ -188,7 +188,7 @@ func composeAndExecute() { stateDiffStreamer := streamer.NewStateDiffStreamer(rpcClient) payloadChan := make(chan statediff.Payload) storageFetcher := fetcher.NewGethRpcStorageFetcher(&stateDiffStreamer, payloadChan) - sw := watcher.NewGethStorageWatcher(storageFetcher, &db) + sw := watcher.NewStorageWatcher(storageFetcher, &db) sw.AddTransformers(ethStorageInitializers) wg.Add(1) go watchEthStorage(&sw, &wg) @@ -196,7 +196,7 @@ func composeAndExecute() { log.Debug("fetching storage diffs from csv") tailer := fs.FileTailer{Path: storageDiffsPath} storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer) - sw := watcher.NewCsvStorageWatcher(storageFetcher, &db) + sw := watcher.NewStorageWatcher(storageFetcher, &db) sw.AddTransformers(ethStorageInitializers) wg.Add(1) go watchEthStorage(&sw, &wg) diff --git a/cmd/execute.go b/cmd/execute.go index 6c82fd01..ae8230f2 100644 --- a/cmd/execute.go +++ b/cmd/execute.go @@ -132,7 +132,7 @@ func execute() { stateDiffStreamer := streamer.NewStateDiffStreamer(rpcClient) payloadChan := make(chan statediff.Payload) storageFetcher := fetcher.NewGethRpcStorageFetcher(&stateDiffStreamer, payloadChan) - sw := watcher.NewGethStorageWatcher(storageFetcher, &db) + sw := watcher.NewStorageWatcher(storageFetcher, &db) sw.AddTransformers(ethStorageInitializers) wg.Add(1) go watchEthStorage(&sw, &wg) @@ -140,7 +140,7 @@ func execute() { log.Debug("fetching storage diffs from csv") tailer := fs.FileTailer{Path: storageDiffsPath} storageFetcher := fetcher.NewCsvTailStorageFetcher(tailer) - sw := watcher.NewCsvStorageWatcher(storageFetcher, &db) + sw := watcher.NewStorageWatcher(storageFetcher, &db) sw.AddTransformers(ethStorageInitializers) wg.Add(1) go watchEthStorage(&sw, &wg) diff --git a/go.mod b/go.mod index 4afac397..64c66ace 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,6 @@ require ( github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff // indirect github.com/go-sql-driver/mysql v1.4.1 // indirect github.com/golang/protobuf v1.3.2 // indirect - github.com/google/uuid v1.0.0 // indirect github.com/howeyc/fsnotify v0.9.0 // indirect github.com/hpcloud/tail v1.0.0 github.com/huin/goupnp v1.0.0 // indirect diff --git a/libraries/shared/factories/storage/transformer.go b/libraries/shared/factories/storage/transformer.go index 240924bd..acd78b9e 100644 --- a/libraries/shared/factories/storage/transformer.go +++ b/libraries/shared/factories/storage/transformer.go @@ -18,7 +18,7 @@ package storage import ( "github.com/ethereum/go-ethereum/common" - + "github.com/vulcanize/vulcanizedb/libraries/shared/repository" "github.com/vulcanize/vulcanizedb/libraries/shared/storage" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" @@ -26,9 +26,9 @@ import ( ) type Transformer struct { - Address common.Address - Mappings storage.Mappings - Repository Repository + HashedAddress common.Hash + Mappings storage.Mappings + Repository repository.StorageRepository } func (transformer Transformer) NewTransformer(db *postgres.DB) transformer.StorageTransformer { @@ -37,8 +37,8 @@ func (transformer Transformer) NewTransformer(db *postgres.DB) transformer.Stora return transformer } -func (transformer Transformer) ContractAddress() common.Address { - return transformer.Address +func (transformer Transformer) KeccakContractAddress() common.Hash { + return transformer.HashedAddress } func (transformer Transformer) Execute(diff utils.StorageDiff) error { diff --git a/libraries/shared/factories/storage/transformer_test.go b/libraries/shared/factories/storage/transformer_test.go index ab1c4188..b67ad02d 100644 --- a/libraries/shared/factories/storage/transformer_test.go +++ b/libraries/shared/factories/storage/transformer_test.go @@ -18,9 +18,9 @@ package storage_test import ( "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/libraries/shared/factories/storage" "github.com/vulcanize/vulcanizedb/libraries/shared/mocks" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" @@ -38,17 +38,17 @@ var _ = Describe("Storage transformer", func() { mappings = &mocks.MockMappings{} repository = &mocks.MockStorageRepository{} t = storage.Transformer{ - Address: common.Address{}, - Mappings: mappings, - Repository: repository, + HashedAddress: common.Hash{}, + Mappings: mappings, + Repository: repository, } }) It("returns the contract address being watched", func() { - fakeAddress := common.HexToAddress("0x12345") - t.Address = fakeAddress + fakeAddress := common.BytesToHash(crypto.Keccak256(common.FromHex("0x12345"))) + t.HashedAddress = fakeAddress - Expect(t.ContractAddress()).To(Equal(fakeAddress)) + Expect(t.KeccakContractAddress()).To(Equal(fakeAddress)) }) It("looks up metadata for storage key", func() { @@ -73,11 +73,11 @@ var _ = Describe("Storage transformer", func() { fakeBlockNumber := 123 fakeBlockHash := "0x67890" fakeRow := utils.StorageDiff{ - Contract: common.Address{}, - BlockHash: common.HexToHash(fakeBlockHash), - BlockHeight: fakeBlockNumber, - StorageKey: common.Hash{}, - StorageValue: rawValue.Hash(), + KeccakOfContractAddress: common.Hash{}, + BlockHash: common.HexToHash(fakeBlockHash), + BlockHeight: fakeBlockNumber, + StorageKey: common.Hash{}, + StorageValue: rawValue.Hash(), } err := t.Execute(fakeRow) @@ -121,11 +121,11 @@ var _ = Describe("Storage transformer", func() { It("passes the decoded data items to the repository", func() { mappings.Metadata = fakeMetadata fakeRow := utils.StorageDiff{ - Contract: common.Address{}, - BlockHash: common.HexToHash(fakeBlockHash), - BlockHeight: fakeBlockNumber, - StorageKey: common.Hash{}, - StorageValue: rawValue.Hash(), + KeccakOfContractAddress: common.Hash{}, + BlockHash: common.HexToHash(fakeBlockHash), + BlockHeight: fakeBlockNumber, + StorageKey: common.Hash{}, + StorageValue: rawValue.Hash(), } err := t.Execute(fakeRow) diff --git a/libraries/shared/fetcher/csv_tail_storage_fetcher.go b/libraries/shared/fetcher/csv_tail_storage_fetcher.go index 1528a1c7..01dc3caa 100644 --- a/libraries/shared/fetcher/csv_tail_storage_fetcher.go +++ b/libraries/shared/fetcher/csv_tail_storage_fetcher.go @@ -17,12 +17,10 @@ package fetcher import ( - "strings" - - log "github.com/sirupsen/logrus" - + "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/pkg/fs" + "strings" ) type CsvTailStorageFetcher struct { @@ -38,9 +36,9 @@ func (storageFetcher CsvTailStorageFetcher) FetchStorageDiffs(out chan<- utils.S if tailErr != nil { errs <- tailErr } - log.Debug("fetching storage diffs...") + logrus.Debug("fetching storage diffs...") for line := range t.Lines { - diff, parseErr := utils.FromStrings(strings.Split(line.Text, ",")) + diff, parseErr := utils.FromParityCsvRow(strings.Split(line.Text, ",")) if parseErr != nil { errs <- parseErr } else { diff --git a/libraries/shared/fetcher/csv_tail_storage_fetcher_test.go b/libraries/shared/fetcher/csv_tail_storage_fetcher_test.go index 22a50bba..eaca0eb3 100644 --- a/libraries/shared/fetcher/csv_tail_storage_fetcher_test.go +++ b/libraries/shared/fetcher/csv_tail_storage_fetcher_test.go @@ -18,17 +18,15 @@ package fetcher_test import ( "fmt" - "strings" - "time" - "github.com/ethereum/go-ethereum/common" "github.com/hpcloud/tail" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/pkg/fakes" + "strings" + "time" ) var _ = Describe("Csv Tail Storage Fetcher", func() { @@ -61,7 +59,7 @@ var _ = Describe("Csv Tail Storage Fetcher", func() { go storageFetcher.FetchStorageDiffs(diffsChannel, errorsChannel) mockTailer.Lines <- line - expectedRow, err := utils.FromStrings(strings.Split(line.Text, ",")) + expectedRow, err := utils.FromParityCsvRow(strings.Split(line.Text, ",")) Expect(err).NotTo(HaveOccurred()) Expect(<-diffsChannel).To(Equal(expectedRow)) close(done) diff --git a/libraries/shared/fetcher/geth_rpc_storage_fetcher.go b/libraries/shared/fetcher/geth_rpc_storage_fetcher.go index 58a77c68..de264328 100644 --- a/libraries/shared/fetcher/geth_rpc_storage_fetcher.go +++ b/libraries/shared/fetcher/geth_rpc_storage_fetcher.go @@ -16,7 +16,6 @@ package fetcher import ( "fmt" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/statediff" "github.com/sirupsen/logrus" @@ -61,13 +60,7 @@ func (fetcher GethRpcStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageD logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account", len(account.Storage))) for _, storage := range account.Storage { logrus.Trace("adding storage diff to out channel") - out <- utils.StorageDiff{ - KeccakOfContractAddress: common.BytesToHash(account.Key), - BlockHash: stateDiff.BlockHash, - BlockHeight: int(stateDiff.BlockNumber.Int64()), - StorageKey: common.BytesToHash(storage.Key), - StorageValue: common.BytesToHash(storage.Value), - } + out <- utils.FromGethStateDiff(account, stateDiff, storage) } } } diff --git a/libraries/shared/mocks/storage_transformer.go b/libraries/shared/mocks/storage_transformer.go index 18599847..b1c3cba2 100644 --- a/libraries/shared/mocks/storage_transformer.go +++ b/libraries/shared/mocks/storage_transformer.go @@ -18,16 +18,15 @@ package mocks import ( "github.com/ethereum/go-ethereum/common" - "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) type MockStorageTransformer struct { - Address common.Address - ExecuteErr error - PassedDiff utils.StorageDiff + KeccakOfAddress common.Hash + ExecuteErr error + PassedDiff utils.StorageDiff } func (transformer *MockStorageTransformer) Execute(diff utils.StorageDiff) error { @@ -35,8 +34,8 @@ func (transformer *MockStorageTransformer) Execute(diff utils.StorageDiff) error return transformer.ExecuteErr } -func (transformer *MockStorageTransformer) ContractAddress() common.Address { - return transformer.Address +func (transformer *MockStorageTransformer) KeccakContractAddress() common.Hash { + return transformer.KeccakOfAddress } func (transformer *MockStorageTransformer) FakeTransformerInitializer(db *postgres.DB) transformer.StorageTransformer { diff --git a/libraries/shared/storage/storage_queue.go b/libraries/shared/storage/storage_queue.go index a91b9e94..4af0c476 100644 --- a/libraries/shared/storage/storage_queue.go +++ b/libraries/shared/storage/storage_queue.go @@ -38,7 +38,7 @@ func NewStorageQueue(db *postgres.DB) StorageQueue { func (queue StorageQueue) Add(diff utils.StorageDiff) error { _, err := queue.db.Exec(`INSERT INTO public.queued_storage (contract, block_hash, block_height, storage_key, storage_value) VALUES - ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING`, diff.Contract.Bytes(), diff.BlockHash.Bytes(), + ($1, $2, $3, $4, $5) ON CONFLICT DO NOTHING`, diff.KeccakOfContractAddress.Bytes(), diff.BlockHash.Bytes(), diff.BlockHeight, diff.StorageKey.Bytes(), diff.StorageValue.Bytes()) return err } diff --git a/libraries/shared/storage/storage_queue_test.go b/libraries/shared/storage/storage_queue_test.go index 6da1af34..693f31d0 100644 --- a/libraries/shared/storage/storage_queue_test.go +++ b/libraries/shared/storage/storage_queue_test.go @@ -18,9 +18,9 @@ package storage_test import ( "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/libraries/shared/storage" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" @@ -35,12 +35,14 @@ var _ = Describe("Storage queue", func() { ) BeforeEach(func() { + fakeAddr := common.FromHex("0x123456") + hashedFakeAddr := crypto.Keccak256(fakeAddr) diff = utils.StorageDiff{ - Contract: common.HexToAddress("0x123456"), - BlockHash: common.HexToHash("0x678901"), - BlockHeight: 987, - StorageKey: common.HexToHash("0x654321"), - StorageValue: common.HexToHash("0x198765"), + KeccakOfContractAddress: common.BytesToHash(hashedFakeAddr), + BlockHash: common.HexToHash("0x678901"), + BlockHeight: 987, + StorageKey: common.HexToHash("0x654321"), + StorageValue: common.HexToHash("0x198765"), } db = test_config.NewTestDB(test_config.NewTestNode()) test_config.CleanTestDB(db) @@ -81,12 +83,14 @@ var _ = Describe("Storage queue", func() { }) It("gets all storage diffs from db", func() { + fakeAddr := common.FromHex("0x234567") + hashedFakeAddr := crypto.Keccak256(fakeAddr) diffTwo := utils.StorageDiff{ - Contract: common.HexToAddress("0x123456"), - BlockHash: common.HexToHash("0x678902"), - BlockHeight: 988, - StorageKey: common.HexToHash("0x654322"), - StorageValue: common.HexToHash("0x198766"), + KeccakOfContractAddress: common.BytesToHash(hashedFakeAddr), + BlockHash: common.HexToHash("0x678902"), + BlockHeight: 988, + StorageKey: common.HexToHash("0x654322"), + StorageValue: common.HexToHash("0x198766"), } addErr := queue.Add(diffTwo) Expect(addErr).NotTo(HaveOccurred()) @@ -97,13 +101,13 @@ var _ = Describe("Storage queue", func() { Expect(len(diffs)).To(Equal(2)) Expect(diffs[0]).NotTo(Equal(diffs[1])) Expect(diffs[0].Id).NotTo(BeZero()) - Expect(diffs[0].Contract).To(Or(Equal(diff.Contract), Equal(diffTwo.Contract))) + Expect(diffs[0].KeccakOfContractAddress).To(Or(Equal(diff.KeccakOfContractAddress), Equal(diffTwo.KeccakOfContractAddress))) Expect(diffs[0].BlockHash).To(Or(Equal(diff.BlockHash), Equal(diffTwo.BlockHash))) Expect(diffs[0].BlockHeight).To(Or(Equal(diff.BlockHeight), Equal(diffTwo.BlockHeight))) Expect(diffs[0].StorageKey).To(Or(Equal(diff.StorageKey), Equal(diffTwo.StorageKey))) Expect(diffs[0].StorageValue).To(Or(Equal(diff.StorageValue), Equal(diffTwo.StorageValue))) Expect(diffs[1].Id).NotTo(BeZero()) - Expect(diffs[1].Contract).To(Or(Equal(diff.Contract), Equal(diffTwo.Contract))) + Expect(diffs[1].KeccakOfContractAddress).To(Or(Equal(diff.KeccakOfContractAddress), Equal(diffTwo.KeccakOfContractAddress))) Expect(diffs[1].BlockHash).To(Or(Equal(diff.BlockHash), Equal(diffTwo.BlockHash))) Expect(diffs[1].BlockHeight).To(Or(Equal(diff.BlockHeight), Equal(diffTwo.BlockHeight))) Expect(diffs[1].StorageKey).To(Or(Equal(diff.StorageKey), Equal(diffTwo.StorageKey))) diff --git a/libraries/shared/storage/utils/diff.go b/libraries/shared/storage/utils/diff.go index 7dddfb4a..720ef197 100644 --- a/libraries/shared/storage/utils/diff.go +++ b/libraries/shared/storage/utils/diff.go @@ -17,24 +17,24 @@ package utils import ( - "strconv" - "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/statediff" + "strconv" ) const ExpectedRowLength = 5 type StorageDiff struct { Id int - Contract common.Address - KeccakOfContractAddress common.Hash + KeccakOfContractAddress common.Hash `db:"contract"` BlockHash common.Hash `db:"block_hash"` BlockHeight int `db:"block_height"` StorageKey common.Hash `db:"storage_key"` StorageValue common.Hash `db:"storage_value"` } -func FromStrings(csvRow []string) (StorageDiff, error) { +func FromParityCsvRow(csvRow []string) (StorageDiff, error) { if len(csvRow) != ExpectedRowLength { return StorageDiff{}, ErrRowMalformed{Length: len(csvRow)} } @@ -42,11 +42,22 @@ func FromStrings(csvRow []string) (StorageDiff, error) { if err != nil { return StorageDiff{}, err } + hashedAddr := crypto.Keccak256(common.FromHex(csvRow[0])) return StorageDiff{ - Contract: common.HexToAddress(csvRow[0]), - BlockHash: common.HexToHash(csvRow[1]), - BlockHeight: height, - StorageKey: common.HexToHash(csvRow[3]), - StorageValue: common.HexToHash(csvRow[4]), + KeccakOfContractAddress: common.BytesToHash(hashedAddr), + BlockHash: common.HexToHash(csvRow[1]), + BlockHeight: height, + StorageKey: common.HexToHash(csvRow[3]), + StorageValue: common.HexToHash(csvRow[4]), }, nil } + +func FromGethStateDiff(account statediff.AccountDiff, stateDiff *statediff.StateDiff, storage statediff.StorageDiff) StorageDiff { + return StorageDiff{ + KeccakOfContractAddress: common.BytesToHash(account.Key), + BlockHash: stateDiff.BlockHash, + BlockHeight: int(stateDiff.BlockNumber.Int64()), + StorageKey: common.BytesToHash(storage.Key), + StorageValue: common.BytesToHash(storage.Value), + } +} diff --git a/libraries/shared/storage/utils/diff_test.go b/libraries/shared/storage/utils/diff_test.go index ffd49fbf..bcb0f49c 100644 --- a/libraries/shared/storage/utils/diff_test.go +++ b/libraries/shared/storage/utils/diff_test.go @@ -18,41 +18,74 @@ package utils_test import ( "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/statediff" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" + "github.com/vulcanize/vulcanizedb/pkg/fakes" + "math/big" + "math/rand" ) var _ = Describe("Storage row parsing", func() { - It("converts an array of strings to a row struct", func() { - contract := "0x123" - blockHash := "0x456" - blockHeight := "789" - storageKey := "0x987" - storageValue := "0x654" - data := []string{contract, blockHash, blockHeight, storageKey, storageValue} + Describe("FromParityCsvRow", func() { + It("converts an array of strings to a row struct", func() { + contract := "0x123" + blockHash := "0x456" + blockHeight := "789" + storageKey := "0x987" + storageValue := "0x654" + data := []string{contract, blockHash, blockHeight, storageKey, storageValue} - result, err := utils.FromStrings(data) + result, err := utils.FromParityCsvRow(data) - Expect(err).NotTo(HaveOccurred()) - Expect(result.Contract).To(Equal(common.HexToAddress(contract))) - Expect(result.BlockHash).To(Equal(common.HexToHash(blockHash))) - Expect(result.BlockHeight).To(Equal(789)) - Expect(result.StorageKey).To(Equal(common.HexToHash(storageKey))) - Expect(result.StorageValue).To(Equal(common.HexToHash(storageValue))) + Expect(err).NotTo(HaveOccurred()) + expectedKeccakOfContractAddress := common.BytesToHash(crypto.Keccak256(common.FromHex(contract))) + Expect(result.KeccakOfContractAddress).To(Equal(expectedKeccakOfContractAddress)) + Expect(result.BlockHash).To(Equal(common.HexToHash(blockHash))) + Expect(result.BlockHeight).To(Equal(789)) + Expect(result.StorageKey).To(Equal(common.HexToHash(storageKey))) + Expect(result.StorageValue).To(Equal(common.HexToHash(storageValue))) + }) + + It("returns an error if row is missing data", func() { + _, err := utils.FromParityCsvRow([]string{"0x123"}) + + Expect(err).To(HaveOccurred()) + Expect(err).To(MatchError(utils.ErrRowMalformed{Length: 1})) + }) + + It("returns error if block height malformed", func() { + _, err := utils.FromParityCsvRow([]string{"", "", "", "", ""}) + + Expect(err).To(HaveOccurred()) + }) }) - It("returns an error if row is missing data", func() { - _, err := utils.FromStrings([]string{"0x123"}) + Describe("FromGethStateDiff", func() { + It("adds relevant fields to diff", func() { + accountDiff := statediff.AccountDiff{Key: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}} + stateDiff := &statediff.StateDiff{ + BlockNumber: big.NewInt(rand.Int63()), + BlockHash: fakes.FakeHash, + } + storageDiff := statediff.StorageDiff{ + Key: []byte{0, 9, 8, 7, 6, 5, 4, 3, 2, 1}, + Value: []byte{1, 2, 3, 4, 5, 0, 9, 8, 7, 6}, + } - Expect(err).To(HaveOccurred()) - Expect(err).To(MatchError(utils.ErrRowMalformed{Length: 1})) - }) + result := utils.FromGethStateDiff(accountDiff, stateDiff, storageDiff) - It("returns error if block height malformed", func() { - _, err := utils.FromStrings([]string{"", "", "", "", ""}) - - Expect(err).To(HaveOccurred()) + expectedAddress := common.BytesToHash(accountDiff.Key) + Expect(result.KeccakOfContractAddress).To(Equal(expectedAddress)) + Expect(result.BlockHash).To(Equal(fakes.FakeHash)) + expectedBlockHeight := int(stateDiff.BlockNumber.Int64()) + Expect(result.BlockHeight).To(Equal(expectedBlockHeight)) + expectedStorageKey := common.BytesToHash(storageDiff.Key) + Expect(result.StorageKey).To(Equal(expectedStorageKey)) + expectedStorageValue := common.BytesToHash(storageDiff.Value) + Expect(result.StorageValue).To(Equal(expectedStorageValue)) + }) }) }) diff --git a/libraries/shared/transformer/storage_transformer.go b/libraries/shared/transformer/storage_transformer.go index 96b3f35c..698ef841 100644 --- a/libraries/shared/transformer/storage_transformer.go +++ b/libraries/shared/transformer/storage_transformer.go @@ -18,14 +18,13 @@ package transformer import ( "github.com/ethereum/go-ethereum/common" - "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" ) type StorageTransformer interface { Execute(diff utils.StorageDiff) error - ContractAddress() common.Address + KeccakContractAddress() common.Hash } type StorageTransformerInitializer func(db *postgres.DB) StorageTransformer diff --git a/libraries/shared/watcher/csv_storage_watcher.go b/libraries/shared/watcher/csv_storage_watcher.go deleted file mode 100644 index c85f19ad..00000000 --- a/libraries/shared/watcher/csv_storage_watcher.go +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright 2018 Vulcanize -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package watcher - -import ( - "github.com/ethereum/go-ethereum/common" - "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" - "github.com/vulcanize/vulcanizedb/libraries/shared/storage" - "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" - "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" - "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" -) - -type CsvStorageWatcher struct { - StorageWatcher -} - -func NewCsvStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) CsvStorageWatcher { - queue := storage.NewStorageQueue(db) - transformers := make(map[common.Address]transformer.StorageTransformer) - storageWatcher := StorageWatcher{ - db: db, - StorageFetcher: fetcher, - Queue: queue, - Transformers: transformers, - } - storageWatcher.transformerGetter = storageWatcher.getCsvTransformer - return CsvStorageWatcher{StorageWatcher: storageWatcher} -} - -func (storageWatcher StorageWatcher) getCsvTransformer(diff utils.StorageDiff) (transformer.StorageTransformer, bool) { - storageTransformer, ok := storageWatcher.Transformers[diff.Contract] - return storageTransformer, ok -} diff --git a/libraries/shared/watcher/geth_storage_watcher.go b/libraries/shared/watcher/geth_storage_watcher.go deleted file mode 100644 index d4138726..00000000 --- a/libraries/shared/watcher/geth_storage_watcher.go +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright 2018 Vulcanize -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package watcher - -import ( - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/crypto" - "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" - "github.com/vulcanize/vulcanizedb/libraries/shared/storage" - "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" - "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" - "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" -) - -type GethStorageWatcher struct { - StorageWatcher -} - -func NewGethStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) GethStorageWatcher { - queue := storage.NewStorageQueue(db) - transformers := make(map[common.Address]transformer.StorageTransformer) - keccakAddressTransformers := make(map[common.Hash]transformer.StorageTransformer) - storageWatcher := StorageWatcher{ - db: db, - StorageFetcher: fetcher, - Queue: queue, - Transformers: transformers, - KeccakAddressTransformers: keccakAddressTransformers, - } - storageWatcher.transformerGetter = storageWatcher.getTransformerForGethWatcher - return GethStorageWatcher{StorageWatcher: storageWatcher} -} - -func (storageWatcher StorageWatcher) getTransformerForGethWatcher(diff utils.StorageDiff) (transformer.StorageTransformer, bool) { - keccakOfAddress := diff.KeccakOfContractAddress - storageTransformer, ok := storageWatcher.KeccakAddressTransformers[keccakOfAddress] - if ok { - return storageTransformer, ok - } else { - for address, transformer := range storageWatcher.Transformers { - keccakOfTransformerAddress := common.BytesToHash(crypto.Keccak256(address[:])) - if keccakOfTransformerAddress == keccakOfAddress { - storageWatcher.KeccakAddressTransformers[keccakOfAddress] = transformer - return transformer, true - } - } - } - - return nil, false -} diff --git a/libraries/shared/watcher/geth_storage_watcher_test.go b/libraries/shared/watcher/geth_storage_watcher_test.go deleted file mode 100644 index cb46ac8e..00000000 --- a/libraries/shared/watcher/geth_storage_watcher_test.go +++ /dev/null @@ -1,269 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package watcher_test - -import ( - "github.com/ethereum/go-ethereum/crypto" - "io/ioutil" - "os" - "time" - - "github.com/ethereum/go-ethereum/common" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - "github.com/sirupsen/logrus" - - "github.com/vulcanize/vulcanizedb/libraries/shared/mocks" - "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" - "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" - "github.com/vulcanize/vulcanizedb/libraries/shared/watcher" - "github.com/vulcanize/vulcanizedb/pkg/fakes" - "github.com/vulcanize/vulcanizedb/test_config" -) - -var _ = Describe("Geth Storage Watcher", func() { - It("adds transformers", func() { - fakeAddress := common.HexToAddress("0x12345") - fakeTransformer := &mocks.MockStorageTransformer{Address: fakeAddress} - w := watcher.NewGethStorageWatcher(mocks.NewMockStorageFetcher(), test_config.NewTestDB(test_config.NewTestNode())) - - w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer}) - - Expect(w.Transformers[fakeAddress]).To(Equal(fakeTransformer)) - }) - - Describe("executing watcher", func() { - var ( - errs chan error - mockFetcher *mocks.MockStorageFetcher - mockQueue *mocks.MockStorageQueue - mockTransformer *mocks.MockStorageTransformer - gethDiff utils.StorageDiff - diffs chan utils.StorageDiff - storageWatcher watcher.GethStorageWatcher - address common.Address - keccakOfAddress common.Hash - ) - - BeforeEach(func() { - errs = make(chan error) - diffs = make(chan utils.StorageDiff) - address = common.HexToAddress("0x0123456789abcdef") - keccakOfAddress = common.BytesToHash(crypto.Keccak256(address[:])) - mockFetcher = mocks.NewMockStorageFetcher() - mockQueue = &mocks.MockStorageQueue{} - mockTransformer = &mocks.MockStorageTransformer{Address: address} - gethDiff = utils.StorageDiff{ - Id: 1338, - KeccakOfContractAddress: keccakOfAddress, - BlockHash: common.HexToHash("0xfedcba9876543210"), - BlockHeight: 0, - StorageKey: common.HexToHash("0xabcdef1234567890"), - StorageValue: common.HexToHash("0x9876543210abcdef"), - } - }) - - It("logs error if fetching storage diffs fails", func(done Done) { - mockFetcher.ErrsToReturn = []error{fakes.FakeError} - storageWatcher = watcher.NewGethStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) - storageWatcher.Queue = mockQueue - storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) - tempFile, fileErr := ioutil.TempFile("", "log") - Expect(fileErr).NotTo(HaveOccurred()) - defer os.Remove(tempFile.Name()) - logrus.SetOutput(tempFile) - - go storageWatcher.Execute(diffs, errs, time.Hour) - - Eventually(func() (string, error) { - logContent, err := ioutil.ReadFile(tempFile.Name()) - return string(logContent), err - }).Should(ContainSubstring(fakes.FakeError.Error())) - close(done) - }) - - Describe("transforming new storage diffs", func() { - BeforeEach(func() { - mockFetcher.DiffsToReturn = []utils.StorageDiff{gethDiff} - storageWatcher = watcher.NewGethStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) - storageWatcher.Queue = mockQueue - storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) - }) - - It("executes transformer for recognized storage diff", func(done Done) { - go storageWatcher.Execute(diffs, errs, time.Hour) - - Eventually(func() utils.StorageDiff { - return mockTransformer.PassedDiff - }).Should(Equal(gethDiff)) - close(done) - }) - - It("queues diff for later processing if transformer execution fails", func(done Done) { - mockTransformer.ExecuteErr = fakes.FakeError - - go storageWatcher.Execute(diffs, errs, time.Hour) - - Expect(<-errs).To(BeNil()) - Eventually(func() bool { - return mockQueue.AddCalled - }).Should(BeTrue()) - Eventually(func() utils.StorageDiff { - return mockQueue.AddPassedDiff - }).Should(Equal(gethDiff)) - close(done) - }) - - It("logs error if queueing diff fails", func(done Done) { - mockTransformer.ExecuteErr = utils.ErrStorageKeyNotFound{} - mockQueue.AddError = fakes.FakeError - tempFile, fileErr := ioutil.TempFile("", "log") - Expect(fileErr).NotTo(HaveOccurred()) - defer os.Remove(tempFile.Name()) - logrus.SetOutput(tempFile) - - go storageWatcher.Execute(diffs, errs, time.Hour) - - Eventually(func() bool { - return mockQueue.AddCalled - }).Should(BeTrue()) - Eventually(func() (string, error) { - logContent, err := ioutil.ReadFile(tempFile.Name()) - return string(logContent), err - }).Should(ContainSubstring(fakes.FakeError.Error())) - close(done) - }) - - It("keeps track transformers by the keccak256 hash of their contract address ", func(done Done) { - go storageWatcher.Execute(diffs, errs, time.Hour) - - m := make(map[common.Hash]transformer.StorageTransformer) - m[keccakOfAddress] = mockTransformer - - Eventually(func() map[common.Hash]transformer.StorageTransformer { - return storageWatcher.KeccakAddressTransformers - }).Should(Equal(m)) - - close(done) - }) - - It("gets the transformer from the known keccak address map first", func(done Done) { - anotherAddress := common.HexToAddress("0xafakeaddress") - anotherTransformer := &mocks.MockStorageTransformer{Address: anotherAddress} - keccakOfAnotherAddress := common.BytesToHash(crypto.Keccak256(anotherAddress[:])) - - anotherGethDiff := utils.StorageDiff{ - Id: 1338, - KeccakOfContractAddress: keccakOfAnotherAddress, - BlockHash: common.HexToHash("0xfedcba9876543210"), - BlockHeight: 0, - StorageKey: common.HexToHash("0xabcdef1234567890"), - StorageValue: common.HexToHash("0x9876543210abcdef"), - } - mockFetcher.DiffsToReturn = []utils.StorageDiff{anotherGethDiff} - storageWatcher.KeccakAddressTransformers[keccakOfAnotherAddress] = anotherTransformer - - go storageWatcher.Execute(diffs, errs, time.Hour) - - Eventually(func() utils.StorageDiff { - return anotherTransformer.PassedDiff - }).Should(Equal(anotherGethDiff)) - - close(done) - }) - }) - - Describe("transforming queued storage diffs", func() { - BeforeEach(func() { - mockQueue.DiffsToReturn = []utils.StorageDiff{gethDiff} - storageWatcher = watcher.NewGethStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) - storageWatcher.Queue = mockQueue - storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) - }) - - It("executes transformer for storage diff", func(done Done) { - go storageWatcher.Execute(diffs, errs, time.Nanosecond) - - Eventually(func() utils.StorageDiff { - return mockTransformer.PassedDiff - }).Should(Equal(gethDiff)) - close(done) - }) - - It("deletes diff from queue if transformer execution successful", func(done Done) { - go storageWatcher.Execute(diffs, errs, time.Nanosecond) - - Eventually(func() int { - return mockQueue.DeletePassedId - }).Should(Equal(gethDiff.Id)) - close(done) - }) - - It("logs error if deleting persisted diff fails", func(done Done) { - mockQueue.DeleteErr = fakes.FakeError - tempFile, fileErr := ioutil.TempFile("", "log") - Expect(fileErr).NotTo(HaveOccurred()) - defer os.Remove(tempFile.Name()) - logrus.SetOutput(tempFile) - - go storageWatcher.Execute(diffs, errs, time.Nanosecond) - - Eventually(func() (string, error) { - logContent, err := ioutil.ReadFile(tempFile.Name()) - return string(logContent), err - }).Should(ContainSubstring(fakes.FakeError.Error())) - close(done) - }) - - It("deletes obsolete diff from queue if contract not recognized", func(done Done) { - obsoleteDiff := utils.StorageDiff{ - Id: gethDiff.Id + 1, - Contract: common.HexToAddress("0xfedcba9876543210"), - } - mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff} - - go storageWatcher.Execute(diffs, errs, time.Nanosecond) - - Eventually(func() int { - return mockQueue.DeletePassedId - }).Should(Equal(obsoleteDiff.Id)) - close(done) - }) - - It("logs error if deleting obsolete diff fails", func(done Done) { - obsoleteDiff := utils.StorageDiff{ - Id: gethDiff.Id + 1, - Contract: common.HexToAddress("0xfedcba9876543210"), - } - mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff} - mockQueue.DeleteErr = fakes.FakeError - tempFile, fileErr := ioutil.TempFile("", "log") - Expect(fileErr).NotTo(HaveOccurred()) - defer os.Remove(tempFile.Name()) - logrus.SetOutput(tempFile) - - go storageWatcher.Execute(diffs, errs, time.Nanosecond) - - Eventually(func() (string, error) { - logContent, err := ioutil.ReadFile(tempFile.Name()) - return string(logContent), err - }).Should(ContainSubstring(fakes.FakeError.Error())) - close(done) - }) - }) - }) -}) diff --git a/libraries/shared/watcher/storage_watcher.go b/libraries/shared/watcher/storage_watcher.go index d36da1a0..64ea29b1 100644 --- a/libraries/shared/watcher/storage_watcher.go +++ b/libraries/shared/watcher/storage_watcher.go @@ -18,17 +18,14 @@ package watcher import ( "fmt" - "reflect" - "time" - "github.com/ethereum/go-ethereum/common" "github.com/sirupsen/logrus" - "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" "github.com/vulcanize/vulcanizedb/libraries/shared/storage" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" + "time" ) type IStorageWatcher interface { @@ -40,15 +37,24 @@ type StorageWatcher struct { db *postgres.DB StorageFetcher fetcher.IStorageFetcher Queue storage.IStorageQueue - Transformers map[common.Address]transformer.StorageTransformer KeccakAddressTransformers map[common.Hash]transformer.StorageTransformer // keccak hash of an address => transformer - transformerGetter func(diff utils.StorageDiff) (transformer.StorageTransformer, bool) +} + +func NewStorageWatcher(fetcher fetcher.IStorageFetcher, db *postgres.DB) StorageWatcher { + queue := storage.NewStorageQueue(db) + transformers := make(map[common.Hash]transformer.StorageTransformer) + return StorageWatcher{ + db: db, + StorageFetcher: fetcher, + Queue: queue, + KeccakAddressTransformers: transformers, + } } func (storageWatcher StorageWatcher) AddTransformers(initializers []transformer.StorageTransformerInitializer) { for _, initializer := range initializers { storageTransformer := initializer(storageWatcher.db) - storageWatcher.Transformers[storageTransformer.ContractAddress()] = storageTransformer + storageWatcher.KeccakAddressTransformers[storageTransformer.KeccakContractAddress()] = storageTransformer } } @@ -68,7 +74,8 @@ func (storageWatcher StorageWatcher) Execute(diffsChan chan utils.StorageDiff, e } func (storageWatcher StorageWatcher) getTransformer(diff utils.StorageDiff) (transformer.StorageTransformer, bool) { - return storageWatcher.transformerGetter(diff) + storageTransformer, ok := storageWatcher.KeccakAddressTransformers[diff.KeccakOfContractAddress] + return storageTransformer, ok } func (storageWatcher StorageWatcher) processRow(diff utils.StorageDiff) { @@ -112,7 +119,3 @@ func (storageWatcher StorageWatcher) deleteRow(id int) { logrus.Warn(fmt.Sprintf("error deleting persisted diff from queue: %s", deleteErr)) } } - -func isKeyNotFound(executeErr error) bool { - return reflect.TypeOf(executeErr) == reflect.TypeOf(utils.ErrStorageKeyNotFound{}) -} diff --git a/libraries/shared/watcher/csv_storage_watcher_test.go b/libraries/shared/watcher/storage_watcher_test.go similarity index 77% rename from libraries/shared/watcher/csv_storage_watcher_test.go rename to libraries/shared/watcher/storage_watcher_test.go index f8617cdf..98b30aa8 100644 --- a/libraries/shared/watcher/csv_storage_watcher_test.go +++ b/libraries/shared/watcher/storage_watcher_test.go @@ -17,35 +17,36 @@ package watcher_test import ( - "io/ioutil" - "os" - "time" - "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/crypto" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/sirupsen/logrus" - "github.com/vulcanize/vulcanizedb/libraries/shared/mocks" "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" "github.com/vulcanize/vulcanizedb/libraries/shared/transformer" "github.com/vulcanize/vulcanizedb/libraries/shared/watcher" "github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/test_config" + "io/ioutil" + "os" + "time" ) -var _ = Describe("Csv Storage Watcher", func() { - It("adds transformers", func() { - fakeAddress := common.HexToAddress("0x12345") - fakeTransformer := &mocks.MockStorageTransformer{Address: fakeAddress} - w := watcher.NewCsvStorageWatcher(mocks.NewMockStorageFetcher(), test_config.NewTestDB(test_config.NewTestNode())) +var _ = Describe("Storage Watcher", func() { + Describe("AddTransformer", func() { + It("adds transformers", func() { + fakeHashedAddress := common.BytesToHash(crypto.Keccak256(common.FromHex("0x12345"))) + fakeTransformer := &mocks.MockStorageTransformer{KeccakOfAddress: fakeHashedAddress} + w := watcher.NewStorageWatcher(mocks.NewMockStorageFetcher(), test_config.NewTestDB(test_config.NewTestNode())) - w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer}) + w.AddTransformers([]transformer.StorageTransformerInitializer{fakeTransformer.FakeTransformerInitializer}) - Expect(w.Transformers[fakeAddress]).To(Equal(fakeTransformer)) + Expect(w.KeccakAddressTransformers[fakeHashedAddress]).To(Equal(fakeTransformer)) + }) }) - Describe("executing watcher", func() { + Describe("Execute", func() { var ( errs chan error mockFetcher *mocks.MockStorageFetcher @@ -53,30 +54,30 @@ var _ = Describe("Csv Storage Watcher", func() { mockTransformer *mocks.MockStorageTransformer csvDiff utils.StorageDiff diffs chan utils.StorageDiff - storageWatcher watcher.CsvStorageWatcher - address common.Address + storageWatcher watcher.StorageWatcher + hashedAddress common.Hash ) BeforeEach(func() { errs = make(chan error) diffs = make(chan utils.StorageDiff) - address = common.HexToAddress("0x0123456789abcdef") + hashedAddress = common.BytesToHash(crypto.Keccak256(common.FromHex("0x0123456789abcdef"))) mockFetcher = mocks.NewMockStorageFetcher() mockQueue = &mocks.MockStorageQueue{} - mockTransformer = &mocks.MockStorageTransformer{Address: address} + mockTransformer = &mocks.MockStorageTransformer{KeccakOfAddress: hashedAddress} csvDiff = utils.StorageDiff{ - Id: 1337, - Contract: address, - BlockHash: common.HexToHash("0xfedcba9876543210"), - BlockHeight: 0, - StorageKey: common.HexToHash("0xabcdef1234567890"), - StorageValue: common.HexToHash("0x9876543210abcdef"), + Id: 1337, + KeccakOfContractAddress: hashedAddress, + BlockHash: common.HexToHash("0xfedcba9876543210"), + BlockHeight: 0, + StorageKey: common.HexToHash("0xabcdef1234567890"), + StorageValue: common.HexToHash("0x9876543210abcdef"), } }) It("logs error if fetching storage diffs fails", func(done Done) { mockFetcher.ErrsToReturn = []error{fakes.FakeError} - storageWatcher = watcher.NewCsvStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) + storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) storageWatcher.Queue = mockQueue storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) tempFile, fileErr := ioutil.TempFile("", "log") @@ -96,7 +97,7 @@ var _ = Describe("Csv Storage Watcher", func() { Describe("transforming new storage diffs from csv", func() { BeforeEach(func() { mockFetcher.DiffsToReturn = []utils.StorageDiff{csvDiff} - storageWatcher = watcher.NewCsvStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) + storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) storageWatcher.Queue = mockQueue storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) }) @@ -149,7 +150,7 @@ var _ = Describe("Csv Storage Watcher", func() { Describe("transforming queued storage diffs", func() { BeforeEach(func() { mockQueue.DiffsToReturn = []utils.StorageDiff{csvDiff} - storageWatcher = watcher.NewCsvStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) + storageWatcher = watcher.NewStorageWatcher(mockFetcher, test_config.NewTestDB(test_config.NewTestNode())) storageWatcher.Queue = mockQueue storageWatcher.AddTransformers([]transformer.StorageTransformerInitializer{mockTransformer.FakeTransformerInitializer}) }) @@ -190,8 +191,8 @@ var _ = Describe("Csv Storage Watcher", func() { It("deletes obsolete diff from queue if contract not recognized", func(done Done) { obsoleteDiff := utils.StorageDiff{ - Id: csvDiff.Id + 1, - Contract: common.HexToAddress("0xfedcba9876543210"), + Id: csvDiff.Id + 1, + KeccakOfContractAddress: common.BytesToHash(crypto.Keccak256(common.FromHex("0xfedcba9876543210"))), } mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff} @@ -205,8 +206,8 @@ var _ = Describe("Csv Storage Watcher", func() { It("logs error if deleting obsolete diff fails", func(done Done) { obsoleteDiff := utils.StorageDiff{ - Id: csvDiff.Id + 1, - Contract: common.HexToAddress("0xfedcba9876543210"), + Id: csvDiff.Id + 1, + KeccakOfContractAddress: common.BytesToHash(crypto.Keccak256(common.FromHex("0xfedcba9876543210"))), } mockQueue.DiffsToReturn = []utils.StorageDiff{obsoleteDiff} mockQueue.DeleteErr = fakes.FakeError