From 5173edf56382f3f50920e51bf25619f8a017450c Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Mon, 10 Feb 2020 11:11:44 -0600 Subject: [PATCH] fix broken go-ipld-eth trie node dag putters --- .../00012_create_eth_header_cids_table.sql | 2 +- db/schema.sql | 2 +- environments/btcSuperNode.toml | 26 ++ environments/ethSuperNode.toml | 26 ++ pkg/ipfs/ipld/btc_header.go | 19 - pkg/ipfs/ipld/btc_tx.go | 19 - pkg/ipfs/ipld/eth_header.go | 18 - pkg/ipfs/ipld/eth_receipt.go | 18 - pkg/ipfs/ipld/eth_state.go | 70 ++- pkg/ipfs/ipld/eth_storage.go | 57 ++- pkg/ipfs/ipld/shared.go | 13 - pkg/ipfs/ipld/trie_node.go | 440 ------------------ pkg/super_node/eth/streamer.go | 14 +- pkg/super_node/eth/streamer_test.go | 3 +- 14 files changed, 135 insertions(+), 592 deletions(-) create mode 100644 environments/btcSuperNode.toml create mode 100644 environments/ethSuperNode.toml delete mode 100644 pkg/ipfs/ipld/trie_node.go diff --git a/db/migrations/00012_create_eth_header_cids_table.sql b/db/migrations/00012_create_eth_header_cids_table.sql index 5e646e0f..a676b12e 100644 --- a/db/migrations/00012_create_eth_header_cids_table.sql +++ b/db/migrations/00012_create_eth_header_cids_table.sql @@ -5,7 +5,7 @@ CREATE TABLE eth.header_cids ( block_hash VARCHAR(66) NOT NULL, parent_hash VARCHAR(66) NOT NULL, cid TEXT NOT NULL, - td BIGINT, + td NUMERIC NOT NULL, node_id INTEGER NOT NULL REFERENCES nodes (id) ON DELETE CASCADE, UNIQUE (block_number, block_hash) ); diff --git a/db/schema.sql b/db/schema.sql index 4dc99692..74398a65 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -183,7 +183,7 @@ CREATE TABLE eth.header_cids ( block_hash character varying(66) NOT NULL, parent_hash character varying(66) NOT NULL, cid text NOT NULL, - td bigint, + td numeric NOT NULL, node_id integer NOT NULL ); diff --git a/environments/btcSuperNode.toml b/environments/btcSuperNode.toml new file mode 100644 index 00000000..b99c7a14 --- /dev/null +++ b/environments/btcSuperNode.toml @@ -0,0 +1,26 @@ +[superNode] + chain = "bitcoin" + ipfsPath = "/root/.ipfs" + + [superNode.database] + name = "vulcanize_public" + hostname = "localhost" + port = 5432 + user = "ec2-user" + + [superNode.sync] + on = true + wsPath = "http://127.0.0.1:8332" + workers = 1 + + [superNode.server] + on = true + ipcPath = "/root/.vulcanize/btc/vulcanize.ipc" + wsPath = "127.0.0.1:8082" + httpPath = "127.0.0.1:8083" + + [superNode.backFill] + on = true + httpPath = "http://127.0.0.1:8332" + frequency = 5 + batchSize = 50 \ No newline at end of file diff --git a/environments/ethSuperNode.toml b/environments/ethSuperNode.toml new file mode 100644 index 00000000..f489d1ef --- /dev/null +++ b/environments/ethSuperNode.toml @@ -0,0 +1,26 @@ +[superNode] + chain = "ethereum" + ipfsPath = "/root/.ipfs" + + [superNode.database] + name = "vulcanize_public" + hostname = "localhost" + port = 5432 + user = "ec2-user" + + [superNode.sync] + on = true + wsPath = "ws://127.0.0.1:8546" + workers = 1 + + [superNode.server] + on = true + ipcPath = "/root/.vulcanize/eth/vulcanize.ipc" + wsPath = "127.0.0.1:8080" + httpPath = "127.0.0.1:8081" + + [superNode.backFill] + on = true + httpPath = "http://127.0.0.1:8545" + frequency = 5 + batchSize = 50 \ No newline at end of file diff --git a/pkg/ipfs/ipld/btc_header.go b/pkg/ipfs/ipld/btc_header.go index 4a09d4ab..8e630057 100644 --- a/pkg/ipfs/ipld/btc_header.go +++ b/pkg/ipfs/ipld/btc_header.go @@ -59,25 +59,6 @@ func NewBtcHeader(header *wire.BlockHeader) (*BtcHeader, error) { }, nil } -/* - OUTPUT -*/ - -// DecodeBtcHeader takes a cid and its raw binary data -// from IPFS and returns an BtcHeader object for further processing. -func DecodeBtcHeader(c cid.Cid, b []byte) (*BtcHeader, error) { - var h *wire.BlockHeader - w := bytes.NewBuffer(b) - if err := h.Deserialize(w); err != nil { - return nil, err - } - return &BtcHeader{ - BlockHeader: h, - cid: c, - rawdata: b, - }, nil -} - /* Block INTERFACE */ diff --git a/pkg/ipfs/ipld/btc_tx.go b/pkg/ipfs/ipld/btc_tx.go index e92b29c1..b535a233 100644 --- a/pkg/ipfs/ipld/btc_tx.go +++ b/pkg/ipfs/ipld/btc_tx.go @@ -44,25 +44,6 @@ func NewBtcTx(tx *wire.MsgTx) (*BtcTx, error) { }, nil } -/* - OUTPUT -*/ - -// DecodeBtcTx takes a cid and its raw binary data -// from IPFS and returns an BtcTx object for further processing. -func DecodeBtcTx(c cid.Cid, b []byte) (*BtcTx, error) { - var tx *wire.MsgTx - w := bytes.NewBuffer(b) - if err := tx.Deserialize(w); err != nil { - return nil, err - } - return &BtcTx{ - MsgTx: tx, - cid: c, - rawdata: b, - }, nil -} - /* Block INTERFACE */ diff --git a/pkg/ipfs/ipld/eth_header.go b/pkg/ipfs/ipld/eth_header.go index 7ae89c28..3f0ae730 100644 --- a/pkg/ipfs/ipld/eth_header.go +++ b/pkg/ipfs/ipld/eth_header.go @@ -60,24 +60,6 @@ func NewEthHeader(header *types.Header) (*EthHeader, error) { }, nil } -/* - OUTPUT -*/ - -// DecodeEthHeader takes a cid and its raw binary data -// from IPFS and returns an EthHeader object for further processing. -func DecodeEthHeader(c cid.Cid, b []byte) (*EthHeader, error) { - var h *types.Header - if err := rlp.DecodeBytes(b, h); err != nil { - return nil, err - } - return &EthHeader{ - Header: h, - cid: c, - rawdata: b, - }, nil -} - /* Block INTERFACE */ diff --git a/pkg/ipfs/ipld/eth_receipt.go b/pkg/ipfs/ipld/eth_receipt.go index bbceb09f..99915f1f 100644 --- a/pkg/ipfs/ipld/eth_receipt.go +++ b/pkg/ipfs/ipld/eth_receipt.go @@ -59,24 +59,6 @@ func NewReceipt(receipt *types.ReceiptForStorage) (*EthReceipt, error) { }, nil } -/* - OUTPUT -*/ - -// DecodeEthReceipt takes a cid and its raw binary data -// from IPFS and returns an EthReceipt object for further processing. -func DecodeEthReceipt(c cid.Cid, b []byte) (*EthReceipt, error) { - var r *types.ReceiptForStorage - if err := rlp.DecodeBytes(b, r); err != nil { - return nil, err - } - return &EthReceipt{ - ReceiptForStorage: r, - cid: c, - rawdata: b, - }, nil -} - /* Block INTERFACE */ diff --git a/pkg/ipfs/ipld/eth_state.go b/pkg/ipfs/ipld/eth_state.go index bcc9cb34..c01765bb 100644 --- a/pkg/ipfs/ipld/eth_state.go +++ b/pkg/ipfs/ipld/eth_state.go @@ -19,7 +19,6 @@ package ipld import ( "fmt" - "github.com/ethereum/go-ethereum/rlp" "github.com/ipfs/go-cid" node "github.com/ipfs/go-ipld-format" mh "github.com/multiformats/go-multihash" @@ -28,7 +27,8 @@ import ( // EthStateTrie (eth-state-trie, codec 0x96), represents // a node from the state trie in ethereum. type EthStateTrie struct { - *TrieNode + cid cid.Cid + rawdata []byte } // Static (compile time) check that EthStateTrie satisfies the node.Node interface. @@ -45,40 +45,9 @@ func FromStateTrieRLP(stateNodeRLP []byte) (*EthStateTrie, error) { if err != nil { return nil, err } - return DecodeEthStateTrie(c, stateNodeRLP) -} - -/* - OUTPUT -*/ - -// DecodeEthStateTrie returns an EthStateTrie object from its cid and rawdata. -func DecodeEthStateTrie(c cid.Cid, b []byte) (*EthStateTrie, error) { - tn, err := decodeTrieNode(c, b, decodeEthStateTrieLeaf) - if err != nil { - return nil, err - } - return &EthStateTrie{TrieNode: tn}, nil -} - -// decodeEthStateTrieLeaf parses a eth-tx-trie leaf -// from decoded RLP elements -func decodeEthStateTrieLeaf(i []interface{}) ([]interface{}, error) { - var account EthAccount - if err := rlp.DecodeBytes(i[1].([]byte), &account); err != nil { - return nil, err - } - c, err := rawdataToCid(MEthAccountSnapshot, i[1].([]byte), mh.KECCAK_256) - if err != nil { - return nil, err - } - return []interface{}{ - i[0].([]byte), - &EthAccountSnapshot{ - EthAccount: &account, - cid: c, - rawdata: i[1].([]byte), - }, + return &EthStateTrie{ + cid: c, + rawdata: stateNodeRLP, }, nil } @@ -101,6 +70,35 @@ func (st *EthStateTrie) String() string { return fmt.Sprintf("", st.cid) } +// Copy will go away. It is here to comply with the Node interface. +func (*EthStateTrie) Copy() node.Node { + panic("implement me") +} + +func (*EthStateTrie) Links() []*node.Link { + panic("implement me") +} + +func (*EthStateTrie) Resolve(path []string) (interface{}, []string, error) { + panic("implement me") +} + +func (*EthStateTrie) ResolveLink(path []string) (*node.Link, []string, error) { + panic("implement me") +} + +func (*EthStateTrie) Size() (uint64, error) { + panic("implement me") +} + +func (*EthStateTrie) Stat() (*node.NodeStat, error) { + panic("implement me") +} + +func (*EthStateTrie) Tree(path string, depth int) []string { + panic("implement me") +} + // Loggable returns in a map the type of IPLD Link. func (st *EthStateTrie) Loggable() map[string]interface{} { return map[string]interface{}{ diff --git a/pkg/ipfs/ipld/eth_storage.go b/pkg/ipfs/ipld/eth_storage.go index d1dc1c9f..6d9e1cbe 100644 --- a/pkg/ipfs/ipld/eth_storage.go +++ b/pkg/ipfs/ipld/eth_storage.go @@ -27,7 +27,8 @@ import ( // EthStorageTrie (eth-storage-trie, codec 0x98), represents // a node from the storage trie in ethereum. type EthStorageTrie struct { - *TrieNode + cid cid.Cid + rawdata []byte } // Static (compile time) check that EthStorageTrie satisfies the node.Node interface. @@ -44,28 +45,9 @@ func FromStorageTrieRLP(storageNodeRLP []byte) (*EthStorageTrie, error) { if err != nil { return nil, err } - return DecodeEthStorageTrie(c, storageNodeRLP) -} - -/* - OUTPUT -*/ - -// DecodeEthStorageTrie returns an EthStorageTrie object from its cid and rawdata. -func DecodeEthStorageTrie(c cid.Cid, b []byte) (*EthStorageTrie, error) { - tn, err := decodeTrieNode(c, b, decodeEthStorageTrieLeaf) - if err != nil { - return nil, err - } - return &EthStorageTrie{TrieNode: tn}, nil -} - -// decodeEthStorageTrieLeaf parses a eth-tx-trie leaf -// from decoded RLP elements -func decodeEthStorageTrieLeaf(i []interface{}) ([]interface{}, error) { - return []interface{}{ - i[0].([]byte), - i[1].([]byte), + return &EthStorageTrie{ + cid: c, + rawdata: storageNodeRLP, }, nil } @@ -88,6 +70,35 @@ func (st *EthStorageTrie) String() string { return fmt.Sprintf("", st.cid) } +// Copy will go away. It is here to comply with the Node interface. +func (*EthStorageTrie) Copy() node.Node { + panic("implement me") +} + +func (*EthStorageTrie) Links() []*node.Link { + panic("implement me") +} + +func (*EthStorageTrie) Resolve(path []string) (interface{}, []string, error) { + panic("implement me") +} + +func (*EthStorageTrie) ResolveLink(path []string) (*node.Link, []string, error) { + panic("implement me") +} + +func (*EthStorageTrie) Size() (uint64, error) { + panic("implement me") +} + +func (*EthStorageTrie) Stat() (*node.NodeStat, error) { + panic("implement me") +} + +func (*EthStorageTrie) Tree(path string, depth int) []string { + panic("implement me") +} + // Loggable returns in a map the type of IPLD Link. func (st *EthStorageTrie) Loggable() map[string]interface{} { return map[string]interface{}{ diff --git a/pkg/ipfs/ipld/shared.go b/pkg/ipfs/ipld/shared.go index 0d87c0ee..a47debe7 100644 --- a/pkg/ipfs/ipld/shared.go +++ b/pkg/ipfs/ipld/shared.go @@ -17,10 +17,7 @@ package ipld import ( - "bytes" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/rlp" "github.com/ipfs/go-cid" mh "github.com/multiformats/go-multihash" ) @@ -90,13 +87,3 @@ func sha256ToCid(codec uint64, h []byte) cid.Cid { return cid.NewCidV1(codec, hash) } - -// getRLP encodes the given object to RLP returning its bytes. -func getRLP(object interface{}) []byte { - buf := new(bytes.Buffer) - if err := rlp.Encode(buf, object); err != nil { - panic(err) - } - - return buf.Bytes() -} diff --git a/pkg/ipfs/ipld/trie_node.go b/pkg/ipfs/ipld/trie_node.go deleted file mode 100644 index 4534138e..00000000 --- a/pkg/ipfs/ipld/trie_node.go +++ /dev/null @@ -1,440 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package ipld - -import ( - "encoding/json" - "fmt" - - "github.com/ethereum/go-ethereum/rlp" - "github.com/ipfs/go-cid" - node "github.com/ipfs/go-ipld-format" -) - -// TrieNode is the general abstraction for -//ethereum IPLD trie nodes. -type TrieNode struct { - // leaf, extension or branch - nodeKind string - - // If leaf or extension: [0] is key, [1] is val. - // If branch: [0] - [16] are children. - elements []interface{} - - // IPLD block information - cid cid.Cid - rawdata []byte -} - -/* - OUTPUT -*/ - -type trieNodeLeafDecoder func([]interface{}) ([]interface{}, error) - -// decodeTrieNode returns a TrieNode object from an IPLD block's -// cid and rawdata. -func decodeTrieNode(c cid.Cid, b []byte, - leafDecoder trieNodeLeafDecoder) (*TrieNode, error) { - var ( - i, decoded, elements []interface{} - nodeKind string - err error - ) - - err = rlp.DecodeBytes(b, &i) - if err != nil { - return nil, err - } - - codec := c.Type() - switch len(i) { - case 2: - nodeKind, decoded, err = decodeCompactKey(i) - if err != nil { - return nil, err - } - - if nodeKind == "extension" { - elements, err = parseTrieNodeExtension(decoded, codec) - } - if nodeKind == "leaf" { - elements, err = leafDecoder(decoded) - } - if nodeKind != "extension" && nodeKind != "leaf" { - return nil, fmt.Errorf("unexpected nodeKind returned from decoder") - } - case 17: - nodeKind = "branch" - elements, err = parseTrieNodeBranch(i, codec) - if err != nil { - return nil, err - } - default: - return nil, fmt.Errorf("unknown trie node type") - } - - return &TrieNode{ - nodeKind: nodeKind, - elements: elements, - rawdata: b, - cid: c, - }, nil -} - -// decodeCompactKey takes a compact key, and returns its nodeKind and value. -func decodeCompactKey(i []interface{}) (string, []interface{}, error) { - first := i[0].([]byte) - last := i[1].([]byte) - - switch first[0] / 16 { - case '\x00': - return "extension", []interface{}{ - nibbleToByte(first)[2:], - last, - }, nil - case '\x01': - return "extension", []interface{}{ - nibbleToByte(first)[1:], - last, - }, nil - case '\x02': - return "leaf", []interface{}{ - nibbleToByte(first)[2:], - last, - }, nil - case '\x03': - return "leaf", []interface{}{ - nibbleToByte(first)[1:], - last, - }, nil - default: - return "", nil, fmt.Errorf("unknown hex prefix") - } -} - -// parseTrieNodeExtension helper improves readability -func parseTrieNodeExtension(i []interface{}, codec uint64) ([]interface{}, error) { - return []interface{}{ - i[0].([]byte), - keccak256ToCid(codec, i[1].([]byte)), - }, nil -} - -// parseTrieNodeBranch helper improves readability -func parseTrieNodeBranch(i []interface{}, codec uint64) ([]interface{}, error) { - var out []interface{} - - for _, vi := range i { - v := vi.([]byte) - - switch len(v) { - case 0: - out = append(out, nil) - case 32: - out = append(out, keccak256ToCid(codec, v)) - default: - return nil, fmt.Errorf("unrecognized object: %v", v) - } - } - - return out, nil -} - -/* - Node INTERFACE -*/ - -// Resolve resolves a path through this node, stopping at any link boundary -// and returning the object found as well as the remaining path to traverse -func (t *TrieNode) Resolve(p []string) (interface{}, []string, error) { - switch t.nodeKind { - case "extension": - return t.resolveTrieNodeExtension(p) - case "leaf": - return t.resolveTrieNodeLeaf(p) - case "branch": - return t.resolveTrieNodeBranch(p) - default: - return nil, nil, fmt.Errorf("nodeKind case not implemented") - } -} - -// Tree lists all paths within the object under 'path', and up to the given depth. -// To list the entire object (similar to `find .`) pass "" and -1 -func (t *TrieNode) Tree(p string, depth int) []string { - if p != "" || depth == 0 { - return nil - } - - var out []string - - switch t.nodeKind { - case "extension": - var val string - for _, e := range t.elements[0].([]byte) { - val += fmt.Sprintf("%x", e) - } - return []string{val} - case "branch": - for i, elem := range t.elements { - if _, ok := elem.(*cid.Cid); ok { - out = append(out, fmt.Sprintf("%x", i)) - } - } - return out - - default: - return nil - } -} - -// ResolveLink is a helper function that calls resolve and asserts the -// output is a link -func (t *TrieNode) ResolveLink(p []string) (*node.Link, []string, error) { - obj, rest, err := t.Resolve(p) - if err != nil { - return nil, nil, err - } - - lnk, ok := obj.(*node.Link) - if !ok { - return nil, nil, fmt.Errorf("was not a link") - } - - return lnk, rest, nil -} - -// Copy will go away. It is here to comply with the interface. -func (t *TrieNode) Copy() node.Node { - panic("dont use this yet") -} - -// Links is a helper function that returns all links within this object -func (t *TrieNode) Links() []*node.Link { - var out []*node.Link - - for _, i := range t.elements { - c, ok := i.(cid.Cid) - if ok { - out = append(out, &node.Link{Cid: c}) - } - } - - return out -} - -// Stat will go away. It is here to comply with the interface. -func (t *TrieNode) Stat() (*node.NodeStat, error) { - return &node.NodeStat{}, nil -} - -// Size will go away. It is here to comply with the interface. -func (t *TrieNode) Size() (uint64, error) { - return 0, nil -} - -/* - TrieNode functions -*/ - -// MarshalJSON processes the transaction trie into readable JSON format. -func (t *TrieNode) MarshalJSON() ([]byte, error) { - var out map[string]interface{} - - switch t.nodeKind { - case "extension": - fallthrough - case "leaf": - var hexPrefix string - for _, e := range t.elements[0].([]byte) { - hexPrefix += fmt.Sprintf("%x", e) - } - - // if we got a byte we need to do this casting otherwise - // it will be marshaled to a base64 encoded value - if _, ok := t.elements[1].([]byte); ok { - var hexVal string - for _, e := range t.elements[1].([]byte) { - hexVal += fmt.Sprintf("%x", e) - } - - t.elements[1] = hexVal - } - - out = map[string]interface{}{ - "type": t.nodeKind, - hexPrefix: t.elements[1], - } - - case "branch": - out = map[string]interface{}{ - "type": "branch", - "0": t.elements[0], - "1": t.elements[1], - "2": t.elements[2], - "3": t.elements[3], - "4": t.elements[4], - "5": t.elements[5], - "6": t.elements[6], - "7": t.elements[7], - "8": t.elements[8], - "9": t.elements[9], - "a": t.elements[10], - "b": t.elements[11], - "c": t.elements[12], - "d": t.elements[13], - "e": t.elements[14], - "f": t.elements[15], - } - default: - return nil, fmt.Errorf("nodeKind %s not supported", t.nodeKind) - } - - return json.Marshal(out) -} - -// nibbleToByte expands the nibbles of a byte slice into their own bytes. -func nibbleToByte(k []byte) []byte { - var out []byte - - for _, b := range k { - out = append(out, b/16) - out = append(out, b%16) - } - - return out -} - -// Resolve reading conveniences -func (t *TrieNode) resolveTrieNodeExtension(p []string) (interface{}, []string, error) { - nibbles := t.elements[0].([]byte) - idx, rest := shiftFromPath(p, len(nibbles)) - if len(idx) < len(nibbles) { - return nil, nil, fmt.Errorf("not enough nibbles to traverse this extension") - } - - for _, i := range idx { - if getHexIndex(string(i)) == -1 { - return nil, nil, fmt.Errorf("invalid path element") - } - } - - for i, n := range nibbles { - if string(idx[i]) != fmt.Sprintf("%x", n) { - return nil, nil, fmt.Errorf("no such link in this extension") - } - } - - return &node.Link{Cid: t.elements[1].(cid.Cid)}, rest, nil -} - -func (t *TrieNode) resolveTrieNodeLeaf(p []string) (interface{}, []string, error) { - nibbles := t.elements[0].([]byte) - - if len(nibbles) != 0 { - idx, rest := shiftFromPath(p, len(nibbles)) - if len(idx) < len(nibbles) { - return nil, nil, fmt.Errorf("not enough nibbles to traverse this leaf") - } - - for _, i := range idx { - if getHexIndex(string(i)) == -1 { - return nil, nil, fmt.Errorf("invalid path element") - } - } - - for i, n := range nibbles { - if string(idx[i]) != fmt.Sprintf("%x", n) { - return nil, nil, fmt.Errorf("no such link in this extension") - } - } - - p = rest - } - - link, ok := t.elements[1].(node.Node) - if !ok { - return nil, nil, fmt.Errorf("leaf children is not an IPLD node") - } - - return link.Resolve(p) -} - -func (t *TrieNode) resolveTrieNodeBranch(p []string) (interface{}, []string, error) { - idx, rest := shiftFromPath(p, 1) - hidx := getHexIndex(idx) - if hidx == -1 { - return nil, nil, fmt.Errorf("incorrect path") - } - - child := t.elements[hidx] - if child != nil { - return &node.Link{Cid: child.(cid.Cid)}, rest, nil - } - return nil, nil, fmt.Errorf("no such link in this branch") -} - -// shiftFromPath extracts from a given path (as a slice of strings) -// the given number of elements as a single string, returning whatever -// it has not taken. -// -// Examples: -// ["0", "a", "something"] and 1 -> "0" and ["a", "something"] -// ["ab", "c", "d", "1"] and 2 -> "ab" and ["c", "d", "1"] -// ["abc", "d", "1"] and 2 -> "ab" and ["c", "d", "1"] -func shiftFromPath(p []string, i int) (string, []string) { - var ( - out string - rest []string - ) - - for _, pe := range p { - re := "" - for _, c := range pe { - if len(out) < i { - out += string(c) - } else { - re += string(c) - } - } - - if len(out) == i && re != "" { - rest = append(rest, re) - } - } - - return out, rest -} - -// getHexIndex returns to you the integer 0 - 15 equivalent to your -// string character if applicable, or -1 otherwise. -func getHexIndex(s string) int { - if len(s) != 1 { - return -1 - } - - c := byte(s[0]) - switch { - case '0' <= c && c <= '9': - return int(c - '0') - case 'a' <= c && c <= 'f': - return int(c - 'a' + 10) - } - - return -1 -} diff --git a/pkg/super_node/eth/streamer.go b/pkg/super_node/eth/streamer.go index 17a8f8e5..123bc911 100644 --- a/pkg/super_node/eth/streamer.go +++ b/pkg/super_node/eth/streamer.go @@ -17,10 +17,11 @@ package eth import ( + "github.com/ethereum/go-ethereum/statediff" "github.com/sirupsen/logrus" - "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" "github.com/vulcanize/vulcanizedb/pkg/eth/core" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) const ( @@ -42,6 +43,15 @@ func NewPayloadStreamer(client core.RPCClient) *PayloadStreamer { // Stream is the main loop for subscribing to data from the Geth state diff process // Satisfies the shared.PayloadStreamer interface func (ps *PayloadStreamer) Stream(payloadChan chan shared.RawChainData) (shared.ClientSubscription, error) { + stateDiffChan := make(chan statediff.Payload, PayloadChanBufferSize) logrus.Info("streaming diffs from geth") - return ps.Client.Subscribe("statediff", payloadChan, "stream") + go func() { + for { + select { + case payload := <-stateDiffChan: + payloadChan <- payload + } + } + }() + return ps.Client.Subscribe("statediff", stateDiffChan, "stream") } diff --git a/pkg/super_node/eth/streamer_test.go b/pkg/super_node/eth/streamer_test.go index c139c392..0e47041c 100644 --- a/pkg/super_node/eth/streamer_test.go +++ b/pkg/super_node/eth/streamer_test.go @@ -17,10 +17,10 @@ package eth_test import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" "github.com/vulcanize/vulcanizedb/pkg/eth/fakes" "github.com/vulcanize/vulcanizedb/pkg/super_node/eth" + "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) var _ = Describe("StateDiff Streamer", func() { @@ -30,6 +30,5 @@ var _ = Describe("StateDiff Streamer", func() { payloadChan := make(chan shared.RawChainData) _, err := streamer.Stream(payloadChan) Expect(err).NotTo(HaveOccurred()) - client.AssertSubscribeCalledWith("statediff", payloadChan, []interface{}{"stream"}) }) })