diff --git a/pkg/ipfs/dag_putters/btc_header.go b/pkg/ipfs/dag_putters/btc_header.go index bf938aee..b35581b5 100644 --- a/pkg/ipfs/dag_putters/btc_header.go +++ b/pkg/ipfs/dag_putters/btc_header.go @@ -19,6 +19,8 @@ package dag_putters import ( "fmt" + node "github.com/ipfs/go-ipld-format" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld" ) @@ -31,13 +33,13 @@ func NewBtcHeaderDagPutter(adder *ipfs.IPFS) *BtcHeaderDagPutter { return &BtcHeaderDagPutter{adder: adder} } -func (bhdp *BtcHeaderDagPutter) DagPut(raw interface{}) ([]string, error) { - header, ok := raw.(*ipld.BtcHeader) +func (bhdp *BtcHeaderDagPutter) DagPut(n node.Node) (string, error) { + header, ok := n.(*ipld.BtcHeader) if !ok { - return nil, fmt.Errorf("BtcHeaderDagPutter expected input type %T got %T", &ipld.BtcHeader{}, raw) + return "", fmt.Errorf("BtcHeaderDagPutter expected input type %T got %T", &ipld.BtcHeader{}, n) } if err := bhdp.adder.Add(header); err != nil { - return nil, err + return "", err } - return []string{header.Cid().String()}, nil + return header.Cid().String(), nil } diff --git a/pkg/ipfs/dag_putters/btc_tx.go b/pkg/ipfs/dag_putters/btc_tx.go index 8d38f675..a958d781 100644 --- a/pkg/ipfs/dag_putters/btc_tx.go +++ b/pkg/ipfs/dag_putters/btc_tx.go @@ -19,6 +19,8 @@ package dag_putters import ( "fmt" + node "github.com/ipfs/go-ipld-format" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld" ) @@ -31,17 +33,13 @@ func NewBtcTxDagPutter(adder *ipfs.IPFS) *BtcTxDagPutter { return &BtcTxDagPutter{adder: adder} } -func (etdp *BtcTxDagPutter) DagPut(raw interface{}) ([]string, error) { - transactions, ok := raw.([]*ipld.BtcTx) +func (etdp *BtcTxDagPutter) DagPut(n node.Node) (string, error) { + transaction, ok := n.(*ipld.BtcTx) if !ok { - return nil, fmt.Errorf("BtcTxDagPutter expected input type %T got %T", []*ipld.BtcTx{}, raw) + return "", fmt.Errorf("BtcTxDagPutter expected input type %T got %T", &ipld.BtcTx{}, n) } - cids := make([]string, len(transactions)) - for i, transaction := range transactions { - if err := etdp.adder.Add(transaction); err != nil { - return nil, err - } - cids[i] = transaction.Cid().String() + if err := etdp.adder.Add(transaction); err != nil { + return "", err } - return cids, nil + return transaction.Cid().String(), nil } diff --git a/pkg/ipfs/dag_putters/btc_tx_trie.go b/pkg/ipfs/dag_putters/btc_tx_trie.go new file mode 100644 index 00000000..446e8d49 --- /dev/null +++ b/pkg/ipfs/dag_putters/btc_tx_trie.go @@ -0,0 +1,45 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package dag_putters + +import ( + "fmt" + + node "github.com/ipfs/go-ipld-format" + + "github.com/vulcanize/vulcanizedb/pkg/ipfs" + "github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld" +) + +type BtcTxTrieDagPutter struct { + adder *ipfs.IPFS +} + +func NewBtcTxTrieDagPutter(adder *ipfs.IPFS) *BtcTxTrieDagPutter { + return &BtcTxTrieDagPutter{adder: adder} +} + +func (etdp *BtcTxTrieDagPutter) DagPut(n node.Node) (string, error) { + txTrieNode, ok := n.(*ipld.BtcTxTrie) + if !ok { + return "", fmt.Errorf("BtcTxTrieDagPutter expected input type %T got %T", &ipld.BtcTxTrie{}, n) + } + if err := etdp.adder.Add(txTrieNode); err != nil { + return "", err + } + return txTrieNode.Cid().String(), nil +} diff --git a/pkg/ipfs/dag_putters/eth_header.go b/pkg/ipfs/dag_putters/eth_header.go index 191b577c..3f38c650 100644 --- a/pkg/ipfs/dag_putters/eth_header.go +++ b/pkg/ipfs/dag_putters/eth_header.go @@ -19,6 +19,8 @@ package dag_putters import ( "fmt" + node "github.com/ipfs/go-ipld-format" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld" ) @@ -31,13 +33,13 @@ func NewEthBlockHeaderDagPutter(adder *ipfs.IPFS) *EthHeaderDagPutter { return &EthHeaderDagPutter{adder: adder} } -func (bhdp *EthHeaderDagPutter) DagPut(raw interface{}) ([]string, error) { - header, ok := raw.(*ipld.EthHeader) +func (bhdp *EthHeaderDagPutter) DagPut(n node.Node) (string, error) { + header, ok := n.(*ipld.EthHeader) if !ok { - return nil, fmt.Errorf("EthHeaderDagPutter expected input type %T got %T", &ipld.EthHeader{}, raw) + return "", fmt.Errorf("EthHeaderDagPutter expected input type %T got %T", &ipld.EthHeader{}, n) } if err := bhdp.adder.Add(header); err != nil { - return nil, err + return "", err } - return []string{header.Cid().String()}, nil + return header.Cid().String(), nil } diff --git a/pkg/ipfs/dag_putters/eth_receipt.go b/pkg/ipfs/dag_putters/eth_receipt.go index ad40283e..0caa7f10 100644 --- a/pkg/ipfs/dag_putters/eth_receipt.go +++ b/pkg/ipfs/dag_putters/eth_receipt.go @@ -19,6 +19,8 @@ package dag_putters import ( "fmt" + node "github.com/ipfs/go-ipld-format" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld" ) @@ -31,17 +33,13 @@ func NewEthReceiptDagPutter(adder *ipfs.IPFS) *EthReceiptDagPutter { return &EthReceiptDagPutter{adder: adder} } -func (erdp *EthReceiptDagPutter) DagPut(raw interface{}) ([]string, error) { - receipts, ok := raw.([]*ipld.EthReceipt) +func (erdp *EthReceiptDagPutter) DagPut(n node.Node) (string, error) { + receipt, ok := n.(*ipld.EthReceipt) if !ok { - return nil, fmt.Errorf("EthReceiptDagPutter expected input type %T got type %T", []*ipld.EthReceipt{}, raw) + return "", fmt.Errorf("EthReceiptDagPutter expected input type %T got type %T", &ipld.EthReceipt{}, n) } - cids := make([]string, len(receipts)) - for i, receipt := range receipts { - if err := erdp.adder.Add(receipt); err != nil { - return nil, err - } - cids[i] = receipt.Cid().String() + if err := erdp.adder.Add(receipt); err != nil { + return "", err } - return cids, nil + return receipt.Cid().String(), nil } diff --git a/pkg/ipfs/dag_putters/eth_receipt_trie.go b/pkg/ipfs/dag_putters/eth_receipt_trie.go index a8f22865..697e6ef2 100644 --- a/pkg/ipfs/dag_putters/eth_receipt_trie.go +++ b/pkg/ipfs/dag_putters/eth_receipt_trie.go @@ -19,6 +19,8 @@ package dag_putters import ( "fmt" + node "github.com/ipfs/go-ipld-format" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld" ) @@ -31,17 +33,13 @@ func NewEthRctTrieDagPutter(adder *ipfs.IPFS) *EthRctTrieDagPutter { return &EthRctTrieDagPutter{adder: adder} } -func (etdp *EthRctTrieDagPutter) DagPut(raw interface{}) ([]string, error) { - rctTrieNodes, ok := raw.([]*ipld.EthRctTrie) +func (etdp *EthRctTrieDagPutter) DagPut(n node.Node) (string, error) { + rctTrieNode, ok := n.(*ipld.EthRctTrie) if !ok { - return nil, fmt.Errorf("EthRctTrieDagPutter expected input type %T got %T", []*ipld.EthRctTrie{}, raw) + return "", fmt.Errorf("EthRctTrieDagPutter expected input type %T got %T", &ipld.EthRctTrie{}, n) } - cids := make([]string, len(rctTrieNodes)) - for i, rctNode := range rctTrieNodes { - if err := etdp.adder.Add(rctNode); err != nil { - return nil, err - } - cids[i] = rctNode.Cid().String() + if err := etdp.adder.Add(rctTrieNode); err != nil { + return "", err } - return cids, nil + return rctTrieNode.Cid().String(), nil } diff --git a/pkg/ipfs/dag_putters/eth_state.go b/pkg/ipfs/dag_putters/eth_state.go index f32fe9b7..b435db28 100644 --- a/pkg/ipfs/dag_putters/eth_state.go +++ b/pkg/ipfs/dag_putters/eth_state.go @@ -19,6 +19,8 @@ package dag_putters import ( "fmt" + node "github.com/ipfs/go-ipld-format" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld" ) @@ -31,13 +33,13 @@ func NewEthStateDagPutter(adder *ipfs.IPFS) *EthStateDagPutter { return &EthStateDagPutter{adder: adder} } -func (erdp *EthStateDagPutter) DagPut(raw interface{}) ([]string, error) { - stateNode, ok := raw.(*ipld.EthStateTrie) +func (erdp *EthStateDagPutter) DagPut(n node.Node) (string, error) { + stateNode, ok := n.(*ipld.EthStateTrie) if !ok { - return nil, fmt.Errorf("EthStateDagPutter expected input type %T got %T", &ipld.EthStateTrie{}, raw) + return "", fmt.Errorf("EthStateDagPutter expected input type %T got %T", &ipld.EthStateTrie{}, n) } if err := erdp.adder.Add(stateNode); err != nil { - return nil, err + return "", err } - return []string{stateNode.Cid().String()}, nil + return stateNode.Cid().String(), nil } diff --git a/pkg/ipfs/dag_putters/eth_storage.go b/pkg/ipfs/dag_putters/eth_storage.go index bb0930d9..fce3fbd4 100644 --- a/pkg/ipfs/dag_putters/eth_storage.go +++ b/pkg/ipfs/dag_putters/eth_storage.go @@ -19,6 +19,8 @@ package dag_putters import ( "fmt" + node "github.com/ipfs/go-ipld-format" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld" ) @@ -31,13 +33,13 @@ func NewEthStorageDagPutter(adder *ipfs.IPFS) *EthStorageDagPutter { return &EthStorageDagPutter{adder: adder} } -func (erdp *EthStorageDagPutter) DagPut(raw interface{}) ([]string, error) { - storageNode, ok := raw.(*ipld.EthStorageTrie) +func (erdp *EthStorageDagPutter) DagPut(n node.Node) (string, error) { + storageNode, ok := n.(*ipld.EthStorageTrie) if !ok { - return nil, fmt.Errorf("EthStorageDagPutter expected input type %T got %T", &ipld.EthStorageTrie{}, raw) + return "", fmt.Errorf("EthStorageDagPutter expected input type %T got %T", &ipld.EthStorageTrie{}, n) } if err := erdp.adder.Add(storageNode); err != nil { - return nil, err + return "", err } - return []string{storageNode.Cid().String()}, nil + return storageNode.Cid().String(), nil } diff --git a/pkg/ipfs/dag_putters/eth_tx.go b/pkg/ipfs/dag_putters/eth_tx.go index 65a3cb63..51af68d9 100644 --- a/pkg/ipfs/dag_putters/eth_tx.go +++ b/pkg/ipfs/dag_putters/eth_tx.go @@ -19,6 +19,8 @@ package dag_putters import ( "fmt" + node "github.com/ipfs/go-ipld-format" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld" ) @@ -31,17 +33,13 @@ func NewEthTxsDagPutter(adder *ipfs.IPFS) *EthTxsDagPutter { return &EthTxsDagPutter{adder: adder} } -func (etdp *EthTxsDagPutter) DagPut(raw interface{}) ([]string, error) { - transactions, ok := raw.([]*ipld.EthTx) +func (etdp *EthTxsDagPutter) DagPut(n node.Node) (string, error) { + transaction, ok := n.(*ipld.EthTx) if !ok { - return nil, fmt.Errorf("EthTxsDagPutter expected input type %T got %T", []*ipld.EthTx{}, raw) + return "", fmt.Errorf("EthTxsDagPutter expected input type %T got %T", &ipld.EthTx{}, n) } - cids := make([]string, len(transactions)) - for i, transaction := range transactions { - if err := etdp.adder.Add(transaction); err != nil { - return nil, err - } - cids[i] = transaction.Cid().String() + if err := etdp.adder.Add(transaction); err != nil { + return "", err } - return cids, nil + return transaction.Cid().String(), nil } diff --git a/pkg/ipfs/dag_putters/eth_tx_trie.go b/pkg/ipfs/dag_putters/eth_tx_trie.go index c805ad9a..12320214 100644 --- a/pkg/ipfs/dag_putters/eth_tx_trie.go +++ b/pkg/ipfs/dag_putters/eth_tx_trie.go @@ -19,6 +19,8 @@ package dag_putters import ( "fmt" + node "github.com/ipfs/go-ipld-format" + "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld" ) @@ -31,17 +33,13 @@ func NewEthTxTrieDagPutter(adder *ipfs.IPFS) *EthTxTrieDagPutter { return &EthTxTrieDagPutter{adder: adder} } -func (etdp *EthTxTrieDagPutter) DagPut(raw interface{}) ([]string, error) { - txTrieNodes, ok := raw.([]*ipld.EthTxTrie) +func (etdp *EthTxTrieDagPutter) DagPut(n node.Node) (string, error) { + txTrieNode, ok := n.(*ipld.EthTxTrie) if !ok { - return nil, fmt.Errorf("EthTxTrieDagPutter expected input type %T got %T", []*ipld.EthTxTrie{}, raw) + return "", fmt.Errorf("EthTxTrieDagPutter expected input type %T got %T", &ipld.EthTxTrie{}, n) } - cids := make([]string, len(txTrieNodes)) - for i, txNode := range txTrieNodes { - if err := etdp.adder.Add(txNode); err != nil { - return nil, err - } - cids[i] = txNode.Cid().String() + if err := etdp.adder.Add(txTrieNode); err != nil { + return "", err } - return cids, nil + return txTrieNode.Cid().String(), nil } diff --git a/pkg/ipfs/ipld/btc_block.go b/pkg/ipfs/ipld/btc_parser.go similarity index 100% rename from pkg/ipfs/ipld/btc_block.go rename to pkg/ipfs/ipld/btc_parser.go diff --git a/pkg/ipfs/ipld/eth_block.go b/pkg/ipfs/ipld/eth_parser.go similarity index 76% rename from pkg/ipfs/ipld/eth_block.go rename to pkg/ipfs/ipld/eth_parser.go index 3e68cfee..f02d7d40 100644 --- a/pkg/ipfs/ipld/eth_block.go +++ b/pkg/ipfs/ipld/eth_parser.go @@ -21,35 +21,35 @@ import ( "fmt" "github.com/ethereum/go-ethereum/core/types" - mh "github.com/multiformats/go-multihash" ) -// FromBlock takes a block and processes it +// FromBlockAndReceipts takes a block and processes it // to return it a set of IPLD nodes for further processing. -func FromBlock(block *types.Block, receipts []*types.Receipt) (*EthHeader, []*EthTx, []*EthTxTrie, []*EthReceipt, []*EthRctTrie, error) { - // process the eth-header object - headerRawData := getRLP(block.Header()) - cid, err := RawdataToCid(MEthHeader, headerRawData, mh.KECCAK_256) +func FromBlockAndReceipts(block *types.Block, receipts []*types.Receipt) (*EthHeader, []*EthHeader, []*EthTx, []*EthTxTrie, []*EthReceipt, []*EthRctTrie, error) { + // Process the header + headerNode, err := NewEthHeader(block.Header()) if err != nil { - return nil, nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, nil, err } - ethHeader := &EthHeader{ - Header: block.Header(), - cid: cid, - rawdata: headerRawData, + // Process the uncles + uncleNodes := make([]*EthHeader, len(block.Uncles())) + for i, uncle := range block.Uncles() { + uncleNode, err := NewEthHeader(uncle) + if err != nil { + return nil, nil, nil, nil, nil, nil, err + } + uncleNodes[i] = uncleNode } - - // Process the found eth-tx objects + // Process the txs ethTxNodes, ethTxTrieNodes, err := processTransactions(block.Transactions(), block.Header().TxHash[:]) if err != nil { - return nil, nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, nil, err } - // process the eth-rct objects + // Process the receipts ethRctNodes, ethRctTrieNodes, err := processReceipts(receipts, block.Header().ReceiptHash[:]) - - return ethHeader, ethTxNodes, ethTxTrieNodes, ethRctNodes, ethRctTrieNodes, nil + return headerNode, uncleNodes, ethTxNodes, ethTxTrieNodes, ethRctNodes, ethRctTrieNodes, err } // processTransactions will take the found transactions in a parsed block body diff --git a/pkg/ipfs/ipld/shared.go b/pkg/ipfs/ipld/shared.go index fe6293f7..e8358f7b 100644 --- a/pkg/ipfs/ipld/shared.go +++ b/pkg/ipfs/ipld/shared.go @@ -132,6 +132,9 @@ func (lt *localTrie) add(idx int, rawdata []byte) { panic(err) } lt.keys = append(lt.keys, key) + if err := lt.db.Put(key, rawdata); err != nil { + panic(err) + } lt.trie.Update(key, rawdata) } diff --git a/pkg/ipfs/mocks/dag_putters.go b/pkg/ipfs/mocks/dag_putters.go index 2c2cb8bc..dd7a9380 100644 --- a/pkg/ipfs/mocks/dag_putters.go +++ b/pkg/ipfs/mocks/dag_putters.go @@ -19,39 +19,35 @@ package mocks import ( "errors" + node "github.com/ipfs/go-ipld-format" + "github.com/ethereum/go-ethereum/common" ) // DagPutter is a mock for testing the ipfs publisher type DagPutter struct { - CIDsToReturn []string - PassedRaw interface{} - ErrToReturn error + PassedNode node.Node + ErrToReturn error } // DagPut returns the pre-loaded CIDs or error -func (dp *DagPutter) DagPut(raw interface{}) ([]string, error) { - dp.PassedRaw = raw - return dp.CIDsToReturn, dp.ErrToReturn +func (dp *DagPutter) DagPut(n node.Node) (string, error) { + dp.PassedNode = n + return n.Cid().String(), dp.ErrToReturn } // MappedDagPutter is a mock for testing the ipfs publisher type MappedDagPutter struct { - CIDsToReturn map[common.Hash][]string - PassedRaw interface{} + CIDsToReturn map[common.Hash]string + PassedNode node.Node ErrToReturn error } // DagPut returns the pre-loaded CIDs or error -func (mdp *MappedDagPutter) DagPut(raw interface{}) ([]string, error) { - mdp.PassedRaw = raw +func (mdp *MappedDagPutter) DagPut(n node.Node) (string, error) { if mdp.CIDsToReturn == nil { - return nil, errors.New("mapped dag putter needs to be initialized with a map of cids to return") + return "", errors.New("mapped dag putter needs to be initialized with a map of cids to return") } - by, ok := raw.([]byte) - if !ok { - return nil, errors.New("mapped dag putters can only dag put []byte values") - } - hash := common.BytesToHash(by) + hash := common.BytesToHash(n.RawData()) return mdp.CIDsToReturn[hash], nil } diff --git a/pkg/super_node/btc/publisher.go b/pkg/super_node/btc/publisher.go index 6805dbc6..e8ffc051 100644 --- a/pkg/super_node/btc/publisher.go +++ b/pkg/super_node/btc/publisher.go @@ -17,22 +17,20 @@ package btc import ( - "errors" "fmt" "strconv" - "github.com/btcsuite/btcd/wire" - "github.com/btcsuite/btcutil" - "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs/dag_putters" + "github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) // IPLDPublisher satisfies the IPLDPublisher for ethereum type IPLDPublisher struct { - HeaderPutter shared.DagPutter - TransactionPutter shared.DagPutter + HeaderPutter shared.DagPutter + TransactionPutter shared.DagPutter + TransactionTriePutter shared.DagPutter } // NewIPLDPublisher creates a pointer to a new Publisher which satisfies the IPLDPublisher interface @@ -42,8 +40,9 @@ func NewIPLDPublisher(ipfsPath string) (*IPLDPublisher, error) { return nil, err } return &IPLDPublisher{ - HeaderPutter: dag_putters.NewBtcHeaderDagPutter(node), - TransactionPutter: dag_putters.NewBtcTxDagPutter(node), + HeaderPutter: dag_putters.NewBtcHeaderDagPutter(node), + TransactionPutter: dag_putters.NewBtcTxDagPutter(node), + TransactionTriePutter: dag_putters.NewBtcTxTrieDagPutter(node), }, nil } @@ -53,8 +52,13 @@ func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForI if !ok { return nil, fmt.Errorf("eth publisher expected payload type %T got %T", &ConvertedPayload{}, payload) } + // Generate nodes + headerNode, txNodes, txTrieNodes, err := ipld.FromHeaderAndTxs(ipldPayload.Header, ipldPayload.Txs) + if err != nil { + return nil, err + } // Process and publish headers - headerCid, err := pub.publishHeader(ipldPayload.Header) + headerCid, err := pub.publishHeader(headerNode) if err != nil { return nil, err } @@ -67,7 +71,7 @@ func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForI Bits: ipldPayload.Header.Bits, } // Process and publish transactions - transactionCids, err := pub.publishTransactions(ipldPayload.Txs, ipldPayload.TxMetaData) + transactionCids, err := pub.publishTransactions(txNodes, txTrieNodes, ipldPayload.TxMetaData) if err != nil { return nil, err } @@ -78,25 +82,22 @@ func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForI }, nil } -func (pub *IPLDPublisher) publishHeader(header *wire.BlockHeader) (string, error) { - cids, err := pub.HeaderPutter.DagPut(header) +func (pub *IPLDPublisher) publishHeader(header *ipld.BtcHeader) (string, error) { + cid, err := pub.HeaderPutter.DagPut(header) if err != nil { return "", err } - return cids[0], nil + return cid, nil } -func (pub *IPLDPublisher) publishTransactions(transactions []*btcutil.Tx, trxMeta []TxModelWithInsAndOuts) ([]TxModelWithInsAndOuts, error) { - transactionCids, err := pub.TransactionPutter.DagPut(transactions) - if err != nil { - return nil, err - } - if len(transactionCids) != len(trxMeta) { - return nil, errors.New("expected one CID for each transaction") - } - mappedTrxCids := make([]TxModelWithInsAndOuts, len(transactionCids)) - for i, cid := range transactionCids { - mappedTrxCids[i] = TxModelWithInsAndOuts{ +func (pub *IPLDPublisher) publishTransactions(transactions []*ipld.BtcTx, txTrie []*ipld.BtcTxTrie, trxMeta []TxModelWithInsAndOuts) ([]TxModelWithInsAndOuts, error) { + txCids := make([]TxModelWithInsAndOuts, len(transactions)) + for i, tx := range transactions { + cid, err := pub.TransactionPutter.DagPut(tx) + if err != nil { + return nil, err + } + txCids[i] = TxModelWithInsAndOuts{ CID: cid, Index: trxMeta[i].Index, TxHash: trxMeta[i].TxHash, @@ -106,5 +107,11 @@ func (pub *IPLDPublisher) publishTransactions(transactions []*btcutil.Tx, trxMet TxOutputs: trxMeta[i].TxOutputs, } } - return mappedTrxCids, nil + for _, txNode := range txTrie { + // We don't do anything with the tx trie cids atm + if _, err := pub.TransactionTriePutter.DagPut(txNode); err != nil { + return nil, err + } + } + return txCids, nil } diff --git a/pkg/super_node/btc/publisher_test.go b/pkg/super_node/btc/publisher_test.go index 5726c44e..5d92286a 100644 --- a/pkg/super_node/btc/publisher_test.go +++ b/pkg/super_node/btc/publisher_test.go @@ -17,6 +17,9 @@ package btc_test import ( + "bytes" + + "github.com/ethereum/go-ethereum/common" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -26,23 +29,45 @@ import ( ) var ( - mockHeaderDagPutter *mocks2.DagPutter - mockTrxDagPutter *mocks2.DagPutter + mockHeaderDagPutter *mocks2.MappedDagPutter + mockTrxDagPutter *mocks2.MappedDagPutter + mockTrxTrieDagPutter *mocks2.DagPutter ) var _ = Describe("Publisher", func() { BeforeEach(func() { - mockHeaderDagPutter = new(mocks2.DagPutter) - mockTrxDagPutter = new(mocks2.DagPutter) + mockHeaderDagPutter = new(mocks2.MappedDagPutter) + mockTrxDagPutter = new(mocks2.MappedDagPutter) + mockTrxTrieDagPutter = new(mocks2.DagPutter) }) Describe("Publish", func() { It("Publishes the passed IPLDPayload objects to IPFS and returns a CIDPayload for indexing", func() { - mockHeaderDagPutter.CIDsToReturn = []string{"mockHeaderCID"} - mockTrxDagPutter.CIDsToReturn = []string{"mockTrxCID1", "mockTrxCID2", "mockTrxCID3"} + by := new(bytes.Buffer) + err := mocks.MockConvertedPayload.BlockPayload.Header.Serialize(by) + Expect(err).ToNot(HaveOccurred()) + headerBytes := by.Bytes() + err = mocks.MockTransactions[0].MsgTx().Serialize(by) + Expect(err).ToNot(HaveOccurred()) + tx1Bytes := by.Bytes() + err = mocks.MockTransactions[1].MsgTx().Serialize(by) + Expect(err).ToNot(HaveOccurred()) + tx2Bytes := by.Bytes() + err = mocks.MockTransactions[2].MsgTx().Serialize(by) + Expect(err).ToNot(HaveOccurred()) + tx3Bytes := by.Bytes() + mockHeaderDagPutter.CIDsToReturn = map[common.Hash]string{ + common.BytesToHash(headerBytes): "mockHeaderCID", + } + mockTrxDagPutter.CIDsToReturn = map[common.Hash]string{ + common.BytesToHash(tx1Bytes): "mockTrxCID1", + common.BytesToHash(tx2Bytes): "mockTrxCID2", + common.BytesToHash(tx3Bytes): "mockTrxCID3", + } publisher := btc.IPLDPublisher{ - HeaderPutter: mockHeaderDagPutter, - TransactionPutter: mockTrxDagPutter, + HeaderPutter: mockHeaderDagPutter, + TransactionPutter: mockTrxDagPutter, + TransactionTriePutter: mockTrxTrieDagPutter, } payload, err := publisher.Publish(mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/super_node/eth/publisher.go b/pkg/super_node/eth/publisher.go index 60b0d76e..b2c76a79 100644 --- a/pkg/super_node/eth/publisher.go +++ b/pkg/super_node/eth/publisher.go @@ -17,7 +17,6 @@ package eth import ( - "errors" "fmt" "github.com/ethereum/go-ethereum/common" @@ -26,16 +25,19 @@ import ( common2 "github.com/vulcanize/vulcanizedb/pkg/eth/converters/common" "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs/dag_putters" + "github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared" ) // IPLDPublisher satisfies the IPLDPublisher for ethereum type IPLDPublisher struct { - HeaderPutter shared.DagPutter - TransactionPutter shared.DagPutter - ReceiptPutter shared.DagPutter - StatePutter shared.DagPutter - StoragePutter shared.DagPutter + HeaderPutter shared.DagPutter + TransactionPutter shared.DagPutter + TransactionTriePutter shared.DagPutter + ReceiptPutter shared.DagPutter + ReceiptTriePutter shared.DagPutter + StatePutter shared.DagPutter + StoragePutter shared.DagPutter } // NewIPLDPublisher creates a pointer to a new Publisher which satisfies the IPLDPublisher interface @@ -45,11 +47,13 @@ func NewIPLDPublisher(ipfsPath string) (*IPLDPublisher, error) { return nil, err } return &IPLDPublisher{ - HeaderPutter: dag_putters.NewEthBlockHeaderDagPutter(node), - TransactionPutter: dag_putters.NewEthTxsDagPutter(node), - ReceiptPutter: dag_putters.NewEthReceiptDagPutter(node), - StatePutter: dag_putters.NewEthStateDagPutter(node), - StoragePutter: dag_putters.NewEthStorageDagPutter(node), + HeaderPutter: dag_putters.NewEthBlockHeaderDagPutter(node), + TransactionPutter: dag_putters.NewEthTxsDagPutter(node), + TransactionTriePutter: dag_putters.NewEthTxTrieDagPutter(node), + ReceiptPutter: dag_putters.NewEthReceiptDagPutter(node), + ReceiptTriePutter: dag_putters.NewEthRctTrieDagPutter(node), + StatePutter: dag_putters.NewEthStateDagPutter(node), + StoragePutter: dag_putters.NewEthStorageDagPutter(node), }, nil } @@ -59,8 +63,14 @@ func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForI if !ok { return nil, fmt.Errorf("eth publisher expected payload type %T got %T", ConvertedPayload{}, payload) } + // Generate the nodes for publishing + headerNode, uncleNodes, txNodes, txTrieNodes, rctNodes, rctTrieNodes, err := ipld.FromBlockAndReceipts(ipldPayload.Block, ipldPayload.Receipts) + if err != nil { + return nil, err + } + // Process and publish headers - headerCid, err := pub.publishHeader(ipldPayload.Block.Header()) + headerCid, err := pub.publishHeader(headerNode) if err != nil { return nil, err } @@ -75,29 +85,29 @@ func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForI } // Process and publish uncles - uncleCids := make([]UncleModel, 0, len(ipldPayload.Block.Uncles())) - for _, uncle := range ipldPayload.Block.Uncles() { + uncleCids := make([]UncleModel, len(uncleNodes)) + for i, uncle := range uncleNodes { uncleCid, err := pub.publishHeader(uncle) if err != nil { return nil, err } uncleReward := common2.CalcUncleMinerReward(ipldPayload.Block.Number().Int64(), uncle.Number.Int64()) - uncleCids = append(uncleCids, UncleModel{ + uncleCids[i] = UncleModel{ CID: uncleCid, ParentHash: uncle.ParentHash.String(), BlockHash: uncle.Hash().String(), Reward: uncleReward.String(), - }) + } } // Process and publish transactions - transactionCids, err := pub.publishTransactions(ipldPayload.Block.Body().Transactions, ipldPayload.TxMetaData) + transactionCids, err := pub.publishTransactions(txNodes, txTrieNodes, ipldPayload.TxMetaData) if err != nil { return nil, err } // Process and publish receipts - receiptsCids, err := pub.publishReceipts(ipldPayload.Receipts, ipldPayload.ReceiptMetaData) + receiptsCids, err := pub.publishReceipts(rctNodes, rctTrieNodes, ipldPayload.ReceiptMetaData) if err != nil { return nil, err } @@ -125,25 +135,23 @@ func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForI }, nil } -func (pub *IPLDPublisher) publishHeader(header *types.Header) (string, error) { - cids, err := pub.HeaderPutter.DagPut(header) - if err != nil { - return "", err - } - return cids[0], nil +func (pub *IPLDPublisher) generateBlockNodes(body *types.Block, receipts types.Receipts) (*ipld.EthHeader, + []*ipld.EthHeader, []*ipld.EthTx, []*ipld.EthTxTrie, []*ipld.EthReceipt, []*ipld.EthRctTrie, error) { + return ipld.FromBlockAndReceipts(body, receipts) } -func (pub *IPLDPublisher) publishTransactions(transactions types.Transactions, trxMeta []TxModel) ([]TxModel, error) { - transactionCids, err := pub.TransactionPutter.DagPut(transactions) - if err != nil { - return nil, err - } - if len(transactionCids) != len(trxMeta) { - return nil, errors.New("expected one CID for each transaction") - } - mappedTrxCids := make([]TxModel, len(transactionCids)) - for i, cid := range transactionCids { - mappedTrxCids[i] = TxModel{ +func (pub *IPLDPublisher) publishHeader(header *ipld.EthHeader) (string, error) { + return pub.HeaderPutter.DagPut(header) +} + +func (pub *IPLDPublisher) publishTransactions(transactions []*ipld.EthTx, txTrie []*ipld.EthTxTrie, trxMeta []TxModel) ([]TxModel, error) { + trxCids := make([]TxModel, len(transactions)) + for i, tx := range transactions { + cid, err := pub.TransactionPutter.DagPut(tx) + if err != nil { + return nil, err + } + trxCids[i] = TxModel{ CID: cid, Index: trxMeta[i].Index, TxHash: trxMeta[i].TxHash, @@ -151,22 +159,24 @@ func (pub *IPLDPublisher) publishTransactions(transactions types.Transactions, t Dst: trxMeta[i].Dst, } } - return mappedTrxCids, nil + for _, txNode := range txTrie { + // We don't do anything with the tx trie cids atm + if _, err := pub.TransactionTriePutter.DagPut(txNode); err != nil { + return nil, err + } + } + return trxCids, nil } -func (pub *IPLDPublisher) publishReceipts(receipts types.Receipts, receiptMeta []ReceiptModel) (map[common.Hash]ReceiptModel, error) { - receiptsCids, err := pub.ReceiptPutter.DagPut(receipts) - if err != nil { - return nil, err - } - if len(receiptsCids) != len(receipts) { - return nil, errors.New("expected one CID for each receipt") - } - // Map receipt cids to their transaction hashes - mappedRctCids := make(map[common.Hash]ReceiptModel, len(receiptsCids)) +func (pub *IPLDPublisher) publishReceipts(receipts []*ipld.EthReceipt, receiptTrie []*ipld.EthRctTrie, receiptMeta []ReceiptModel) (map[common.Hash]ReceiptModel, error) { + rctCids := make(map[common.Hash]ReceiptModel) for i, rct := range receipts { - mappedRctCids[rct.TxHash] = ReceiptModel{ - CID: receiptsCids[i], + cid, err := pub.ReceiptPutter.DagPut(rct) + if err != nil { + return nil, err + } + rctCids[rct.TxHash] = ReceiptModel{ + CID: cid, Contract: receiptMeta[i].Contract, Topic0s: receiptMeta[i].Topic0s, Topic1s: receiptMeta[i].Topic1s, @@ -174,21 +184,31 @@ func (pub *IPLDPublisher) publishReceipts(receipts types.Receipts, receiptMeta [ Topic3s: receiptMeta[i].Topic3s, } } - return mappedRctCids, nil + for _, rctNode := range receiptTrie { + // We don't do anything with the rct trie cids atm + if _, err := pub.ReceiptTriePutter.DagPut(rctNode); err != nil { + return nil, err + } + } + return rctCids, nil } func (pub *IPLDPublisher) publishStateNodes(stateNodes []TrieNode) ([]StateNodeModel, error) { stateNodeCids := make([]StateNodeModel, 0, len(stateNodes)) - for _, node := range stateNodes { - cids, err := pub.StatePutter.DagPut(node.Value) + for _, stateNode := range stateNodes { + node, err := ipld.FromStateTrieRLP(stateNode.Value) + if err != nil { + return nil, err + } + cid, err := pub.StatePutter.DagPut(node) if err != nil { return nil, err } stateNodeCids = append(stateNodeCids, StateNodeModel{ - Path: node.Path, - StateKey: node.LeafKey.String(), - CID: cids[0], - NodeType: ResolveFromNodeType(node.Type), + Path: stateNode.Path, + StateKey: stateNode.LeafKey.String(), + CID: cid, + NodeType: ResolveFromNodeType(stateNode.Type), }) } return stateNodeCids, nil @@ -198,17 +218,21 @@ func (pub *IPLDPublisher) publishStorageNodes(storageNodes map[common.Hash][]Tri storageLeafCids := make(map[common.Hash][]StorageNodeModel) for pathHash, storageTrie := range storageNodes { storageLeafCids[pathHash] = make([]StorageNodeModel, 0, len(storageTrie)) - for _, node := range storageTrie { - cids, err := pub.StoragePutter.DagPut(node.Value) + for _, storageNode := range storageTrie { + node, err := ipld.FromStorageTrieRLP(storageNode.Value) + if err != nil { + return nil, err + } + cid, err := pub.StoragePutter.DagPut(node) if err != nil { return nil, err } // Map storage node cids to their path hashes storageLeafCids[pathHash] = append(storageLeafCids[pathHash], StorageNodeModel{ - Path: node.Path, - StorageKey: node.LeafKey.Hex(), - CID: cids[0], - NodeType: ResolveFromNodeType(node.Type), + Path: storageNode.Path, + StorageKey: storageNode.LeafKey.Hex(), + CID: cid, + NodeType: ResolveFromNodeType(storageNode.Type), }) } } diff --git a/pkg/super_node/eth/publisher_test.go b/pkg/super_node/eth/publisher_test.go index 0e4dd94a..62c517de 100644 --- a/pkg/super_node/eth/publisher_test.go +++ b/pkg/super_node/eth/publisher_test.go @@ -27,40 +27,54 @@ import ( ) var ( - mockHeaderDagPutter *mocks2.DagPutter - mockTrxDagPutter *mocks2.DagPutter - mockRctDagPutter *mocks2.DagPutter + mockHeaderDagPutter *mocks2.MappedDagPutter + mockTrxDagPutter *mocks2.MappedDagPutter + mockTrxTrieDagPutter *mocks2.DagPutter + mockRctDagPutter *mocks2.MappedDagPutter + mockRctTrieDagPutter *mocks2.DagPutter mockStateDagPutter *mocks2.MappedDagPutter - mockStorageDagPutter *mocks2.DagPutter + mockStorageDagPutter *mocks2.MappedDagPutter ) var _ = Describe("Publisher", func() { BeforeEach(func() { - mockHeaderDagPutter = new(mocks2.DagPutter) - mockTrxDagPutter = new(mocks2.DagPutter) - mockRctDagPutter = new(mocks2.DagPutter) + mockHeaderDagPutter = new(mocks2.MappedDagPutter) + mockTrxDagPutter = new(mocks2.MappedDagPutter) + mockTrxTrieDagPutter = new(mocks2.DagPutter) + mockRctDagPutter = new(mocks2.MappedDagPutter) + mockRctTrieDagPutter = new(mocks2.DagPutter) mockStateDagPutter = new(mocks2.MappedDagPutter) - mockStorageDagPutter = new(mocks2.DagPutter) + mockStorageDagPutter = new(mocks2.MappedDagPutter) }) Describe("Publish", func() { It("Publishes the passed IPLDPayload objects to IPFS and returns a CIDPayload for indexing", func() { - mockHeaderDagPutter.CIDsToReturn = []string{mocks.HeaderCID.String()} - mockTrxDagPutter.CIDsToReturn = []string{mocks.Trx1CID.String(), mocks.Trx2CID.String()} - mockRctDagPutter.CIDsToReturn = []string{mocks.Rct1CID.String(), mocks.Rct2CID.String()} - val1 := common.BytesToHash(mocks.MockConvertedPayload.StateNodes[0].Value) - val2 := common.BytesToHash(mocks.MockConvertedPayload.StateNodes[1].Value) - mockStateDagPutter.CIDsToReturn = map[common.Hash][]string{ - val1: {mocks.State1CID.String()}, - val2: {mocks.State2CID.String()}, + mockHeaderDagPutter.CIDsToReturn = map[common.Hash]string{ + common.BytesToHash(mocks.HeaderIPLD.RawData()): mocks.HeaderCID.String(), + } + mockTrxDagPutter.CIDsToReturn = map[common.Hash]string{ + common.BytesToHash(mocks.Trx1IPLD.RawData()): mocks.Trx1CID.String(), + common.BytesToHash(mocks.Trx2IPLD.RawData()): mocks.Trx2CID.String(), + } + mockRctDagPutter.CIDsToReturn = map[common.Hash]string{ + common.BytesToHash(mocks.Rct1IPLD.RawData()): mocks.Rct1CID.String(), + common.BytesToHash(mocks.Rct2IPLD.RawData()): mocks.Rct2CID.String(), + } + mockStateDagPutter.CIDsToReturn = map[common.Hash]string{ + common.BytesToHash(mocks.State1IPLD.RawData()): mocks.State1CID.String(), + common.BytesToHash(mocks.State2IPLD.RawData()): mocks.State2CID.String(), + } + mockStorageDagPutter.CIDsToReturn = map[common.Hash]string{ + common.BytesToHash(mocks.StorageIPLD.RawData()): mocks.StorageCID.String(), } - mockStorageDagPutter.CIDsToReturn = []string{mocks.StorageCID.String()} publisher := eth.IPLDPublisher{ - HeaderPutter: mockHeaderDagPutter, - TransactionPutter: mockTrxDagPutter, - ReceiptPutter: mockRctDagPutter, - StatePutter: mockStateDagPutter, - StoragePutter: mockStorageDagPutter, + HeaderPutter: mockHeaderDagPutter, + TransactionPutter: mockTrxDagPutter, + TransactionTriePutter: mockTrxTrieDagPutter, + ReceiptPutter: mockRctDagPutter, + ReceiptTriePutter: mockRctTrieDagPutter, + StatePutter: mockStateDagPutter, + StoragePutter: mockStorageDagPutter, } payload, err := publisher.Publish(mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/super_node/shared/intefaces.go b/pkg/super_node/shared/intefaces.go index 72940422..39393919 100644 --- a/pkg/super_node/shared/intefaces.go +++ b/pkg/super_node/shared/intefaces.go @@ -18,6 +18,8 @@ package shared import ( "math/big" + + node "github.com/ipfs/go-ipld-format" ) // PayloadStreamer streams chain-specific payloads to the provided channel @@ -71,7 +73,7 @@ type ClientSubscription interface { // DagPutter is a general interface for a dag putter type DagPutter interface { - DagPut(raw interface{}) ([]string, error) + DagPut(n node.Node) (string, error) } // Cleaner is for cleaning out data from the cache within the given ranges