adjust btc and eth publishers and publisher tests

This commit is contained in:
Ian Norden 2020-03-17 13:05:19 -05:00
parent 7eddf396a5
commit d47ba24373
19 changed files with 334 additions and 220 deletions

View File

@ -19,6 +19,8 @@ package dag_putters
import ( import (
"fmt" "fmt"
node "github.com/ipfs/go-ipld-format"
"github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld" "github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld"
) )
@ -31,13 +33,13 @@ func NewBtcHeaderDagPutter(adder *ipfs.IPFS) *BtcHeaderDagPutter {
return &BtcHeaderDagPutter{adder: adder} return &BtcHeaderDagPutter{adder: adder}
} }
func (bhdp *BtcHeaderDagPutter) DagPut(raw interface{}) ([]string, error) { func (bhdp *BtcHeaderDagPutter) DagPut(n node.Node) (string, error) {
header, ok := raw.(*ipld.BtcHeader) header, ok := n.(*ipld.BtcHeader)
if !ok { if !ok {
return nil, fmt.Errorf("BtcHeaderDagPutter expected input type %T got %T", &ipld.BtcHeader{}, raw) return "", fmt.Errorf("BtcHeaderDagPutter expected input type %T got %T", &ipld.BtcHeader{}, n)
} }
if err := bhdp.adder.Add(header); err != nil { if err := bhdp.adder.Add(header); err != nil {
return nil, err return "", err
} }
return []string{header.Cid().String()}, nil return header.Cid().String(), nil
} }

View File

@ -19,6 +19,8 @@ package dag_putters
import ( import (
"fmt" "fmt"
node "github.com/ipfs/go-ipld-format"
"github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld" "github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld"
) )
@ -31,17 +33,13 @@ func NewBtcTxDagPutter(adder *ipfs.IPFS) *BtcTxDagPutter {
return &BtcTxDagPutter{adder: adder} return &BtcTxDagPutter{adder: adder}
} }
func (etdp *BtcTxDagPutter) DagPut(raw interface{}) ([]string, error) { func (etdp *BtcTxDagPutter) DagPut(n node.Node) (string, error) {
transactions, ok := raw.([]*ipld.BtcTx) transaction, ok := n.(*ipld.BtcTx)
if !ok { if !ok {
return nil, fmt.Errorf("BtcTxDagPutter expected input type %T got %T", []*ipld.BtcTx{}, raw) return "", fmt.Errorf("BtcTxDagPutter expected input type %T got %T", &ipld.BtcTx{}, n)
} }
cids := make([]string, len(transactions)) if err := etdp.adder.Add(transaction); err != nil {
for i, transaction := range transactions { return "", err
if err := etdp.adder.Add(transaction); err != nil {
return nil, err
}
cids[i] = transaction.Cid().String()
} }
return cids, nil return transaction.Cid().String(), nil
} }

View File

@ -0,0 +1,45 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package dag_putters
import (
"fmt"
node "github.com/ipfs/go-ipld-format"
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld"
)
type BtcTxTrieDagPutter struct {
adder *ipfs.IPFS
}
func NewBtcTxTrieDagPutter(adder *ipfs.IPFS) *BtcTxTrieDagPutter {
return &BtcTxTrieDagPutter{adder: adder}
}
func (etdp *BtcTxTrieDagPutter) DagPut(n node.Node) (string, error) {
txTrieNode, ok := n.(*ipld.BtcTxTrie)
if !ok {
return "", fmt.Errorf("BtcTxTrieDagPutter expected input type %T got %T", &ipld.BtcTxTrie{}, n)
}
if err := etdp.adder.Add(txTrieNode); err != nil {
return "", err
}
return txTrieNode.Cid().String(), nil
}

View File

@ -19,6 +19,8 @@ package dag_putters
import ( import (
"fmt" "fmt"
node "github.com/ipfs/go-ipld-format"
"github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld" "github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld"
) )
@ -31,13 +33,13 @@ func NewEthBlockHeaderDagPutter(adder *ipfs.IPFS) *EthHeaderDagPutter {
return &EthHeaderDagPutter{adder: adder} return &EthHeaderDagPutter{adder: adder}
} }
func (bhdp *EthHeaderDagPutter) DagPut(raw interface{}) ([]string, error) { func (bhdp *EthHeaderDagPutter) DagPut(n node.Node) (string, error) {
header, ok := raw.(*ipld.EthHeader) header, ok := n.(*ipld.EthHeader)
if !ok { if !ok {
return nil, fmt.Errorf("EthHeaderDagPutter expected input type %T got %T", &ipld.EthHeader{}, raw) return "", fmt.Errorf("EthHeaderDagPutter expected input type %T got %T", &ipld.EthHeader{}, n)
} }
if err := bhdp.adder.Add(header); err != nil { if err := bhdp.adder.Add(header); err != nil {
return nil, err return "", err
} }
return []string{header.Cid().String()}, nil return header.Cid().String(), nil
} }

View File

@ -19,6 +19,8 @@ package dag_putters
import ( import (
"fmt" "fmt"
node "github.com/ipfs/go-ipld-format"
"github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld" "github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld"
) )
@ -31,17 +33,13 @@ func NewEthReceiptDagPutter(adder *ipfs.IPFS) *EthReceiptDagPutter {
return &EthReceiptDagPutter{adder: adder} return &EthReceiptDagPutter{adder: adder}
} }
func (erdp *EthReceiptDagPutter) DagPut(raw interface{}) ([]string, error) { func (erdp *EthReceiptDagPutter) DagPut(n node.Node) (string, error) {
receipts, ok := raw.([]*ipld.EthReceipt) receipt, ok := n.(*ipld.EthReceipt)
if !ok { if !ok {
return nil, fmt.Errorf("EthReceiptDagPutter expected input type %T got type %T", []*ipld.EthReceipt{}, raw) return "", fmt.Errorf("EthReceiptDagPutter expected input type %T got type %T", &ipld.EthReceipt{}, n)
} }
cids := make([]string, len(receipts)) if err := erdp.adder.Add(receipt); err != nil {
for i, receipt := range receipts { return "", err
if err := erdp.adder.Add(receipt); err != nil {
return nil, err
}
cids[i] = receipt.Cid().String()
} }
return cids, nil return receipt.Cid().String(), nil
} }

View File

@ -19,6 +19,8 @@ package dag_putters
import ( import (
"fmt" "fmt"
node "github.com/ipfs/go-ipld-format"
"github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld" "github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld"
) )
@ -31,17 +33,13 @@ func NewEthRctTrieDagPutter(adder *ipfs.IPFS) *EthRctTrieDagPutter {
return &EthRctTrieDagPutter{adder: adder} return &EthRctTrieDagPutter{adder: adder}
} }
func (etdp *EthRctTrieDagPutter) DagPut(raw interface{}) ([]string, error) { func (etdp *EthRctTrieDagPutter) DagPut(n node.Node) (string, error) {
rctTrieNodes, ok := raw.([]*ipld.EthRctTrie) rctTrieNode, ok := n.(*ipld.EthRctTrie)
if !ok { if !ok {
return nil, fmt.Errorf("EthRctTrieDagPutter expected input type %T got %T", []*ipld.EthRctTrie{}, raw) return "", fmt.Errorf("EthRctTrieDagPutter expected input type %T got %T", &ipld.EthRctTrie{}, n)
} }
cids := make([]string, len(rctTrieNodes)) if err := etdp.adder.Add(rctTrieNode); err != nil {
for i, rctNode := range rctTrieNodes { return "", err
if err := etdp.adder.Add(rctNode); err != nil {
return nil, err
}
cids[i] = rctNode.Cid().String()
} }
return cids, nil return rctTrieNode.Cid().String(), nil
} }

View File

@ -19,6 +19,8 @@ package dag_putters
import ( import (
"fmt" "fmt"
node "github.com/ipfs/go-ipld-format"
"github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld" "github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld"
) )
@ -31,13 +33,13 @@ func NewEthStateDagPutter(adder *ipfs.IPFS) *EthStateDagPutter {
return &EthStateDagPutter{adder: adder} return &EthStateDagPutter{adder: adder}
} }
func (erdp *EthStateDagPutter) DagPut(raw interface{}) ([]string, error) { func (erdp *EthStateDagPutter) DagPut(n node.Node) (string, error) {
stateNode, ok := raw.(*ipld.EthStateTrie) stateNode, ok := n.(*ipld.EthStateTrie)
if !ok { if !ok {
return nil, fmt.Errorf("EthStateDagPutter expected input type %T got %T", &ipld.EthStateTrie{}, raw) return "", fmt.Errorf("EthStateDagPutter expected input type %T got %T", &ipld.EthStateTrie{}, n)
} }
if err := erdp.adder.Add(stateNode); err != nil { if err := erdp.adder.Add(stateNode); err != nil {
return nil, err return "", err
} }
return []string{stateNode.Cid().String()}, nil return stateNode.Cid().String(), nil
} }

View File

@ -19,6 +19,8 @@ package dag_putters
import ( import (
"fmt" "fmt"
node "github.com/ipfs/go-ipld-format"
"github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld" "github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld"
) )
@ -31,13 +33,13 @@ func NewEthStorageDagPutter(adder *ipfs.IPFS) *EthStorageDagPutter {
return &EthStorageDagPutter{adder: adder} return &EthStorageDagPutter{adder: adder}
} }
func (erdp *EthStorageDagPutter) DagPut(raw interface{}) ([]string, error) { func (erdp *EthStorageDagPutter) DagPut(n node.Node) (string, error) {
storageNode, ok := raw.(*ipld.EthStorageTrie) storageNode, ok := n.(*ipld.EthStorageTrie)
if !ok { if !ok {
return nil, fmt.Errorf("EthStorageDagPutter expected input type %T got %T", &ipld.EthStorageTrie{}, raw) return "", fmt.Errorf("EthStorageDagPutter expected input type %T got %T", &ipld.EthStorageTrie{}, n)
} }
if err := erdp.adder.Add(storageNode); err != nil { if err := erdp.adder.Add(storageNode); err != nil {
return nil, err return "", err
} }
return []string{storageNode.Cid().String()}, nil return storageNode.Cid().String(), nil
} }

View File

@ -19,6 +19,8 @@ package dag_putters
import ( import (
"fmt" "fmt"
node "github.com/ipfs/go-ipld-format"
"github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld" "github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld"
) )
@ -31,17 +33,13 @@ func NewEthTxsDagPutter(adder *ipfs.IPFS) *EthTxsDagPutter {
return &EthTxsDagPutter{adder: adder} return &EthTxsDagPutter{adder: adder}
} }
func (etdp *EthTxsDagPutter) DagPut(raw interface{}) ([]string, error) { func (etdp *EthTxsDagPutter) DagPut(n node.Node) (string, error) {
transactions, ok := raw.([]*ipld.EthTx) transaction, ok := n.(*ipld.EthTx)
if !ok { if !ok {
return nil, fmt.Errorf("EthTxsDagPutter expected input type %T got %T", []*ipld.EthTx{}, raw) return "", fmt.Errorf("EthTxsDagPutter expected input type %T got %T", &ipld.EthTx{}, n)
} }
cids := make([]string, len(transactions)) if err := etdp.adder.Add(transaction); err != nil {
for i, transaction := range transactions { return "", err
if err := etdp.adder.Add(transaction); err != nil {
return nil, err
}
cids[i] = transaction.Cid().String()
} }
return cids, nil return transaction.Cid().String(), nil
} }

View File

@ -19,6 +19,8 @@ package dag_putters
import ( import (
"fmt" "fmt"
node "github.com/ipfs/go-ipld-format"
"github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld" "github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld"
) )
@ -31,17 +33,13 @@ func NewEthTxTrieDagPutter(adder *ipfs.IPFS) *EthTxTrieDagPutter {
return &EthTxTrieDagPutter{adder: adder} return &EthTxTrieDagPutter{adder: adder}
} }
func (etdp *EthTxTrieDagPutter) DagPut(raw interface{}) ([]string, error) { func (etdp *EthTxTrieDagPutter) DagPut(n node.Node) (string, error) {
txTrieNodes, ok := raw.([]*ipld.EthTxTrie) txTrieNode, ok := n.(*ipld.EthTxTrie)
if !ok { if !ok {
return nil, fmt.Errorf("EthTxTrieDagPutter expected input type %T got %T", []*ipld.EthTxTrie{}, raw) return "", fmt.Errorf("EthTxTrieDagPutter expected input type %T got %T", &ipld.EthTxTrie{}, n)
} }
cids := make([]string, len(txTrieNodes)) if err := etdp.adder.Add(txTrieNode); err != nil {
for i, txNode := range txTrieNodes { return "", err
if err := etdp.adder.Add(txNode); err != nil {
return nil, err
}
cids[i] = txNode.Cid().String()
} }
return cids, nil return txTrieNode.Cid().String(), nil
} }

View File

@ -21,35 +21,35 @@ import (
"fmt" "fmt"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
mh "github.com/multiformats/go-multihash"
) )
// FromBlock takes a block and processes it // FromBlockAndReceipts takes a block and processes it
// to return it a set of IPLD nodes for further processing. // to return it a set of IPLD nodes for further processing.
func FromBlock(block *types.Block, receipts []*types.Receipt) (*EthHeader, []*EthTx, []*EthTxTrie, []*EthReceipt, []*EthRctTrie, error) { func FromBlockAndReceipts(block *types.Block, receipts []*types.Receipt) (*EthHeader, []*EthHeader, []*EthTx, []*EthTxTrie, []*EthReceipt, []*EthRctTrie, error) {
// process the eth-header object // Process the header
headerRawData := getRLP(block.Header()) headerNode, err := NewEthHeader(block.Header())
cid, err := RawdataToCid(MEthHeader, headerRawData, mh.KECCAK_256)
if err != nil { if err != nil {
return nil, nil, nil, nil, nil, err return nil, nil, nil, nil, nil, nil, err
} }
ethHeader := &EthHeader{ // Process the uncles
Header: block.Header(), uncleNodes := make([]*EthHeader, len(block.Uncles()))
cid: cid, for i, uncle := range block.Uncles() {
rawdata: headerRawData, uncleNode, err := NewEthHeader(uncle)
if err != nil {
return nil, nil, nil, nil, nil, nil, err
}
uncleNodes[i] = uncleNode
} }
// Process the txs
// Process the found eth-tx objects
ethTxNodes, ethTxTrieNodes, err := processTransactions(block.Transactions(), ethTxNodes, ethTxTrieNodes, err := processTransactions(block.Transactions(),
block.Header().TxHash[:]) block.Header().TxHash[:])
if err != nil { if err != nil {
return nil, nil, nil, nil, nil, err return nil, nil, nil, nil, nil, nil, err
} }
// process the eth-rct objects // Process the receipts
ethRctNodes, ethRctTrieNodes, err := processReceipts(receipts, ethRctNodes, ethRctTrieNodes, err := processReceipts(receipts,
block.Header().ReceiptHash[:]) block.Header().ReceiptHash[:])
return headerNode, uncleNodes, ethTxNodes, ethTxTrieNodes, ethRctNodes, ethRctTrieNodes, err
return ethHeader, ethTxNodes, ethTxTrieNodes, ethRctNodes, ethRctTrieNodes, nil
} }
// processTransactions will take the found transactions in a parsed block body // processTransactions will take the found transactions in a parsed block body

View File

@ -132,6 +132,9 @@ func (lt *localTrie) add(idx int, rawdata []byte) {
panic(err) panic(err)
} }
lt.keys = append(lt.keys, key) lt.keys = append(lt.keys, key)
if err := lt.db.Put(key, rawdata); err != nil {
panic(err)
}
lt.trie.Update(key, rawdata) lt.trie.Update(key, rawdata)
} }

View File

@ -19,39 +19,35 @@ package mocks
import ( import (
"errors" "errors"
node "github.com/ipfs/go-ipld-format"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
) )
// DagPutter is a mock for testing the ipfs publisher // DagPutter is a mock for testing the ipfs publisher
type DagPutter struct { type DagPutter struct {
CIDsToReturn []string PassedNode node.Node
PassedRaw interface{} ErrToReturn error
ErrToReturn error
} }
// DagPut returns the pre-loaded CIDs or error // DagPut returns the pre-loaded CIDs or error
func (dp *DagPutter) DagPut(raw interface{}) ([]string, error) { func (dp *DagPutter) DagPut(n node.Node) (string, error) {
dp.PassedRaw = raw dp.PassedNode = n
return dp.CIDsToReturn, dp.ErrToReturn return n.Cid().String(), dp.ErrToReturn
} }
// MappedDagPutter is a mock for testing the ipfs publisher // MappedDagPutter is a mock for testing the ipfs publisher
type MappedDagPutter struct { type MappedDagPutter struct {
CIDsToReturn map[common.Hash][]string CIDsToReturn map[common.Hash]string
PassedRaw interface{} PassedNode node.Node
ErrToReturn error ErrToReturn error
} }
// DagPut returns the pre-loaded CIDs or error // DagPut returns the pre-loaded CIDs or error
func (mdp *MappedDagPutter) DagPut(raw interface{}) ([]string, error) { func (mdp *MappedDagPutter) DagPut(n node.Node) (string, error) {
mdp.PassedRaw = raw
if mdp.CIDsToReturn == nil { if mdp.CIDsToReturn == nil {
return nil, errors.New("mapped dag putter needs to be initialized with a map of cids to return") return "", errors.New("mapped dag putter needs to be initialized with a map of cids to return")
} }
by, ok := raw.([]byte) hash := common.BytesToHash(n.RawData())
if !ok {
return nil, errors.New("mapped dag putters can only dag put []byte values")
}
hash := common.BytesToHash(by)
return mdp.CIDsToReturn[hash], nil return mdp.CIDsToReturn[hash], nil
} }

View File

@ -17,22 +17,20 @@
package btc package btc
import ( import (
"errors"
"fmt" "fmt"
"strconv" "strconv"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/dag_putters" "github.com/vulcanize/vulcanizedb/pkg/ipfs/dag_putters"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
) )
// IPLDPublisher satisfies the IPLDPublisher for ethereum // IPLDPublisher satisfies the IPLDPublisher for ethereum
type IPLDPublisher struct { type IPLDPublisher struct {
HeaderPutter shared.DagPutter HeaderPutter shared.DagPutter
TransactionPutter shared.DagPutter TransactionPutter shared.DagPutter
TransactionTriePutter shared.DagPutter
} }
// NewIPLDPublisher creates a pointer to a new Publisher which satisfies the IPLDPublisher interface // NewIPLDPublisher creates a pointer to a new Publisher which satisfies the IPLDPublisher interface
@ -42,8 +40,9 @@ func NewIPLDPublisher(ipfsPath string) (*IPLDPublisher, error) {
return nil, err return nil, err
} }
return &IPLDPublisher{ return &IPLDPublisher{
HeaderPutter: dag_putters.NewBtcHeaderDagPutter(node), HeaderPutter: dag_putters.NewBtcHeaderDagPutter(node),
TransactionPutter: dag_putters.NewBtcTxDagPutter(node), TransactionPutter: dag_putters.NewBtcTxDagPutter(node),
TransactionTriePutter: dag_putters.NewBtcTxTrieDagPutter(node),
}, nil }, nil
} }
@ -53,8 +52,13 @@ func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForI
if !ok { if !ok {
return nil, fmt.Errorf("eth publisher expected payload type %T got %T", &ConvertedPayload{}, payload) return nil, fmt.Errorf("eth publisher expected payload type %T got %T", &ConvertedPayload{}, payload)
} }
// Generate nodes
headerNode, txNodes, txTrieNodes, err := ipld.FromHeaderAndTxs(ipldPayload.Header, ipldPayload.Txs)
if err != nil {
return nil, err
}
// Process and publish headers // Process and publish headers
headerCid, err := pub.publishHeader(ipldPayload.Header) headerCid, err := pub.publishHeader(headerNode)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -67,7 +71,7 @@ func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForI
Bits: ipldPayload.Header.Bits, Bits: ipldPayload.Header.Bits,
} }
// Process and publish transactions // Process and publish transactions
transactionCids, err := pub.publishTransactions(ipldPayload.Txs, ipldPayload.TxMetaData) transactionCids, err := pub.publishTransactions(txNodes, txTrieNodes, ipldPayload.TxMetaData)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -78,25 +82,22 @@ func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForI
}, nil }, nil
} }
func (pub *IPLDPublisher) publishHeader(header *wire.BlockHeader) (string, error) { func (pub *IPLDPublisher) publishHeader(header *ipld.BtcHeader) (string, error) {
cids, err := pub.HeaderPutter.DagPut(header) cid, err := pub.HeaderPutter.DagPut(header)
if err != nil { if err != nil {
return "", err return "", err
} }
return cids[0], nil return cid, nil
} }
func (pub *IPLDPublisher) publishTransactions(transactions []*btcutil.Tx, trxMeta []TxModelWithInsAndOuts) ([]TxModelWithInsAndOuts, error) { func (pub *IPLDPublisher) publishTransactions(transactions []*ipld.BtcTx, txTrie []*ipld.BtcTxTrie, trxMeta []TxModelWithInsAndOuts) ([]TxModelWithInsAndOuts, error) {
transactionCids, err := pub.TransactionPutter.DagPut(transactions) txCids := make([]TxModelWithInsAndOuts, len(transactions))
if err != nil { for i, tx := range transactions {
return nil, err cid, err := pub.TransactionPutter.DagPut(tx)
} if err != nil {
if len(transactionCids) != len(trxMeta) { return nil, err
return nil, errors.New("expected one CID for each transaction") }
} txCids[i] = TxModelWithInsAndOuts{
mappedTrxCids := make([]TxModelWithInsAndOuts, len(transactionCids))
for i, cid := range transactionCids {
mappedTrxCids[i] = TxModelWithInsAndOuts{
CID: cid, CID: cid,
Index: trxMeta[i].Index, Index: trxMeta[i].Index,
TxHash: trxMeta[i].TxHash, TxHash: trxMeta[i].TxHash,
@ -106,5 +107,11 @@ func (pub *IPLDPublisher) publishTransactions(transactions []*btcutil.Tx, trxMet
TxOutputs: trxMeta[i].TxOutputs, TxOutputs: trxMeta[i].TxOutputs,
} }
} }
return mappedTrxCids, nil for _, txNode := range txTrie {
// We don't do anything with the tx trie cids atm
if _, err := pub.TransactionTriePutter.DagPut(txNode); err != nil {
return nil, err
}
}
return txCids, nil
} }

View File

@ -17,6 +17,9 @@
package btc_test package btc_test
import ( import (
"bytes"
"github.com/ethereum/go-ethereum/common"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
@ -26,23 +29,45 @@ import (
) )
var ( var (
mockHeaderDagPutter *mocks2.DagPutter mockHeaderDagPutter *mocks2.MappedDagPutter
mockTrxDagPutter *mocks2.DagPutter mockTrxDagPutter *mocks2.MappedDagPutter
mockTrxTrieDagPutter *mocks2.DagPutter
) )
var _ = Describe("Publisher", func() { var _ = Describe("Publisher", func() {
BeforeEach(func() { BeforeEach(func() {
mockHeaderDagPutter = new(mocks2.DagPutter) mockHeaderDagPutter = new(mocks2.MappedDagPutter)
mockTrxDagPutter = new(mocks2.DagPutter) mockTrxDagPutter = new(mocks2.MappedDagPutter)
mockTrxTrieDagPutter = new(mocks2.DagPutter)
}) })
Describe("Publish", func() { Describe("Publish", func() {
It("Publishes the passed IPLDPayload objects to IPFS and returns a CIDPayload for indexing", func() { It("Publishes the passed IPLDPayload objects to IPFS and returns a CIDPayload for indexing", func() {
mockHeaderDagPutter.CIDsToReturn = []string{"mockHeaderCID"} by := new(bytes.Buffer)
mockTrxDagPutter.CIDsToReturn = []string{"mockTrxCID1", "mockTrxCID2", "mockTrxCID3"} err := mocks.MockConvertedPayload.BlockPayload.Header.Serialize(by)
Expect(err).ToNot(HaveOccurred())
headerBytes := by.Bytes()
err = mocks.MockTransactions[0].MsgTx().Serialize(by)
Expect(err).ToNot(HaveOccurred())
tx1Bytes := by.Bytes()
err = mocks.MockTransactions[1].MsgTx().Serialize(by)
Expect(err).ToNot(HaveOccurred())
tx2Bytes := by.Bytes()
err = mocks.MockTransactions[2].MsgTx().Serialize(by)
Expect(err).ToNot(HaveOccurred())
tx3Bytes := by.Bytes()
mockHeaderDagPutter.CIDsToReturn = map[common.Hash]string{
common.BytesToHash(headerBytes): "mockHeaderCID",
}
mockTrxDagPutter.CIDsToReturn = map[common.Hash]string{
common.BytesToHash(tx1Bytes): "mockTrxCID1",
common.BytesToHash(tx2Bytes): "mockTrxCID2",
common.BytesToHash(tx3Bytes): "mockTrxCID3",
}
publisher := btc.IPLDPublisher{ publisher := btc.IPLDPublisher{
HeaderPutter: mockHeaderDagPutter, HeaderPutter: mockHeaderDagPutter,
TransactionPutter: mockTrxDagPutter, TransactionPutter: mockTrxDagPutter,
TransactionTriePutter: mockTrxTrieDagPutter,
} }
payload, err := publisher.Publish(mocks.MockConvertedPayload) payload, err := publisher.Publish(mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())

View File

@ -17,7 +17,6 @@
package eth package eth
import ( import (
"errors"
"fmt" "fmt"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -26,16 +25,19 @@ import (
common2 "github.com/vulcanize/vulcanizedb/pkg/eth/converters/common" common2 "github.com/vulcanize/vulcanizedb/pkg/eth/converters/common"
"github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/dag_putters" "github.com/vulcanize/vulcanizedb/pkg/ipfs/dag_putters"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/ipld"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared" "github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
) )
// IPLDPublisher satisfies the IPLDPublisher for ethereum // IPLDPublisher satisfies the IPLDPublisher for ethereum
type IPLDPublisher struct { type IPLDPublisher struct {
HeaderPutter shared.DagPutter HeaderPutter shared.DagPutter
TransactionPutter shared.DagPutter TransactionPutter shared.DagPutter
ReceiptPutter shared.DagPutter TransactionTriePutter shared.DagPutter
StatePutter shared.DagPutter ReceiptPutter shared.DagPutter
StoragePutter shared.DagPutter ReceiptTriePutter shared.DagPutter
StatePutter shared.DagPutter
StoragePutter shared.DagPutter
} }
// NewIPLDPublisher creates a pointer to a new Publisher which satisfies the IPLDPublisher interface // NewIPLDPublisher creates a pointer to a new Publisher which satisfies the IPLDPublisher interface
@ -45,11 +47,13 @@ func NewIPLDPublisher(ipfsPath string) (*IPLDPublisher, error) {
return nil, err return nil, err
} }
return &IPLDPublisher{ return &IPLDPublisher{
HeaderPutter: dag_putters.NewEthBlockHeaderDagPutter(node), HeaderPutter: dag_putters.NewEthBlockHeaderDagPutter(node),
TransactionPutter: dag_putters.NewEthTxsDagPutter(node), TransactionPutter: dag_putters.NewEthTxsDagPutter(node),
ReceiptPutter: dag_putters.NewEthReceiptDagPutter(node), TransactionTriePutter: dag_putters.NewEthTxTrieDagPutter(node),
StatePutter: dag_putters.NewEthStateDagPutter(node), ReceiptPutter: dag_putters.NewEthReceiptDagPutter(node),
StoragePutter: dag_putters.NewEthStorageDagPutter(node), ReceiptTriePutter: dag_putters.NewEthRctTrieDagPutter(node),
StatePutter: dag_putters.NewEthStateDagPutter(node),
StoragePutter: dag_putters.NewEthStorageDagPutter(node),
}, nil }, nil
} }
@ -59,8 +63,14 @@ func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForI
if !ok { if !ok {
return nil, fmt.Errorf("eth publisher expected payload type %T got %T", ConvertedPayload{}, payload) return nil, fmt.Errorf("eth publisher expected payload type %T got %T", ConvertedPayload{}, payload)
} }
// Generate the nodes for publishing
headerNode, uncleNodes, txNodes, txTrieNodes, rctNodes, rctTrieNodes, err := ipld.FromBlockAndReceipts(ipldPayload.Block, ipldPayload.Receipts)
if err != nil {
return nil, err
}
// Process and publish headers // Process and publish headers
headerCid, err := pub.publishHeader(ipldPayload.Block.Header()) headerCid, err := pub.publishHeader(headerNode)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -75,29 +85,29 @@ func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForI
} }
// Process and publish uncles // Process and publish uncles
uncleCids := make([]UncleModel, 0, len(ipldPayload.Block.Uncles())) uncleCids := make([]UncleModel, len(uncleNodes))
for _, uncle := range ipldPayload.Block.Uncles() { for i, uncle := range uncleNodes {
uncleCid, err := pub.publishHeader(uncle) uncleCid, err := pub.publishHeader(uncle)
if err != nil { if err != nil {
return nil, err return nil, err
} }
uncleReward := common2.CalcUncleMinerReward(ipldPayload.Block.Number().Int64(), uncle.Number.Int64()) uncleReward := common2.CalcUncleMinerReward(ipldPayload.Block.Number().Int64(), uncle.Number.Int64())
uncleCids = append(uncleCids, UncleModel{ uncleCids[i] = UncleModel{
CID: uncleCid, CID: uncleCid,
ParentHash: uncle.ParentHash.String(), ParentHash: uncle.ParentHash.String(),
BlockHash: uncle.Hash().String(), BlockHash: uncle.Hash().String(),
Reward: uncleReward.String(), Reward: uncleReward.String(),
}) }
} }
// Process and publish transactions // Process and publish transactions
transactionCids, err := pub.publishTransactions(ipldPayload.Block.Body().Transactions, ipldPayload.TxMetaData) transactionCids, err := pub.publishTransactions(txNodes, txTrieNodes, ipldPayload.TxMetaData)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Process and publish receipts // Process and publish receipts
receiptsCids, err := pub.publishReceipts(ipldPayload.Receipts, ipldPayload.ReceiptMetaData) receiptsCids, err := pub.publishReceipts(rctNodes, rctTrieNodes, ipldPayload.ReceiptMetaData)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -125,25 +135,23 @@ func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForI
}, nil }, nil
} }
func (pub *IPLDPublisher) publishHeader(header *types.Header) (string, error) { func (pub *IPLDPublisher) generateBlockNodes(body *types.Block, receipts types.Receipts) (*ipld.EthHeader,
cids, err := pub.HeaderPutter.DagPut(header) []*ipld.EthHeader, []*ipld.EthTx, []*ipld.EthTxTrie, []*ipld.EthReceipt, []*ipld.EthRctTrie, error) {
if err != nil { return ipld.FromBlockAndReceipts(body, receipts)
return "", err
}
return cids[0], nil
} }
func (pub *IPLDPublisher) publishTransactions(transactions types.Transactions, trxMeta []TxModel) ([]TxModel, error) { func (pub *IPLDPublisher) publishHeader(header *ipld.EthHeader) (string, error) {
transactionCids, err := pub.TransactionPutter.DagPut(transactions) return pub.HeaderPutter.DagPut(header)
if err != nil { }
return nil, err
} func (pub *IPLDPublisher) publishTransactions(transactions []*ipld.EthTx, txTrie []*ipld.EthTxTrie, trxMeta []TxModel) ([]TxModel, error) {
if len(transactionCids) != len(trxMeta) { trxCids := make([]TxModel, len(transactions))
return nil, errors.New("expected one CID for each transaction") for i, tx := range transactions {
} cid, err := pub.TransactionPutter.DagPut(tx)
mappedTrxCids := make([]TxModel, len(transactionCids)) if err != nil {
for i, cid := range transactionCids { return nil, err
mappedTrxCids[i] = TxModel{ }
trxCids[i] = TxModel{
CID: cid, CID: cid,
Index: trxMeta[i].Index, Index: trxMeta[i].Index,
TxHash: trxMeta[i].TxHash, TxHash: trxMeta[i].TxHash,
@ -151,22 +159,24 @@ func (pub *IPLDPublisher) publishTransactions(transactions types.Transactions, t
Dst: trxMeta[i].Dst, Dst: trxMeta[i].Dst,
} }
} }
return mappedTrxCids, nil for _, txNode := range txTrie {
// We don't do anything with the tx trie cids atm
if _, err := pub.TransactionTriePutter.DagPut(txNode); err != nil {
return nil, err
}
}
return trxCids, nil
} }
func (pub *IPLDPublisher) publishReceipts(receipts types.Receipts, receiptMeta []ReceiptModel) (map[common.Hash]ReceiptModel, error) { func (pub *IPLDPublisher) publishReceipts(receipts []*ipld.EthReceipt, receiptTrie []*ipld.EthRctTrie, receiptMeta []ReceiptModel) (map[common.Hash]ReceiptModel, error) {
receiptsCids, err := pub.ReceiptPutter.DagPut(receipts) rctCids := make(map[common.Hash]ReceiptModel)
if err != nil {
return nil, err
}
if len(receiptsCids) != len(receipts) {
return nil, errors.New("expected one CID for each receipt")
}
// Map receipt cids to their transaction hashes
mappedRctCids := make(map[common.Hash]ReceiptModel, len(receiptsCids))
for i, rct := range receipts { for i, rct := range receipts {
mappedRctCids[rct.TxHash] = ReceiptModel{ cid, err := pub.ReceiptPutter.DagPut(rct)
CID: receiptsCids[i], if err != nil {
return nil, err
}
rctCids[rct.TxHash] = ReceiptModel{
CID: cid,
Contract: receiptMeta[i].Contract, Contract: receiptMeta[i].Contract,
Topic0s: receiptMeta[i].Topic0s, Topic0s: receiptMeta[i].Topic0s,
Topic1s: receiptMeta[i].Topic1s, Topic1s: receiptMeta[i].Topic1s,
@ -174,21 +184,31 @@ func (pub *IPLDPublisher) publishReceipts(receipts types.Receipts, receiptMeta [
Topic3s: receiptMeta[i].Topic3s, Topic3s: receiptMeta[i].Topic3s,
} }
} }
return mappedRctCids, nil for _, rctNode := range receiptTrie {
// We don't do anything with the rct trie cids atm
if _, err := pub.ReceiptTriePutter.DagPut(rctNode); err != nil {
return nil, err
}
}
return rctCids, nil
} }
func (pub *IPLDPublisher) publishStateNodes(stateNodes []TrieNode) ([]StateNodeModel, error) { func (pub *IPLDPublisher) publishStateNodes(stateNodes []TrieNode) ([]StateNodeModel, error) {
stateNodeCids := make([]StateNodeModel, 0, len(stateNodes)) stateNodeCids := make([]StateNodeModel, 0, len(stateNodes))
for _, node := range stateNodes { for _, stateNode := range stateNodes {
cids, err := pub.StatePutter.DagPut(node.Value) node, err := ipld.FromStateTrieRLP(stateNode.Value)
if err != nil {
return nil, err
}
cid, err := pub.StatePutter.DagPut(node)
if err != nil { if err != nil {
return nil, err return nil, err
} }
stateNodeCids = append(stateNodeCids, StateNodeModel{ stateNodeCids = append(stateNodeCids, StateNodeModel{
Path: node.Path, Path: stateNode.Path,
StateKey: node.LeafKey.String(), StateKey: stateNode.LeafKey.String(),
CID: cids[0], CID: cid,
NodeType: ResolveFromNodeType(node.Type), NodeType: ResolveFromNodeType(stateNode.Type),
}) })
} }
return stateNodeCids, nil return stateNodeCids, nil
@ -198,17 +218,21 @@ func (pub *IPLDPublisher) publishStorageNodes(storageNodes map[common.Hash][]Tri
storageLeafCids := make(map[common.Hash][]StorageNodeModel) storageLeafCids := make(map[common.Hash][]StorageNodeModel)
for pathHash, storageTrie := range storageNodes { for pathHash, storageTrie := range storageNodes {
storageLeafCids[pathHash] = make([]StorageNodeModel, 0, len(storageTrie)) storageLeafCids[pathHash] = make([]StorageNodeModel, 0, len(storageTrie))
for _, node := range storageTrie { for _, storageNode := range storageTrie {
cids, err := pub.StoragePutter.DagPut(node.Value) node, err := ipld.FromStorageTrieRLP(storageNode.Value)
if err != nil {
return nil, err
}
cid, err := pub.StoragePutter.DagPut(node)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Map storage node cids to their path hashes // Map storage node cids to their path hashes
storageLeafCids[pathHash] = append(storageLeafCids[pathHash], StorageNodeModel{ storageLeafCids[pathHash] = append(storageLeafCids[pathHash], StorageNodeModel{
Path: node.Path, Path: storageNode.Path,
StorageKey: node.LeafKey.Hex(), StorageKey: storageNode.LeafKey.Hex(),
CID: cids[0], CID: cid,
NodeType: ResolveFromNodeType(node.Type), NodeType: ResolveFromNodeType(storageNode.Type),
}) })
} }
} }

View File

@ -27,40 +27,54 @@ import (
) )
var ( var (
mockHeaderDagPutter *mocks2.DagPutter mockHeaderDagPutter *mocks2.MappedDagPutter
mockTrxDagPutter *mocks2.DagPutter mockTrxDagPutter *mocks2.MappedDagPutter
mockRctDagPutter *mocks2.DagPutter mockTrxTrieDagPutter *mocks2.DagPutter
mockRctDagPutter *mocks2.MappedDagPutter
mockRctTrieDagPutter *mocks2.DagPutter
mockStateDagPutter *mocks2.MappedDagPutter mockStateDagPutter *mocks2.MappedDagPutter
mockStorageDagPutter *mocks2.DagPutter mockStorageDagPutter *mocks2.MappedDagPutter
) )
var _ = Describe("Publisher", func() { var _ = Describe("Publisher", func() {
BeforeEach(func() { BeforeEach(func() {
mockHeaderDagPutter = new(mocks2.DagPutter) mockHeaderDagPutter = new(mocks2.MappedDagPutter)
mockTrxDagPutter = new(mocks2.DagPutter) mockTrxDagPutter = new(mocks2.MappedDagPutter)
mockRctDagPutter = new(mocks2.DagPutter) mockTrxTrieDagPutter = new(mocks2.DagPutter)
mockRctDagPutter = new(mocks2.MappedDagPutter)
mockRctTrieDagPutter = new(mocks2.DagPutter)
mockStateDagPutter = new(mocks2.MappedDagPutter) mockStateDagPutter = new(mocks2.MappedDagPutter)
mockStorageDagPutter = new(mocks2.DagPutter) mockStorageDagPutter = new(mocks2.MappedDagPutter)
}) })
Describe("Publish", func() { Describe("Publish", func() {
It("Publishes the passed IPLDPayload objects to IPFS and returns a CIDPayload for indexing", func() { It("Publishes the passed IPLDPayload objects to IPFS and returns a CIDPayload for indexing", func() {
mockHeaderDagPutter.CIDsToReturn = []string{mocks.HeaderCID.String()} mockHeaderDagPutter.CIDsToReturn = map[common.Hash]string{
mockTrxDagPutter.CIDsToReturn = []string{mocks.Trx1CID.String(), mocks.Trx2CID.String()} common.BytesToHash(mocks.HeaderIPLD.RawData()): mocks.HeaderCID.String(),
mockRctDagPutter.CIDsToReturn = []string{mocks.Rct1CID.String(), mocks.Rct2CID.String()} }
val1 := common.BytesToHash(mocks.MockConvertedPayload.StateNodes[0].Value) mockTrxDagPutter.CIDsToReturn = map[common.Hash]string{
val2 := common.BytesToHash(mocks.MockConvertedPayload.StateNodes[1].Value) common.BytesToHash(mocks.Trx1IPLD.RawData()): mocks.Trx1CID.String(),
mockStateDagPutter.CIDsToReturn = map[common.Hash][]string{ common.BytesToHash(mocks.Trx2IPLD.RawData()): mocks.Trx2CID.String(),
val1: {mocks.State1CID.String()}, }
val2: {mocks.State2CID.String()}, mockRctDagPutter.CIDsToReturn = map[common.Hash]string{
common.BytesToHash(mocks.Rct1IPLD.RawData()): mocks.Rct1CID.String(),
common.BytesToHash(mocks.Rct2IPLD.RawData()): mocks.Rct2CID.String(),
}
mockStateDagPutter.CIDsToReturn = map[common.Hash]string{
common.BytesToHash(mocks.State1IPLD.RawData()): mocks.State1CID.String(),
common.BytesToHash(mocks.State2IPLD.RawData()): mocks.State2CID.String(),
}
mockStorageDagPutter.CIDsToReturn = map[common.Hash]string{
common.BytesToHash(mocks.StorageIPLD.RawData()): mocks.StorageCID.String(),
} }
mockStorageDagPutter.CIDsToReturn = []string{mocks.StorageCID.String()}
publisher := eth.IPLDPublisher{ publisher := eth.IPLDPublisher{
HeaderPutter: mockHeaderDagPutter, HeaderPutter: mockHeaderDagPutter,
TransactionPutter: mockTrxDagPutter, TransactionPutter: mockTrxDagPutter,
ReceiptPutter: mockRctDagPutter, TransactionTriePutter: mockTrxTrieDagPutter,
StatePutter: mockStateDagPutter, ReceiptPutter: mockRctDagPutter,
StoragePutter: mockStorageDagPutter, ReceiptTriePutter: mockRctTrieDagPutter,
StatePutter: mockStateDagPutter,
StoragePutter: mockStorageDagPutter,
} }
payload, err := publisher.Publish(mocks.MockConvertedPayload) payload, err := publisher.Publish(mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())

View File

@ -18,6 +18,8 @@ package shared
import ( import (
"math/big" "math/big"
node "github.com/ipfs/go-ipld-format"
) )
// PayloadStreamer streams chain-specific payloads to the provided channel // PayloadStreamer streams chain-specific payloads to the provided channel
@ -71,7 +73,7 @@ type ClientSubscription interface {
// DagPutter is a general interface for a dag putter // DagPutter is a general interface for a dag putter
type DagPutter interface { type DagPutter interface {
DagPut(raw interface{}) ([]string, error) DagPut(n node.Node) (string, error)
} }
// Cleaner is for cleaning out data from the cache within the given ranges // Cleaner is for cleaning out data from the cache within the given ranges