From dc06991605f78b7bb7878fbab268d486fe168fd5 Mon Sep 17 00:00:00 2001 From: Elizabeth Engelman Date: Mon, 8 Jul 2019 15:34:06 -0500 Subject: [PATCH] Wire up the streamer with a fetcher --- .../fetcher/geth_rpc_storage_fetcher.go | 69 +++++++++ .../fetcher/geth_rpc_storage_fetcher_test.go | 131 ++++++++++++++++++ libraries/shared/test_data/statediff.go | 128 +++++++++++++++++ 3 files changed, 328 insertions(+) create mode 100644 libraries/shared/fetcher/geth_rpc_storage_fetcher.go create mode 100644 libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go create mode 100644 libraries/shared/test_data/statediff.go diff --git a/libraries/shared/fetcher/geth_rpc_storage_fetcher.go b/libraries/shared/fetcher/geth_rpc_storage_fetcher.go new file mode 100644 index 00000000..0a46f253 --- /dev/null +++ b/libraries/shared/fetcher/geth_rpc_storage_fetcher.go @@ -0,0 +1,69 @@ +// Copyright 2019 Vulcanize +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fetcher + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/statediff" + "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" + "github.com/vulcanize/vulcanizedb/libraries/shared/streamer" +) + +type GethRpcStorageFetcher struct{ + statediffPayloadChan chan statediff.Payload + streamer streamer.Streamer +} + +func NewGethRpcStorageFetcher(streamer streamer.Streamer, statediffPayloadChan chan statediff.Payload) GethRpcStorageFetcher{ + return GethRpcStorageFetcher{ + statediffPayloadChan: statediffPayloadChan, + streamer: streamer, + } +} + +func (fetcher *GethRpcStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiffRow, errs chan<- error) { + ethStatediffPayloadChan := fetcher.statediffPayloadChan + _, err := fetcher.streamer.Stream(ethStatediffPayloadChan) + if err != nil { + errs <- err + } + + for diff := range ethStatediffPayloadChan { + stateDiff := new(statediff.StateDiff) + err = rlp.DecodeBytes(diff.StateDiffRlp, stateDiff) + if err != nil { + errs <- err + } + accounts := getAccountDiffs(*stateDiff) + + for _, account := range accounts { + for _, storage := range account.Storage { + out <- utils.StorageDiffRow{ + Contract: common.BytesToAddress(account.Key), + BlockHash: stateDiff.BlockHash, + BlockHeight: int(stateDiff.BlockNumber.Int64()), + StorageKey: common.BytesToHash(storage.Key), + StorageValue: common.BytesToHash(storage.Value), + } + } + } + } +} + +func getAccountDiffs(stateDiff statediff.StateDiff) []statediff.AccountDiff { + accounts :=append(stateDiff.CreatedAccounts, stateDiff.UpdatedAccounts...) + return append(accounts, stateDiff.DeletedAccounts...) +} diff --git a/libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go b/libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go new file mode 100644 index 00000000..a5bf3dd9 --- /dev/null +++ b/libraries/shared/fetcher/geth_rpc_storage_fetcher_test.go @@ -0,0 +1,131 @@ +// Copyright 2019 Vulcanize +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package fetcher_test + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/statediff" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/vulcanize/vulcanizedb/libraries/shared/fetcher" + "github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils" + "github.com/vulcanize/vulcanizedb/libraries/shared/test_data" + "github.com/vulcanize/vulcanizedb/pkg/fakes" +) + +type MockStoragediffStreamer struct { + subscribeError error + PassedPayloadChan chan statediff.Payload + streamPayloads []statediff.Payload +} + +func (streamer *MockStoragediffStreamer) Stream(statediffPayloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) { + clientSubscription := rpc.ClientSubscription{} + streamer.PassedPayloadChan = statediffPayloadChan + + go func() { + for _, payload := range streamer.streamPayloads { + streamer.PassedPayloadChan <- payload + } + }() + + return &clientSubscription, streamer.subscribeError +} + +func (streamer *MockStoragediffStreamer) SetSubscribeError(err error) { + streamer.subscribeError = err +} + +func (streamer *MockStoragediffStreamer) SetPayloads(payloads []statediff.Payload) { + streamer.streamPayloads = payloads +} + +var _ = Describe("Geth RPC Storage Fetcher", func() { + var streamer MockStoragediffStreamer + var statediffPayloadChan chan statediff.Payload + var statediffFetcher fetcher.GethRpcStorageFetcher + var storagediffRowChan chan utils.StorageDiffRow + var errorChan chan error + + + BeforeEach(func() { + streamer = MockStoragediffStreamer{} + statediffPayloadChan = make(chan statediff.Payload, 1) + statediffFetcher = fetcher.NewGethRpcStorageFetcher(&streamer, statediffPayloadChan) + storagediffRowChan = make(chan utils.StorageDiffRow) + errorChan = make(chan error) + }) + + It("adds errors to error channel if the RPC subscription fails ", func(done Done) { + streamer.SetSubscribeError(fakes.FakeError) + + go statediffFetcher.FetchStorageDiffs(storagediffRowChan, errorChan) + + Expect(<-errorChan).To(MatchError(fakes.FakeError)) + close(done) + }) + + It("streams StatediffPayloads from a Geth RPC subscription", func(done Done) { + streamer.SetPayloads([]statediff.Payload{test_data.MockStatediffPayload}) + + go statediffFetcher.FetchStorageDiffs(storagediffRowChan, errorChan) + + streamedPayload := <-statediffPayloadChan + Expect(streamedPayload).To(Equal(test_data.MockStatediffPayload)) + Expect(streamer.PassedPayloadChan).To(Equal(statediffPayloadChan)) + close(done) + }) + + It("adds parsed statediff payloads to the rows channel", func(done Done) { + streamer.SetPayloads([]statediff.Payload{test_data.MockStatediffPayload}) + + go statediffFetcher.FetchStorageDiffs(storagediffRowChan, errorChan) + + height := test_data.BlockNumber + intHeight := int(height.Int64()) + expectedStorageDiffRow := utils.StorageDiffRow{ + //this is not the contract address, but the keccak 256 of the address + Contract: common.BytesToAddress(test_data.ContractLeafKey[:]), + BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"), + BlockHeight: intHeight, + StorageKey: common.BytesToHash(test_data.StorageKey), + StorageValue: common.BytesToHash(test_data.StorageValue), + } + anotherExpectedStorageDiffRow := utils.StorageDiffRow{ + //this is not the contract address, but the keccak 256 of the address + Contract: common.BytesToAddress(test_data.AnotherContractLeafKey[:]), + BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"), + BlockHeight: intHeight, + StorageKey: common.BytesToHash(test_data.StorageKey), + StorageValue: common.BytesToHash(test_data.StorageValue), + } + Expect(<-storagediffRowChan).To(Equal(expectedStorageDiffRow)) + Expect(<-storagediffRowChan).To(Equal(anotherExpectedStorageDiffRow)) + + close(done) + }) + + It("adds errors to error channel if parsing the diff fails", func(done Done) { + badStatediffPayload := statediff.Payload{} + streamer.SetPayloads([]statediff.Payload{badStatediffPayload}) + + go statediffFetcher.FetchStorageDiffs(storagediffRowChan, errorChan) + + Expect(<-errorChan).To(MatchError("EOF")) + + close(done) + }) +}) \ No newline at end of file diff --git a/libraries/shared/test_data/statediff.go b/libraries/shared/test_data/statediff.go new file mode 100644 index 00000000..8231bbd4 --- /dev/null +++ b/libraries/shared/test_data/statediff.go @@ -0,0 +1,128 @@ +// Copyright 2018 Vulcanize +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package test_data + +import ( + "errors" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/statediff" + "math/big" + "math/rand" +) + +var ( + BlockNumber = big.NewInt(rand.Int63()) + BlockHash = "0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73" + CodeHash = common.Hex2Bytes("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470") + NewNonceValue = rand.Uint64() + NewBalanceValue = rand.Int63() + ContractRoot = common.HexToHash("0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421") + StoragePath = common.HexToHash("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470").Bytes() + StorageKey = common.HexToHash("0000000000000000000000000000000000000000000000000000000000000001").Bytes() + StorageValue = common.Hex2Bytes("0x03") + storage = []statediff.StorageDiff{{ + Key: StorageKey, + Value: StorageValue, + Path: StoragePath, + Proof: [][]byte{}, + }} + emptyStorage = make([]statediff.StorageDiff, 0) + contractAddress = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476592") + ContractLeafKey = crypto.Keccak256Hash(contractAddress[:]) + anotherContractAddress = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476593") + AnotherContractLeafKey = crypto.Keccak256Hash(anotherContractAddress[:]) + + testAccount = state.Account{ + Nonce: NewNonceValue, + Balance: big.NewInt(NewBalanceValue), + Root: ContractRoot, + CodeHash: CodeHash, + } + valueBytes, _ = rlp.EncodeToBytes(testAccount) + CreatedAccountDiffs = []statediff.AccountDiff{ + { + Key: ContractLeafKey.Bytes(), + Value: valueBytes, + Storage: storage, + }, + { + Key: AnotherContractLeafKey.Bytes(), + Value: valueBytes, + Storage: emptyStorage, + }, + } + + UpdatedAccountDiffs = []statediff.AccountDiff{{ + Key: AnotherContractLeafKey.Bytes(), + Value: valueBytes, + Storage: storage, + }} + + DeletedAccountDiffs = []statediff.AccountDiff{{ + Key: ContractLeafKey.Bytes(), + Value: valueBytes, + Storage: storage, + }} + + MockStateDiff = statediff.StateDiff{ + BlockNumber: BlockNumber, + BlockHash: common.HexToHash(BlockHash), + CreatedAccounts: CreatedAccountDiffs, + DeletedAccounts: DeletedAccountDiffs, + UpdatedAccounts: UpdatedAccountDiffs, + } + MockStateDiffBytes, _ = rlp.EncodeToBytes(MockStateDiff) + + mockTransaction1 = types.NewTransaction(0, common.HexToAddress("0x0"), big.NewInt(1000), 50, big.NewInt(100), nil) + mockTransaction2 = types.NewTransaction(1, common.HexToAddress("0x1"), big.NewInt(2000), 100, big.NewInt(200), nil) + MockTransactions = types.Transactions{mockTransaction1, mockTransaction2} + + mockReceipt1 = types.NewReceipt(common.HexToHash("0x0").Bytes(), false, 50) + mockReceipt2 = types.NewReceipt(common.HexToHash("0x1").Bytes(), false, 100) + MockReceipts = types.Receipts{mockReceipt1, mockReceipt2} + + MockHeader = types.Header{ + Time: 0, + Number: BlockNumber, + Root: common.HexToHash("0x0"), + TxHash: common.HexToHash("0x0"), + ReceiptHash: common.HexToHash("0x0"), + } + MockBlock = types.NewBlock(&MockHeader, MockTransactions, nil, MockReceipts) + MockBlockRlp, _ = rlp.EncodeToBytes(MockBlock) + + MockStatediffPayload = statediff.Payload{ + BlockRlp: MockBlockRlp, + StateDiffRlp: MockStateDiffBytes, + Err: nil, + } + + EmptyStatediffPayload = statediff.Payload{ + BlockRlp: []byte{}, + StateDiffRlp: []byte{}, + Err: nil, + } + + ErrStatediffPayload = statediff.Payload{ + BlockRlp: []byte{}, + StateDiffRlp: []byte{}, + Err: errors.New("mock error"), + } +) +