pair with new statediffing geth version; travis tests will fail til release is up

This commit is contained in:
Ian Norden 2020-05-19 15:09:30 -05:00
parent 5fb1cc0696
commit 0101c4791a
26 changed files with 169 additions and 180 deletions

View File

@ -72,6 +72,7 @@ func superNode() {
if err != nil { if err != nil {
logWithCommand.Fatal(err) logWithCommand.Fatal(err)
} }
defer superNode.Stop()
var forwardPayloadChan chan shared.ConvertedData var forwardPayloadChan chan shared.ConvertedData
if superNodeConfig.Serve { if superNodeConfig.Serve {
logWithCommand.Info("starting up super node servers") logWithCommand.Info("starting up super node servers")
@ -100,7 +101,9 @@ func superNode() {
shutdown := make(chan os.Signal) shutdown := make(chan os.Signal)
signal.Notify(shutdown, os.Interrupt) signal.Notify(shutdown, os.Interrupt)
<-shutdown <-shutdown
if superNodeConfig.BackFill {
backFiller.Stop() backFiller.Stop()
}
superNode.Stop() superNode.Stop()
wg.Wait() wg.Wait()
} }

View File

@ -45,7 +45,7 @@ func NewGethRPCStorageFetcher(streamer streamer.Streamer) GethRPCStorageFetcher
func (fetcher GethRPCStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiffInput, errs chan<- error) { func (fetcher GethRPCStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageDiffInput, errs chan<- error) {
ethStatediffPayloadChan := fetcher.StatediffPayloadChan ethStatediffPayloadChan := fetcher.StatediffPayloadChan
clientSubscription, clientSubErr := fetcher.streamer.Stream(ethStatediffPayloadChan) clientSubscription, clientSubErr := fetcher.streamer.Stream(ethStatediffPayloadChan, statediff.Params{})
if clientSubErr != nil { if clientSubErr != nil {
errs <- clientSubErr errs <- clientSubErr
panic(fmt.Sprintf("Error creating a geth client subscription: %v", clientSubErr)) panic(fmt.Sprintf("Error creating a geth client subscription: %v", clientSubErr))
@ -55,8 +55,8 @@ func (fetcher GethRPCStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageD
for { for {
diff := <-ethStatediffPayloadChan diff := <-ethStatediffPayloadChan
logrus.Trace("received a statediff") logrus.Trace("received a statediff")
stateDiff := new(statediff.StateDiff) stateDiff := new(statediff.StateObject)
decodeErr := rlp.DecodeBytes(diff.StateDiffRlp, stateDiff) decodeErr := rlp.DecodeBytes(diff.StateObjectRlp, stateDiff)
if decodeErr != nil { if decodeErr != nil {
logrus.Warn("Error decoding state diff into RLP: ", decodeErr) logrus.Warn("Error decoding state diff into RLP: ", decodeErr)
errs <- decodeErr errs <- decodeErr
@ -65,8 +65,8 @@ func (fetcher GethRPCStorageFetcher) FetchStorageDiffs(out chan<- utils.StorageD
accounts := utils.GetAccountsFromDiff(*stateDiff) accounts := utils.GetAccountsFromDiff(*stateDiff)
logrus.Trace(fmt.Sprintf("iterating through %d accounts on stateDiff for block %d", len(accounts), stateDiff.BlockNumber)) logrus.Trace(fmt.Sprintf("iterating through %d accounts on stateDiff for block %d", len(accounts), stateDiff.BlockNumber))
for _, account := range accounts { for _, account := range accounts {
logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account with key %s", len(account.Storage), common.BytesToHash(account.LeafKey).Hex())) logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account with key %s", len(account.StorageNodes), common.BytesToHash(account.LeafKey).Hex()))
for _, storage := range account.Storage { for _, storage := range account.StorageNodes {
diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage) diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage)
if formatErr != nil { if formatErr != nil {
logrus.Error("failed to format utils.StorageDiff from storage with key: ", common.BytesToHash(storage.LeafKey), "from account with key: ", common.BytesToHash(account.LeafKey)) logrus.Error("failed to format utils.StorageDiff from storage with key: ", common.BytesToHash(storage.LeafKey), "from account with key: ", common.BytesToHash(account.LeafKey))

View File

@ -33,13 +33,14 @@ import (
type MockStoragediffStreamer struct { type MockStoragediffStreamer struct {
subscribeError error subscribeError error
PassedPayloadChan chan statediff.Payload PassedPayloadChan chan statediff.Payload
PassedParams statediff.Params
streamPayloads []statediff.Payload streamPayloads []statediff.Payload
} }
func (streamer *MockStoragediffStreamer) Stream(statediffPayloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) { func (streamer *MockStoragediffStreamer) Stream(statediffPayloadChan chan statediff.Payload, params statediff.Params) (*rpc.ClientSubscription, error) {
clientSubscription := rpc.ClientSubscription{} clientSubscription := rpc.ClientSubscription{}
streamer.PassedPayloadChan = statediffPayloadChan streamer.PassedPayloadChan = statediffPayloadChan
streamer.PassedParams = params
go func() { go func() {
for _, payload := range streamer.streamPayloads { for _, payload := range streamer.streamPayloads {
streamer.PassedPayloadChan <- payload streamer.PassedPayloadChan <- payload
@ -148,19 +149,19 @@ var _ = Describe("Geth RPC Storage Fetcher", func() {
It("adds errors to error channel if formatting the diff as a StateDiff object fails", func(done Done) { It("adds errors to error channel if formatting the diff as a StateDiff object fails", func(done Done) {
accountDiffs := test_data.CreatedAccountDiffs accountDiffs := test_data.CreatedAccountDiffs
accountDiffs[0].Storage = []statediff.StorageDiff{test_data.StorageWithBadValue} accountDiffs[0].StorageNodes = []statediff.StorageNode{test_data.StorageWithBadValue}
stateDiff := statediff.StateDiff{ stateDiff := statediff.StateObject{
BlockNumber: test_data.BlockNumber, BlockNumber: test_data.BlockNumber,
BlockHash: common.HexToHash(test_data.BlockHash), BlockHash: common.HexToHash(test_data.BlockHash),
CreatedAccounts: accountDiffs, Nodes: accountDiffs,
} }
stateDiffRlp, err := rlp.EncodeToBytes(stateDiff) stateDiffRlp, err := rlp.EncodeToBytes(stateDiff)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
badStatediffPayload := statediff.Payload{ badStatediffPayload := statediff.Payload{
StateDiffRlp: stateDiffRlp, StateObjectRlp: stateDiffRlp,
} }
streamer.SetPayloads([]statediff.Payload{badStatediffPayload}) streamer.SetPayloads([]statediff.Payload{badStatediffPayload})

View File

@ -51,12 +51,12 @@ func (mc *BackFillerClient) BatchCall(batch []client.BatchElem) error {
return errors.New("mockclient needs to be initialized with statediff payloads and errors") return errors.New("mockclient needs to be initialized with statediff payloads and errors")
} }
for _, batchElem := range batch { for _, batchElem := range batch {
if len(batchElem.Args) != 1 { if len(batchElem.Args) < 1 {
return errors.New("expected batch elem to contain single argument") return errors.New("expected batch elem to contain an argument(s)")
} }
blockHeight, ok := batchElem.Args[0].(uint64) blockHeight, ok := batchElem.Args[0].(uint64)
if !ok { if !ok {
return errors.New("expected batch elem argument to be a uint64") return errors.New("expected first batch elem argument to be a uint64")
} }
err := json.Unmarshal(mc.MappedStateDiffAt[blockHeight], batchElem.Result) err := json.Unmarshal(mc.MappedStateDiffAt[blockHeight], batchElem.Result)
if err != nil { if err != nil {
@ -72,12 +72,12 @@ func (mc *BackFillerClient) BatchCallContext(ctx context.Context, batch []rpc.Ba
return errors.New("mockclient needs to be initialized with statediff payloads and errors") return errors.New("mockclient needs to be initialized with statediff payloads and errors")
} }
for _, batchElem := range batch { for _, batchElem := range batch {
if len(batchElem.Args) != 1 { if len(batchElem.Args) < 1 {
return errors.New("expected batch elem to contain single argument") return errors.New("expected batch elem to contain an argument(s)")
} }
blockHeight, ok := batchElem.Args[0].(uint64) blockHeight, ok := batchElem.Args[0].(uint64)
if !ok { if !ok {
return errors.New("expected batch elem argument to be a uint64") return errors.New("expected batch elem first argument to be a uint64")
} }
err := json.Unmarshal(mc.MappedStateDiffAt[blockHeight], batchElem.Result) err := json.Unmarshal(mc.MappedStateDiffAt[blockHeight], batchElem.Result)
if err != nil { if err != nil {

View File

@ -120,16 +120,16 @@ func (bf *backFiller) backFillRange(blockHeights []uint64, diffChan chan utils.S
errChan <- fetchErr errChan <- fetchErr
} }
for _, payload := range payloads { for _, payload := range payloads {
stateDiff := new(statediff.StateDiff) stateDiff := new(statediff.StateObject)
stateDiffDecodeErr := rlp.DecodeBytes(payload.StateDiffRlp, stateDiff) stateDiffDecodeErr := rlp.DecodeBytes(payload.StateObjectRlp, stateDiff)
if stateDiffDecodeErr != nil { if stateDiffDecodeErr != nil {
errChan <- stateDiffDecodeErr errChan <- stateDiffDecodeErr
continue continue
} }
accounts := utils.GetAccountsFromDiff(*stateDiff) accounts := utils.GetAccountsFromDiff(*stateDiff)
for _, account := range accounts { for _, account := range accounts {
logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account with key %s", len(account.Storage), common.BytesToHash(account.LeafKey).Hex())) logrus.Trace(fmt.Sprintf("iterating through %d Storage values on account with key %s", len(account.StorageNodes), common.BytesToHash(account.LeafKey).Hex()))
for _, storage := range account.Storage { for _, storage := range account.StorageNodes {
diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage) diff, formatErr := utils.FromGethStateDiff(account, stateDiff, storage)
if formatErr != nil { if formatErr != nil {
logrus.Error("failed to format utils.StorageDiff from storage with key: ", common.BytesToHash(storage.LeafKey), "from account with key: ", common.BytesToHash(account.LeafKey)) logrus.Error("failed to format utils.StorageDiff from storage with key: ", common.BytesToHash(storage.LeafKey), "from account with key: ", common.BytesToHash(account.LeafKey))

View File

@ -57,7 +57,7 @@ func FromParityCsvRow(csvRow []string) (StorageDiffInput, error) {
}, nil }, nil
} }
func FromGethStateDiff(account statediff.AccountDiff, stateDiff *statediff.StateDiff, storage statediff.StorageDiff) (StorageDiffInput, error) { func FromGethStateDiff(account statediff.StateNode, stateDiff *statediff.StateObject, storage statediff.StorageNode) (StorageDiffInput, error) {
var decodedValue []byte var decodedValue []byte
err := rlp.DecodeBytes(storage.NodeValue, &decodedValue) err := rlp.DecodeBytes(storage.NodeValue, &decodedValue)
if err != nil { if err != nil {
@ -84,7 +84,6 @@ func HexToKeccak256Hash(hex string) common.Hash {
return crypto.Keccak256Hash(common.FromHex(hex)) return crypto.Keccak256Hash(common.FromHex(hex))
} }
func GetAccountsFromDiff(stateDiff statediff.StateDiff) []statediff.AccountDiff { func GetAccountsFromDiff(stateDiff statediff.StateObject) []statediff.StateNode {
accounts := append(stateDiff.CreatedAccounts, stateDiff.UpdatedAccounts...) return stateDiff.Nodes
return append(accounts, stateDiff.DeletedAccounts...)
} }

View File

@ -67,8 +67,8 @@ var _ = Describe("Storage row parsing", func() {
Describe("FromGethStateDiff", func() { Describe("FromGethStateDiff", func() {
var ( var (
accountDiff = statediff.AccountDiff{LeafKey: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}} accountDiff = statediff.StateNode{LeafKey: []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}}
stateDiff = &statediff.StateDiff{ stateDiff = &statediff.StateObject{
BlockNumber: big.NewInt(rand.Int63()), BlockNumber: big.NewInt(rand.Int63()),
BlockHash: fakes.FakeHash, BlockHash: fakes.FakeHash,
} }
@ -79,7 +79,7 @@ var _ = Describe("Storage row parsing", func() {
storageValueRlp, encodeErr := rlp.EncodeToBytes(storageValueBytes) storageValueRlp, encodeErr := rlp.EncodeToBytes(storageValueBytes)
Expect(encodeErr).NotTo(HaveOccurred()) Expect(encodeErr).NotTo(HaveOccurred())
storageDiff := statediff.StorageDiff{ storageDiff := statediff.StorageNode{
LeafKey: []byte{0, 9, 8, 7, 6, 5, 4, 3, 2, 1}, LeafKey: []byte{0, 9, 8, 7, 6, 5, 4, 3, 2, 1},
NodeValue: storageValueRlp, NodeValue: storageValueRlp,
NodeType: statediff.Leaf, NodeType: statediff.Leaf,
@ -104,7 +104,7 @@ var _ = Describe("Storage row parsing", func() {
storageValueRlp, encodeErr := rlp.EncodeToBytes(storageValueBytes) storageValueRlp, encodeErr := rlp.EncodeToBytes(storageValueBytes)
Expect(encodeErr).NotTo(HaveOccurred()) Expect(encodeErr).NotTo(HaveOccurred())
storageDiff := statediff.StorageDiff{ storageDiff := statediff.StorageNode{
LeafKey: []byte{0, 9, 8, 7, 6, 5, 4, 3, 2, 1}, LeafKey: []byte{0, 9, 8, 7, 6, 5, 4, 3, 2, 1},
NodeValue: storageValueRlp, NodeValue: storageValueRlp,
NodeType: statediff.Leaf, NodeType: statediff.Leaf,

View File

@ -26,7 +26,7 @@ import (
// Streamer is the interface for streaming a statediff subscription // Streamer is the interface for streaming a statediff subscription
type Streamer interface { type Streamer interface {
Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) Stream(payloadChan chan statediff.Payload, params statediff.Params) (*rpc.ClientSubscription, error)
} }
// StateDiffStreamer is the underlying struct for the StateDiffStreamer interface // StateDiffStreamer is the underlying struct for the StateDiffStreamer interface
@ -42,7 +42,7 @@ func NewStateDiffStreamer(client core.RPCClient) Streamer {
} }
// Stream is the main loop for subscribing to data from the Geth state diff process // Stream is the main loop for subscribing to data from the Geth state diff process
func (sds *StateDiffStreamer) Stream(payloadChan chan statediff.Payload) (*rpc.ClientSubscription, error) { func (sds *StateDiffStreamer) Stream(payloadChan chan statediff.Payload, params statediff.Params) (*rpc.ClientSubscription, error) {
logrus.Info("streaming diffs from geth") logrus.Info("streaming diffs from geth")
return sds.Client.Subscribe("statediff", payloadChan, "stream") return sds.Client.Subscribe("statediff", payloadChan, "stream", params)
} }

View File

@ -28,9 +28,14 @@ var _ = Describe("StateDiff Streamer", func() {
client := &fakes.MockRPCClient{} client := &fakes.MockRPCClient{}
streamer := streamer.NewStateDiffStreamer(client) streamer := streamer.NewStateDiffStreamer(client)
payloadChan := make(chan statediff.Payload) payloadChan := make(chan statediff.Payload)
_, err := streamer.Stream(payloadChan) params := statediff.Params{
IncludeBlock: true,
IncludeTD: true,
IncludeReceipts: true,
}
_, err := streamer.Stream(payloadChan, params)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
client.AssertSubscribeCalledWith("statediff", payloadChan, []interface{}{"stream"}) client.AssertSubscribeCalledWith("statediff", payloadChan, []interface{}{"stream", params})
}) })
}) })

View File

@ -40,7 +40,7 @@ var (
StorageKey = common.HexToHash("0000000000000000000000000000000000000000000000000000000000000001").Bytes() StorageKey = common.HexToHash("0000000000000000000000000000000000000000000000000000000000000001").Bytes()
SmallStorageValue = common.Hex2Bytes("03") SmallStorageValue = common.Hex2Bytes("03")
SmallStorageValueRlp, _ = rlp.EncodeToBytes(SmallStorageValue) SmallStorageValueRlp, _ = rlp.EncodeToBytes(SmallStorageValue)
storageWithSmallValue = []statediff.StorageDiff{{ storageWithSmallValue = []statediff.StorageNode{{
LeafKey: StorageKey, LeafKey: StorageKey,
NodeValue: SmallStorageValueRlp, NodeValue: SmallStorageValueRlp,
NodeType: statediff.Leaf, NodeType: statediff.Leaf,
@ -48,13 +48,13 @@ var (
}} }}
LargeStorageValue = common.Hex2Bytes("00191b53778c567b14b50ba0000") LargeStorageValue = common.Hex2Bytes("00191b53778c567b14b50ba0000")
LargeStorageValueRlp, _ = rlp.EncodeToBytes(LargeStorageValue) LargeStorageValueRlp, _ = rlp.EncodeToBytes(LargeStorageValue)
storageWithLargeValue = []statediff.StorageDiff{{ storageWithLargeValue = []statediff.StorageNode{{
LeafKey: StorageKey, LeafKey: StorageKey,
NodeValue: LargeStorageValueRlp, NodeValue: LargeStorageValueRlp,
Path: StoragePath, Path: StoragePath,
NodeType: statediff.Leaf, NodeType: statediff.Leaf,
}} }}
StorageWithBadValue = statediff.StorageDiff{ StorageWithBadValue = statediff.StorageNode{
LeafKey: StorageKey, LeafKey: StorageKey,
NodeValue: []byte{0, 1, 2}, NodeValue: []byte{0, 1, 2},
NodeType: statediff.Leaf, NodeType: statediff.Leaf,
@ -74,44 +74,40 @@ var (
CodeHash: CodeHash, CodeHash: CodeHash,
} }
valueBytes, _ = rlp.EncodeToBytes(testAccount) valueBytes, _ = rlp.EncodeToBytes(testAccount)
CreatedAccountDiffs = []statediff.AccountDiff{ CreatedAccountDiffs = []statediff.StateNode{
{ {
LeafKey: ContractLeafKey.Bytes(), LeafKey: ContractLeafKey.Bytes(),
NodeValue: valueBytes, NodeValue: valueBytes,
Storage: storageWithSmallValue, StorageNodes: storageWithSmallValue,
}, },
} }
UpdatedAccountDiffs = []statediff.AccountDiff{{ UpdatedAccountDiffs = []statediff.StateNode{{
LeafKey: AnotherContractLeafKey.Bytes(), LeafKey: AnotherContractLeafKey.Bytes(),
NodeValue: valueBytes, NodeValue: valueBytes,
Storage: storageWithLargeValue, StorageNodes: storageWithLargeValue,
}} }}
UpdatedAccountDiffs2 = []statediff.AccountDiff{{ UpdatedAccountDiffs2 = []statediff.StateNode{{
LeafKey: AnotherContractLeafKey.Bytes(), LeafKey: AnotherContractLeafKey.Bytes(),
NodeValue: valueBytes, NodeValue: valueBytes,
Storage: storageWithSmallValue, StorageNodes: storageWithSmallValue,
}} }}
DeletedAccountDiffs = []statediff.AccountDiff{{ DeletedAccountDiffs = []statediff.StateNode{{
LeafKey: AnotherContractLeafKey.Bytes(), LeafKey: AnotherContractLeafKey.Bytes(),
NodeValue: valueBytes, NodeValue: valueBytes,
Storage: storageWithSmallValue, StorageNodes: storageWithSmallValue,
}} }}
MockStateDiff = statediff.StateDiff{ MockStateDiff = statediff.StateObject{
BlockNumber: BlockNumber, BlockNumber: BlockNumber,
BlockHash: common.HexToHash(BlockHash), BlockHash: common.HexToHash(BlockHash),
CreatedAccounts: CreatedAccountDiffs, Nodes: append(append(CreatedAccountDiffs, UpdatedAccountDiffs...), DeletedAccountDiffs...),
DeletedAccounts: DeletedAccountDiffs,
UpdatedAccounts: UpdatedAccountDiffs,
} }
MockStateDiff2 = statediff.StateDiff{ MockStateDiff2 = statediff.StateObject{
BlockNumber: BlockNumber2, BlockNumber: BlockNumber2,
BlockHash: common.HexToHash(BlockHash2), BlockHash: common.HexToHash(BlockHash2),
CreatedAccounts: nil, Nodes: UpdatedAccountDiffs2,
DeletedAccounts: nil,
UpdatedAccounts: UpdatedAccountDiffs2,
} }
MockStateDiffBytes, _ = rlp.EncodeToBytes(MockStateDiff) MockStateDiffBytes, _ = rlp.EncodeToBytes(MockStateDiff)
MockStateDiff2Bytes, _ = rlp.EncodeToBytes(MockStateDiff2) MockStateDiff2Bytes, _ = rlp.EncodeToBytes(MockStateDiff2)
@ -145,11 +141,11 @@ var (
MockStatediffPayload = statediff.Payload{ MockStatediffPayload = statediff.Payload{
BlockRlp: MockBlockRlp, BlockRlp: MockBlockRlp,
StateDiffRlp: MockStateDiffBytes, StateObjectRlp: MockStateDiffBytes,
} }
MockStatediffPayload2 = statediff.Payload{ MockStatediffPayload2 = statediff.Payload{
BlockRlp: MockBlockRlp2, BlockRlp: MockBlockRlp2,
StateDiffRlp: MockStateDiff2Bytes, StateObjectRlp: MockStateDiff2Bytes,
} }
CreatedExpectedStorageDiff = utils.StorageDiffInput{ CreatedExpectedStorageDiff = utils.StorageDiffInput{

View File

@ -124,8 +124,8 @@ var (
storageCID = "mockStorageCID1" storageCID = "mockStorageCID1"
storagePath = []byte{'\x01'} storagePath = []byte{'\x01'}
storageKey = crypto.Keccak256Hash(common.Hex2Bytes("0x0000000000000000000000000000000000000000000000000000000000000000")) storageKey = crypto.Keccak256Hash(common.Hex2Bytes("0x0000000000000000000000000000000000000000000000000000000000000000"))
storageModels1 = map[common.Hash][]eth2.StorageNodeModel{ storageModels1 = map[string][]eth2.StorageNodeModel{
crypto.Keccak256Hash(state1Path): { common.Bytes2Hex(state1Path): {
{ {
CID: storageCID, CID: storageCID,
StorageKey: storageKey.String(), StorageKey: storageKey.String(),

View File

@ -61,7 +61,7 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Convert
Receipts: make(types.Receipts, 0, trxLen), Receipts: make(types.Receipts, 0, trxLen),
ReceiptMetaData: make([]ReceiptModel, 0, trxLen), ReceiptMetaData: make([]ReceiptModel, 0, trxLen),
StateNodes: make([]TrieNode, 0), StateNodes: make([]TrieNode, 0),
StorageNodes: make(map[common.Hash][]TrieNode), StorageNodes: make(map[string][]TrieNode),
} }
signer := types.MakeSigner(pc.chainConfig, block.Number()) signer := types.MakeSigner(pc.chainConfig, block.Number())
transactions := block.Transactions() transactions := block.Transactions()
@ -100,10 +100,12 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Convert
} }
mappedContracts[log.Address.String()] = true mappedContracts[log.Address.String()] = true
} }
// These are the contracts seen in the logs
logContracts := make([]string, 0, len(mappedContracts)) logContracts := make([]string, 0, len(mappedContracts))
for addr := range mappedContracts { for addr := range mappedContracts {
logContracts = append(logContracts, addr) logContracts = append(logContracts, addr)
} }
// This is the contract address if this receipt is for a contract creation tx
contract := shared.HandleNullAddr(receipt.ContractAddress) contract := shared.HandleNullAddr(receipt.ContractAddress)
var contractHash string var contractHash string
if contract != "" { if contract != "" {
@ -124,60 +126,27 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Convert
} }
// Unpack state diff rlp to access fields // Unpack state diff rlp to access fields
stateDiff := new(statediff.StateDiff) stateDiff := new(statediff.StateObject)
if err := rlp.DecodeBytes(stateDiffPayload.StateDiffRlp, stateDiff); err != nil { if err := rlp.DecodeBytes(stateDiffPayload.StateObjectRlp, stateDiff); err != nil {
return nil, err return nil, err
} }
for _, createdAccount := range stateDiff.CreatedAccounts { for _, stateNode := range stateDiff.Nodes {
statePathHash := crypto.Keccak256Hash(createdAccount.Path) statePath := common.Bytes2Hex(stateNode.Path)
convertedPayload.StateNodes = append(convertedPayload.StateNodes, TrieNode{ convertedPayload.StateNodes = append(convertedPayload.StateNodes, TrieNode{
Path: createdAccount.Path, Path: stateNode.Path,
Value: createdAccount.NodeValue, Value: stateNode.NodeValue,
Type: createdAccount.NodeType, Type: stateNode.NodeType,
LeafKey: common.BytesToHash(createdAccount.LeafKey), LeafKey: common.BytesToHash(stateNode.LeafKey),
}) })
for _, storageDiff := range createdAccount.Storage { for _, storageNode := range stateNode.StorageNodes {
convertedPayload.StorageNodes[statePathHash] = append(convertedPayload.StorageNodes[statePathHash], TrieNode{ convertedPayload.StorageNodes[statePath] = append(convertedPayload.StorageNodes[statePath], TrieNode{
Path: storageDiff.Path, Path: storageNode.Path,
Value: storageDiff.NodeValue, Value: storageNode.NodeValue,
Type: storageDiff.NodeType, Type: storageNode.NodeType,
LeafKey: common.BytesToHash(storageDiff.LeafKey), LeafKey: common.BytesToHash(storageNode.LeafKey),
})
}
}
for _, deletedAccount := range stateDiff.DeletedAccounts {
statePathHash := crypto.Keccak256Hash(deletedAccount.Path)
convertedPayload.StateNodes = append(convertedPayload.StateNodes, TrieNode{
Path: deletedAccount.Path,
Value: deletedAccount.NodeValue,
Type: deletedAccount.NodeType,
LeafKey: common.BytesToHash(deletedAccount.LeafKey),
})
for _, storageDiff := range deletedAccount.Storage {
convertedPayload.StorageNodes[statePathHash] = append(convertedPayload.StorageNodes[statePathHash], TrieNode{
Path: storageDiff.Path,
Value: storageDiff.NodeValue,
Type: storageDiff.NodeType,
LeafKey: common.BytesToHash(storageDiff.LeafKey),
})
}
}
for _, updatedAccount := range stateDiff.UpdatedAccounts {
statePathHash := crypto.Keccak256Hash(updatedAccount.Path)
convertedPayload.StateNodes = append(convertedPayload.StateNodes, TrieNode{
Path: updatedAccount.Path,
Value: updatedAccount.NodeValue,
Type: updatedAccount.NodeType,
LeafKey: common.BytesToHash(updatedAccount.LeafKey),
})
for _, storageDiff := range updatedAccount.Storage {
convertedPayload.StorageNodes[statePathHash] = append(convertedPayload.StorageNodes[statePathHash], TrieNode{
Path: storageDiff.Path,
Value: storageDiff.NodeValue,
Type: storageDiff.NodeType,
LeafKey: common.BytesToHash(storageDiff.LeafKey),
}) })
} }
} }
return convertedPayload, nil return convertedPayload, nil
} }

View File

@ -291,7 +291,7 @@ func (s *ResponseFilterer) filterStateAndStorage(stateFilter StateFilter, storag
} }
} }
if !storageFilter.Off && checkNodeKeys(storageAddressFilters, stateNode.LeafKey) { if !storageFilter.Off && checkNodeKeys(storageAddressFilters, stateNode.LeafKey) {
for _, storageNode := range payload.StorageNodes[crypto.Keccak256Hash(stateNode.Path)] { for _, storageNode := range payload.StorageNodes[common.Bytes2Hex(stateNode.Path)] {
if checkNodeKeys(storageKeyFilters, storageNode.LeafKey) { if checkNodeKeys(storageKeyFilters, storageNode.LeafKey) {
cid, err := ipld.RawdataToCid(ipld.MEthStorageTrie, storageNode.Value, multihash.KECCAK_256) cid, err := ipld.RawdataToCid(ipld.MEthStorageTrie, storageNode.Value, multihash.KECCAK_256)
if err != nil { if err != nil {

View File

@ -26,6 +26,8 @@ func ResolveFromNodeType(nodeType statediff.NodeType) int {
return 1 return 1
case statediff.Leaf: case statediff.Leaf:
return 2 return 2
case statediff.Removed:
return 3
default: default:
return -1 return -1
} }
@ -39,6 +41,8 @@ func ResolveToNodeType(nodeType int) statediff.NodeType {
return statediff.Extension return statediff.Extension
case 2: case 2:
return statediff.Leaf return statediff.Leaf
case 3:
return statediff.Removed
default: default:
return statediff.Unknown return statediff.Unknown
} }

View File

@ -20,7 +20,6 @@ import (
"fmt" "fmt"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -159,13 +158,13 @@ func (in *CIDIndexer) indexStateAndStorageCIDs(tx *sqlx.Tx, payload *CIDPayload,
} }
// If we have a state leaf node, index the associated account and storage nodes // If we have a state leaf node, index the associated account and storage nodes
if stateCID.NodeType == 2 { if stateCID.NodeType == 2 {
pathKey := crypto.Keccak256Hash(stateCID.Path) statePath := common.Bytes2Hex(stateCID.Path)
for _, storageCID := range payload.StorageNodeCIDs[pathKey] { for _, storageCID := range payload.StorageNodeCIDs[statePath] {
if err := in.indexStorageCID(tx, storageCID, stateID); err != nil { if err := in.indexStorageCID(tx, storageCID, stateID); err != nil {
return err return err
} }
} }
if stateAccount, ok := payload.StateAccounts[pathKey]; ok { if stateAccount, ok := payload.StateAccounts[statePath]; ok {
if err := in.indexStateAccount(tx, stateAccount, stateID); err != nil { if err := in.indexStateAccount(tx, stateAccount, stateID); err != nil {
return err return err
} }

View File

@ -218,7 +218,7 @@ var (
nonce1 = uint64(1) nonce1 = uint64(1)
ContractRoot = "0x821e2556a290c86405f8160a2d662042a431ba456b9db265c79bb837c04be5f0" ContractRoot = "0x821e2556a290c86405f8160a2d662042a431ba456b9db265c79bb837c04be5f0"
ContractCodeHash = common.HexToHash("0x753f98a8d4328b15636e46f66f2cb4bc860100aa17967cc145fcd17d1d4710ea") ContractCodeHash = common.HexToHash("0x753f98a8d4328b15636e46f66f2cb4bc860100aa17967cc145fcd17d1d4710ea")
contractPathHash = crypto.Keccak256Hash([]byte{'\x06'}) contractPath = common.Bytes2Hex([]byte{'\x06'})
ContractLeafKey = testhelpers.AddressToLeafKey(ContractAddress) ContractLeafKey = testhelpers.AddressToLeafKey(ContractAddress)
ContractAccount, _ = rlp.EncodeToBytes(state.Account{ ContractAccount, _ = rlp.EncodeToBytes(state.Account{
Nonce: nonce1, Nonce: nonce1,
@ -235,7 +235,7 @@ var (
nonce0 = uint64(0) nonce0 = uint64(0)
AccountRoot = "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421" AccountRoot = "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"
AccountCodeHash = common.HexToHash("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470") AccountCodeHash = common.HexToHash("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470")
accountPathHash = crypto.Keccak256Hash([]byte{'\x0c'}) accountPath = common.Bytes2Hex([]byte{'\x0c'})
AccountAddresss = common.HexToAddress("0x0D3ab14BBaD3D99F4203bd7a11aCB94882050E7e") AccountAddresss = common.HexToAddress("0x0D3ab14BBaD3D99F4203bd7a11aCB94882050E7e")
AccountLeafKey = testhelpers.Account2LeafKey AccountLeafKey = testhelpers.Account2LeafKey
Account, _ = rlp.EncodeToBytes(state.Account{ Account, _ = rlp.EncodeToBytes(state.Account{
@ -250,13 +250,13 @@ var (
Account, Account,
}) })
CreatedAccountDiffs = []statediff.AccountDiff{ StateDiffs = []statediff.StateNode{
{ {
Path: []byte{'\x06'}, Path: []byte{'\x06'},
NodeType: statediff.Leaf, NodeType: statediff.Leaf,
LeafKey: ContractLeafKey, LeafKey: ContractLeafKey,
NodeValue: ContractLeafNode, NodeValue: ContractLeafNode,
Storage: []statediff.StorageDiff{ StorageNodes: []statediff.StorageNode{
{ {
Path: []byte{}, Path: []byte{},
NodeType: statediff.Leaf, NodeType: statediff.Leaf,
@ -270,14 +270,14 @@ var (
NodeType: statediff.Leaf, NodeType: statediff.Leaf,
LeafKey: AccountLeafKey, LeafKey: AccountLeafKey,
NodeValue: AccountLeafNode, NodeValue: AccountLeafNode,
Storage: []statediff.StorageDiff{}, StorageNodes: []statediff.StorageNode{},
}, },
} }
MockStateDiff = statediff.StateDiff{ MockStateDiff = statediff.StateObject{
BlockNumber: BlockNumber, BlockNumber: BlockNumber,
BlockHash: MockBlock.Hash(), BlockHash: MockBlock.Hash(),
CreatedAccounts: CreatedAccountDiffs, Nodes: StateDiffs,
} }
MockStateDiffBytes, _ = rlp.EncodeToBytes(MockStateDiff) MockStateDiffBytes, _ = rlp.EncodeToBytes(MockStateDiff)
MockStateNodes = []eth.TrieNode{ MockStateNodes = []eth.TrieNode{
@ -308,8 +308,8 @@ var (
StateKey: common.BytesToHash(AccountLeafKey).Hex(), StateKey: common.BytesToHash(AccountLeafKey).Hex(),
}, },
} }
MockStorageNodes = map[common.Hash][]eth.TrieNode{ MockStorageNodes = map[string][]eth.TrieNode{
contractPathHash: { contractPath: {
{ {
LeafKey: common.BytesToHash(StorageLeafKey), LeafKey: common.BytesToHash(StorageLeafKey),
Value: StorageLeafNode, Value: StorageLeafNode,
@ -322,7 +322,7 @@ var (
// aggregate payloads // aggregate payloads
MockStateDiffPayload = statediff.Payload{ MockStateDiffPayload = statediff.Payload{
BlockRlp: MockBlockRlp, BlockRlp: MockBlockRlp,
StateDiffRlp: MockStateDiffBytes, StateObjectRlp: MockStateDiffBytes,
ReceiptsRlp: ReceiptsRlp, ReceiptsRlp: ReceiptsRlp,
TotalDifficulty: MockBlock.Difficulty(), TotalDifficulty: MockBlock.Difficulty(),
} }
@ -360,8 +360,8 @@ var (
MockTransactions[2].Hash(): MockRctMetaPostPublish[2], MockTransactions[2].Hash(): MockRctMetaPostPublish[2],
}, },
StateNodeCIDs: MockStateMetaPostPublish, StateNodeCIDs: MockStateMetaPostPublish,
StorageNodeCIDs: map[common.Hash][]eth.StorageNodeModel{ StorageNodeCIDs: map[string][]eth.StorageNodeModel{
contractPathHash: { contractPath: {
{ {
CID: StorageCID.String(), CID: StorageCID.String(),
Path: []byte{}, Path: []byte{},
@ -370,14 +370,14 @@ var (
}, },
}, },
}, },
StateAccounts: map[common.Hash]eth.StateAccountModel{ StateAccounts: map[string]eth.StateAccountModel{
contractPathHash: { contractPath: {
Balance: big.NewInt(0).String(), Balance: big.NewInt(0).String(),
Nonce: nonce1, Nonce: nonce1,
CodeHash: ContractCodeHash.Bytes(), CodeHash: ContractCodeHash.Bytes(),
StorageRoot: common.HexToHash(ContractRoot).String(), StorageRoot: common.HexToHash(ContractRoot).String(),
}, },
accountPathHash: { accountPath: {
Balance: big.NewInt(1000).String(), Balance: big.NewInt(1000).String(),
Nonce: nonce0, Nonce: nonce0,
CodeHash: AccountCodeHash.Bytes(), CodeHash: AccountCodeHash.Bytes(),

View File

@ -38,34 +38,41 @@ type PayloadFetcher struct {
// http.Client is thread-safe // http.Client is thread-safe
client BatchClient client BatchClient
timeout time.Duration timeout time.Duration
params statediff.Params
} }
const method = "statediff_stateDiffAt" const method = "statediff_stateDiffAt"
// NewStateDiffFetcher returns a PayloadFetcher // NewPayloadFetcher returns a PayloadFetcher
func NewPayloadFetcher(bc BatchClient, timeout time.Duration) *PayloadFetcher { func NewPayloadFetcher(bc BatchClient, timeout time.Duration) *PayloadFetcher {
return &PayloadFetcher{ return &PayloadFetcher{
client: bc, client: bc,
timeout: timeout, timeout: timeout,
params: statediff.Params{
IncludeReceipts: true,
IncludeTD: true,
IncludeBlock: true,
IntermediateStateNodes: true,
IntermediateStorageNodes: true,
},
} }
} }
// FetchAt fetches the statediff payloads at the given block heights // FetchAt fetches the statediff payloads at the given block heights
// Calls StateDiffAt(ctx context.Context, blockNumber uint64) (*Payload, error) // Calls StateDiffAt(ctx context.Context, blockNumber uint64, params Params) (*Payload, error)
func (fetcher *PayloadFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChainData, error) { func (fetcher *PayloadFetcher) FetchAt(blockHeights []uint64) ([]shared.RawChainData, error) {
batch := make([]rpc.BatchElem, 0) batch := make([]rpc.BatchElem, 0)
for _, height := range blockHeights { for _, height := range blockHeights {
batch = append(batch, rpc.BatchElem{ batch = append(batch, rpc.BatchElem{
Method: method, Method: method,
Args: []interface{}{height}, Args: []interface{}{height, fetcher.params},
Result: new(statediff.Payload), Result: new(statediff.Payload),
}) })
} }
ctx, cancel := context.WithTimeout(context.Background(), fetcher.timeout) ctx, cancel := context.WithTimeout(context.Background(), fetcher.timeout)
defer cancel() defer cancel()
batchErr := fetcher.client.BatchCallContext(ctx, batch) if err := fetcher.client.BatchCallContext(ctx, batch); err != nil {
if batchErr != nil { return nil, fmt.Errorf("ethereum PayloadFetcher batch err for block range %d-%d: %s", blockHeights[0], blockHeights[len(blockHeights)-1], err.Error())
return nil, fmt.Errorf("ethereum PayloadFetcher batch err for block range %d-%d: %s", blockHeights[0], blockHeights[len(blockHeights)-1], batchErr.Error())
} }
results := make([]shared.RawChainData, 0, len(blockHeights)) results := make([]shared.RawChainData, 0, len(blockHeights))
for _, batchElem := range batch { for _, batchElem := range batch {

View File

@ -36,10 +36,10 @@ var _ = Describe("StateDiffFetcher", func() {
) )
BeforeEach(func() { BeforeEach(func() {
mc = new(mocks.BackFillerClient) mc = new(mocks.BackFillerClient)
setDiffAtErr1 := mc.SetReturnDiffAt(test_data.BlockNumber.Uint64(), test_data.MockStatediffPayload) err := mc.SetReturnDiffAt(test_data.BlockNumber.Uint64(), test_data.MockStatediffPayload)
Expect(setDiffAtErr1).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
setDiffAtErr2 := mc.SetReturnDiffAt(test_data.BlockNumber2.Uint64(), test_data.MockStatediffPayload2) err = mc.SetReturnDiffAt(test_data.BlockNumber2.Uint64(), test_data.MockStatediffPayload2)
Expect(setDiffAtErr2).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
stateDiffFetcher = eth.NewPayloadFetcher(mc, time.Second*60) stateDiffFetcher = eth.NewPayloadFetcher(mc, time.Second*60)
}) })
It("Batch calls statediff_stateDiffAt", func() { It("Batch calls statediff_stateDiffAt", func() {
@ -47,8 +47,8 @@ var _ = Describe("StateDiffFetcher", func() {
test_data.BlockNumber.Uint64(), test_data.BlockNumber.Uint64(),
test_data.BlockNumber2.Uint64(), test_data.BlockNumber2.Uint64(),
} }
stateDiffPayloads, fetchErr := stateDiffFetcher.FetchAt(blockHeights) stateDiffPayloads, err := stateDiffFetcher.FetchAt(blockHeights)
Expect(fetchErr).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(len(stateDiffPayloads)).To(Equal(2)) Expect(len(stateDiffPayloads)).To(Equal(2))
payload1, ok := stateDiffPayloads[0].(statediff.Payload) payload1, ok := stateDiffPayloads[0].(statediff.Payload)
Expect(ok).To(BeTrue()) Expect(ok).To(BeTrue())

View File

@ -19,8 +19,8 @@ package eth
import ( import (
"fmt" "fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
@ -195,8 +195,7 @@ func (pub *IPLDPublisherAndIndexer) publishAndIndexStateAndStorage(tx *sqlx.Tx,
if err := pub.indexer.indexStateAccount(tx, accountModel, stateID); err != nil { if err := pub.indexer.indexStateAccount(tx, accountModel, stateID); err != nil {
return err return err
} }
statePathHash := crypto.Keccak256Hash(stateNode.Path) for _, storageNode := range ipldPayload.StorageNodes[common.Bytes2Hex(stateNode.Path)] {
for _, storageNode := range ipldPayload.StorageNodes[statePathHash] {
storageCIDStr, err := shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, storageNode.Value) storageCIDStr, err := shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, storageNode.Value)
if err != nil { if err != nil {
return err return err

View File

@ -22,7 +22,6 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
@ -206,9 +205,9 @@ func (pub *IPLDPublisher) publishReceipts(receipts []*ipld.EthReceipt, receiptTr
return rctCids, nil return rctCids, nil
} }
func (pub *IPLDPublisher) publishStateNodes(stateNodes []TrieNode) ([]StateNodeModel, map[common.Hash]StateAccountModel, error) { func (pub *IPLDPublisher) publishStateNodes(stateNodes []TrieNode) ([]StateNodeModel, map[string]StateAccountModel, error) {
stateNodeCids := make([]StateNodeModel, 0, len(stateNodes)) stateNodeCids := make([]StateNodeModel, 0, len(stateNodes))
stateAccounts := make(map[common.Hash]StateAccountModel) stateAccounts := make(map[string]StateAccountModel)
for _, stateNode := range stateNodes { for _, stateNode := range stateNodes {
node, err := ipld.FromStateTrieRLP(stateNode.Value) node, err := ipld.FromStateTrieRLP(stateNode.Value)
if err != nil { if err != nil {
@ -238,8 +237,8 @@ func (pub *IPLDPublisher) publishStateNodes(stateNodes []TrieNode) ([]StateNodeM
return nil, nil, err return nil, nil, err
} }
// Map state account to the state path hash // Map state account to the state path hash
statePathHash := crypto.Keccak256Hash(stateNode.Path) statePath := common.Bytes2Hex(stateNode.Path)
stateAccounts[statePathHash] = StateAccountModel{ stateAccounts[statePath] = StateAccountModel{
Balance: account.Balance.String(), Balance: account.Balance.String(),
Nonce: account.Nonce, Nonce: account.Nonce,
CodeHash: account.CodeHash, CodeHash: account.CodeHash,
@ -250,10 +249,10 @@ func (pub *IPLDPublisher) publishStateNodes(stateNodes []TrieNode) ([]StateNodeM
return stateNodeCids, stateAccounts, nil return stateNodeCids, stateAccounts, nil
} }
func (pub *IPLDPublisher) publishStorageNodes(storageNodes map[common.Hash][]TrieNode) (map[common.Hash][]StorageNodeModel, error) { func (pub *IPLDPublisher) publishStorageNodes(storageNodes map[string][]TrieNode) (map[string][]StorageNodeModel, error) {
storageLeafCids := make(map[common.Hash][]StorageNodeModel) storageLeafCids := make(map[string][]StorageNodeModel)
for pathHash, storageTrie := range storageNodes { for path, storageTrie := range storageNodes {
storageLeafCids[pathHash] = make([]StorageNodeModel, 0, len(storageTrie)) storageLeafCids[path] = make([]StorageNodeModel, 0, len(storageTrie))
for _, storageNode := range storageTrie { for _, storageNode := range storageTrie {
node, err := ipld.FromStorageTrieRLP(storageNode.Value) node, err := ipld.FromStorageTrieRLP(storageNode.Value)
if err != nil { if err != nil {
@ -264,7 +263,7 @@ func (pub *IPLDPublisher) publishStorageNodes(storageNodes map[common.Hash][]Tri
return nil, err return nil, err
} }
// Map storage node cids to the state path hash // Map storage node cids to the state path hash
storageLeafCids[pathHash] = append(storageLeafCids[pathHash], StorageNodeModel{ storageLeafCids[path] = append(storageLeafCids[path], StorageNodeModel{
Path: storageNode.Path, Path: storageNode.Path,
StorageKey: storageNode.LeafKey.Hex(), StorageKey: storageNode.LeafKey.Hex(),
CID: cid, CID: cid,

View File

@ -38,12 +38,20 @@ type StreamClient interface {
// PayloadStreamer satisfies the PayloadStreamer interface for ethereum // PayloadStreamer satisfies the PayloadStreamer interface for ethereum
type PayloadStreamer struct { type PayloadStreamer struct {
Client StreamClient Client StreamClient
params statediff.Params
} }
// NewPayloadStreamer creates a pointer to a new PayloadStreamer which satisfies the PayloadStreamer interface for ethereum // NewPayloadStreamer creates a pointer to a new PayloadStreamer which satisfies the PayloadStreamer interface for ethereum
func NewPayloadStreamer(client StreamClient) *PayloadStreamer { func NewPayloadStreamer(client StreamClient) *PayloadStreamer {
return &PayloadStreamer{ return &PayloadStreamer{
Client: client, Client: client,
params: statediff.Params{
IncludeBlock: true,
IncludeTD: true,
IncludeReceipts: true,
IntermediateStorageNodes: true,
IntermediateStateNodes: true,
},
} }
} }
@ -60,5 +68,5 @@ func (ps *PayloadStreamer) Stream(payloadChan chan shared.RawChainData) (shared.
} }
} }
}() }()
return ps.Client.Subscribe(context.Background(), "statediff", stateDiffChan, "stream") return ps.Client.Subscribe(context.Background(), "statediff", stateDiffChan, "stream", ps.params)
} }

View File

@ -37,7 +37,7 @@ type ConvertedPayload struct {
Receipts types.Receipts Receipts types.Receipts
ReceiptMetaData []ReceiptModel ReceiptMetaData []ReceiptModel
StateNodes []TrieNode StateNodes []TrieNode
StorageNodes map[common.Hash][]TrieNode StorageNodes map[string][]TrieNode
} }
// Height satisfies the StreamedIPLDs interface // Height satisfies the StreamedIPLDs interface
@ -62,8 +62,8 @@ type CIDPayload struct {
TransactionCIDs []TxModel TransactionCIDs []TxModel
ReceiptCIDs map[common.Hash]ReceiptModel ReceiptCIDs map[common.Hash]ReceiptModel
StateNodeCIDs []StateNodeModel StateNodeCIDs []StateNodeModel
StateAccounts map[common.Hash]StateAccountModel StateAccounts map[string]StateAccountModel
StorageNodeCIDs map[common.Hash][]StorageNodeModel StorageNodeCIDs map[string][]StorageNodeModel
} }
// CIDWrapper is used to direct fetching of IPLDs from IPFS // CIDWrapper is used to direct fetching of IPLDs from IPFS

View File

@ -56,7 +56,7 @@ func (pc *WatcherConverter) Convert(ethIPLDs eth.IPLDs) (*eth.CIDPayload, error)
cids.TransactionCIDs = make([]eth.TxModel, numTxs) cids.TransactionCIDs = make([]eth.TxModel, numTxs)
cids.ReceiptCIDs = make(map[common.Hash]eth.ReceiptModel, numTxs) cids.ReceiptCIDs = make(map[common.Hash]eth.ReceiptModel, numTxs)
cids.StateNodeCIDs = make([]eth.StateNodeModel, len(ethIPLDs.StateNodes)) cids.StateNodeCIDs = make([]eth.StateNodeModel, len(ethIPLDs.StateNodes))
cids.StorageNodeCIDs = make(map[common.Hash][]eth.StorageNodeModel, len(ethIPLDs.StateNodes)) cids.StorageNodeCIDs = make(map[string][]eth.StorageNodeModel, len(ethIPLDs.StateNodes))
// Unpack header // Unpack header
var header types.Header var header types.Header
@ -164,7 +164,7 @@ func (pc *WatcherConverter) Convert(ethIPLDs eth.IPLDs) (*eth.CIDPayload, error)
} }
// Storage data // Storage data
for _, storageIPLD := range ethIPLDs.StorageNodes { for _, storageIPLD := range ethIPLDs.StorageNodes {
cids.StorageNodeCIDs[storageIPLD.StateLeafKey] = append(cids.StorageNodeCIDs[storageIPLD.StateLeafKey], eth.StorageNodeModel{ cids.StorageNodeCIDs[storageIPLD.StateLeafKey.Hex()] = append(cids.StorageNodeCIDs[storageIPLD.StateLeafKey.Hex()], eth.StorageNodeModel{
CID: storageIPLD.IPLD.CID, CID: storageIPLD.IPLD.CID,
NodeType: eth.ResolveFromNodeType(storageIPLD.Type), NodeType: eth.ResolveFromNodeType(storageIPLD.Type),
StorageKey: storageIPLD.StorageLeafKey.String(), StorageKey: storageIPLD.StorageLeafKey.String(),