From 39354b21144e7170a4dc45e513ada3e15994b8b1 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Mon, 10 Aug 2020 12:46:03 -0500 Subject: [PATCH] write code to pg-ipfs table, for codehash lookups --- ...0006_create_eth_transaction_cids_table.sql | 2 + db/schema.sql | 4 +- pkg/btc/ipld_fetcher.go | 108 +++---- pkg/btc/publisher.go | 129 ++++---- pkg/btc/publisher_test.go | 120 ++++--- pkg/eth/converter.go | 5 +- pkg/eth/indexer.go | 12 +- pkg/eth/ipld_fetcher.go | 190 ++++------- pkg/eth/ipld_fetcher_test.go | 140 ++------ pkg/eth/models.go | 2 + pkg/eth/publisher.go | 305 ++++++++---------- pkg/eth/publisher_test.go | 273 ++++++++++++---- 12 files changed, 627 insertions(+), 663 deletions(-) diff --git a/db/migrations/00006_create_eth_transaction_cids_table.sql b/db/migrations/00006_create_eth_transaction_cids_table.sql index cbbef782..f41412c5 100644 --- a/db/migrations/00006_create_eth_transaction_cids_table.sql +++ b/db/migrations/00006_create_eth_transaction_cids_table.sql @@ -8,6 +8,8 @@ CREATE TABLE eth.transaction_cids ( mh_key TEXT NOT NULL REFERENCES public.blocks (key) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED, dst VARCHAR(66) NOT NULL, src VARCHAR(66) NOT NULL, + deployment BOOL NOT NULL, + data BYTEA, UNIQUE (header_id, tx_hash) ); diff --git a/db/schema.sql b/db/schema.sql index caa1bb67..05c36096 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -415,7 +415,9 @@ CREATE TABLE eth.transaction_cids ( cid text NOT NULL, mh_key text NOT NULL, dst character varying(66) NOT NULL, - src character varying(66) NOT NULL + src character varying(66) NOT NULL, + deployment boolean NOT NULL, + data bytea ); diff --git a/pkg/btc/ipld_fetcher.go b/pkg/btc/ipld_fetcher.go index 787fad4c..cd673c75 100644 --- a/pkg/btc/ipld_fetcher.go +++ b/pkg/btc/ipld_fetcher.go @@ -17,42 +17,31 @@ package btc import ( - "context" - "errors" "fmt" - "github.com/ipfs/go-block-format" - "github.com/ipfs/go-blockservice" - "github.com/ipfs/go-cid" + "github.com/jmoiron/sqlx" log "github.com/sirupsen/logrus" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" ) -var ( - errUnexpectedNumberOfIPLDs = errors.New("ipfs batch fetch returned unexpected number of IPLDs") -) - -// IPLDFetcher satisfies the IPLDFetcher interface for ethereum -type IPLDFetcher struct { - BlockService blockservice.BlockService +// IPLDPGFetcher satisfies the IPLDFetcher interface for ethereum +// it interfaces directly with PG-IPFS instead of going through a node-interface or remote node +type IPLDPGFetcher struct { + db *postgres.DB } -// NewIPLDFetcher creates a pointer to a new IPLDFetcher -// It interfaces with PG-IPFS through an internalized IPFS node interface -func NewIPLDFetcher(ipfsPath string) (*IPLDFetcher, error) { - blockService, err := ipfs.InitIPFSBlockService(ipfsPath) - if err != nil { - return nil, err +// NewIPLDPGFetcher creates a pointer to a new IPLDPGFetcher +func NewIPLDPGFetcher(db *postgres.DB) *IPLDPGFetcher { + return &IPLDPGFetcher{ + db: db, } - return &IPLDFetcher{ - BlockService: blockService, - }, nil } // Fetch is the exported method for fetching and returning all the IPLDS specified in the CIDWrapper -func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) { +func (f *IPLDPGFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) { cidWrapper, ok := cids.(*CIDWrapper) if !ok { return nil, fmt.Errorf("btc fetcher: expected cids type %T got %T", &CIDWrapper{}, cids) @@ -60,76 +49,59 @@ func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) { log.Debug("fetching iplds") iplds := IPLDs{} iplds.BlockNumber = cidWrapper.BlockNumber - var err error - iplds.Header, err = f.FetchHeader(cidWrapper.Header) + + tx, err := f.db.Beginx() if err != nil { return nil, err } - iplds.Transactions, err = f.FetchTrxs(cidWrapper.Transactions) + defer func() { + if p := recover(); p != nil { + shared.Rollback(tx) + panic(p) + } else if err != nil { + shared.Rollback(tx) + } else { + err = tx.Commit() + } + }() + + iplds.Header, err = f.FetchHeader(tx, cidWrapper.Header) if err != nil { - return nil, err + return nil, fmt.Errorf("btc pg fetcher: header fetching error: %s", err.Error()) } - return iplds, nil + iplds.Transactions, err = f.FetchTrxs(tx, cidWrapper.Transactions) + if err != nil { + return nil, fmt.Errorf("btc pg fetcher: transaction fetching error: %s", err.Error()) + } + return iplds, err } // FetchHeaders fetches headers -// It uses the f.fetch method -func (f *IPLDFetcher) FetchHeader(c HeaderModel) (ipfs.BlockModel, error) { +func (f *IPLDPGFetcher) FetchHeader(tx *sqlx.Tx, c HeaderModel) (ipfs.BlockModel, error) { log.Debug("fetching header ipld") - dc, err := cid.Decode(c.CID) - if err != nil { - return ipfs.BlockModel{}, err - } - header, err := f.fetch(dc) + headerBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey) if err != nil { return ipfs.BlockModel{}, err } return ipfs.BlockModel{ - Data: header.RawData(), - CID: header.Cid().String(), + Data: headerBytes, + CID: c.CID, }, nil } // FetchTrxs fetches transactions -// It uses the f.fetchBatch method -func (f *IPLDFetcher) FetchTrxs(cids []TxModel) ([]ipfs.BlockModel, error) { +func (f *IPLDPGFetcher) FetchTrxs(tx *sqlx.Tx, cids []TxModel) ([]ipfs.BlockModel, error) { log.Debug("fetching transaction iplds") - trxCids := make([]cid.Cid, len(cids)) + trxIPLDs := make([]ipfs.BlockModel, len(cids)) for i, c := range cids { - dc, err := cid.Decode(c.CID) + trxBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey) if err != nil { return nil, err } - trxCids[i] = dc - } - trxs := f.fetchBatch(trxCids) - trxIPLDs := make([]ipfs.BlockModel, len(trxs)) - for i, trx := range trxs { trxIPLDs[i] = ipfs.BlockModel{ - Data: trx.RawData(), - CID: trx.Cid().String(), + Data: trxBytes, + CID: c.CID, } } - if len(trxIPLDs) != len(trxCids) { - log.Errorf("ipfs fetcher: number of transaction blocks returned (%d) does not match number expected (%d)", len(trxs), len(trxCids)) - return trxIPLDs, errUnexpectedNumberOfIPLDs - } return trxIPLDs, nil } - -// fetch is used to fetch a single cid -func (f *IPLDFetcher) fetch(cid cid.Cid) (blocks.Block, error) { - return f.BlockService.GetBlock(context.Background(), cid) -} - -// fetchBatch is used to fetch a batch of IPFS data blocks by cid -// There is no guarantee all are fetched, and no error in such a case, so -// downstream we will need to confirm which CIDs were fetched in the result set -func (f *IPLDFetcher) fetchBatch(cids []cid.Cid) []blocks.Block { - fetchedBlocks := make([]blocks.Block, 0, len(cids)) - blockChan := f.BlockService.GetBlocks(context.Background(), cids) - for block := range blockChan { - fetchedBlocks = append(fetchedBlocks, block) - } - return fetchedBlocks -} diff --git a/pkg/btc/publisher.go b/pkg/btc/publisher.go index 5c6c4d07..dd5a72cb 100644 --- a/pkg/btc/publisher.go +++ b/pkg/btc/publisher.go @@ -20,102 +20,107 @@ import ( "fmt" "strconv" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/dag_putters" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld" + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" ) -// IPLDPublisher satisfies the IPLDPublisher for ethereum -type IPLDPublisher struct { - HeaderPutter ipfs.DagPutter - TransactionPutter ipfs.DagPutter - TransactionTriePutter ipfs.DagPutter +// IPLDPublisherAndIndexer satisfies the IPLDPublisher interface for bitcoin +// It interfaces directly with the public.blocks table of PG-IPFS rather than going through an ipfs intermediary +// It publishes and indexes IPLDs together in a single sqlx.Tx +type IPLDPublisherAndIndexer struct { + indexer *CIDIndexer } -// NewIPLDPublisher creates a pointer to a new Publisher which satisfies the IPLDPublisher interface -func NewIPLDPublisher(ipfsPath string) (*IPLDPublisher, error) { - node, err := ipfs.InitIPFSNode(ipfsPath) - if err != nil { - return nil, err +// NewIPLDPublisherAndIndexer creates a pointer to a new IPLDPublisherAndIndexer which satisfies the IPLDPublisher interface +func NewIPLDPublisherAndIndexer(db *postgres.DB) *IPLDPublisherAndIndexer { + return &IPLDPublisherAndIndexer{ + indexer: NewCIDIndexer(db), } - return &IPLDPublisher{ - HeaderPutter: dag_putters.NewBtcHeaderDagPutter(node), - TransactionPutter: dag_putters.NewBtcTxDagPutter(node), - TransactionTriePutter: dag_putters.NewBtcTxTrieDagPutter(node), - }, nil } // Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload -func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) { +func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) { ipldPayload, ok := payload.(ConvertedPayload) if !ok { - return nil, fmt.Errorf("eth publisher expected payload type %T got %T", &ConvertedPayload{}, payload) + return nil, fmt.Errorf("btc publisher expected payload type %T got %T", ConvertedPayload{}, payload) } - // Generate nodes + // Generate the iplds headerNode, txNodes, txTrieNodes, err := ipld.FromHeaderAndTxs(ipldPayload.Header, ipldPayload.Txs) if err != nil { return nil, err } - // Process and publish headers - headerCid, err := pub.publishHeader(headerNode) + + // Begin new db tx + tx, err := pub.indexer.db.Beginx() if err != nil { return nil, err } - mhKey, _ := shared.MultihashKeyFromCIDString(headerCid) + defer func() { + if p := recover(); p != nil { + shared.Rollback(tx) + panic(p) + } else if err != nil { + shared.Rollback(tx) + } else { + err = tx.Commit() + } + }() + + // Publish trie nodes + for _, node := range txTrieNodes { + if err := shared.PublishIPLD(tx, node); err != nil { + return nil, err + } + } + + // Publish and index header + if err := shared.PublishIPLD(tx, headerNode); err != nil { + return nil, err + } header := HeaderModel{ - CID: headerCid, - MhKey: mhKey, + CID: headerNode.Cid().String(), + MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), ParentHash: ipldPayload.Header.PrevBlock.String(), BlockNumber: strconv.Itoa(int(ipldPayload.BlockPayload.BlockHeight)), BlockHash: ipldPayload.Header.BlockHash().String(), Timestamp: ipldPayload.Header.Timestamp.UnixNano(), Bits: ipldPayload.Header.Bits, } - // Process and publish transactions - transactionCids, err := pub.publishTransactions(txNodes, txTrieNodes, ipldPayload.TxMetaData) + headerID, err := pub.indexer.indexHeaderCID(tx, header) if err != nil { return nil, err } - // Package CIDs and their metadata into a single struct - return &CIDPayload{ - HeaderCID: header, - TransactionCIDs: transactionCids, - }, nil -} -func (pub *IPLDPublisher) publishHeader(header *ipld.BtcHeader) (string, error) { - cid, err := pub.HeaderPutter.DagPut(header) - if err != nil { - return "", err - } - return cid, nil -} - -func (pub *IPLDPublisher) publishTransactions(transactions []*ipld.BtcTx, txTrie []*ipld.BtcTxTrie, trxMeta []TxModelWithInsAndOuts) ([]TxModelWithInsAndOuts, error) { - txCids := make([]TxModelWithInsAndOuts, len(transactions)) - for i, tx := range transactions { - cid, err := pub.TransactionPutter.DagPut(tx) + // Publish and index txs + for i, txNode := range txNodes { + if err := shared.PublishIPLD(tx, txNode); err != nil { + return nil, err + } + txModel := ipldPayload.TxMetaData[i] + txModel.CID = txNode.Cid().String() + txModel.MhKey = shared.MultihashKeyFromCID(txNode.Cid()) + txID, err := pub.indexer.indexTransactionCID(tx, txModel, headerID) if err != nil { return nil, err } - mhKey, _ := shared.MultihashKeyFromCIDString(cid) - txCids[i] = TxModelWithInsAndOuts{ - CID: cid, - MhKey: mhKey, - Index: trxMeta[i].Index, - TxHash: trxMeta[i].TxHash, - SegWit: trxMeta[i].SegWit, - WitnessHash: trxMeta[i].WitnessHash, - TxInputs: trxMeta[i].TxInputs, - TxOutputs: trxMeta[i].TxOutputs, + for _, input := range txModel.TxInputs { + if err := pub.indexer.indexTxInput(tx, input, txID); err != nil { + return nil, err + } + } + for _, output := range txModel.TxOutputs { + if err := pub.indexer.indexTxOutput(tx, output, txID); err != nil { + return nil, err + } } } - for _, txNode := range txTrie { - // We don't do anything with the tx trie cids atm - if _, err := pub.TransactionTriePutter.DagPut(txNode); err != nil { - return nil, err - } - } - return txCids, nil + + // This IPLDPublisher does both publishing and indexing, we do not need to pass anything forward to the indexer + return nil, err +} + +// Index satisfies the shared.CIDIndexer interface +func (pub *IPLDPublisherAndIndexer) Index(cids shared.CIDsForIndexing) error { + return nil } diff --git a/pkg/btc/publisher_test.go b/pkg/btc/publisher_test.go index ef809f3a..3271db9a 100644 --- a/pkg/btc/publisher_test.go +++ b/pkg/btc/publisher_test.go @@ -19,63 +19,103 @@ package btc_test import ( "bytes" - "github.com/ethereum/go-ethereum/common" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-ipfs-blockstore" + "github.com/ipfs/go-ipfs-ds-help" + "github.com/multiformats/go-multihash" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/btc" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/btc/mocks" - mocks2 "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/mocks" + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" ) -var ( - mockHeaderDagPutter *mocks2.MappedDagPutter - mockTrxDagPutter *mocks2.MappedDagPutter - mockTrxTrieDagPutter *mocks2.DagPutter -) - -var _ = Describe("Publisher", func() { +var _ = Describe("PublishAndIndexer", func() { + var ( + db *postgres.DB + err error + repo *btc.IPLDPublisherAndIndexer + ipfsPgGet = `SELECT data FROM public.blocks + WHERE key = $1` + ) BeforeEach(func() { - mockHeaderDagPutter = new(mocks2.MappedDagPutter) - mockTrxDagPutter = new(mocks2.MappedDagPutter) - mockTrxTrieDagPutter = new(mocks2.DagPutter) + db, err = shared.SetupDB() + Expect(err).ToNot(HaveOccurred()) + repo = btc.NewIPLDPublisherAndIndexer(db) + }) + AfterEach(func() { + btc.TearDownDB(db) }) Describe("Publish", func() { - It("Publishes the passed IPLDPayload objects to IPFS and returns a CIDPayload for indexing", func() { - by := new(bytes.Buffer) - err := mocks.MockConvertedPayload.BlockPayload.Header.Serialize(by) + It("Published and indexes header and transaction IPLDs in a single tx", func() { + emptyReturn, err := repo.Publish(mocks.MockConvertedPayload) + Expect(emptyReturn).To(BeNil()) Expect(err).ToNot(HaveOccurred()) - headerBytes := by.Bytes() - err = mocks.MockTransactions[0].MsgTx().Serialize(by) + pgStr := `SELECT * FROM btc.header_cids + WHERE block_number = $1` + // check header was properly indexed + buf := bytes.NewBuffer(make([]byte, 0, 80)) + err = mocks.MockBlock.Header.Serialize(buf) Expect(err).ToNot(HaveOccurred()) - tx1Bytes := by.Bytes() - err = mocks.MockTransactions[1].MsgTx().Serialize(by) + headerBytes := buf.Bytes() + c, _ := ipld.RawdataToCid(ipld.MBitcoinHeader, headerBytes, multihash.DBL_SHA2_256) + header := new(btc.HeaderModel) + err = db.Get(header, pgStr, mocks.MockHeaderMetaData.BlockNumber) Expect(err).ToNot(HaveOccurred()) - tx2Bytes := by.Bytes() - err = mocks.MockTransactions[2].MsgTx().Serialize(by) + Expect(header.CID).To(Equal(c.String())) + Expect(header.BlockNumber).To(Equal(mocks.MockHeaderMetaData.BlockNumber)) + Expect(header.Bits).To(Equal(mocks.MockHeaderMetaData.Bits)) + Expect(header.Timestamp).To(Equal(mocks.MockHeaderMetaData.Timestamp)) + Expect(header.BlockHash).To(Equal(mocks.MockHeaderMetaData.BlockHash)) + Expect(header.ParentHash).To(Equal(mocks.MockHeaderMetaData.ParentHash)) + dc, err := cid.Decode(header.CID) Expect(err).ToNot(HaveOccurred()) - tx3Bytes := by.Bytes() - mockHeaderDagPutter.CIDsToReturn = map[common.Hash]string{ - common.BytesToHash(headerBytes): mocks.MockHeaderCID.String(), + mhKey := dshelp.MultihashToDsKey(dc.Hash()) + prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() + var data []byte + err = db.Get(&data, ipfsPgGet, prefixedKey) + Expect(err).ToNot(HaveOccurred()) + Expect(data).To(Equal(headerBytes)) + + // check that txs were properly indexed + trxs := make([]btc.TxModel, 0) + pgStr = `SELECT transaction_cids.id, transaction_cids.header_id, transaction_cids.index, + transaction_cids.tx_hash, transaction_cids.cid, transaction_cids.segwit, transaction_cids.witness_hash + FROM btc.transaction_cids INNER JOIN btc.header_cids ON (transaction_cids.header_id = header_cids.id) + WHERE header_cids.block_number = $1` + err = db.Select(&trxs, pgStr, mocks.MockHeaderMetaData.BlockNumber) + Expect(err).ToNot(HaveOccurred()) + Expect(len(trxs)).To(Equal(3)) + txData := make([][]byte, len(mocks.MockTransactions)) + txCIDs := make([]string, len(mocks.MockTransactions)) + for i, m := range mocks.MockTransactions { + buf := bytes.NewBuffer(make([]byte, 0)) + err = m.MsgTx().Serialize(buf) + Expect(err).ToNot(HaveOccurred()) + tx := buf.Bytes() + txData[i] = tx + c, _ := ipld.RawdataToCid(ipld.MBitcoinTx, tx, multihash.DBL_SHA2_256) + txCIDs[i] = c.String() } - mockTrxDagPutter.CIDsToReturn = map[common.Hash]string{ - common.BytesToHash(tx1Bytes): mocks.MockTrxCID1.String(), - common.BytesToHash(tx2Bytes): mocks.MockTrxCID2.String(), - common.BytesToHash(tx3Bytes): mocks.MockTrxCID3.String(), + for _, tx := range trxs { + Expect(tx.SegWit).To(Equal(false)) + Expect(tx.HeaderID).To(Equal(header.ID)) + Expect(tx.WitnessHash).To(Equal("")) + Expect(tx.CID).To(Equal(txCIDs[tx.Index])) + Expect(tx.TxHash).To(Equal(mocks.MockBlock.Transactions[tx.Index].TxHash().String())) + dc, err := cid.Decode(tx.CID) + Expect(err).ToNot(HaveOccurred()) + mhKey := dshelp.MultihashToDsKey(dc.Hash()) + prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() + var data []byte + err = db.Get(&data, ipfsPgGet, prefixedKey) + Expect(err).ToNot(HaveOccurred()) + Expect(data).To(Equal(txData[tx.Index])) } - publisher := btc.IPLDPublisher{ - HeaderPutter: mockHeaderDagPutter, - TransactionPutter: mockTrxDagPutter, - TransactionTriePutter: mockTrxTrieDagPutter, - } - payload, err := publisher.Publish(mocks.MockConvertedPayload) - Expect(err).ToNot(HaveOccurred()) - cidPayload, ok := payload.(*btc.CIDPayload) - Expect(ok).To(BeTrue()) - Expect(cidPayload).To(Equal(&mocks.MockCIDPayload)) - Expect(cidPayload.HeaderCID).To(Equal(mocks.MockHeaderMetaData)) - Expect(cidPayload.TransactionCIDs).To(Equal(mocks.MockTxsMetaDataPostPublish)) }) }) }) diff --git a/pkg/eth/converter.go b/pkg/eth/converter.go index 1d6d2d31..eb534773 100644 --- a/pkg/eth/converter.go +++ b/pkg/eth/converter.go @@ -71,11 +71,13 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Convert if err != nil { return nil, err } + txMeta := TxModel{ Dst: shared.HandleZeroAddrPointer(trx.To()), Src: shared.HandleZeroAddr(from), TxHash: trx.Hash().String(), Index: int64(i), + Data: trx.Data(), } // txMeta will have same index as its corresponding trx in the convertedPayload.BlockBody convertedPayload.TxMetaData = append(convertedPayload.TxMetaData, txMeta) @@ -90,7 +92,7 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Convert if err := receipts.DeriveFields(pc.chainConfig, block.Hash(), block.NumberU64(), block.Transactions()); err != nil { return nil, err } - for _, receipt := range receipts { + for i, receipt := range receipts { // Extract topic and contract data from the receipt for indexing topicSets := make([][]string, 4) mappedContracts := make(map[string]bool) // use map to avoid duplicate addresses @@ -109,6 +111,7 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Convert contract := shared.HandleZeroAddr(receipt.ContractAddress) var contractHash string if contract != "" { + convertedPayload.TxMetaData[i].Deployment = true contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String() } rctMeta := ReceiptModel{ diff --git a/pkg/eth/indexer.go b/pkg/eth/indexer.go index ff57d2d4..b48f9a3b 100644 --- a/pkg/eth/indexer.go +++ b/pkg/eth/indexer.go @@ -109,10 +109,10 @@ func (in *CIDIndexer) indexUncleCID(tx *sqlx.Tx, uncle UncleModel, headerID int6 func (in *CIDIndexer) indexTransactionAndReceiptCIDs(tx *sqlx.Tx, payload *CIDPayload, headerID int64) error { for _, trxCidMeta := range payload.TransactionCIDs { var txID int64 - err := tx.QueryRowx(`INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) - ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index, mh_key) = ($3, $4, $5, $6, $7) + err := tx.QueryRowx(`INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, data, deployment) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index, mh_key, data, deployment) = ($3, $4, $5, $6, $7, $8, $9) RETURNING id`, - headerID, trxCidMeta.TxHash, trxCidMeta.CID, trxCidMeta.Dst, trxCidMeta.Src, trxCidMeta.Index, trxCidMeta.MhKey).Scan(&txID) + headerID, trxCidMeta.TxHash, trxCidMeta.CID, trxCidMeta.Dst, trxCidMeta.Src, trxCidMeta.Index, trxCidMeta.MhKey, trxCidMeta.Data, trxCidMeta.Deployment).Scan(&txID) if err != nil { return err } @@ -128,10 +128,10 @@ func (in *CIDIndexer) indexTransactionAndReceiptCIDs(tx *sqlx.Tx, payload *CIDPa func (in *CIDIndexer) indexTransactionCID(tx *sqlx.Tx, transaction TxModel, headerID int64) (int64, error) { var txID int64 - err := tx.QueryRowx(`INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) - ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index, mh_key) = ($3, $4, $5, $6, $7) + err := tx.QueryRowx(`INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, data, deployment) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index, mh_key, data, deployment) = ($3, $4, $5, $6, $7, $8, $9) RETURNING id`, - headerID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey).Scan(&txID) + headerID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Deployment).Scan(&txID) return txID, err } diff --git a/pkg/eth/ipld_fetcher.go b/pkg/eth/ipld_fetcher.go index e17e85b5..bbfe6b3d 100644 --- a/pkg/eth/ipld_fetcher.go +++ b/pkg/eth/ipld_fetcher.go @@ -17,203 +17,168 @@ package eth import ( - "context" "errors" "fmt" "math/big" "github.com/ethereum/go-ethereum/common" - "github.com/ipfs/go-block-format" - "github.com/ipfs/go-blockservice" - "github.com/ipfs/go-cid" + "github.com/jmoiron/sqlx" log "github.com/sirupsen/logrus" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" ) -var ( - errUnexpectedNumberOfIPLDs = errors.New("ipfs batch fetch returned unexpected number of IPLDs") -) - -// IPLDFetcher satisfies the IPLDFetcher interface for ethereum -type IPLDFetcher struct { - BlockService blockservice.BlockService +// IPLDPGFetcher satisfies the IPLDFetcher interface for ethereum +// It interfaces directly with PG-IPFS +type IPLDPGFetcher struct { + db *postgres.DB } -// NewIPLDFetcher creates a pointer to a new IPLDFetcher -func NewIPLDFetcher(ipfsPath string) (*IPLDFetcher, error) { - blockService, err := ipfs.InitIPFSBlockService(ipfsPath) - if err != nil { - return nil, err +// NewIPLDPGFetcher creates a pointer to a new IPLDPGFetcher +func NewIPLDPGFetcher(db *postgres.DB) *IPLDPGFetcher { + return &IPLDPGFetcher{ + db: db, } - return &IPLDFetcher{ - BlockService: blockService, - }, nil } // Fetch is the exported method for fetching and returning all the IPLDS specified in the CIDWrapper -func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) { +func (f *IPLDPGFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) { cidWrapper, ok := cids.(*CIDWrapper) if !ok { return nil, fmt.Errorf("eth fetcher: expected cids type %T got %T", &CIDWrapper{}, cids) } log.Debug("fetching iplds") - var err error iplds := IPLDs{} iplds.TotalDifficulty, ok = new(big.Int).SetString(cidWrapper.Header.TotalDifficulty, 10) if !ok { return nil, errors.New("eth fetcher: unable to set total difficulty") } iplds.BlockNumber = cidWrapper.BlockNumber - iplds.Header, err = f.FetchHeader(cidWrapper.Header) + + tx, err := f.db.Beginx() if err != nil { return nil, err } - iplds.Uncles, err = f.FetchUncles(cidWrapper.Uncles) + defer func() { + if p := recover(); p != nil { + shared.Rollback(tx) + panic(p) + } else if err != nil { + shared.Rollback(tx) + } else { + err = tx.Commit() + } + }() + + iplds.Header, err = f.FetchHeader(tx, cidWrapper.Header) if err != nil { - return nil, err + return nil, fmt.Errorf("eth pg fetcher: header fetching error: %s", err.Error()) } - iplds.Transactions, err = f.FetchTrxs(cidWrapper.Transactions) + iplds.Uncles, err = f.FetchUncles(tx, cidWrapper.Uncles) if err != nil { - return nil, err + return nil, fmt.Errorf("eth pg fetcher: uncle fetching error: %s", err.Error()) } - iplds.Receipts, err = f.FetchRcts(cidWrapper.Receipts) + iplds.Transactions, err = f.FetchTrxs(tx, cidWrapper.Transactions) if err != nil { - return nil, err + return nil, fmt.Errorf("eth pg fetcher: transaction fetching error: %s", err.Error()) } - iplds.StateNodes, err = f.FetchState(cidWrapper.StateNodes) + iplds.Receipts, err = f.FetchRcts(tx, cidWrapper.Receipts) if err != nil { - return nil, err + return nil, fmt.Errorf("eth pg fetcher: receipt fetching error: %s", err.Error()) } - iplds.StorageNodes, err = f.FetchStorage(cidWrapper.StorageNodes) + iplds.StateNodes, err = f.FetchState(tx, cidWrapper.StateNodes) if err != nil { - return nil, err + return nil, fmt.Errorf("eth pg fetcher: state fetching error: %s", err.Error()) } - return iplds, nil + iplds.StorageNodes, err = f.FetchStorage(tx, cidWrapper.StorageNodes) + if err != nil { + return nil, fmt.Errorf("eth pg fetcher: storage fetching error: %s", err.Error()) + } + return iplds, err } // FetchHeaders fetches headers -// It uses the f.fetch method -func (f *IPLDFetcher) FetchHeader(c HeaderModel) (ipfs.BlockModel, error) { +func (f *IPLDPGFetcher) FetchHeader(tx *sqlx.Tx, c HeaderModel) (ipfs.BlockModel, error) { log.Debug("fetching header ipld") - dc, err := cid.Decode(c.CID) - if err != nil { - return ipfs.BlockModel{}, err - } - header, err := f.fetch(dc) + headerBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey) if err != nil { return ipfs.BlockModel{}, err } return ipfs.BlockModel{ - Data: header.RawData(), - CID: header.Cid().String(), + Data: headerBytes, + CID: c.CID, }, nil } // FetchUncles fetches uncles -// It uses the f.fetchBatch method -func (f *IPLDFetcher) FetchUncles(cids []UncleModel) ([]ipfs.BlockModel, error) { +func (f *IPLDPGFetcher) FetchUncles(tx *sqlx.Tx, cids []UncleModel) ([]ipfs.BlockModel, error) { log.Debug("fetching uncle iplds") - uncleCids := make([]cid.Cid, len(cids)) + uncleIPLDs := make([]ipfs.BlockModel, len(cids)) for i, c := range cids { - dc, err := cid.Decode(c.CID) + uncleBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey) if err != nil { return nil, err } - uncleCids[i] = dc - } - uncles := f.fetchBatch(uncleCids) - uncleIPLDs := make([]ipfs.BlockModel, len(uncles)) - for i, uncle := range uncles { uncleIPLDs[i] = ipfs.BlockModel{ - Data: uncle.RawData(), - CID: uncle.Cid().String(), + Data: uncleBytes, + CID: c.CID, } } - if len(uncleIPLDs) != len(uncleCids) { - log.Errorf("ipfs fetcher: number of uncle blocks returned (%d) does not match number expected (%d)", len(uncles), len(uncleCids)) - return uncleIPLDs, errUnexpectedNumberOfIPLDs - } return uncleIPLDs, nil } // FetchTrxs fetches transactions -// It uses the f.fetchBatch method -func (f *IPLDFetcher) FetchTrxs(cids []TxModel) ([]ipfs.BlockModel, error) { +func (f *IPLDPGFetcher) FetchTrxs(tx *sqlx.Tx, cids []TxModel) ([]ipfs.BlockModel, error) { log.Debug("fetching transaction iplds") - trxCids := make([]cid.Cid, len(cids)) + trxIPLDs := make([]ipfs.BlockModel, len(cids)) for i, c := range cids { - dc, err := cid.Decode(c.CID) + txBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey) if err != nil { return nil, err } - trxCids[i] = dc - } - trxs := f.fetchBatch(trxCids) - trxIPLDs := make([]ipfs.BlockModel, len(trxs)) - for i, trx := range trxs { trxIPLDs[i] = ipfs.BlockModel{ - Data: trx.RawData(), - CID: trx.Cid().String(), + Data: txBytes, + CID: c.CID, } } - if len(trxIPLDs) != len(trxCids) { - log.Errorf("ipfs fetcher: number of transaction blocks returned (%d) does not match number expected (%d)", len(trxs), len(trxCids)) - return trxIPLDs, errUnexpectedNumberOfIPLDs - } return trxIPLDs, nil } // FetchRcts fetches receipts -// It uses the f.fetchBatch method -func (f *IPLDFetcher) FetchRcts(cids []ReceiptModel) ([]ipfs.BlockModel, error) { +func (f *IPLDPGFetcher) FetchRcts(tx *sqlx.Tx, cids []ReceiptModel) ([]ipfs.BlockModel, error) { log.Debug("fetching receipt iplds") - rctCids := make([]cid.Cid, len(cids)) + rctIPLDs := make([]ipfs.BlockModel, len(cids)) for i, c := range cids { - dc, err := cid.Decode(c.CID) + rctBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey) if err != nil { return nil, err } - rctCids[i] = dc - } - rcts := f.fetchBatch(rctCids) - rctIPLDs := make([]ipfs.BlockModel, len(rcts)) - for i, rct := range rcts { rctIPLDs[i] = ipfs.BlockModel{ - Data: rct.RawData(), - CID: rct.Cid().String(), + Data: rctBytes, + CID: c.CID, } } - if len(rctIPLDs) != len(rctCids) { - log.Errorf("ipfs fetcher: number of receipt blocks returned (%d) does not match number expected (%d)", len(rcts), len(rctCids)) - return rctIPLDs, errUnexpectedNumberOfIPLDs - } return rctIPLDs, nil } // FetchState fetches state nodes -// It uses the single f.fetch method instead of the batch fetch, because it -// needs to maintain the data's relation to state keys -func (f *IPLDFetcher) FetchState(cids []StateNodeModel) ([]StateNode, error) { +func (f *IPLDPGFetcher) FetchState(tx *sqlx.Tx, cids []StateNodeModel) ([]StateNode, error) { log.Debug("fetching state iplds") stateNodes := make([]StateNode, 0, len(cids)) for _, stateNode := range cids { if stateNode.CID == "" { continue } - dc, err := cid.Decode(stateNode.CID) - if err != nil { - return nil, err - } - state, err := f.fetch(dc) + stateBytes, err := shared.FetchIPLDByMhKey(tx, stateNode.MhKey) if err != nil { return nil, err } stateNodes = append(stateNodes, StateNode{ IPLD: ipfs.BlockModel{ - Data: state.RawData(), - CID: state.Cid().String(), + Data: stateBytes, + CID: stateNode.CID, }, StateLeafKey: common.HexToHash(stateNode.StateKey), Type: ResolveToNodeType(stateNode.NodeType), @@ -224,27 +189,21 @@ func (f *IPLDFetcher) FetchState(cids []StateNodeModel) ([]StateNode, error) { } // FetchStorage fetches storage nodes -// It uses the single f.fetch method instead of the batch fetch, because it -// needs to maintain the data's relation to state and storage keys -func (f *IPLDFetcher) FetchStorage(cids []StorageNodeWithStateKeyModel) ([]StorageNode, error) { +func (f *IPLDPGFetcher) FetchStorage(tx *sqlx.Tx, cids []StorageNodeWithStateKeyModel) ([]StorageNode, error) { log.Debug("fetching storage iplds") storageNodes := make([]StorageNode, 0, len(cids)) for _, storageNode := range cids { if storageNode.CID == "" || storageNode.StateKey == "" { continue } - dc, err := cid.Decode(storageNode.CID) - if err != nil { - return nil, err - } - storage, err := f.fetch(dc) + storageBytes, err := shared.FetchIPLDByMhKey(tx, storageNode.MhKey) if err != nil { return nil, err } storageNodes = append(storageNodes, StorageNode{ IPLD: ipfs.BlockModel{ - Data: storage.RawData(), - CID: storage.Cid().String(), + Data: storageBytes, + CID: storageNode.CID, }, StateLeafKey: common.HexToHash(storageNode.StateKey), StorageLeafKey: common.HexToHash(storageNode.StorageKey), @@ -254,20 +213,3 @@ func (f *IPLDFetcher) FetchStorage(cids []StorageNodeWithStateKeyModel) ([]Stora } return storageNodes, nil } - -// fetch is used to fetch a single cid -func (f *IPLDFetcher) fetch(cid cid.Cid) (blocks.Block, error) { - return f.BlockService.GetBlock(context.Background(), cid) -} - -// fetchBatch is used to fetch a batch of IPFS data blocks by cid -// There is no guarantee all are fetched, and no error in such a case, so -// downstream we will need to confirm which CIDs were fetched in the result set -func (f *IPLDFetcher) fetchBatch(cids []cid.Cid) []blocks.Block { - fetchedBlocks := make([]blocks.Block, 0, len(cids)) - blockChan := f.BlockService.GetBlocks(context.Background(), cids) - for block := range blockChan { - fetchedBlocks = append(fetchedBlocks, block) - } - return fetchedBlocks -} diff --git a/pkg/eth/ipld_fetcher_test.go b/pkg/eth/ipld_fetcher_test.go index 3ee52a65..a3ee7c78 100644 --- a/pkg/eth/ipld_fetcher_test.go +++ b/pkg/eth/ipld_fetcher_test.go @@ -17,139 +17,49 @@ package eth_test import ( - "bytes" - "math/big" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/statediff" - "github.com/ipfs/go-block-format" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/mocks" + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth/mocks" + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" ) var ( - mockHeaderData = []byte{0, 1, 2, 3, 4} - mockUncleData = []byte{1, 2, 3, 4, 5} - mockTrxData = []byte{2, 3, 4, 5, 6} - mockReceiptData = []byte{3, 4, 5, 6, 7} - mockStateData = []byte{4, 5, 6, 7, 8} - mockStorageData = []byte{5, 6, 7, 8, 9} - mockStorageData2 = []byte{6, 7, 8, 9, 1} - mockHeaderBlock = blocks.NewBlock(mockHeaderData) - mockUncleBlock = blocks.NewBlock(mockUncleData) - mockTrxBlock = blocks.NewBlock(mockTrxData) - mockReceiptBlock = blocks.NewBlock(mockReceiptData) - mockStateBlock = blocks.NewBlock(mockStateData) - mockStorageBlock1 = blocks.NewBlock(mockStorageData) - mockStorageBlock2 = blocks.NewBlock(mockStorageData2) - mockBlocks = []blocks.Block{mockHeaderBlock, mockUncleBlock, mockTrxBlock, mockReceiptBlock, mockStateBlock, mockStorageBlock1, mockStorageBlock2} - mockBlockService *mocks.MockIPFSBlockService - mockCIDWrapper = ð.CIDWrapper{ - BlockNumber: big.NewInt(9000), - Header: eth.HeaderModel{ - TotalDifficulty: "1337", - CID: mockHeaderBlock.Cid().String(), - }, - Uncles: []eth.UncleModel{ - { - CID: mockUncleBlock.Cid().String(), - }, - }, - Transactions: []eth.TxModel{ - { - CID: mockTrxBlock.Cid().String(), - }, - }, - Receipts: []eth.ReceiptModel{ - { - CID: mockReceiptBlock.Cid().String(), - }, - }, - StateNodes: []eth.StateNodeModel{{ - CID: mockStateBlock.Cid().String(), - NodeType: 2, - StateKey: "0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470", - }}, - StorageNodes: []eth.StorageNodeWithStateKeyModel{{ - CID: mockStorageBlock1.Cid().String(), - NodeType: 2, - StateKey: "0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470", - StorageKey: "0000000000000000000000000000000000000000000000000000000000000001", - }, - { - CID: mockStorageBlock2.Cid().String(), - NodeType: 2, - StateKey: "0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470", - StorageKey: "0000000000000000000000000000000000000000000000000000000000000002", - }}, - } + db *postgres.DB + pubAndIndexer *eth.IPLDPublisherAndIndexer + fetcher *eth.IPLDPGFetcher ) -var _ = Describe("IPLDFetcher", func() { +var _ = Describe("IPLDPGFetcher", func() { Describe("Fetch", func() { BeforeEach(func() { - mockBlockService = new(mocks.MockIPFSBlockService) - err := mockBlockService.AddBlocks(mockBlocks) + var err error + db, err = shared.SetupDB() Expect(err).ToNot(HaveOccurred()) - Expect(len(mockBlockService.Blocks)).To(Equal(7)) + pubAndIndexer = eth.NewIPLDPublisherAndIndexer(db) + _, err = pubAndIndexer.Publish(mocks.MockConvertedPayload) + Expect(err).ToNot(HaveOccurred()) + fetcher = eth.NewIPLDPGFetcher(db) + }) + AfterEach(func() { + eth.TearDownDB(db) }) It("Fetches and returns IPLDs for the CIDs provided in the CIDWrapper", func() { - fetcher := new(eth.IPLDFetcher) - fetcher.BlockService = mockBlockService - i, err := fetcher.Fetch(mockCIDWrapper) + i, err := fetcher.Fetch(mocks.MockCIDWrapper) Expect(err).ToNot(HaveOccurred()) iplds, ok := i.(eth.IPLDs) Expect(ok).To(BeTrue()) - Expect(iplds.TotalDifficulty).To(Equal(big.NewInt(1337))) - Expect(iplds.BlockNumber).To(Equal(mockCIDWrapper.BlockNumber)) - Expect(iplds.Header).To(Equal(ipfs.BlockModel{ - Data: mockHeaderBlock.RawData(), - CID: mockHeaderBlock.Cid().String(), - })) - Expect(len(iplds.Uncles)).To(Equal(1)) - Expect(iplds.Uncles[0]).To(Equal(ipfs.BlockModel{ - Data: mockUncleBlock.RawData(), - CID: mockUncleBlock.Cid().String(), - })) - Expect(len(iplds.Transactions)).To(Equal(1)) - Expect(iplds.Transactions[0]).To(Equal(ipfs.BlockModel{ - Data: mockTrxBlock.RawData(), - CID: mockTrxBlock.Cid().String(), - })) - Expect(len(iplds.Receipts)).To(Equal(1)) - Expect(iplds.Receipts[0]).To(Equal(ipfs.BlockModel{ - Data: mockReceiptBlock.RawData(), - CID: mockReceiptBlock.Cid().String(), - })) - Expect(len(iplds.StateNodes)).To(Equal(1)) - Expect(iplds.StateNodes[0].StateLeafKey).To(Equal(common.HexToHash("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470"))) - Expect(iplds.StateNodes[0].Type).To(Equal(statediff.Leaf)) - Expect(iplds.StateNodes[0].IPLD).To(Equal(ipfs.BlockModel{ - Data: mockStateBlock.RawData(), - CID: mockStateBlock.Cid().String(), - })) - Expect(len(iplds.StorageNodes)).To(Equal(2)) - for _, storage := range iplds.StorageNodes { - Expect(storage.Type).To(Equal(statediff.Leaf)) - Expect(storage.StateLeafKey).To(Equal(common.HexToHash("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470"))) - if bytes.Equal(storage.StorageLeafKey.Bytes(), common.HexToHash("0000000000000000000000000000000000000000000000000000000000000001").Bytes()) { - Expect(storage.IPLD).To(Equal(ipfs.BlockModel{ - Data: mockStorageBlock1.RawData(), - CID: mockStorageBlock1.Cid().String(), - })) - } - if bytes.Equal(storage.StorageLeafKey.Bytes(), common.HexToHash("0000000000000000000000000000000000000000000000000000000000000002").Bytes()) { - Expect(storage.IPLD).To(Equal(ipfs.BlockModel{ - Data: mockStorageBlock2.RawData(), - CID: mockStorageBlock2.Cid().String(), - })) - } - } + Expect(iplds.TotalDifficulty).To(Equal(mocks.MockConvertedPayload.TotalDifficulty)) + Expect(iplds.BlockNumber).To(Equal(mocks.MockConvertedPayload.Block.Number())) + Expect(iplds.Header).To(Equal(mocks.MockIPLDs.Header)) + Expect(len(iplds.Uncles)).To(Equal(0)) + Expect(iplds.Transactions).To(Equal(mocks.MockIPLDs.Transactions)) + Expect(iplds.Receipts).To(Equal(mocks.MockIPLDs.Receipts)) + Expect(iplds.StateNodes).To(Equal(mocks.MockIPLDs.StateNodes)) + Expect(iplds.StorageNodes).To(Equal(mocks.MockIPLDs.StorageNodes)) }) }) }) diff --git a/pkg/eth/models.go b/pkg/eth/models.go index 6c44a801..b78eda5c 100644 --- a/pkg/eth/models.go +++ b/pkg/eth/models.go @@ -59,6 +59,8 @@ type TxModel struct { MhKey string `db:"mh_key"` Dst string `db:"dst"` Src string `db:"src"` + Data []byte `db:"data"` + Deployment bool `db:"deployment"` } // ReceiptModel is the db model for eth.receipt_cids diff --git a/pkg/eth/publisher.go b/pkg/eth/publisher.go index 89169270..9a500631 100644 --- a/pkg/eth/publisher.go +++ b/pkg/eth/publisher.go @@ -21,64 +21,77 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" - "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/statediff" + "github.com/jmoiron/sqlx" + "github.com/multiformats/go-multihash" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/dag_putters" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld" + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" ) -// IPLDPublisher satisfies the IPLDPublisher for ethereum -type IPLDPublisher struct { - HeaderPutter ipfs.DagPutter - TransactionPutter ipfs.DagPutter - TransactionTriePutter ipfs.DagPutter - ReceiptPutter ipfs.DagPutter - ReceiptTriePutter ipfs.DagPutter - StatePutter ipfs.DagPutter - StoragePutter ipfs.DagPutter +// IPLDPublisherAndIndexer satisfies the IPLDPublisher interface for ethereum +// It interfaces directly with the public.blocks table of PG-IPFS rather than going through an ipfs intermediary +// It publishes and indexes IPLDs together in a single sqlx.Tx +type IPLDPublisherAndIndexer struct { + indexer *CIDIndexer } -// NewIPLDPublisher creates a pointer to a new IPLDPublisher which satisfies the IPLDPublisher interface -func NewIPLDPublisher(ipfsPath string) (*IPLDPublisher, error) { - node, err := ipfs.InitIPFSNode(ipfsPath) - if err != nil { - return nil, err +// NewIPLDPublisherAndIndexer creates a pointer to a new IPLDPublisherAndIndexer which satisfies the IPLDPublisher interface +func NewIPLDPublisherAndIndexer(db *postgres.DB) *IPLDPublisherAndIndexer { + return &IPLDPublisherAndIndexer{ + indexer: NewCIDIndexer(db), } - return &IPLDPublisher{ - HeaderPutter: dag_putters.NewEthBlockHeaderDagPutter(node), - TransactionPutter: dag_putters.NewEthTxsDagPutter(node), - TransactionTriePutter: dag_putters.NewEthTxTrieDagPutter(node), - ReceiptPutter: dag_putters.NewEthReceiptDagPutter(node), - ReceiptTriePutter: dag_putters.NewEthRctTrieDagPutter(node), - StatePutter: dag_putters.NewEthStateDagPutter(node), - StoragePutter: dag_putters.NewEthStorageDagPutter(node), - }, nil } // Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload -func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) { +func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) { ipldPayload, ok := payload.(ConvertedPayload) if !ok { - return nil, fmt.Errorf("eth publisher expected payload type %T got %T", ConvertedPayload{}, payload) + return nil, fmt.Errorf("eth IPLDPublisherAndIndexer expected payload type %T got %T", ConvertedPayload{}, payload) } - // Generate the nodes for publishing + // Generate the iplds headerNode, uncleNodes, txNodes, txTrieNodes, rctNodes, rctTrieNodes, err := ipld.FromBlockAndReceipts(ipldPayload.Block, ipldPayload.Receipts) if err != nil { return nil, err } - // Process and publish headers - headerCid, err := pub.publishHeader(headerNode) + // Begin new db tx + tx, err := pub.indexer.db.Beginx() if err != nil { return nil, err } + defer func() { + if p := recover(); p != nil { + shared.Rollback(tx) + panic(p) + } else if err != nil { + shared.Rollback(tx) + } else { + err = tx.Commit() + } + }() + + // Publish trie nodes + for _, node := range txTrieNodes { + if err := shared.PublishIPLD(tx, node); err != nil { + return nil, err + } + } + for _, node := range rctTrieNodes { + if err := shared.PublishIPLD(tx, node); err != nil { + return nil, err + } + } + + // Publish and index header + if err := shared.PublishIPLD(tx, headerNode); err != nil { + return nil, err + } reward := CalcEthBlockReward(ipldPayload.Block.Header(), ipldPayload.Block.Uncles(), ipldPayload.Block.Transactions(), ipldPayload.Receipts) header := HeaderModel{ - CID: headerCid, + CID: headerNode.Cid().String(), MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), ParentHash: ipldPayload.Block.ParentHash().String(), BlockNumber: ipldPayload.Block.Number().String(), @@ -92,189 +105,129 @@ func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForI UncleRoot: ipldPayload.Block.UncleHash().String(), Timestamp: ipldPayload.Block.Time(), } + headerID, err := pub.indexer.indexHeaderCID(tx, header) + if err != nil { + return nil, err + } - // Process and publish uncles - uncleCids := make([]UncleModel, len(uncleNodes)) - for i, uncle := range uncleNodes { - uncleCid, err := pub.publishHeader(uncle) - if err != nil { + // Publish and index uncles + for _, uncleNode := range uncleNodes { + if err := shared.PublishIPLD(tx, uncleNode); err != nil { return nil, err } - uncleReward := CalcUncleMinerReward(ipldPayload.Block.Number().Int64(), uncle.Number.Int64()) - uncleCids[i] = UncleModel{ - CID: uncleCid, - MhKey: shared.MultihashKeyFromCID(uncle.Cid()), - ParentHash: uncle.ParentHash.String(), - BlockHash: uncle.Hash().String(), + uncleReward := CalcUncleMinerReward(ipldPayload.Block.Number().Int64(), uncleNode.Number.Int64()) + uncle := UncleModel{ + CID: uncleNode.Cid().String(), + MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), + ParentHash: uncleNode.ParentHash.String(), + BlockHash: uncleNode.Hash().String(), Reward: uncleReward.String(), } - } - - // Process and publish transactions - transactionCids, err := pub.publishTransactions(txNodes, txTrieNodes, ipldPayload.TxMetaData) - if err != nil { - return nil, err - } - - // Process and publish receipts - receiptsCids, err := pub.publishReceipts(rctNodes, rctTrieNodes, ipldPayload.ReceiptMetaData) - if err != nil { - return nil, err - } - - // Process and publish state leafs - stateNodeCids, stateAccounts, err := pub.publishStateNodes(ipldPayload.StateNodes) - if err != nil { - return nil, err - } - - // Process and publish storage leafs - storageNodeCids, err := pub.publishStorageNodes(ipldPayload.StorageNodes) - if err != nil { - return nil, err - } - - // Package CIDs and their metadata into a single struct - return &CIDPayload{ - HeaderCID: header, - UncleCIDs: uncleCids, - TransactionCIDs: transactionCids, - ReceiptCIDs: receiptsCids, - StateNodeCIDs: stateNodeCids, - StorageNodeCIDs: storageNodeCids, - StateAccounts: stateAccounts, - }, nil -} - -func (pub *IPLDPublisher) generateBlockNodes(body *types.Block, receipts types.Receipts) (*ipld.EthHeader, - []*ipld.EthHeader, []*ipld.EthTx, []*ipld.EthTxTrie, []*ipld.EthReceipt, []*ipld.EthRctTrie, error) { - return ipld.FromBlockAndReceipts(body, receipts) -} - -func (pub *IPLDPublisher) publishHeader(header *ipld.EthHeader) (string, error) { - return pub.HeaderPutter.DagPut(header) -} - -func (pub *IPLDPublisher) publishTransactions(transactions []*ipld.EthTx, txTrie []*ipld.EthTxTrie, trxMeta []TxModel) ([]TxModel, error) { - trxCids := make([]TxModel, len(transactions)) - for i, tx := range transactions { - cid, err := pub.TransactionPutter.DagPut(tx) - if err != nil { - return nil, err - } - trxCids[i] = TxModel{ - CID: cid, - MhKey: shared.MultihashKeyFromCID(tx.Cid()), - Index: trxMeta[i].Index, - TxHash: trxMeta[i].TxHash, - Src: trxMeta[i].Src, - Dst: trxMeta[i].Dst, - } - } - for _, txNode := range txTrie { - // We don't do anything with the tx trie cids atm - if _, err := pub.TransactionTriePutter.DagPut(txNode); err != nil { + if err := pub.indexer.indexUncleCID(tx, uncle, headerID); err != nil { return nil, err } } - return trxCids, nil -} -func (pub *IPLDPublisher) publishReceipts(receipts []*ipld.EthReceipt, receiptTrie []*ipld.EthRctTrie, receiptMeta []ReceiptModel) (map[common.Hash]ReceiptModel, error) { - rctCids := make(map[common.Hash]ReceiptModel) - for i, rct := range receipts { - cid, err := pub.ReceiptPutter.DagPut(rct) + // Publish and index txs and receipts + for i, txNode := range txNodes { + if err := shared.PublishIPLD(tx, txNode); err != nil { + return nil, err + } + rctNode := rctNodes[i] + if err := shared.PublishIPLD(tx, rctNode); err != nil { + return nil, err + } + txModel := ipldPayload.TxMetaData[i] + txModel.CID = txNode.Cid().String() + txModel.MhKey = shared.MultihashKeyFromCID(txNode.Cid()) + txID, err := pub.indexer.indexTransactionCID(tx, txModel, headerID) if err != nil { return nil, err } - rctCids[rct.TxHash] = ReceiptModel{ - CID: cid, - MhKey: shared.MultihashKeyFromCID(rct.Cid()), - Contract: receiptMeta[i].Contract, - ContractHash: receiptMeta[i].ContractHash, - Topic0s: receiptMeta[i].Topic0s, - Topic1s: receiptMeta[i].Topic1s, - Topic2s: receiptMeta[i].Topic2s, - Topic3s: receiptMeta[i].Topic3s, - LogContracts: receiptMeta[i].LogContracts, + if txModel.Deployment { + if _, err = shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, txModel.Data); err != nil { + return nil, err + } } - } - for _, rctNode := range receiptTrie { - // We don't do anything with the rct trie cids atm - if _, err := pub.ReceiptTriePutter.DagPut(rctNode); err != nil { + rctModel := ipldPayload.ReceiptMetaData[i] + rctModel.CID = rctNode.Cid().String() + rctModel.MhKey = shared.MultihashKeyFromCID(rctNode.Cid()) + if err := pub.indexer.indexReceiptCID(tx, rctModel, txID); err != nil { return nil, err } } - return rctCids, nil + + // Publish and index state and storage + err = pub.publishAndIndexStateAndStorage(tx, ipldPayload, headerID) + + // This IPLDPublisher does both publishing and indexing, we do not need to pass anything forward to the indexer + return nil, err // return err variable explicitly so that we return the err = tx.Commit() assignment in the defer } -func (pub *IPLDPublisher) publishStateNodes(stateNodes []TrieNode) ([]StateNodeModel, map[string]StateAccountModel, error) { - stateNodeCids := make([]StateNodeModel, 0, len(stateNodes)) - stateAccounts := make(map[string]StateAccountModel) - for _, stateNode := range stateNodes { - node, err := ipld.FromStateTrieRLP(stateNode.Value) +func (pub *IPLDPublisherAndIndexer) publishAndIndexStateAndStorage(tx *sqlx.Tx, ipldPayload ConvertedPayload, headerID int64) error { + // Publish and index state and storage + for _, stateNode := range ipldPayload.StateNodes { + stateCIDStr, err := shared.PublishRaw(tx, ipld.MEthStateTrie, multihash.KECCAK_256, stateNode.Value) if err != nil { - return nil, nil, err + return err } - cid, err := pub.StatePutter.DagPut(node) - if err != nil { - return nil, nil, err - } - stateNodeCids = append(stateNodeCids, StateNodeModel{ + mhKey, _ := shared.MultihashKeyFromCIDString(stateCIDStr) + stateModel := StateNodeModel{ Path: stateNode.Path, StateKey: stateNode.LeafKey.String(), - CID: cid, - MhKey: shared.MultihashKeyFromCID(node.Cid()), + CID: stateCIDStr, + MhKey: mhKey, NodeType: ResolveFromNodeType(stateNode.Type), - }) - // If we have a leaf, decode the account to extract additional metadata for indexing + } + stateID, err := pub.indexer.indexStateCID(tx, stateModel, headerID) + if err != nil { + return err + } + // If we have a leaf, decode and index the account data and any associated storage diffs if stateNode.Type == statediff.Leaf { var i []interface{} if err := rlp.DecodeBytes(stateNode.Value, &i); err != nil { - return nil, nil, err + return err } if len(i) != 2 { - return nil, nil, fmt.Errorf("IPLDPublisher expected state leaf node rlp to decode into two elements") + return fmt.Errorf("eth IPLDPublisherAndIndexer expected state leaf node rlp to decode into two elements") } var account state.Account if err := rlp.DecodeBytes(i[1].([]byte), &account); err != nil { - return nil, nil, err + return err } - // Map state account to the state path hash - statePath := common.Bytes2Hex(stateNode.Path) - stateAccounts[statePath] = StateAccountModel{ + accountModel := StateAccountModel{ Balance: account.Balance.String(), Nonce: account.Nonce, CodeHash: account.CodeHash, StorageRoot: account.Root.String(), } + if err := pub.indexer.indexStateAccount(tx, accountModel, stateID); err != nil { + return err + } + for _, storageNode := range ipldPayload.StorageNodes[common.Bytes2Hex(stateNode.Path)] { + storageCIDStr, err := shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, storageNode.Value) + if err != nil { + return err + } + mhKey, _ := shared.MultihashKeyFromCIDString(storageCIDStr) + storageModel := StorageNodeModel{ + Path: storageNode.Path, + StorageKey: storageNode.LeafKey.Hex(), + CID: storageCIDStr, + MhKey: mhKey, + NodeType: ResolveFromNodeType(storageNode.Type), + } + if err := pub.indexer.indexStorageCID(tx, storageModel, stateID); err != nil { + return err + } + } } } - return stateNodeCids, stateAccounts, nil + return nil } -func (pub *IPLDPublisher) publishStorageNodes(storageNodes map[string][]TrieNode) (map[string][]StorageNodeModel, error) { - storageLeafCids := make(map[string][]StorageNodeModel) - for path, storageTrie := range storageNodes { - storageLeafCids[path] = make([]StorageNodeModel, 0, len(storageTrie)) - for _, storageNode := range storageTrie { - node, err := ipld.FromStorageTrieRLP(storageNode.Value) - if err != nil { - return nil, err - } - cid, err := pub.StoragePutter.DagPut(node) - if err != nil { - return nil, err - } - // Map storage node cids to the state path hash - storageLeafCids[path] = append(storageLeafCids[path], StorageNodeModel{ - Path: storageNode.Path, - StorageKey: storageNode.LeafKey.Hex(), - CID: cid, - MhKey: shared.MultihashKeyFromCID(node.Cid()), - NodeType: ResolveFromNodeType(storageNode.Type), - }) - } - } - return storageLeafCids, nil +// Index satisfies the shared.CIDIndexer interface +func (pub *IPLDPublisherAndIndexer) Index(cids shared.CIDsForIndexing) error { + return nil } diff --git a/pkg/eth/publisher_test.go b/pkg/eth/publisher_test.go index cd98954f..c90d10ae 100644 --- a/pkg/eth/publisher_test.go +++ b/pkg/eth/publisher_test.go @@ -18,88 +18,221 @@ package eth_test import ( "github.com/ethereum/go-ethereum/common" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-ipfs-blockstore" + "github.com/ipfs/go-ipfs-ds-help" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth/mocks" - mocks2 "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/mocks" + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" ) -var ( - mockHeaderDagPutter *mocks2.MappedDagPutter - mockTrxDagPutter *mocks2.MappedDagPutter - mockTrxTrieDagPutter *mocks2.DagPutter - mockRctDagPutter *mocks2.MappedDagPutter - mockRctTrieDagPutter *mocks2.DagPutter - mockStateDagPutter *mocks2.MappedDagPutter - mockStorageDagPutter *mocks2.MappedDagPutter -) - -var _ = Describe("Publisher", func() { +var _ = Describe("PublishAndIndexer", func() { + var ( + db *postgres.DB + err error + repo *eth.IPLDPublisherAndIndexer + ipfsPgGet = `SELECT data FROM public.blocks + WHERE key = $1` + ) BeforeEach(func() { - mockHeaderDagPutter = new(mocks2.MappedDagPutter) - mockTrxDagPutter = new(mocks2.MappedDagPutter) - mockTrxTrieDagPutter = new(mocks2.DagPutter) - mockRctDagPutter = new(mocks2.MappedDagPutter) - mockRctTrieDagPutter = new(mocks2.DagPutter) - mockStateDagPutter = new(mocks2.MappedDagPutter) - mockStorageDagPutter = new(mocks2.MappedDagPutter) + db, err = shared.SetupDB() + Expect(err).ToNot(HaveOccurred()) + repo = eth.NewIPLDPublisherAndIndexer(db) + }) + AfterEach(func() { + eth.TearDownDB(db) }) Describe("Publish", func() { - It("Publishes the passed IPLDPayload objects to IPFS and returns a CIDPayload for indexing", func() { - mockHeaderDagPutter.CIDsToReturn = map[common.Hash]string{ - common.BytesToHash(mocks.HeaderIPLD.RawData()): mocks.HeaderCID.String(), - } - mockTrxDagPutter.CIDsToReturn = map[common.Hash]string{ - common.BytesToHash(mocks.Trx1IPLD.RawData()): mocks.Trx1CID.String(), - common.BytesToHash(mocks.Trx2IPLD.RawData()): mocks.Trx2CID.String(), - common.BytesToHash(mocks.Trx3IPLD.RawData()): mocks.Trx3CID.String(), - } - mockRctDagPutter.CIDsToReturn = map[common.Hash]string{ - common.BytesToHash(mocks.Rct1IPLD.RawData()): mocks.Rct1CID.String(), - common.BytesToHash(mocks.Rct2IPLD.RawData()): mocks.Rct2CID.String(), - common.BytesToHash(mocks.Rct3IPLD.RawData()): mocks.Rct3CID.String(), - } - mockStateDagPutter.CIDsToReturn = map[common.Hash]string{ - common.BytesToHash(mocks.State1IPLD.RawData()): mocks.State1CID.String(), - common.BytesToHash(mocks.State2IPLD.RawData()): mocks.State2CID.String(), - } - mockStorageDagPutter.CIDsToReturn = map[common.Hash]string{ - common.BytesToHash(mocks.StorageIPLD.RawData()): mocks.StorageCID.String(), - } - publisher := eth.IPLDPublisher{ - HeaderPutter: mockHeaderDagPutter, - TransactionPutter: mockTrxDagPutter, - TransactionTriePutter: mockTrxTrieDagPutter, - ReceiptPutter: mockRctDagPutter, - ReceiptTriePutter: mockRctTrieDagPutter, - StatePutter: mockStateDagPutter, - StoragePutter: mockStorageDagPutter, - } - payload, err := publisher.Publish(mocks.MockConvertedPayload) + It("Published and indexes header IPLDs in a single tx", func() { + emptyReturn, err := repo.Publish(mocks.MockConvertedPayload) + Expect(emptyReturn).To(BeNil()) Expect(err).ToNot(HaveOccurred()) - cidPayload, ok := payload.(*eth.CIDPayload) - Expect(ok).To(BeTrue()) - Expect(cidPayload.HeaderCID.TotalDifficulty).To(Equal(mocks.MockConvertedPayload.TotalDifficulty.String())) - Expect(cidPayload.HeaderCID.BlockNumber).To(Equal(mocks.MockCIDPayload.HeaderCID.BlockNumber)) - Expect(cidPayload.HeaderCID.BlockHash).To(Equal(mocks.MockCIDPayload.HeaderCID.BlockHash)) - Expect(cidPayload.HeaderCID.Reward).To(Equal(mocks.MockCIDPayload.HeaderCID.Reward)) - Expect(cidPayload.UncleCIDs).To(Equal(mocks.MockCIDPayload.UncleCIDs)) - Expect(cidPayload.HeaderCID).To(Equal(mocks.MockCIDPayload.HeaderCID)) - Expect(len(cidPayload.TransactionCIDs)).To(Equal(3)) - Expect(cidPayload.TransactionCIDs[0]).To(Equal(mocks.MockCIDPayload.TransactionCIDs[0])) - Expect(cidPayload.TransactionCIDs[1]).To(Equal(mocks.MockCIDPayload.TransactionCIDs[1])) - Expect(cidPayload.TransactionCIDs[2]).To(Equal(mocks.MockCIDPayload.TransactionCIDs[2])) - Expect(len(cidPayload.ReceiptCIDs)).To(Equal(3)) - Expect(cidPayload.ReceiptCIDs[mocks.MockTransactions[0].Hash()]).To(Equal(mocks.MockCIDPayload.ReceiptCIDs[mocks.MockTransactions[0].Hash()])) - Expect(cidPayload.ReceiptCIDs[mocks.MockTransactions[1].Hash()]).To(Equal(mocks.MockCIDPayload.ReceiptCIDs[mocks.MockTransactions[1].Hash()])) - Expect(cidPayload.ReceiptCIDs[mocks.MockTransactions[2].Hash()]).To(Equal(mocks.MockCIDPayload.ReceiptCIDs[mocks.MockTransactions[2].Hash()])) - Expect(len(cidPayload.StateNodeCIDs)).To(Equal(2)) - Expect(cidPayload.StateNodeCIDs[0]).To(Equal(mocks.MockCIDPayload.StateNodeCIDs[0])) - Expect(cidPayload.StateNodeCIDs[1]).To(Equal(mocks.MockCIDPayload.StateNodeCIDs[1])) - Expect(cidPayload.StorageNodeCIDs).To(Equal(mocks.MockCIDPayload.StorageNodeCIDs)) + pgStr := `SELECT cid, td, reward, id + FROM eth.header_cids + WHERE block_number = $1` + // check header was properly indexed + type res struct { + CID string + TD string + Reward string + ID int + } + header := new(res) + err = db.QueryRowx(pgStr, 1).StructScan(header) + Expect(err).ToNot(HaveOccurred()) + Expect(header.CID).To(Equal(mocks.HeaderCID.String())) + Expect(header.TD).To(Equal(mocks.MockBlock.Difficulty().String())) + Expect(header.Reward).To(Equal("5000000000000000000")) + dc, err := cid.Decode(header.CID) + Expect(err).ToNot(HaveOccurred()) + mhKey := dshelp.MultihashToDsKey(dc.Hash()) + prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() + var data []byte + err = db.Get(&data, ipfsPgGet, prefixedKey) + Expect(err).ToNot(HaveOccurred()) + Expect(data).To(Equal(mocks.MockHeaderRlp)) + }) + + It("Publishes and indexes transaction IPLDs in a single tx", func() { + emptyReturn, err := repo.Publish(mocks.MockConvertedPayload) + Expect(emptyReturn).To(BeNil()) + Expect(err).ToNot(HaveOccurred()) + // check that txs were properly indexed + trxs := make([]string, 0) + pgStr := `SELECT transaction_cids.cid FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.id) + WHERE header_cids.block_number = $1` + err = db.Select(&trxs, pgStr, 1) + Expect(err).ToNot(HaveOccurred()) + Expect(len(trxs)).To(Equal(3)) + Expect(shared.ListContainsString(trxs, mocks.Trx1CID.String())).To(BeTrue()) + Expect(shared.ListContainsString(trxs, mocks.Trx2CID.String())).To(BeTrue()) + Expect(shared.ListContainsString(trxs, mocks.Trx3CID.String())).To(BeTrue()) + // and published + for _, c := range trxs { + dc, err := cid.Decode(c) + Expect(err).ToNot(HaveOccurred()) + mhKey := dshelp.MultihashToDsKey(dc.Hash()) + prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() + var data []byte + err = db.Get(&data, ipfsPgGet, prefixedKey) + Expect(err).ToNot(HaveOccurred()) + switch c { + case mocks.Trx1CID.String(): + Expect(data).To(Equal(mocks.MockTransactions.GetRlp(0))) + case mocks.Trx2CID.String(): + Expect(data).To(Equal(mocks.MockTransactions.GetRlp(1))) + case mocks.Trx3CID.String(): + Expect(data).To(Equal(mocks.MockTransactions.GetRlp(2))) + } + } + }) + + It("Publishes and indexes receipt IPLDs in a single tx", func() { + emptyReturn, err := repo.Publish(mocks.MockConvertedPayload) + Expect(emptyReturn).To(BeNil()) + Expect(err).ToNot(HaveOccurred()) + // check receipts were properly indexed + rcts := make([]string, 0) + pgStr := `SELECT receipt_cids.cid FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids + WHERE receipt_cids.tx_id = transaction_cids.id + AND transaction_cids.header_id = header_cids.id + AND header_cids.block_number = $1` + err = db.Select(&rcts, pgStr, 1) + Expect(err).ToNot(HaveOccurred()) + Expect(len(rcts)).To(Equal(3)) + Expect(shared.ListContainsString(rcts, mocks.Rct1CID.String())).To(BeTrue()) + Expect(shared.ListContainsString(rcts, mocks.Rct2CID.String())).To(BeTrue()) + Expect(shared.ListContainsString(rcts, mocks.Rct3CID.String())).To(BeTrue()) + // and published + for _, c := range rcts { + dc, err := cid.Decode(c) + Expect(err).ToNot(HaveOccurred()) + mhKey := dshelp.MultihashToDsKey(dc.Hash()) + prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() + var data []byte + err = db.Get(&data, ipfsPgGet, prefixedKey) + Expect(err).ToNot(HaveOccurred()) + switch c { + case mocks.Rct1CID.String(): + Expect(data).To(Equal(mocks.MockReceipts.GetRlp(0))) + case mocks.Rct2CID.String(): + Expect(data).To(Equal(mocks.MockReceipts.GetRlp(1))) + case mocks.Rct3CID.String(): + Expect(data).To(Equal(mocks.MockReceipts.GetRlp(2))) + } + } + }) + + It("Publishes and indexes state IPLDs in a single tx", func() { + emptyReturn, err := repo.Publish(mocks.MockConvertedPayload) + Expect(emptyReturn).To(BeNil()) + Expect(err).ToNot(HaveOccurred()) + // check that state nodes were properly indexed and published + stateNodes := make([]eth.StateNodeModel, 0) + pgStr := `SELECT state_cids.id, state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id + FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id) + WHERE header_cids.block_number = $1` + err = db.Select(&stateNodes, pgStr, 1) + Expect(err).ToNot(HaveOccurred()) + Expect(len(stateNodes)).To(Equal(2)) + for _, stateNode := range stateNodes { + var data []byte + dc, err := cid.Decode(stateNode.CID) + Expect(err).ToNot(HaveOccurred()) + mhKey := dshelp.MultihashToDsKey(dc.Hash()) + prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() + err = db.Get(&data, ipfsPgGet, prefixedKey) + Expect(err).ToNot(HaveOccurred()) + pgStr = `SELECT * from eth.state_accounts WHERE state_id = $1` + var account eth.StateAccountModel + err = db.Get(&account, pgStr, stateNode.ID) + Expect(err).ToNot(HaveOccurred()) + if stateNode.CID == mocks.State1CID.String() { + Expect(stateNode.NodeType).To(Equal(2)) + Expect(stateNode.StateKey).To(Equal(common.BytesToHash(mocks.ContractLeafKey).Hex())) + Expect(stateNode.Path).To(Equal([]byte{'\x06'})) + Expect(data).To(Equal(mocks.ContractLeafNode)) + Expect(account).To(Equal(eth.StateAccountModel{ + ID: account.ID, + StateID: stateNode.ID, + Balance: "0", + CodeHash: mocks.ContractCodeHash.Bytes(), + StorageRoot: mocks.ContractRoot, + Nonce: 1, + })) + } + if stateNode.CID == mocks.State2CID.String() { + Expect(stateNode.NodeType).To(Equal(2)) + Expect(stateNode.StateKey).To(Equal(common.BytesToHash(mocks.AccountLeafKey).Hex())) + Expect(stateNode.Path).To(Equal([]byte{'\x0c'})) + Expect(data).To(Equal(mocks.AccountLeafNode)) + Expect(account).To(Equal(eth.StateAccountModel{ + ID: account.ID, + StateID: stateNode.ID, + Balance: "1000", + CodeHash: mocks.AccountCodeHash.Bytes(), + StorageRoot: mocks.AccountRoot, + Nonce: 0, + })) + } + } + pgStr = `SELECT * from eth.state_accounts WHERE state_id = $1` + }) + + It("Publishes and indexes storage IPLDs in a single tx", func() { + emptyReturn, err := repo.Publish(mocks.MockConvertedPayload) + Expect(emptyReturn).To(BeNil()) + Expect(err).ToNot(HaveOccurred()) + // check that storage nodes were properly indexed + storageNodes := make([]eth.StorageNodeWithStateKeyModel, 0) + pgStr := `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path + FROM eth.storage_cids, eth.state_cids, eth.header_cids + WHERE storage_cids.state_id = state_cids.id + AND state_cids.header_id = header_cids.id + AND header_cids.block_number = $1` + err = db.Select(&storageNodes, pgStr, 1) + Expect(err).ToNot(HaveOccurred()) + Expect(len(storageNodes)).To(Equal(1)) + Expect(storageNodes[0]).To(Equal(eth.StorageNodeWithStateKeyModel{ + CID: mocks.StorageCID.String(), + NodeType: 2, + StorageKey: common.BytesToHash(mocks.StorageLeafKey).Hex(), + StateKey: common.BytesToHash(mocks.ContractLeafKey).Hex(), + Path: []byte{}, + })) + var data []byte + dc, err := cid.Decode(storageNodes[0].CID) + Expect(err).ToNot(HaveOccurred()) + mhKey := dshelp.MultihashToDsKey(dc.Hash()) + prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() + err = db.Get(&data, ipfsPgGet, prefixedKey) + Expect(err).ToNot(HaveOccurred()) + Expect(data).To(Equal(mocks.StorageLeafNode)) }) }) })