Wire up the streamer with a fetcher

This commit is contained in:
Elizabeth Engelman 2019-07-08 15:34:06 -05:00
parent 1b4a901892
commit dc06991605
3 changed files with 328 additions and 0 deletions

View File

@ -0,0 +1,69 @@
// Copyright 2019 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fetcher
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/streamer"
)
type GethRpcStorageFetcher struct{
statediffPayloadChan chan statediff.Payload
streamer streamer.Streamer
}
func NewGethRpcStorageFetcher(streamer streamer.Streamer, statediffPayloadChan chan statediff.Payload) GethRpcStorageFetcher{
return GethRpcStorageFetcher{
statediffPayloadChan: statediffPayloadChan,
streamer: streamer,
}
}
func (fetcher *GethRpcStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiffRow, errs chan<- error) {
ethStatediffPayloadChan := fetcher.statediffPayloadChan
_, err := fetcher.streamer.Stream(ethStatediffPayloadChan)
if err != nil {
errs <- err
}
for diff := range ethStatediffPayloadChan {
stateDiff := new(statediff.StateDiff)
err = rlp.DecodeBytes(diff.StateDiffRlp, stateDiff)
if err != nil {
errs <- err
}
accounts := getAccountDiffs(*stateDiff)
for _, account := range accounts {
for _, storage := range account.Storage {
out <- utils.StorageDiffRow{
Contract: common.BytesToAddress(account.Key),
BlockHash: stateDiff.BlockHash,
BlockHeight: int(stateDiff.BlockNumber.Int64()),
StorageKey: common.BytesToHash(storage.Key),
StorageValue: common.BytesToHash(storage.Value),
}
}
}
}
}
func getAccountDiffs(stateDiff statediff.StateDiff) []statediff.AccountDiff {
accounts :=append(stateDiff.CreatedAccounts, stateDiff.UpdatedAccounts...)
return append(accounts, stateDiff.DeletedAccounts...)
}

View File

@ -0,0 +1,131 @@
// Copyright 2019 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fetcher_test
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/libraries/shared/fetcher"
"github.com/vulcanize/vulcanizedb/libraries/shared/storage/utils"
"github.com/vulcanize/vulcanizedb/libraries/shared/test_data"
"github.com/vulcanize/vulcanizedb/pkg/fakes"
)
type MockStoragediffStreamer struct {
subscribeError error
PassedPayloadChan chan statediff.Payload
streamPayloads []statediff.Payload
}
func (streamer *MockStoragediffStreamer) Stream(statediffPayloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) {
clientSubscription := rpc.ClientSubscription{}
streamer.PassedPayloadChan = statediffPayloadChan
go func() {
for _, payload := range streamer.streamPayloads {
streamer.PassedPayloadChan <- payload
}
}()
return &clientSubscription, streamer.subscribeError
}
func (streamer *MockStoragediffStreamer) SetSubscribeError(err error) {
streamer.subscribeError = err
}
func (streamer *MockStoragediffStreamer) SetPayloads(payloads []statediff.Payload) {
streamer.streamPayloads = payloads
}
var _ = Describe("Geth RPC Storage Fetcher", func() {
var streamer MockStoragediffStreamer
var statediffPayloadChan chan statediff.Payload
var statediffFetcher fetcher.GethRpcStorageFetcher
var storagediffRowChan chan utils.StorageDiffRow
var errorChan chan error
BeforeEach(func() {
streamer = MockStoragediffStreamer{}
statediffPayloadChan = make(chan statediff.Payload, 1)
statediffFetcher = fetcher.NewGethRpcStorageFetcher(&streamer, statediffPayloadChan)
storagediffRowChan = make(chan utils.StorageDiffRow)
errorChan = make(chan error)
})
It("adds errors to error channel if the RPC subscription fails ", func(done Done) {
streamer.SetSubscribeError(fakes.FakeError)
go statediffFetcher.FetchStorageDiffs(storagediffRowChan, errorChan)
Expect(<-errorChan).To(MatchError(fakes.FakeError))
close(done)
})
It("streams StatediffPayloads from a Geth RPC subscription", func(done Done) {
streamer.SetPayloads([]statediff.Payload{test_data.MockStatediffPayload})
go statediffFetcher.FetchStorageDiffs(storagediffRowChan, errorChan)
streamedPayload := <-statediffPayloadChan
Expect(streamedPayload).To(Equal(test_data.MockStatediffPayload))
Expect(streamer.PassedPayloadChan).To(Equal(statediffPayloadChan))
close(done)
})
It("adds parsed statediff payloads to the rows channel", func(done Done) {
streamer.SetPayloads([]statediff.Payload{test_data.MockStatediffPayload})
go statediffFetcher.FetchStorageDiffs(storagediffRowChan, errorChan)
height := test_data.BlockNumber
intHeight := int(height.Int64())
expectedStorageDiffRow := utils.StorageDiffRow{
//this is not the contract address, but the keccak 256 of the address
Contract: common.BytesToAddress(test_data.ContractLeafKey[:]),
BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"),
BlockHeight: intHeight,
StorageKey: common.BytesToHash(test_data.StorageKey),
StorageValue: common.BytesToHash(test_data.StorageValue),
}
anotherExpectedStorageDiffRow := utils.StorageDiffRow{
//this is not the contract address, but the keccak 256 of the address
Contract: common.BytesToAddress(test_data.AnotherContractLeafKey[:]),
BlockHash: common.HexToHash("0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"),
BlockHeight: intHeight,
StorageKey: common.BytesToHash(test_data.StorageKey),
StorageValue: common.BytesToHash(test_data.StorageValue),
}
Expect(<-storagediffRowChan).To(Equal(expectedStorageDiffRow))
Expect(<-storagediffRowChan).To(Equal(anotherExpectedStorageDiffRow))
close(done)
})
It("adds errors to error channel if parsing the diff fails", func(done Done) {
badStatediffPayload := statediff.Payload{}
streamer.SetPayloads([]statediff.Payload{badStatediffPayload})
go statediffFetcher.FetchStorageDiffs(storagediffRowChan, errorChan)
Expect(<-errorChan).To(MatchError("EOF"))
close(done)
})
})

View File

@ -0,0 +1,128 @@
// Copyright 2018 Vulcanize
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package test_data
import (
"errors"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff"
"math/big"
"math/rand"
)
var (
BlockNumber = big.NewInt(rand.Int63())
BlockHash = "0xfa40fbe2d98d98b3363a778d52f2bcd29d6790b9b3f3cab2b167fd12d3550f73"
CodeHash = common.Hex2Bytes("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470")
NewNonceValue = rand.Uint64()
NewBalanceValue = rand.Int63()
ContractRoot = common.HexToHash("0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")
StoragePath = common.HexToHash("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470").Bytes()
StorageKey = common.HexToHash("0000000000000000000000000000000000000000000000000000000000000001").Bytes()
StorageValue = common.Hex2Bytes("0x03")
storage = []statediff.StorageDiff{{
Key: StorageKey,
Value: StorageValue,
Path: StoragePath,
Proof: [][]byte{},
}}
emptyStorage = make([]statediff.StorageDiff, 0)
contractAddress = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476592")
ContractLeafKey = crypto.Keccak256Hash(contractAddress[:])
anotherContractAddress = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476593")
AnotherContractLeafKey = crypto.Keccak256Hash(anotherContractAddress[:])
testAccount = state.Account{
Nonce: NewNonceValue,
Balance: big.NewInt(NewBalanceValue),
Root: ContractRoot,
CodeHash: CodeHash,
}
valueBytes, _ = rlp.EncodeToBytes(testAccount)
CreatedAccountDiffs = []statediff.AccountDiff{
{
Key: ContractLeafKey.Bytes(),
Value: valueBytes,
Storage: storage,
},
{
Key: AnotherContractLeafKey.Bytes(),
Value: valueBytes,
Storage: emptyStorage,
},
}
UpdatedAccountDiffs = []statediff.AccountDiff{{
Key: AnotherContractLeafKey.Bytes(),
Value: valueBytes,
Storage: storage,
}}
DeletedAccountDiffs = []statediff.AccountDiff{{
Key: ContractLeafKey.Bytes(),
Value: valueBytes,
Storage: storage,
}}
MockStateDiff = statediff.StateDiff{
BlockNumber: BlockNumber,
BlockHash: common.HexToHash(BlockHash),
CreatedAccounts: CreatedAccountDiffs,
DeletedAccounts: DeletedAccountDiffs,
UpdatedAccounts: UpdatedAccountDiffs,
}
MockStateDiffBytes, _ = rlp.EncodeToBytes(MockStateDiff)
mockTransaction1 = types.NewTransaction(0, common.HexToAddress("0x0"), big.NewInt(1000), 50, big.NewInt(100), nil)
mockTransaction2 = types.NewTransaction(1, common.HexToAddress("0x1"), big.NewInt(2000), 100, big.NewInt(200), nil)
MockTransactions = types.Transactions{mockTransaction1, mockTransaction2}
mockReceipt1 = types.NewReceipt(common.HexToHash("0x0").Bytes(), false, 50)
mockReceipt2 = types.NewReceipt(common.HexToHash("0x1").Bytes(), false, 100)
MockReceipts = types.Receipts{mockReceipt1, mockReceipt2}
MockHeader = types.Header{
Time: 0,
Number: BlockNumber,
Root: common.HexToHash("0x0"),
TxHash: common.HexToHash("0x0"),
ReceiptHash: common.HexToHash("0x0"),
}
MockBlock = types.NewBlock(&MockHeader, MockTransactions, nil, MockReceipts)
MockBlockRlp, _ = rlp.EncodeToBytes(MockBlock)
MockStatediffPayload = statediff.Payload{
BlockRlp: MockBlockRlp,
StateDiffRlp: MockStateDiffBytes,
Err: nil,
}
EmptyStatediffPayload = statediff.Payload{
BlockRlp: []byte{},
StateDiffRlp: []byte{},
Err: nil,
}
ErrStatediffPayload = statediff.Payload{
BlockRlp: []byte{},
StateDiffRlp: []byte{},
Err: errors.New("mock error"),
}
)