From a532e17e21667cd804d8816eb0c58680c8cd3592 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Mon, 10 Aug 2020 13:04:07 -0500 Subject: [PATCH] remove option for publishing through ipfs node interface --- README.md | 5 +- cmd/resync.go | 8 - cmd/watch.go | 6 - dockerfiles/super_node/entrypoint.sh | 1 - dockerfiles/super_node/startup_script.sh | 1 - documentation/architecture.md | 4 - documentation/resync.md | 3 - environments/superNodeETH.toml | 1 + pkg/btc/ipld_fetcher.go | 16 +- pkg/btc/ipld_pg_fetcher.go | 107 ---------- pkg/btc/publish_and_indexer.go | 126 ------------ pkg/btc/publish_and_indexer_test.go | 121 ------------ pkg/btc/publisher.go | 40 ++-- pkg/btc/publisher_test.go | 7 +- pkg/builders/builders.go | 66 +------ pkg/eth/api_test.go | 10 +- pkg/eth/backend.go | 2 +- pkg/eth/cid_retriever_test.go | 92 ++++----- pkg/eth/helpers.go | 3 +- pkg/eth/ipld_fetcher.go | 24 +-- pkg/eth/ipld_fetcher_test.go | 12 +- pkg/eth/ipld_pg_fetcher.go | 215 -------------------- pkg/eth/ipld_pg_fetcher_test.go | 65 ------- pkg/eth/mocks/publisher.go | 16 +- pkg/eth/mocks/test_data.go | 45 +++-- pkg/eth/models.go | 20 +- pkg/eth/publish_and_indexer.go | 228 ---------------------- pkg/eth/publish_and_indexer_test.go | 238 ----------------------- pkg/eth/publisher.go | 53 +++-- pkg/eth/publisher_test.go | 19 +- pkg/historical/config.go | 14 -- pkg/historical/service.go | 15 +- pkg/historical/service_test.go | 20 -- pkg/ipfs/builders.go | 90 --------- pkg/ipfs/dag_putters/btc_header.go | 50 ----- pkg/ipfs/dag_putters/btc_tx.go | 46 ----- pkg/ipfs/dag_putters/btc_tx_trie.go | 46 ----- pkg/ipfs/dag_putters/eth_header.go | 46 ----- pkg/ipfs/dag_putters/eth_receipt.go | 46 ----- pkg/ipfs/dag_putters/eth_receipt_trie.go | 46 ----- pkg/ipfs/dag_putters/eth_state.go | 46 ----- pkg/ipfs/dag_putters/eth_storage.go | 46 ----- pkg/ipfs/dag_putters/eth_tx.go | 46 ----- pkg/ipfs/dag_putters/eth_tx_trie.go | 46 ----- pkg/ipfs/interfaces.go | 26 --- pkg/ipfs/mocks/blockservice.go | 86 -------- pkg/ipfs/mocks/dag_putters.go | 53 ----- pkg/resync/config.go | 12 -- pkg/resync/service.go | 15 +- pkg/shared/env.go | 29 --- pkg/shared/intefaces.go | 7 +- pkg/shared/ipfs_mode.go | 58 ------ pkg/watch/config.go | 14 -- pkg/watch/service.go | 54 ++--- pkg/watch/service_test.go | 6 - 55 files changed, 206 insertions(+), 2311 deletions(-) delete mode 100644 pkg/btc/ipld_pg_fetcher.go delete mode 100644 pkg/btc/publish_and_indexer.go delete mode 100644 pkg/btc/publish_and_indexer_test.go delete mode 100644 pkg/eth/ipld_pg_fetcher.go delete mode 100644 pkg/eth/ipld_pg_fetcher_test.go delete mode 100644 pkg/eth/publish_and_indexer.go delete mode 100644 pkg/eth/publish_and_indexer_test.go delete mode 100644 pkg/ipfs/builders.go delete mode 100644 pkg/ipfs/dag_putters/btc_header.go delete mode 100644 pkg/ipfs/dag_putters/btc_tx.go delete mode 100644 pkg/ipfs/dag_putters/btc_tx_trie.go delete mode 100644 pkg/ipfs/dag_putters/eth_header.go delete mode 100644 pkg/ipfs/dag_putters/eth_receipt.go delete mode 100644 pkg/ipfs/dag_putters/eth_receipt_trie.go delete mode 100644 pkg/ipfs/dag_putters/eth_state.go delete mode 100644 pkg/ipfs/dag_putters/eth_storage.go delete mode 100644 pkg/ipfs/dag_putters/eth_tx.go delete mode 100644 pkg/ipfs/dag_putters/eth_tx_trie.go delete mode 100644 pkg/ipfs/interfaces.go delete mode 100644 pkg/ipfs/mocks/blockservice.go delete mode 100644 pkg/ipfs/mocks/dag_putters.go delete mode 100644 pkg/shared/ipfs_mode.go diff --git a/README.md b/README.md index 21eba062..306ed2a1 100644 --- a/README.md +++ b/README.md @@ -161,10 +161,6 @@ This set of parameters needs to be set no matter the chain type. user = "vdbm" # $DATABASE_USER password = "" # $DATABASE_PASSWORD -[ipfs] - path = "~/.ipfs" # $IPFS_PATH - mode = "postgres" # $IPFS_MODE - [watcher] chain = "bitcoin" # $SUPERNODE_CHAIN server = true # $SUPERNODE_SERVER @@ -207,6 +203,7 @@ For Ethereum: clientName = "Geth" # $ETH_CLIENT_NAME genesisBlock = "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3" # $ETH_GENESIS_BLOCK networkID = "1" # $ETH_NETWORK_ID + chainID = "1" # $ETH_CHAIN_ID ``` ### Exposing the data diff --git a/cmd/resync.go b/cmd/resync.go index 9ffde23e..0a04f3d5 100644 --- a/cmd/resync.go +++ b/cmd/resync.go @@ -19,9 +19,6 @@ import ( log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" - - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/resync" v "github.com/vulcanize/ipfs-blockchain-watcher/version" ) @@ -46,11 +43,6 @@ func rsyncCmdCommand() { logWithCommand.Fatal(err) } logWithCommand.Infof("resync config: %+v", rConfig) - if rConfig.IPFSMode == shared.LocalInterface { - if err := ipfs.InitIPFSPlugins(); err != nil { - logWithCommand.Fatal(err) - } - } logWithCommand.Debug("initializing new resync service") rService, err := resync.NewResyncService(rConfig) if err != nil { diff --git a/cmd/watch.go b/cmd/watch.go index de4acc92..f0512557 100644 --- a/cmd/watch.go +++ b/cmd/watch.go @@ -26,7 +26,6 @@ import ( "github.com/spf13/viper" h "github.com/vulcanize/ipfs-blockchain-watcher/pkg/historical" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" w "github.com/vulcanize/ipfs-blockchain-watcher/pkg/watch" v "github.com/vulcanize/ipfs-blockchain-watcher/version" @@ -65,11 +64,6 @@ func watch() { logWithCommand.Fatal(err) } logWithCommand.Infof("watcher config: %+v", watcherConfig) - if watcherConfig.IPFSMode == shared.LocalInterface { - if err := ipfs.InitIPFSPlugins(); err != nil { - logWithCommand.Fatal(err) - } - } logWithCommand.Debug("initializing new watcher service") watcher, err := w.NewWatcher(watcherConfig) if err != nil { diff --git a/dockerfiles/super_node/entrypoint.sh b/dockerfiles/super_node/entrypoint.sh index 7df52bc5..ff93bc60 100755 --- a/dockerfiles/super_node/entrypoint.sh +++ b/dockerfiles/super_node/entrypoint.sh @@ -13,7 +13,6 @@ set +x #test $DATABASE_USER #test $DATABASE_PASSWORD #test $IPFS_INIT -#test $IPFS_PATH VDB_COMMAND=${VDB_COMMAND:-watch} set +e diff --git a/dockerfiles/super_node/startup_script.sh b/dockerfiles/super_node/startup_script.sh index 89039372..3ff9ca02 100755 --- a/dockerfiles/super_node/startup_script.sh +++ b/dockerfiles/super_node/startup_script.sh @@ -12,7 +12,6 @@ test $DATABASE_PORT test $DATABASE_USER test $DATABASE_PASSWORD test $IPFS_INIT -test $IPFS_PATH test $VDB_COMMAND set +e diff --git a/documentation/architecture.md b/documentation/architecture.md index 8b617c10..dd18e100 100644 --- a/documentation/architecture.md +++ b/documentation/architecture.md @@ -52,10 +52,6 @@ This set of parameters needs to be set no matter the chain type. user = "vdbm" # $DATABASE_USER password = "" # $DATABASE_PASSWORD -[ipfs] - path = "~/.ipfs" # $IPFS_PATH - mode = "direct" # $IPFS_MODE - [watcher] chain = "bitcoin" # $SUPERNODE_CHAIN server = true # $SUPERNODE_SERVER diff --git a/documentation/resync.md b/documentation/resync.md index 44e5332d..b0de3c2e 100644 --- a/documentation/resync.md +++ b/documentation/resync.md @@ -30,9 +30,6 @@ This set of parameters needs to be set no matter the chain type. port = 5432 # $DATABASE_PORT user = "vdbm" # $DATABASE_USER password = "" # $DATABASE_PASSWORD - -[ipfs] - path = "~/.ipfs" # $IPFS_PATH [resync] chain = "ethereum" # $RESYNC_CHAIN diff --git a/environments/superNodeETH.toml b/environments/superNodeETH.toml index 11bcc709..02cd5d22 100644 --- a/environments/superNodeETH.toml +++ b/environments/superNodeETH.toml @@ -43,3 +43,4 @@ clientName = "Geth" # $ETH_CLIENT_NAME genesisBlock = "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3" # $ETH_GENESIS_BLOCK networkID = "1" # $ETH_NETWORK_ID + chainID = "1" # $ETH_CHAIN_ID diff --git a/pkg/btc/ipld_fetcher.go b/pkg/btc/ipld_fetcher.go index cd673c75..0e8f1213 100644 --- a/pkg/btc/ipld_fetcher.go +++ b/pkg/btc/ipld_fetcher.go @@ -27,21 +27,21 @@ import ( "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" ) -// IPLDPGFetcher satisfies the IPLDFetcher interface for ethereum +// IPLDFetcher satisfies the IPLDFetcher interface for ethereum // it interfaces directly with PG-IPFS instead of going through a node-interface or remote node -type IPLDPGFetcher struct { +type IPLDFetcher struct { db *postgres.DB } -// NewIPLDPGFetcher creates a pointer to a new IPLDPGFetcher -func NewIPLDPGFetcher(db *postgres.DB) *IPLDPGFetcher { - return &IPLDPGFetcher{ +// NewIPLDFetcher creates a pointer to a new IPLDFetcher +func NewIPLDFetcher(db *postgres.DB) *IPLDFetcher { + return &IPLDFetcher{ db: db, } } // Fetch is the exported method for fetching and returning all the IPLDS specified in the CIDWrapper -func (f *IPLDPGFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) { +func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) { cidWrapper, ok := cids.(*CIDWrapper) if !ok { return nil, fmt.Errorf("btc fetcher: expected cids type %T got %T", &CIDWrapper{}, cids) @@ -77,7 +77,7 @@ func (f *IPLDPGFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) } // FetchHeaders fetches headers -func (f *IPLDPGFetcher) FetchHeader(tx *sqlx.Tx, c HeaderModel) (ipfs.BlockModel, error) { +func (f *IPLDFetcher) FetchHeader(tx *sqlx.Tx, c HeaderModel) (ipfs.BlockModel, error) { log.Debug("fetching header ipld") headerBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey) if err != nil { @@ -90,7 +90,7 @@ func (f *IPLDPGFetcher) FetchHeader(tx *sqlx.Tx, c HeaderModel) (ipfs.BlockModel } // FetchTrxs fetches transactions -func (f *IPLDPGFetcher) FetchTrxs(tx *sqlx.Tx, cids []TxModel) ([]ipfs.BlockModel, error) { +func (f *IPLDFetcher) FetchTrxs(tx *sqlx.Tx, cids []TxModel) ([]ipfs.BlockModel, error) { log.Debug("fetching transaction iplds") trxIPLDs := make([]ipfs.BlockModel, len(cids)) for i, c := range cids { diff --git a/pkg/btc/ipld_pg_fetcher.go b/pkg/btc/ipld_pg_fetcher.go deleted file mode 100644 index cd673c75..00000000 --- a/pkg/btc/ipld_pg_fetcher.go +++ /dev/null @@ -1,107 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package btc - -import ( - "fmt" - - "github.com/jmoiron/sqlx" - log "github.com/sirupsen/logrus" - - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" -) - -// IPLDPGFetcher satisfies the IPLDFetcher interface for ethereum -// it interfaces directly with PG-IPFS instead of going through a node-interface or remote node -type IPLDPGFetcher struct { - db *postgres.DB -} - -// NewIPLDPGFetcher creates a pointer to a new IPLDPGFetcher -func NewIPLDPGFetcher(db *postgres.DB) *IPLDPGFetcher { - return &IPLDPGFetcher{ - db: db, - } -} - -// Fetch is the exported method for fetching and returning all the IPLDS specified in the CIDWrapper -func (f *IPLDPGFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) { - cidWrapper, ok := cids.(*CIDWrapper) - if !ok { - return nil, fmt.Errorf("btc fetcher: expected cids type %T got %T", &CIDWrapper{}, cids) - } - log.Debug("fetching iplds") - iplds := IPLDs{} - iplds.BlockNumber = cidWrapper.BlockNumber - - tx, err := f.db.Beginx() - if err != nil { - return nil, err - } - defer func() { - if p := recover(); p != nil { - shared.Rollback(tx) - panic(p) - } else if err != nil { - shared.Rollback(tx) - } else { - err = tx.Commit() - } - }() - - iplds.Header, err = f.FetchHeader(tx, cidWrapper.Header) - if err != nil { - return nil, fmt.Errorf("btc pg fetcher: header fetching error: %s", err.Error()) - } - iplds.Transactions, err = f.FetchTrxs(tx, cidWrapper.Transactions) - if err != nil { - return nil, fmt.Errorf("btc pg fetcher: transaction fetching error: %s", err.Error()) - } - return iplds, err -} - -// FetchHeaders fetches headers -func (f *IPLDPGFetcher) FetchHeader(tx *sqlx.Tx, c HeaderModel) (ipfs.BlockModel, error) { - log.Debug("fetching header ipld") - headerBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey) - if err != nil { - return ipfs.BlockModel{}, err - } - return ipfs.BlockModel{ - Data: headerBytes, - CID: c.CID, - }, nil -} - -// FetchTrxs fetches transactions -func (f *IPLDPGFetcher) FetchTrxs(tx *sqlx.Tx, cids []TxModel) ([]ipfs.BlockModel, error) { - log.Debug("fetching transaction iplds") - trxIPLDs := make([]ipfs.BlockModel, len(cids)) - for i, c := range cids { - trxBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey) - if err != nil { - return nil, err - } - trxIPLDs[i] = ipfs.BlockModel{ - Data: trxBytes, - CID: c.CID, - } - } - return trxIPLDs, nil -} diff --git a/pkg/btc/publish_and_indexer.go b/pkg/btc/publish_and_indexer.go deleted file mode 100644 index dd5a72cb..00000000 --- a/pkg/btc/publish_and_indexer.go +++ /dev/null @@ -1,126 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package btc - -import ( - "fmt" - "strconv" - - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" -) - -// IPLDPublisherAndIndexer satisfies the IPLDPublisher interface for bitcoin -// It interfaces directly with the public.blocks table of PG-IPFS rather than going through an ipfs intermediary -// It publishes and indexes IPLDs together in a single sqlx.Tx -type IPLDPublisherAndIndexer struct { - indexer *CIDIndexer -} - -// NewIPLDPublisherAndIndexer creates a pointer to a new IPLDPublisherAndIndexer which satisfies the IPLDPublisher interface -func NewIPLDPublisherAndIndexer(db *postgres.DB) *IPLDPublisherAndIndexer { - return &IPLDPublisherAndIndexer{ - indexer: NewCIDIndexer(db), - } -} - -// Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload -func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) { - ipldPayload, ok := payload.(ConvertedPayload) - if !ok { - return nil, fmt.Errorf("btc publisher expected payload type %T got %T", ConvertedPayload{}, payload) - } - // Generate the iplds - headerNode, txNodes, txTrieNodes, err := ipld.FromHeaderAndTxs(ipldPayload.Header, ipldPayload.Txs) - if err != nil { - return nil, err - } - - // Begin new db tx - tx, err := pub.indexer.db.Beginx() - if err != nil { - return nil, err - } - defer func() { - if p := recover(); p != nil { - shared.Rollback(tx) - panic(p) - } else if err != nil { - shared.Rollback(tx) - } else { - err = tx.Commit() - } - }() - - // Publish trie nodes - for _, node := range txTrieNodes { - if err := shared.PublishIPLD(tx, node); err != nil { - return nil, err - } - } - - // Publish and index header - if err := shared.PublishIPLD(tx, headerNode); err != nil { - return nil, err - } - header := HeaderModel{ - CID: headerNode.Cid().String(), - MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), - ParentHash: ipldPayload.Header.PrevBlock.String(), - BlockNumber: strconv.Itoa(int(ipldPayload.BlockPayload.BlockHeight)), - BlockHash: ipldPayload.Header.BlockHash().String(), - Timestamp: ipldPayload.Header.Timestamp.UnixNano(), - Bits: ipldPayload.Header.Bits, - } - headerID, err := pub.indexer.indexHeaderCID(tx, header) - if err != nil { - return nil, err - } - - // Publish and index txs - for i, txNode := range txNodes { - if err := shared.PublishIPLD(tx, txNode); err != nil { - return nil, err - } - txModel := ipldPayload.TxMetaData[i] - txModel.CID = txNode.Cid().String() - txModel.MhKey = shared.MultihashKeyFromCID(txNode.Cid()) - txID, err := pub.indexer.indexTransactionCID(tx, txModel, headerID) - if err != nil { - return nil, err - } - for _, input := range txModel.TxInputs { - if err := pub.indexer.indexTxInput(tx, input, txID); err != nil { - return nil, err - } - } - for _, output := range txModel.TxOutputs { - if err := pub.indexer.indexTxOutput(tx, output, txID); err != nil { - return nil, err - } - } - } - - // This IPLDPublisher does both publishing and indexing, we do not need to pass anything forward to the indexer - return nil, err -} - -// Index satisfies the shared.CIDIndexer interface -func (pub *IPLDPublisherAndIndexer) Index(cids shared.CIDsForIndexing) error { - return nil -} diff --git a/pkg/btc/publish_and_indexer_test.go b/pkg/btc/publish_and_indexer_test.go deleted file mode 100644 index 3271db9a..00000000 --- a/pkg/btc/publish_and_indexer_test.go +++ /dev/null @@ -1,121 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package btc_test - -import ( - "bytes" - - "github.com/ipfs/go-cid" - "github.com/ipfs/go-ipfs-blockstore" - "github.com/ipfs/go-ipfs-ds-help" - "github.com/multiformats/go-multihash" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld" - - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/btc" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/btc/mocks" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" -) - -var _ = Describe("PublishAndIndexer", func() { - var ( - db *postgres.DB - err error - repo *btc.IPLDPublisherAndIndexer - ipfsPgGet = `SELECT data FROM public.blocks - WHERE key = $1` - ) - BeforeEach(func() { - db, err = shared.SetupDB() - Expect(err).ToNot(HaveOccurred()) - repo = btc.NewIPLDPublisherAndIndexer(db) - }) - AfterEach(func() { - btc.TearDownDB(db) - }) - - Describe("Publish", func() { - It("Published and indexes header and transaction IPLDs in a single tx", func() { - emptyReturn, err := repo.Publish(mocks.MockConvertedPayload) - Expect(emptyReturn).To(BeNil()) - Expect(err).ToNot(HaveOccurred()) - pgStr := `SELECT * FROM btc.header_cids - WHERE block_number = $1` - // check header was properly indexed - buf := bytes.NewBuffer(make([]byte, 0, 80)) - err = mocks.MockBlock.Header.Serialize(buf) - Expect(err).ToNot(HaveOccurred()) - headerBytes := buf.Bytes() - c, _ := ipld.RawdataToCid(ipld.MBitcoinHeader, headerBytes, multihash.DBL_SHA2_256) - header := new(btc.HeaderModel) - err = db.Get(header, pgStr, mocks.MockHeaderMetaData.BlockNumber) - Expect(err).ToNot(HaveOccurred()) - Expect(header.CID).To(Equal(c.String())) - Expect(header.BlockNumber).To(Equal(mocks.MockHeaderMetaData.BlockNumber)) - Expect(header.Bits).To(Equal(mocks.MockHeaderMetaData.Bits)) - Expect(header.Timestamp).To(Equal(mocks.MockHeaderMetaData.Timestamp)) - Expect(header.BlockHash).To(Equal(mocks.MockHeaderMetaData.BlockHash)) - Expect(header.ParentHash).To(Equal(mocks.MockHeaderMetaData.ParentHash)) - dc, err := cid.Decode(header.CID) - Expect(err).ToNot(HaveOccurred()) - mhKey := dshelp.MultihashToDsKey(dc.Hash()) - prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() - var data []byte - err = db.Get(&data, ipfsPgGet, prefixedKey) - Expect(err).ToNot(HaveOccurred()) - Expect(data).To(Equal(headerBytes)) - - // check that txs were properly indexed - trxs := make([]btc.TxModel, 0) - pgStr = `SELECT transaction_cids.id, transaction_cids.header_id, transaction_cids.index, - transaction_cids.tx_hash, transaction_cids.cid, transaction_cids.segwit, transaction_cids.witness_hash - FROM btc.transaction_cids INNER JOIN btc.header_cids ON (transaction_cids.header_id = header_cids.id) - WHERE header_cids.block_number = $1` - err = db.Select(&trxs, pgStr, mocks.MockHeaderMetaData.BlockNumber) - Expect(err).ToNot(HaveOccurred()) - Expect(len(trxs)).To(Equal(3)) - txData := make([][]byte, len(mocks.MockTransactions)) - txCIDs := make([]string, len(mocks.MockTransactions)) - for i, m := range mocks.MockTransactions { - buf := bytes.NewBuffer(make([]byte, 0)) - err = m.MsgTx().Serialize(buf) - Expect(err).ToNot(HaveOccurred()) - tx := buf.Bytes() - txData[i] = tx - c, _ := ipld.RawdataToCid(ipld.MBitcoinTx, tx, multihash.DBL_SHA2_256) - txCIDs[i] = c.String() - } - for _, tx := range trxs { - Expect(tx.SegWit).To(Equal(false)) - Expect(tx.HeaderID).To(Equal(header.ID)) - Expect(tx.WitnessHash).To(Equal("")) - Expect(tx.CID).To(Equal(txCIDs[tx.Index])) - Expect(tx.TxHash).To(Equal(mocks.MockBlock.Transactions[tx.Index].TxHash().String())) - dc, err := cid.Decode(tx.CID) - Expect(err).ToNot(HaveOccurred()) - mhKey := dshelp.MultihashToDsKey(dc.Hash()) - prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() - var data []byte - err = db.Get(&data, ipfsPgGet, prefixedKey) - Expect(err).ToNot(HaveOccurred()) - Expect(data).To(Equal(txData[tx.Index])) - } - }) - }) -}) diff --git a/pkg/btc/publisher.go b/pkg/btc/publisher.go index dd5a72cb..74a3ba56 100644 --- a/pkg/btc/publisher.go +++ b/pkg/btc/publisher.go @@ -25,36 +25,36 @@ import ( "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" ) -// IPLDPublisherAndIndexer satisfies the IPLDPublisher interface for bitcoin +// IPLDPublisher satisfies the IPLDPublisher interface for bitcoin // It interfaces directly with the public.blocks table of PG-IPFS rather than going through an ipfs intermediary // It publishes and indexes IPLDs together in a single sqlx.Tx -type IPLDPublisherAndIndexer struct { +type IPLDPublisher struct { indexer *CIDIndexer } -// NewIPLDPublisherAndIndexer creates a pointer to a new IPLDPublisherAndIndexer which satisfies the IPLDPublisher interface -func NewIPLDPublisherAndIndexer(db *postgres.DB) *IPLDPublisherAndIndexer { - return &IPLDPublisherAndIndexer{ +// NewIPLDPublisher creates a pointer to a new eth IPLDPublisher which satisfies the IPLDPublisher interface +func NewIPLDPublisher(db *postgres.DB) *IPLDPublisher { + return &IPLDPublisher{ indexer: NewCIDIndexer(db), } } // Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload -func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) { +func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) error { ipldPayload, ok := payload.(ConvertedPayload) if !ok { - return nil, fmt.Errorf("btc publisher expected payload type %T got %T", ConvertedPayload{}, payload) + return fmt.Errorf("btc publisher expected payload type %T got %T", ConvertedPayload{}, payload) } // Generate the iplds headerNode, txNodes, txTrieNodes, err := ipld.FromHeaderAndTxs(ipldPayload.Header, ipldPayload.Txs) if err != nil { - return nil, err + return err } // Begin new db tx tx, err := pub.indexer.db.Beginx() if err != nil { - return nil, err + return err } defer func() { if p := recover(); p != nil { @@ -70,13 +70,13 @@ func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (share // Publish trie nodes for _, node := range txTrieNodes { if err := shared.PublishIPLD(tx, node); err != nil { - return nil, err + return err } } // Publish and index header if err := shared.PublishIPLD(tx, headerNode); err != nil { - return nil, err + return err } header := HeaderModel{ CID: headerNode.Cid().String(), @@ -89,38 +89,32 @@ func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (share } headerID, err := pub.indexer.indexHeaderCID(tx, header) if err != nil { - return nil, err + return err } // Publish and index txs for i, txNode := range txNodes { if err := shared.PublishIPLD(tx, txNode); err != nil { - return nil, err + return err } txModel := ipldPayload.TxMetaData[i] txModel.CID = txNode.Cid().String() txModel.MhKey = shared.MultihashKeyFromCID(txNode.Cid()) txID, err := pub.indexer.indexTransactionCID(tx, txModel, headerID) if err != nil { - return nil, err + return err } for _, input := range txModel.TxInputs { if err := pub.indexer.indexTxInput(tx, input, txID); err != nil { - return nil, err + return err } } for _, output := range txModel.TxOutputs { if err := pub.indexer.indexTxOutput(tx, output, txID); err != nil { - return nil, err + return err } } } - // This IPLDPublisher does both publishing and indexing, we do not need to pass anything forward to the indexer - return nil, err -} - -// Index satisfies the shared.CIDIndexer interface -func (pub *IPLDPublisherAndIndexer) Index(cids shared.CIDsForIndexing) error { - return nil + return err } diff --git a/pkg/btc/publisher_test.go b/pkg/btc/publisher_test.go index 3271db9a..b2277b6d 100644 --- a/pkg/btc/publisher_test.go +++ b/pkg/btc/publisher_test.go @@ -37,14 +37,14 @@ var _ = Describe("PublishAndIndexer", func() { var ( db *postgres.DB err error - repo *btc.IPLDPublisherAndIndexer + repo *btc.IPLDPublisher ipfsPgGet = `SELECT data FROM public.blocks WHERE key = $1` ) BeforeEach(func() { db, err = shared.SetupDB() Expect(err).ToNot(HaveOccurred()) - repo = btc.NewIPLDPublisherAndIndexer(db) + repo = btc.NewIPLDPublisher(db) }) AfterEach(func() { btc.TearDownDB(db) @@ -52,8 +52,7 @@ var _ = Describe("PublishAndIndexer", func() { Describe("Publish", func() { It("Published and indexes header and transaction IPLDs in a single tx", func() { - emptyReturn, err := repo.Publish(mocks.MockConvertedPayload) - Expect(emptyReturn).To(BeNil()) + err = repo.Publish(mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) pgStr := `SELECT * FROM btc.header_cids WHERE block_number = $1` diff --git a/pkg/builders/builders.go b/pkg/builders/builders.go index d44b48d1..226dc2ef 100644 --- a/pkg/builders/builders.go +++ b/pkg/builders/builders.go @@ -42,32 +42,6 @@ func NewResponseFilterer(chain shared.ChainType) (shared.ResponseFilterer, error } } -// NewCIDIndexer constructs a CIDIndexer for the provided chain type -func NewCIDIndexer(chain shared.ChainType, db *postgres.DB, ipfsMode shared.IPFSMode) (shared.CIDIndexer, error) { - switch chain { - case shared.Ethereum: - switch ipfsMode { - case shared.LocalInterface, shared.RemoteClient: - return eth.NewCIDIndexer(db), nil - case shared.DirectPostgres: - return eth.NewIPLDPublisherAndIndexer(db), nil - default: - return nil, fmt.Errorf("ethereum CIDIndexer unexpected ipfs mode %s", ipfsMode.String()) - } - case shared.Bitcoin: - switch ipfsMode { - case shared.LocalInterface, shared.RemoteClient: - return btc.NewCIDIndexer(db), nil - case shared.DirectPostgres: - return eth.NewIPLDPublisherAndIndexer(db), nil - default: - return nil, fmt.Errorf("bitcoin CIDIndexer unexpected ipfs mode %s", ipfsMode.String()) - } - default: - return nil, fmt.Errorf("invalid chain %s for indexer constructor", chain.String()) - } -} - // NewCIDRetriever constructs a CIDRetriever for the provided chain type func NewCIDRetriever(chain shared.ChainType, db *postgres.DB) (shared.CIDRetriever, error) { switch chain { @@ -139,52 +113,24 @@ func NewPayloadConverter(chainType shared.ChainType, chainID uint64) (shared.Pay } // NewIPLDFetcher constructs an IPLDFetcher for the provided chain type -func NewIPLDFetcher(chain shared.ChainType, ipfsPath string, db *postgres.DB, ipfsMode shared.IPFSMode) (shared.IPLDFetcher, error) { +func NewIPLDFetcher(chain shared.ChainType, db *postgres.DB) (shared.IPLDFetcher, error) { switch chain { case shared.Ethereum: - switch ipfsMode { - case shared.LocalInterface, shared.RemoteClient: - return eth.NewIPLDFetcher(ipfsPath) - case shared.DirectPostgres: - return eth.NewIPLDPGFetcher(db), nil - default: - return nil, fmt.Errorf("ethereum IPLDFetcher unexpected ipfs mode %s", ipfsMode.String()) - } + return eth.NewIPLDFetcher(db), nil case shared.Bitcoin: - switch ipfsMode { - case shared.LocalInterface, shared.RemoteClient: - return btc.NewIPLDFetcher(ipfsPath) - case shared.DirectPostgres: - return btc.NewIPLDPGFetcher(db), nil - default: - return nil, fmt.Errorf("bitcoin IPLDFetcher unexpected ipfs mode %s", ipfsMode.String()) - } + return btc.NewIPLDFetcher(db), nil default: return nil, fmt.Errorf("invalid chain %s for IPLD fetcher constructor", chain.String()) } } // NewIPLDPublisher constructs an IPLDPublisher for the provided chain type -func NewIPLDPublisher(chain shared.ChainType, ipfsPath string, db *postgres.DB, ipfsMode shared.IPFSMode) (shared.IPLDPublisher, error) { +func NewIPLDPublisher(chain shared.ChainType, db *postgres.DB) (shared.IPLDPublisher, error) { switch chain { case shared.Ethereum: - switch ipfsMode { - case shared.LocalInterface, shared.RemoteClient: - return eth.NewIPLDPublisher(ipfsPath) - case shared.DirectPostgres: - return eth.NewIPLDPublisherAndIndexer(db), nil - default: - return nil, fmt.Errorf("ethereum IPLDPublisher unexpected ipfs mode %s", ipfsMode.String()) - } + return eth.NewIPLDPublisher(db), nil case shared.Bitcoin: - switch ipfsMode { - case shared.LocalInterface, shared.RemoteClient: - return btc.NewIPLDPublisher(ipfsPath) - case shared.DirectPostgres: - return btc.NewIPLDPublisherAndIndexer(db), nil - default: - return nil, fmt.Errorf("bitcoin IPLDPublisher unexpected ipfs mode %s", ipfsMode.String()) - } + return btc.NewIPLDPublisher(db), nil default: return nil, fmt.Errorf("invalid chain %s for publisher constructor", chain.String()) } diff --git a/pkg/eth/api_test.go b/pkg/eth/api_test.go index edf4ec40..f41b3b44 100644 --- a/pkg/eth/api_test.go +++ b/pkg/eth/api_test.go @@ -84,8 +84,8 @@ var _ = Describe("API", func() { var ( db *postgres.DB retriever *eth.CIDRetriever - fetcher *eth.IPLDPGFetcher - indexAndPublisher *eth.IPLDPublisherAndIndexer + fetcher *eth.IPLDFetcher + indexAndPublisher *eth.IPLDPublisher backend *eth.Backend api *eth.PublicEthAPI ) @@ -94,15 +94,15 @@ var _ = Describe("API", func() { db, err = shared.SetupDB() Expect(err).ToNot(HaveOccurred()) retriever = eth.NewCIDRetriever(db) - fetcher = eth.NewIPLDPGFetcher(db) - indexAndPublisher = eth.NewIPLDPublisherAndIndexer(db) + fetcher = eth.NewIPLDFetcher(db) + indexAndPublisher = eth.NewIPLDPublisher(db) backend = ð.Backend{ Retriever: retriever, Fetcher: fetcher, DB: db, } api = eth.NewPublicEthAPI(backend) - _, err = indexAndPublisher.Publish(mocks.MockConvertedPayload) + err = indexAndPublisher.Publish(mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) uncles := mocks.MockBlock.Uncles() uncleHashes := make([]common.Hash, len(uncles)) diff --git a/pkg/eth/backend.go b/pkg/eth/backend.go index 81ad8382..49c20b06 100644 --- a/pkg/eth/backend.go +++ b/pkg/eth/backend.go @@ -39,7 +39,7 @@ var ( type Backend struct { Retriever *CIDRetriever - Fetcher *IPLDPGFetcher + Fetcher *IPLDFetcher DB *postgres.DB } diff --git a/pkg/eth/cid_retriever_test.go b/pkg/eth/cid_retriever_test.go index 668d763d..20e2abc0 100644 --- a/pkg/eth/cid_retriever_test.go +++ b/pkg/eth/cid_retriever_test.go @@ -211,14 +211,14 @@ var ( var _ = Describe("Retriever", func() { var ( db *postgres.DB - repo *eth2.IPLDPublisherAndIndexer + repo *eth2.IPLDPublisher retriever *eth2.CIDRetriever ) BeforeEach(func() { var err error db, err = shared.SetupDB() Expect(err).ToNot(HaveOccurred()) - repo = eth2.NewIPLDPublisherAndIndexer(db) + repo = eth2.NewIPLDPublisher(db) retriever = eth2.NewCIDRetriever(db) }) AfterEach(func() { @@ -227,7 +227,7 @@ var _ = Describe("Retriever", func() { Describe("Retrieve", func() { BeforeEach(func() { - _, err := repo.Publish(mocks.MockConvertedPayload) + err := repo.Publish(mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) }) It("Retrieves all CIDs for the given blocknumber when provided an open filter", func() { @@ -413,7 +413,7 @@ var _ = Describe("Retriever", func() { Expect(err).To(HaveOccurred()) }) It("Gets the number of the first block that has data in the database", func() { - _, err := repo.Publish(mocks.MockConvertedPayload) + err := repo.Publish(mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) num, err := retriever.RetrieveFirstBlockNumber() Expect(err).ToNot(HaveOccurred()) @@ -423,7 +423,7 @@ var _ = Describe("Retriever", func() { It("Gets the number of the first block that has data in the database", func() { payload := mocks.MockConvertedPayload payload.Block = newMockBlock(1010101) - _, err := repo.Publish(payload) + err := repo.Publish(payload) Expect(err).ToNot(HaveOccurred()) num, err := retriever.RetrieveFirstBlockNumber() Expect(err).ToNot(HaveOccurred()) @@ -435,9 +435,9 @@ var _ = Describe("Retriever", func() { payload1.Block = newMockBlock(1010101) payload2 := payload1 payload2.Block = newMockBlock(5) - _, err := repo.Publish(payload1) + err := repo.Publish(payload1) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload2) + err = repo.Publish(payload2) Expect(err).ToNot(HaveOccurred()) num, err := retriever.RetrieveFirstBlockNumber() Expect(err).ToNot(HaveOccurred()) @@ -451,7 +451,7 @@ var _ = Describe("Retriever", func() { Expect(err).To(HaveOccurred()) }) It("Gets the number of the latest block that has data in the database", func() { - _, err := repo.Publish(mocks.MockConvertedPayload) + err := repo.Publish(mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) num, err := retriever.RetrieveLastBlockNumber() Expect(err).ToNot(HaveOccurred()) @@ -461,7 +461,7 @@ var _ = Describe("Retriever", func() { It("Gets the number of the latest block that has data in the database", func() { payload := mocks.MockConvertedPayload payload.Block = newMockBlock(1010101) - _, err := repo.Publish(payload) + err := repo.Publish(payload) Expect(err).ToNot(HaveOccurred()) num, err := retriever.RetrieveLastBlockNumber() Expect(err).ToNot(HaveOccurred()) @@ -473,9 +473,9 @@ var _ = Describe("Retriever", func() { payload1.Block = newMockBlock(1010101) payload2 := payload1 payload2.Block = newMockBlock(5) - _, err := repo.Publish(payload1) + err := repo.Publish(payload1) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload2) + err = repo.Publish(payload2) Expect(err).ToNot(HaveOccurred()) num, err := retriever.RetrieveLastBlockNumber() Expect(err).ToNot(HaveOccurred()) @@ -492,13 +492,13 @@ var _ = Describe("Retriever", func() { payload2.Block = newMockBlock(2) payload3 := payload2 payload3.Block = newMockBlock(3) - _, err := repo.Publish(payload0) + err := repo.Publish(payload0) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload1) + err = repo.Publish(payload1) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload2) + err = repo.Publish(payload2) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload3) + err = repo.Publish(payload3) Expect(err).ToNot(HaveOccurred()) gaps, err := retriever.RetrieveGapsInData(1) Expect(err).ToNot(HaveOccurred()) @@ -508,7 +508,7 @@ var _ = Describe("Retriever", func() { It("Returns the gap from 0 to the earliest block", func() { payload := mocks.MockConvertedPayload payload.Block = newMockBlock(5) - _, err := repo.Publish(payload) + err := repo.Publish(payload) Expect(err).ToNot(HaveOccurred()) gaps, err := retriever.RetrieveGapsInData(1) Expect(err).ToNot(HaveOccurred()) @@ -523,11 +523,11 @@ var _ = Describe("Retriever", func() { payload1 := mocks.MockConvertedPayload payload3 := payload1 payload3.Block = newMockBlock(3) - _, err := repo.Publish(payload0) + err := repo.Publish(payload0) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload1) + err = repo.Publish(payload1) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload3) + err = repo.Publish(payload3) Expect(err).ToNot(HaveOccurred()) gaps, err := retriever.RetrieveGapsInData(1) Expect(err).ToNot(HaveOccurred()) @@ -541,9 +541,9 @@ var _ = Describe("Retriever", func() { payload1.Block = newMockBlock(1010101) payload2 := payload1 payload2.Block = newMockBlock(0) - _, err := repo.Publish(payload1) + err := repo.Publish(payload1) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload2) + err = repo.Publish(payload2) Expect(err).ToNot(HaveOccurred()) gaps, err := retriever.RetrieveGapsInData(1) Expect(err).ToNot(HaveOccurred()) @@ -576,27 +576,27 @@ var _ = Describe("Retriever", func() { payload11 := mocks.MockConvertedPayload payload11.Block = newMockBlock(1000) - _, err := repo.Publish(payload1) + err := repo.Publish(payload1) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload2) + err = repo.Publish(payload2) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload3) + err = repo.Publish(payload3) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload4) + err = repo.Publish(payload4) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload5) + err = repo.Publish(payload5) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload6) + err = repo.Publish(payload6) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload7) + err = repo.Publish(payload7) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload8) + err = repo.Publish(payload8) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload9) + err = repo.Publish(payload9) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload10) + err = repo.Publish(payload10) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload11) + err = repo.Publish(payload11) Expect(err).ToNot(HaveOccurred()) gaps, err := retriever.RetrieveGapsInData(1) @@ -640,33 +640,33 @@ var _ = Describe("Retriever", func() { payload14 := mocks.MockConvertedPayload payload14.Block = newMockBlock(1000) - _, err := repo.Publish(payload1) + err := repo.Publish(payload1) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload2) + err = repo.Publish(payload2) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload3) + err = repo.Publish(payload3) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload4) + err = repo.Publish(payload4) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload5) + err = repo.Publish(payload5) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload6) + err = repo.Publish(payload6) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload7) + err = repo.Publish(payload7) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload8) + err = repo.Publish(payload8) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload9) + err = repo.Publish(payload9) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload10) + err = repo.Publish(payload10) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload11) + err = repo.Publish(payload11) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload12) + err = repo.Publish(payload12) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload13) + err = repo.Publish(payload13) Expect(err).ToNot(HaveOccurred()) - _, err = repo.Publish(payload14) + err = repo.Publish(payload14) Expect(err).ToNot(HaveOccurred()) cleaner := eth.NewCleaner(db) diff --git a/pkg/eth/helpers.go b/pkg/eth/helpers.go index ad08022c..73b1f0ca 100644 --- a/pkg/eth/helpers.go +++ b/pkg/eth/helpers.go @@ -18,6 +18,7 @@ package eth import ( "fmt" + "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/statediff" ) @@ -66,4 +67,4 @@ func ChainConfig(chainID uint64) (*params.ChainConfig, error) { default: return nil, fmt.Errorf("chain config for chainid %d not available", chainID) } -} \ No newline at end of file +} diff --git a/pkg/eth/ipld_fetcher.go b/pkg/eth/ipld_fetcher.go index bbfe6b3d..acea6197 100644 --- a/pkg/eth/ipld_fetcher.go +++ b/pkg/eth/ipld_fetcher.go @@ -30,21 +30,21 @@ import ( "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" ) -// IPLDPGFetcher satisfies the IPLDFetcher interface for ethereum +// IPLDFetcher satisfies the IPLDFetcher interface for ethereum // It interfaces directly with PG-IPFS -type IPLDPGFetcher struct { +type IPLDFetcher struct { db *postgres.DB } -// NewIPLDPGFetcher creates a pointer to a new IPLDPGFetcher -func NewIPLDPGFetcher(db *postgres.DB) *IPLDPGFetcher { - return &IPLDPGFetcher{ +// NewIPLDFetcher creates a pointer to a new IPLDFetcher +func NewIPLDFetcher(db *postgres.DB) *IPLDFetcher { + return &IPLDFetcher{ db: db, } } // Fetch is the exported method for fetching and returning all the IPLDS specified in the CIDWrapper -func (f *IPLDPGFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) { +func (f *IPLDFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) { cidWrapper, ok := cids.(*CIDWrapper) if !ok { return nil, fmt.Errorf("eth fetcher: expected cids type %T got %T", &CIDWrapper{}, cids) @@ -100,7 +100,7 @@ func (f *IPLDPGFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) } // FetchHeaders fetches headers -func (f *IPLDPGFetcher) FetchHeader(tx *sqlx.Tx, c HeaderModel) (ipfs.BlockModel, error) { +func (f *IPLDFetcher) FetchHeader(tx *sqlx.Tx, c HeaderModel) (ipfs.BlockModel, error) { log.Debug("fetching header ipld") headerBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey) if err != nil { @@ -113,7 +113,7 @@ func (f *IPLDPGFetcher) FetchHeader(tx *sqlx.Tx, c HeaderModel) (ipfs.BlockModel } // FetchUncles fetches uncles -func (f *IPLDPGFetcher) FetchUncles(tx *sqlx.Tx, cids []UncleModel) ([]ipfs.BlockModel, error) { +func (f *IPLDFetcher) FetchUncles(tx *sqlx.Tx, cids []UncleModel) ([]ipfs.BlockModel, error) { log.Debug("fetching uncle iplds") uncleIPLDs := make([]ipfs.BlockModel, len(cids)) for i, c := range cids { @@ -130,7 +130,7 @@ func (f *IPLDPGFetcher) FetchUncles(tx *sqlx.Tx, cids []UncleModel) ([]ipfs.Bloc } // FetchTrxs fetches transactions -func (f *IPLDPGFetcher) FetchTrxs(tx *sqlx.Tx, cids []TxModel) ([]ipfs.BlockModel, error) { +func (f *IPLDFetcher) FetchTrxs(tx *sqlx.Tx, cids []TxModel) ([]ipfs.BlockModel, error) { log.Debug("fetching transaction iplds") trxIPLDs := make([]ipfs.BlockModel, len(cids)) for i, c := range cids { @@ -147,7 +147,7 @@ func (f *IPLDPGFetcher) FetchTrxs(tx *sqlx.Tx, cids []TxModel) ([]ipfs.BlockMode } // FetchRcts fetches receipts -func (f *IPLDPGFetcher) FetchRcts(tx *sqlx.Tx, cids []ReceiptModel) ([]ipfs.BlockModel, error) { +func (f *IPLDFetcher) FetchRcts(tx *sqlx.Tx, cids []ReceiptModel) ([]ipfs.BlockModel, error) { log.Debug("fetching receipt iplds") rctIPLDs := make([]ipfs.BlockModel, len(cids)) for i, c := range cids { @@ -164,7 +164,7 @@ func (f *IPLDPGFetcher) FetchRcts(tx *sqlx.Tx, cids []ReceiptModel) ([]ipfs.Bloc } // FetchState fetches state nodes -func (f *IPLDPGFetcher) FetchState(tx *sqlx.Tx, cids []StateNodeModel) ([]StateNode, error) { +func (f *IPLDFetcher) FetchState(tx *sqlx.Tx, cids []StateNodeModel) ([]StateNode, error) { log.Debug("fetching state iplds") stateNodes := make([]StateNode, 0, len(cids)) for _, stateNode := range cids { @@ -189,7 +189,7 @@ func (f *IPLDPGFetcher) FetchState(tx *sqlx.Tx, cids []StateNodeModel) ([]StateN } // FetchStorage fetches storage nodes -func (f *IPLDPGFetcher) FetchStorage(tx *sqlx.Tx, cids []StorageNodeWithStateKeyModel) ([]StorageNode, error) { +func (f *IPLDFetcher) FetchStorage(tx *sqlx.Tx, cids []StorageNodeWithStateKeyModel) ([]StorageNode, error) { log.Debug("fetching storage iplds") storageNodes := make([]StorageNode, 0, len(cids)) for _, storageNode := range cids { diff --git a/pkg/eth/ipld_fetcher_test.go b/pkg/eth/ipld_fetcher_test.go index a3ee7c78..c1165d8c 100644 --- a/pkg/eth/ipld_fetcher_test.go +++ b/pkg/eth/ipld_fetcher_test.go @@ -28,20 +28,20 @@ import ( var ( db *postgres.DB - pubAndIndexer *eth.IPLDPublisherAndIndexer - fetcher *eth.IPLDPGFetcher + pubAndIndexer *eth.IPLDPublisher + fetcher *eth.IPLDFetcher ) -var _ = Describe("IPLDPGFetcher", func() { +var _ = Describe("IPLDFetcher", func() { Describe("Fetch", func() { BeforeEach(func() { var err error db, err = shared.SetupDB() Expect(err).ToNot(HaveOccurred()) - pubAndIndexer = eth.NewIPLDPublisherAndIndexer(db) - _, err = pubAndIndexer.Publish(mocks.MockConvertedPayload) + pubAndIndexer = eth.NewIPLDPublisher(db) + err = pubAndIndexer.Publish(mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) - fetcher = eth.NewIPLDPGFetcher(db) + fetcher = eth.NewIPLDFetcher(db) }) AfterEach(func() { eth.TearDownDB(db) diff --git a/pkg/eth/ipld_pg_fetcher.go b/pkg/eth/ipld_pg_fetcher.go deleted file mode 100644 index bbfe6b3d..00000000 --- a/pkg/eth/ipld_pg_fetcher.go +++ /dev/null @@ -1,215 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package eth - -import ( - "errors" - "fmt" - "math/big" - - "github.com/ethereum/go-ethereum/common" - "github.com/jmoiron/sqlx" - log "github.com/sirupsen/logrus" - - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" -) - -// IPLDPGFetcher satisfies the IPLDFetcher interface for ethereum -// It interfaces directly with PG-IPFS -type IPLDPGFetcher struct { - db *postgres.DB -} - -// NewIPLDPGFetcher creates a pointer to a new IPLDPGFetcher -func NewIPLDPGFetcher(db *postgres.DB) *IPLDPGFetcher { - return &IPLDPGFetcher{ - db: db, - } -} - -// Fetch is the exported method for fetching and returning all the IPLDS specified in the CIDWrapper -func (f *IPLDPGFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) { - cidWrapper, ok := cids.(*CIDWrapper) - if !ok { - return nil, fmt.Errorf("eth fetcher: expected cids type %T got %T", &CIDWrapper{}, cids) - } - log.Debug("fetching iplds") - iplds := IPLDs{} - iplds.TotalDifficulty, ok = new(big.Int).SetString(cidWrapper.Header.TotalDifficulty, 10) - if !ok { - return nil, errors.New("eth fetcher: unable to set total difficulty") - } - iplds.BlockNumber = cidWrapper.BlockNumber - - tx, err := f.db.Beginx() - if err != nil { - return nil, err - } - defer func() { - if p := recover(); p != nil { - shared.Rollback(tx) - panic(p) - } else if err != nil { - shared.Rollback(tx) - } else { - err = tx.Commit() - } - }() - - iplds.Header, err = f.FetchHeader(tx, cidWrapper.Header) - if err != nil { - return nil, fmt.Errorf("eth pg fetcher: header fetching error: %s", err.Error()) - } - iplds.Uncles, err = f.FetchUncles(tx, cidWrapper.Uncles) - if err != nil { - return nil, fmt.Errorf("eth pg fetcher: uncle fetching error: %s", err.Error()) - } - iplds.Transactions, err = f.FetchTrxs(tx, cidWrapper.Transactions) - if err != nil { - return nil, fmt.Errorf("eth pg fetcher: transaction fetching error: %s", err.Error()) - } - iplds.Receipts, err = f.FetchRcts(tx, cidWrapper.Receipts) - if err != nil { - return nil, fmt.Errorf("eth pg fetcher: receipt fetching error: %s", err.Error()) - } - iplds.StateNodes, err = f.FetchState(tx, cidWrapper.StateNodes) - if err != nil { - return nil, fmt.Errorf("eth pg fetcher: state fetching error: %s", err.Error()) - } - iplds.StorageNodes, err = f.FetchStorage(tx, cidWrapper.StorageNodes) - if err != nil { - return nil, fmt.Errorf("eth pg fetcher: storage fetching error: %s", err.Error()) - } - return iplds, err -} - -// FetchHeaders fetches headers -func (f *IPLDPGFetcher) FetchHeader(tx *sqlx.Tx, c HeaderModel) (ipfs.BlockModel, error) { - log.Debug("fetching header ipld") - headerBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey) - if err != nil { - return ipfs.BlockModel{}, err - } - return ipfs.BlockModel{ - Data: headerBytes, - CID: c.CID, - }, nil -} - -// FetchUncles fetches uncles -func (f *IPLDPGFetcher) FetchUncles(tx *sqlx.Tx, cids []UncleModel) ([]ipfs.BlockModel, error) { - log.Debug("fetching uncle iplds") - uncleIPLDs := make([]ipfs.BlockModel, len(cids)) - for i, c := range cids { - uncleBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey) - if err != nil { - return nil, err - } - uncleIPLDs[i] = ipfs.BlockModel{ - Data: uncleBytes, - CID: c.CID, - } - } - return uncleIPLDs, nil -} - -// FetchTrxs fetches transactions -func (f *IPLDPGFetcher) FetchTrxs(tx *sqlx.Tx, cids []TxModel) ([]ipfs.BlockModel, error) { - log.Debug("fetching transaction iplds") - trxIPLDs := make([]ipfs.BlockModel, len(cids)) - for i, c := range cids { - txBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey) - if err != nil { - return nil, err - } - trxIPLDs[i] = ipfs.BlockModel{ - Data: txBytes, - CID: c.CID, - } - } - return trxIPLDs, nil -} - -// FetchRcts fetches receipts -func (f *IPLDPGFetcher) FetchRcts(tx *sqlx.Tx, cids []ReceiptModel) ([]ipfs.BlockModel, error) { - log.Debug("fetching receipt iplds") - rctIPLDs := make([]ipfs.BlockModel, len(cids)) - for i, c := range cids { - rctBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey) - if err != nil { - return nil, err - } - rctIPLDs[i] = ipfs.BlockModel{ - Data: rctBytes, - CID: c.CID, - } - } - return rctIPLDs, nil -} - -// FetchState fetches state nodes -func (f *IPLDPGFetcher) FetchState(tx *sqlx.Tx, cids []StateNodeModel) ([]StateNode, error) { - log.Debug("fetching state iplds") - stateNodes := make([]StateNode, 0, len(cids)) - for _, stateNode := range cids { - if stateNode.CID == "" { - continue - } - stateBytes, err := shared.FetchIPLDByMhKey(tx, stateNode.MhKey) - if err != nil { - return nil, err - } - stateNodes = append(stateNodes, StateNode{ - IPLD: ipfs.BlockModel{ - Data: stateBytes, - CID: stateNode.CID, - }, - StateLeafKey: common.HexToHash(stateNode.StateKey), - Type: ResolveToNodeType(stateNode.NodeType), - Path: stateNode.Path, - }) - } - return stateNodes, nil -} - -// FetchStorage fetches storage nodes -func (f *IPLDPGFetcher) FetchStorage(tx *sqlx.Tx, cids []StorageNodeWithStateKeyModel) ([]StorageNode, error) { - log.Debug("fetching storage iplds") - storageNodes := make([]StorageNode, 0, len(cids)) - for _, storageNode := range cids { - if storageNode.CID == "" || storageNode.StateKey == "" { - continue - } - storageBytes, err := shared.FetchIPLDByMhKey(tx, storageNode.MhKey) - if err != nil { - return nil, err - } - storageNodes = append(storageNodes, StorageNode{ - IPLD: ipfs.BlockModel{ - Data: storageBytes, - CID: storageNode.CID, - }, - StateLeafKey: common.HexToHash(storageNode.StateKey), - StorageLeafKey: common.HexToHash(storageNode.StorageKey), - Type: ResolveToNodeType(storageNode.NodeType), - Path: storageNode.Path, - }) - } - return storageNodes, nil -} diff --git a/pkg/eth/ipld_pg_fetcher_test.go b/pkg/eth/ipld_pg_fetcher_test.go deleted file mode 100644 index a3ee7c78..00000000 --- a/pkg/eth/ipld_pg_fetcher_test.go +++ /dev/null @@ -1,65 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package eth_test - -import ( - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth/mocks" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" -) - -var ( - db *postgres.DB - pubAndIndexer *eth.IPLDPublisherAndIndexer - fetcher *eth.IPLDPGFetcher -) - -var _ = Describe("IPLDPGFetcher", func() { - Describe("Fetch", func() { - BeforeEach(func() { - var err error - db, err = shared.SetupDB() - Expect(err).ToNot(HaveOccurred()) - pubAndIndexer = eth.NewIPLDPublisherAndIndexer(db) - _, err = pubAndIndexer.Publish(mocks.MockConvertedPayload) - Expect(err).ToNot(HaveOccurred()) - fetcher = eth.NewIPLDPGFetcher(db) - }) - AfterEach(func() { - eth.TearDownDB(db) - }) - - It("Fetches and returns IPLDs for the CIDs provided in the CIDWrapper", func() { - i, err := fetcher.Fetch(mocks.MockCIDWrapper) - Expect(err).ToNot(HaveOccurred()) - iplds, ok := i.(eth.IPLDs) - Expect(ok).To(BeTrue()) - Expect(iplds.TotalDifficulty).To(Equal(mocks.MockConvertedPayload.TotalDifficulty)) - Expect(iplds.BlockNumber).To(Equal(mocks.MockConvertedPayload.Block.Number())) - Expect(iplds.Header).To(Equal(mocks.MockIPLDs.Header)) - Expect(len(iplds.Uncles)).To(Equal(0)) - Expect(iplds.Transactions).To(Equal(mocks.MockIPLDs.Transactions)) - Expect(iplds.Receipts).To(Equal(mocks.MockIPLDs.Receipts)) - Expect(iplds.StateNodes).To(Equal(mocks.MockIPLDs.StateNodes)) - Expect(iplds.StorageNodes).To(Equal(mocks.MockIPLDs.StorageNodes)) - }) - }) -}) diff --git a/pkg/eth/mocks/publisher.go b/pkg/eth/mocks/publisher.go index a86f303e..c3e9a26a 100644 --- a/pkg/eth/mocks/publisher.go +++ b/pkg/eth/mocks/publisher.go @@ -32,13 +32,13 @@ type IPLDPublisher struct { } // Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload -func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) { +func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) error { ipldPayload, ok := payload.(eth.ConvertedPayload) if !ok { - return nil, fmt.Errorf("publish expected payload type %T got %T", ð.ConvertedPayload{}, payload) + return fmt.Errorf("publish expected payload type %T got %T", ð.ConvertedPayload{}, payload) } pub.PassedIPLDPayload = ipldPayload - return pub.ReturnCIDPayload, pub.ReturnErr + return pub.ReturnErr } // IterativeIPLDPublisher is the underlying struct for the Publisher interface; used in testing @@ -50,16 +50,12 @@ type IterativeIPLDPublisher struct { } // Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload -func (pub *IterativeIPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) { +func (pub *IterativeIPLDPublisher) Publish(payload shared.ConvertedData) error { ipldPayload, ok := payload.(eth.ConvertedPayload) if !ok { - return nil, fmt.Errorf("publish expected payload type %T got %T", ð.ConvertedPayload{}, payload) + return fmt.Errorf("publish expected payload type %T got %T", ð.ConvertedPayload{}, payload) } pub.PassedIPLDPayload = append(pub.PassedIPLDPayload, ipldPayload) - if len(pub.ReturnCIDPayload) < pub.iteration+1 { - return nil, fmt.Errorf("IterativeIPLDPublisher does not have a payload to return at iteration %d", pub.iteration) - } - returnPayload := pub.ReturnCIDPayload[pub.iteration] pub.iteration++ - return returnPayload, pub.ReturnErr + return pub.ReturnErr } diff --git a/pkg/eth/mocks/test_data.go b/pkg/eth/mocks/test_data.go index e4cf42bb..ecd62b49 100644 --- a/pkg/eth/mocks/test_data.go +++ b/pkg/eth/mocks/test_data.go @@ -63,6 +63,7 @@ var ( AnotherAddress = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476593") ContractAddress = crypto.CreateAddress(SenderAddr, MockTransactions[2].Nonce()) ContractHash = crypto.Keccak256Hash(ContractAddress.Bytes()).String() + MockContractByteCode = []byte{0, 1, 2, 3, 4, 5} mockTopic11 = common.HexToHash("0x04") mockTopic12 = common.HexToHash("0x06") mockTopic21 = common.HexToHash("0x05") @@ -99,28 +100,34 @@ var ( StorageMhKey = shared.MultihashKeyFromCID(StorageCID) MockTrxMeta = []eth.TxModel{ { - CID: "", // This is empty until we go to publish to ipfs - MhKey: "", - Src: SenderAddr.Hex(), - Dst: Address.String(), - Index: 0, - TxHash: MockTransactions[0].Hash().String(), + CID: "", // This is empty until we go to publish to ipfs + MhKey: "", + Src: SenderAddr.Hex(), + Dst: Address.String(), + Index: 0, + TxHash: MockTransactions[0].Hash().String(), + Data: []byte{}, + Deployment: false, }, { - CID: "", - MhKey: "", - Src: SenderAddr.Hex(), - Dst: AnotherAddress.String(), - Index: 1, - TxHash: MockTransactions[1].Hash().String(), + CID: "", + MhKey: "", + Src: SenderAddr.Hex(), + Dst: AnotherAddress.String(), + Index: 1, + TxHash: MockTransactions[1].Hash().String(), + Data: []byte{}, + Deployment: false, }, { - CID: "", - MhKey: "", - Src: SenderAddr.Hex(), - Dst: "", - Index: 2, - TxHash: MockTransactions[2].Hash().String(), + CID: "", + MhKey: "", + Src: SenderAddr.Hex(), + Dst: "", + Index: 2, + TxHash: MockTransactions[2].Hash().String(), + Data: MockContractByteCode, + Deployment: true, }, } MockTrxMetaPostPublsh = []eth.TxModel{ @@ -532,7 +539,7 @@ func createTransactionsAndReceipts() (types.Transactions, types.Receipts, common // make transactions trx1 := types.NewTransaction(0, Address, big.NewInt(1000), 50, big.NewInt(100), []byte{}) trx2 := types.NewTransaction(1, AnotherAddress, big.NewInt(2000), 100, big.NewInt(200), []byte{}) - trx3 := types.NewContractCreation(2, big.NewInt(1500), 75, big.NewInt(150), []byte{0, 1, 2, 3, 4, 5}) + trx3 := types.NewContractCreation(2, big.NewInt(1500), 75, big.NewInt(150), MockContractByteCode) transactionSigner := types.MakeSigner(params.MainnetChainConfig, new(big.Int).Set(BlockNumber)) mockCurve := elliptic.P256() mockPrvKey, err := ecdsa.GenerateKey(mockCurve, rand.Reader) diff --git a/pkg/eth/models.go b/pkg/eth/models.go index b78eda5c..e5375d6c 100644 --- a/pkg/eth/models.go +++ b/pkg/eth/models.go @@ -51,16 +51,16 @@ type UncleModel struct { // TxModel is the db model for eth.transaction_cids type TxModel struct { - ID int64 `db:"id"` - HeaderID int64 `db:"header_id"` - Index int64 `db:"index"` - TxHash string `db:"tx_hash"` - CID string `db:"cid"` - MhKey string `db:"mh_key"` - Dst string `db:"dst"` - Src string `db:"src"` - Data []byte `db:"data"` - Deployment bool `db:"deployment"` + ID int64 `db:"id"` + HeaderID int64 `db:"header_id"` + Index int64 `db:"index"` + TxHash string `db:"tx_hash"` + CID string `db:"cid"` + MhKey string `db:"mh_key"` + Dst string `db:"dst"` + Src string `db:"src"` + Data []byte `db:"data"` + Deployment bool `db:"deployment"` } // ReceiptModel is the db model for eth.receipt_cids diff --git a/pkg/eth/publish_and_indexer.go b/pkg/eth/publish_and_indexer.go deleted file mode 100644 index eceaf59d..00000000 --- a/pkg/eth/publish_and_indexer.go +++ /dev/null @@ -1,228 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package eth - -import ( - "fmt" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/state" - "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/statediff" - "github.com/jmoiron/sqlx" - "github.com/multiformats/go-multihash" - - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" -) - -// IPLDPublisherAndIndexer satisfies the IPLDPublisher interface for ethereum -// It interfaces directly with the public.blocks table of PG-IPFS rather than going through an ipfs intermediary -// It publishes and indexes IPLDs together in a single sqlx.Tx -type IPLDPublisherAndIndexer struct { - indexer *CIDIndexer -} - -// NewIPLDPublisherAndIndexer creates a pointer to a new IPLDPublisherAndIndexer which satisfies the IPLDPublisher interface -func NewIPLDPublisherAndIndexer(db *postgres.DB) *IPLDPublisherAndIndexer { - return &IPLDPublisherAndIndexer{ - indexer: NewCIDIndexer(db), - } -} - -// Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload -func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) { - ipldPayload, ok := payload.(ConvertedPayload) - if !ok { - return nil, fmt.Errorf("eth IPLDPublisherAndIndexer expected payload type %T got %T", ConvertedPayload{}, payload) - } - // Generate the iplds - headerNode, uncleNodes, txNodes, txTrieNodes, rctNodes, rctTrieNodes, err := ipld.FromBlockAndReceipts(ipldPayload.Block, ipldPayload.Receipts) - if err != nil { - return nil, err - } - - // Begin new db tx - tx, err := pub.indexer.db.Beginx() - if err != nil { - return nil, err - } - defer func() { - if p := recover(); p != nil { - shared.Rollback(tx) - panic(p) - } else if err != nil { - shared.Rollback(tx) - } else { - err = tx.Commit() - } - }() - - // Publish trie nodes - for _, node := range txTrieNodes { - if err := shared.PublishIPLD(tx, node); err != nil { - return nil, err - } - } - for _, node := range rctTrieNodes { - if err := shared.PublishIPLD(tx, node); err != nil { - return nil, err - } - } - - // Publish and index header - if err := shared.PublishIPLD(tx, headerNode); err != nil { - return nil, err - } - reward := CalcEthBlockReward(ipldPayload.Block.Header(), ipldPayload.Block.Uncles(), ipldPayload.Block.Transactions(), ipldPayload.Receipts) - header := HeaderModel{ - CID: headerNode.Cid().String(), - MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), - ParentHash: ipldPayload.Block.ParentHash().String(), - BlockNumber: ipldPayload.Block.Number().String(), - BlockHash: ipldPayload.Block.Hash().String(), - TotalDifficulty: ipldPayload.TotalDifficulty.String(), - Reward: reward.String(), - Bloom: ipldPayload.Block.Bloom().Bytes(), - StateRoot: ipldPayload.Block.Root().String(), - RctRoot: ipldPayload.Block.ReceiptHash().String(), - TxRoot: ipldPayload.Block.TxHash().String(), - UncleRoot: ipldPayload.Block.UncleHash().String(), - Timestamp: ipldPayload.Block.Time(), - } - headerID, err := pub.indexer.indexHeaderCID(tx, header) - if err != nil { - return nil, err - } - - // Publish and index uncles - for _, uncleNode := range uncleNodes { - if err := shared.PublishIPLD(tx, uncleNode); err != nil { - return nil, err - } - uncleReward := CalcUncleMinerReward(ipldPayload.Block.Number().Int64(), uncleNode.Number.Int64()) - uncle := UncleModel{ - CID: uncleNode.Cid().String(), - MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), - ParentHash: uncleNode.ParentHash.String(), - BlockHash: uncleNode.Hash().String(), - Reward: uncleReward.String(), - } - if err := pub.indexer.indexUncleCID(tx, uncle, headerID); err != nil { - return nil, err - } - } - - // Publish and index txs and receipts - for i, txNode := range txNodes { - if err := shared.PublishIPLD(tx, txNode); err != nil { - return nil, err - } - rctNode := rctNodes[i] - if err := shared.PublishIPLD(tx, rctNode); err != nil { - return nil, err - } - txModel := ipldPayload.TxMetaData[i] - txModel.CID = txNode.Cid().String() - txModel.MhKey = shared.MultihashKeyFromCID(txNode.Cid()) - txID, err := pub.indexer.indexTransactionCID(tx, txModel, headerID) - if err != nil { - return nil, err - } - rctModel := ipldPayload.ReceiptMetaData[i] - rctModel.CID = rctNode.Cid().String() - rctModel.MhKey = shared.MultihashKeyFromCID(rctNode.Cid()) - if err := pub.indexer.indexReceiptCID(tx, rctModel, txID); err != nil { - return nil, err - } - } - - // Publish and index state and storage - err = pub.publishAndIndexStateAndStorage(tx, ipldPayload, headerID) - - // This IPLDPublisher does both publishing and indexing, we do not need to pass anything forward to the indexer - return nil, err // return err variable explicitly so that we return the err = tx.Commit() assignment in the defer -} - -func (pub *IPLDPublisherAndIndexer) publishAndIndexStateAndStorage(tx *sqlx.Tx, ipldPayload ConvertedPayload, headerID int64) error { - // Publish and index state and storage - for _, stateNode := range ipldPayload.StateNodes { - stateCIDStr, err := shared.PublishRaw(tx, ipld.MEthStateTrie, multihash.KECCAK_256, stateNode.Value) - if err != nil { - return err - } - mhKey, _ := shared.MultihashKeyFromCIDString(stateCIDStr) - stateModel := StateNodeModel{ - Path: stateNode.Path, - StateKey: stateNode.LeafKey.String(), - CID: stateCIDStr, - MhKey: mhKey, - NodeType: ResolveFromNodeType(stateNode.Type), - } - stateID, err := pub.indexer.indexStateCID(tx, stateModel, headerID) - if err != nil { - return err - } - // If we have a leaf, decode and index the account data and any associated storage diffs - if stateNode.Type == statediff.Leaf { - var i []interface{} - if err := rlp.DecodeBytes(stateNode.Value, &i); err != nil { - return err - } - if len(i) != 2 { - return fmt.Errorf("eth IPLDPublisherAndIndexer expected state leaf node rlp to decode into two elements") - } - var account state.Account - if err := rlp.DecodeBytes(i[1].([]byte), &account); err != nil { - return err - } - accountModel := StateAccountModel{ - Balance: account.Balance.String(), - Nonce: account.Nonce, - CodeHash: account.CodeHash, - StorageRoot: account.Root.String(), - } - if err := pub.indexer.indexStateAccount(tx, accountModel, stateID); err != nil { - return err - } - for _, storageNode := range ipldPayload.StorageNodes[common.Bytes2Hex(stateNode.Path)] { - storageCIDStr, err := shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, storageNode.Value) - if err != nil { - return err - } - mhKey, _ := shared.MultihashKeyFromCIDString(storageCIDStr) - storageModel := StorageNodeModel{ - Path: storageNode.Path, - StorageKey: storageNode.LeafKey.Hex(), - CID: storageCIDStr, - MhKey: mhKey, - NodeType: ResolveFromNodeType(storageNode.Type), - } - if err := pub.indexer.indexStorageCID(tx, storageModel, stateID); err != nil { - return err - } - } - } - } - return nil -} - -// Index satisfies the shared.CIDIndexer interface -func (pub *IPLDPublisherAndIndexer) Index(cids shared.CIDsForIndexing) error { - return nil -} diff --git a/pkg/eth/publish_and_indexer_test.go b/pkg/eth/publish_and_indexer_test.go deleted file mode 100644 index c90d10ae..00000000 --- a/pkg/eth/publish_and_indexer_test.go +++ /dev/null @@ -1,238 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package eth_test - -import ( - "github.com/ethereum/go-ethereum/common" - "github.com/ipfs/go-cid" - "github.com/ipfs/go-ipfs-blockstore" - "github.com/ipfs/go-ipfs-ds-help" - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/eth/mocks" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" -) - -var _ = Describe("PublishAndIndexer", func() { - var ( - db *postgres.DB - err error - repo *eth.IPLDPublisherAndIndexer - ipfsPgGet = `SELECT data FROM public.blocks - WHERE key = $1` - ) - BeforeEach(func() { - db, err = shared.SetupDB() - Expect(err).ToNot(HaveOccurred()) - repo = eth.NewIPLDPublisherAndIndexer(db) - }) - AfterEach(func() { - eth.TearDownDB(db) - }) - - Describe("Publish", func() { - It("Published and indexes header IPLDs in a single tx", func() { - emptyReturn, err := repo.Publish(mocks.MockConvertedPayload) - Expect(emptyReturn).To(BeNil()) - Expect(err).ToNot(HaveOccurred()) - pgStr := `SELECT cid, td, reward, id - FROM eth.header_cids - WHERE block_number = $1` - // check header was properly indexed - type res struct { - CID string - TD string - Reward string - ID int - } - header := new(res) - err = db.QueryRowx(pgStr, 1).StructScan(header) - Expect(err).ToNot(HaveOccurred()) - Expect(header.CID).To(Equal(mocks.HeaderCID.String())) - Expect(header.TD).To(Equal(mocks.MockBlock.Difficulty().String())) - Expect(header.Reward).To(Equal("5000000000000000000")) - dc, err := cid.Decode(header.CID) - Expect(err).ToNot(HaveOccurred()) - mhKey := dshelp.MultihashToDsKey(dc.Hash()) - prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() - var data []byte - err = db.Get(&data, ipfsPgGet, prefixedKey) - Expect(err).ToNot(HaveOccurred()) - Expect(data).To(Equal(mocks.MockHeaderRlp)) - }) - - It("Publishes and indexes transaction IPLDs in a single tx", func() { - emptyReturn, err := repo.Publish(mocks.MockConvertedPayload) - Expect(emptyReturn).To(BeNil()) - Expect(err).ToNot(HaveOccurred()) - // check that txs were properly indexed - trxs := make([]string, 0) - pgStr := `SELECT transaction_cids.cid FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.id) - WHERE header_cids.block_number = $1` - err = db.Select(&trxs, pgStr, 1) - Expect(err).ToNot(HaveOccurred()) - Expect(len(trxs)).To(Equal(3)) - Expect(shared.ListContainsString(trxs, mocks.Trx1CID.String())).To(BeTrue()) - Expect(shared.ListContainsString(trxs, mocks.Trx2CID.String())).To(BeTrue()) - Expect(shared.ListContainsString(trxs, mocks.Trx3CID.String())).To(BeTrue()) - // and published - for _, c := range trxs { - dc, err := cid.Decode(c) - Expect(err).ToNot(HaveOccurred()) - mhKey := dshelp.MultihashToDsKey(dc.Hash()) - prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() - var data []byte - err = db.Get(&data, ipfsPgGet, prefixedKey) - Expect(err).ToNot(HaveOccurred()) - switch c { - case mocks.Trx1CID.String(): - Expect(data).To(Equal(mocks.MockTransactions.GetRlp(0))) - case mocks.Trx2CID.String(): - Expect(data).To(Equal(mocks.MockTransactions.GetRlp(1))) - case mocks.Trx3CID.String(): - Expect(data).To(Equal(mocks.MockTransactions.GetRlp(2))) - } - } - }) - - It("Publishes and indexes receipt IPLDs in a single tx", func() { - emptyReturn, err := repo.Publish(mocks.MockConvertedPayload) - Expect(emptyReturn).To(BeNil()) - Expect(err).ToNot(HaveOccurred()) - // check receipts were properly indexed - rcts := make([]string, 0) - pgStr := `SELECT receipt_cids.cid FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids - WHERE receipt_cids.tx_id = transaction_cids.id - AND transaction_cids.header_id = header_cids.id - AND header_cids.block_number = $1` - err = db.Select(&rcts, pgStr, 1) - Expect(err).ToNot(HaveOccurred()) - Expect(len(rcts)).To(Equal(3)) - Expect(shared.ListContainsString(rcts, mocks.Rct1CID.String())).To(BeTrue()) - Expect(shared.ListContainsString(rcts, mocks.Rct2CID.String())).To(BeTrue()) - Expect(shared.ListContainsString(rcts, mocks.Rct3CID.String())).To(BeTrue()) - // and published - for _, c := range rcts { - dc, err := cid.Decode(c) - Expect(err).ToNot(HaveOccurred()) - mhKey := dshelp.MultihashToDsKey(dc.Hash()) - prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() - var data []byte - err = db.Get(&data, ipfsPgGet, prefixedKey) - Expect(err).ToNot(HaveOccurred()) - switch c { - case mocks.Rct1CID.String(): - Expect(data).To(Equal(mocks.MockReceipts.GetRlp(0))) - case mocks.Rct2CID.String(): - Expect(data).To(Equal(mocks.MockReceipts.GetRlp(1))) - case mocks.Rct3CID.String(): - Expect(data).To(Equal(mocks.MockReceipts.GetRlp(2))) - } - } - }) - - It("Publishes and indexes state IPLDs in a single tx", func() { - emptyReturn, err := repo.Publish(mocks.MockConvertedPayload) - Expect(emptyReturn).To(BeNil()) - Expect(err).ToNot(HaveOccurred()) - // check that state nodes were properly indexed and published - stateNodes := make([]eth.StateNodeModel, 0) - pgStr := `SELECT state_cids.id, state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id - FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id) - WHERE header_cids.block_number = $1` - err = db.Select(&stateNodes, pgStr, 1) - Expect(err).ToNot(HaveOccurred()) - Expect(len(stateNodes)).To(Equal(2)) - for _, stateNode := range stateNodes { - var data []byte - dc, err := cid.Decode(stateNode.CID) - Expect(err).ToNot(HaveOccurred()) - mhKey := dshelp.MultihashToDsKey(dc.Hash()) - prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() - err = db.Get(&data, ipfsPgGet, prefixedKey) - Expect(err).ToNot(HaveOccurred()) - pgStr = `SELECT * from eth.state_accounts WHERE state_id = $1` - var account eth.StateAccountModel - err = db.Get(&account, pgStr, stateNode.ID) - Expect(err).ToNot(HaveOccurred()) - if stateNode.CID == mocks.State1CID.String() { - Expect(stateNode.NodeType).To(Equal(2)) - Expect(stateNode.StateKey).To(Equal(common.BytesToHash(mocks.ContractLeafKey).Hex())) - Expect(stateNode.Path).To(Equal([]byte{'\x06'})) - Expect(data).To(Equal(mocks.ContractLeafNode)) - Expect(account).To(Equal(eth.StateAccountModel{ - ID: account.ID, - StateID: stateNode.ID, - Balance: "0", - CodeHash: mocks.ContractCodeHash.Bytes(), - StorageRoot: mocks.ContractRoot, - Nonce: 1, - })) - } - if stateNode.CID == mocks.State2CID.String() { - Expect(stateNode.NodeType).To(Equal(2)) - Expect(stateNode.StateKey).To(Equal(common.BytesToHash(mocks.AccountLeafKey).Hex())) - Expect(stateNode.Path).To(Equal([]byte{'\x0c'})) - Expect(data).To(Equal(mocks.AccountLeafNode)) - Expect(account).To(Equal(eth.StateAccountModel{ - ID: account.ID, - StateID: stateNode.ID, - Balance: "1000", - CodeHash: mocks.AccountCodeHash.Bytes(), - StorageRoot: mocks.AccountRoot, - Nonce: 0, - })) - } - } - pgStr = `SELECT * from eth.state_accounts WHERE state_id = $1` - }) - - It("Publishes and indexes storage IPLDs in a single tx", func() { - emptyReturn, err := repo.Publish(mocks.MockConvertedPayload) - Expect(emptyReturn).To(BeNil()) - Expect(err).ToNot(HaveOccurred()) - // check that storage nodes were properly indexed - storageNodes := make([]eth.StorageNodeWithStateKeyModel, 0) - pgStr := `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path - FROM eth.storage_cids, eth.state_cids, eth.header_cids - WHERE storage_cids.state_id = state_cids.id - AND state_cids.header_id = header_cids.id - AND header_cids.block_number = $1` - err = db.Select(&storageNodes, pgStr, 1) - Expect(err).ToNot(HaveOccurred()) - Expect(len(storageNodes)).To(Equal(1)) - Expect(storageNodes[0]).To(Equal(eth.StorageNodeWithStateKeyModel{ - CID: mocks.StorageCID.String(), - NodeType: 2, - StorageKey: common.BytesToHash(mocks.StorageLeafKey).Hex(), - StateKey: common.BytesToHash(mocks.ContractLeafKey).Hex(), - Path: []byte{}, - })) - var data []byte - dc, err := cid.Decode(storageNodes[0].CID) - Expect(err).ToNot(HaveOccurred()) - mhKey := dshelp.MultihashToDsKey(dc.Hash()) - prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() - err = db.Get(&data, ipfsPgGet, prefixedKey) - Expect(err).ToNot(HaveOccurred()) - Expect(data).To(Equal(mocks.StorageLeafNode)) - }) - }) -}) diff --git a/pkg/eth/publisher.go b/pkg/eth/publisher.go index 9a500631..de8bdcc0 100644 --- a/pkg/eth/publisher.go +++ b/pkg/eth/publisher.go @@ -31,36 +31,36 @@ import ( "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" ) -// IPLDPublisherAndIndexer satisfies the IPLDPublisher interface for ethereum +// IPLDPublisher satisfies the IPLDPublisher interface for ethereum // It interfaces directly with the public.blocks table of PG-IPFS rather than going through an ipfs intermediary // It publishes and indexes IPLDs together in a single sqlx.Tx -type IPLDPublisherAndIndexer struct { +type IPLDPublisher struct { indexer *CIDIndexer } -// NewIPLDPublisherAndIndexer creates a pointer to a new IPLDPublisherAndIndexer which satisfies the IPLDPublisher interface -func NewIPLDPublisherAndIndexer(db *postgres.DB) *IPLDPublisherAndIndexer { - return &IPLDPublisherAndIndexer{ +// NewIPLDPublisher creates a pointer to a new IPLDPublisher which satisfies the IPLDPublisher interface +func NewIPLDPublisher(db *postgres.DB) *IPLDPublisher { + return &IPLDPublisher{ indexer: NewCIDIndexer(db), } } // Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload -func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (shared.CIDsForIndexing, error) { +func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) error { ipldPayload, ok := payload.(ConvertedPayload) if !ok { - return nil, fmt.Errorf("eth IPLDPublisherAndIndexer expected payload type %T got %T", ConvertedPayload{}, payload) + return fmt.Errorf("eth IPLDPublisher expected payload type %T got %T", ConvertedPayload{}, payload) } // Generate the iplds headerNode, uncleNodes, txNodes, txTrieNodes, rctNodes, rctTrieNodes, err := ipld.FromBlockAndReceipts(ipldPayload.Block, ipldPayload.Receipts) if err != nil { - return nil, err + return err } // Begin new db tx tx, err := pub.indexer.db.Beginx() if err != nil { - return nil, err + return err } defer func() { if p := recover(); p != nil { @@ -76,18 +76,18 @@ func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (share // Publish trie nodes for _, node := range txTrieNodes { if err := shared.PublishIPLD(tx, node); err != nil { - return nil, err + return err } } for _, node := range rctTrieNodes { if err := shared.PublishIPLD(tx, node); err != nil { - return nil, err + return err } } // Publish and index header if err := shared.PublishIPLD(tx, headerNode); err != nil { - return nil, err + return err } reward := CalcEthBlockReward(ipldPayload.Block.Header(), ipldPayload.Block.Uncles(), ipldPayload.Block.Transactions(), ipldPayload.Receipts) header := HeaderModel{ @@ -107,13 +107,13 @@ func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (share } headerID, err := pub.indexer.indexHeaderCID(tx, header) if err != nil { - return nil, err + return err } // Publish and index uncles for _, uncleNode := range uncleNodes { if err := shared.PublishIPLD(tx, uncleNode); err != nil { - return nil, err + return err } uncleReward := CalcUncleMinerReward(ipldPayload.Block.Number().Int64(), uncleNode.Number.Int64()) uncle := UncleModel{ @@ -124,47 +124,47 @@ func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (share Reward: uncleReward.String(), } if err := pub.indexer.indexUncleCID(tx, uncle, headerID); err != nil { - return nil, err + return err } } // Publish and index txs and receipts for i, txNode := range txNodes { if err := shared.PublishIPLD(tx, txNode); err != nil { - return nil, err + return err } rctNode := rctNodes[i] if err := shared.PublishIPLD(tx, rctNode); err != nil { - return nil, err + return err } txModel := ipldPayload.TxMetaData[i] txModel.CID = txNode.Cid().String() txModel.MhKey = shared.MultihashKeyFromCID(txNode.Cid()) txID, err := pub.indexer.indexTransactionCID(tx, txModel, headerID) if err != nil { - return nil, err + return err } + // If tx is a contract deployment, publish the data (code) if txModel.Deployment { if _, err = shared.PublishRaw(tx, ipld.MEthStorageTrie, multihash.KECCAK_256, txModel.Data); err != nil { - return nil, err + return err } } rctModel := ipldPayload.ReceiptMetaData[i] rctModel.CID = rctNode.Cid().String() rctModel.MhKey = shared.MultihashKeyFromCID(rctNode.Cid()) if err := pub.indexer.indexReceiptCID(tx, rctModel, txID); err != nil { - return nil, err + return err } } // Publish and index state and storage err = pub.publishAndIndexStateAndStorage(tx, ipldPayload, headerID) - // This IPLDPublisher does both publishing and indexing, we do not need to pass anything forward to the indexer - return nil, err // return err variable explicitly so that we return the err = tx.Commit() assignment in the defer + return err // return err variable explicitly so that we return the err = tx.Commit() assignment in the defer } -func (pub *IPLDPublisherAndIndexer) publishAndIndexStateAndStorage(tx *sqlx.Tx, ipldPayload ConvertedPayload, headerID int64) error { +func (pub *IPLDPublisher) publishAndIndexStateAndStorage(tx *sqlx.Tx, ipldPayload ConvertedPayload, headerID int64) error { // Publish and index state and storage for _, stateNode := range ipldPayload.StateNodes { stateCIDStr, err := shared.PublishRaw(tx, ipld.MEthStateTrie, multihash.KECCAK_256, stateNode.Value) @@ -190,7 +190,7 @@ func (pub *IPLDPublisherAndIndexer) publishAndIndexStateAndStorage(tx *sqlx.Tx, return err } if len(i) != 2 { - return fmt.Errorf("eth IPLDPublisherAndIndexer expected state leaf node rlp to decode into two elements") + return fmt.Errorf("eth IPLDPublisher expected state leaf node rlp to decode into two elements") } var account state.Account if err := rlp.DecodeBytes(i[1].([]byte), &account); err != nil { @@ -226,8 +226,3 @@ func (pub *IPLDPublisherAndIndexer) publishAndIndexStateAndStorage(tx *sqlx.Tx, } return nil } - -// Index satisfies the shared.CIDIndexer interface -func (pub *IPLDPublisherAndIndexer) Index(cids shared.CIDsForIndexing) error { - return nil -} diff --git a/pkg/eth/publisher_test.go b/pkg/eth/publisher_test.go index c90d10ae..559e9b5a 100644 --- a/pkg/eth/publisher_test.go +++ b/pkg/eth/publisher_test.go @@ -34,14 +34,14 @@ var _ = Describe("PublishAndIndexer", func() { var ( db *postgres.DB err error - repo *eth.IPLDPublisherAndIndexer + repo *eth.IPLDPublisher ipfsPgGet = `SELECT data FROM public.blocks WHERE key = $1` ) BeforeEach(func() { db, err = shared.SetupDB() Expect(err).ToNot(HaveOccurred()) - repo = eth.NewIPLDPublisherAndIndexer(db) + repo = eth.NewIPLDPublisher(db) }) AfterEach(func() { eth.TearDownDB(db) @@ -49,8 +49,7 @@ var _ = Describe("PublishAndIndexer", func() { Describe("Publish", func() { It("Published and indexes header IPLDs in a single tx", func() { - emptyReturn, err := repo.Publish(mocks.MockConvertedPayload) - Expect(emptyReturn).To(BeNil()) + err = repo.Publish(mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) pgStr := `SELECT cid, td, reward, id FROM eth.header_cids @@ -79,8 +78,7 @@ var _ = Describe("PublishAndIndexer", func() { }) It("Publishes and indexes transaction IPLDs in a single tx", func() { - emptyReturn, err := repo.Publish(mocks.MockConvertedPayload) - Expect(emptyReturn).To(BeNil()) + err = repo.Publish(mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) // check that txs were properly indexed trxs := make([]string, 0) @@ -113,8 +111,7 @@ var _ = Describe("PublishAndIndexer", func() { }) It("Publishes and indexes receipt IPLDs in a single tx", func() { - emptyReturn, err := repo.Publish(mocks.MockConvertedPayload) - Expect(emptyReturn).To(BeNil()) + err = repo.Publish(mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) // check receipts were properly indexed rcts := make([]string, 0) @@ -149,8 +146,7 @@ var _ = Describe("PublishAndIndexer", func() { }) It("Publishes and indexes state IPLDs in a single tx", func() { - emptyReturn, err := repo.Publish(mocks.MockConvertedPayload) - Expect(emptyReturn).To(BeNil()) + err = repo.Publish(mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) // check that state nodes were properly indexed and published stateNodes := make([]eth.StateNodeModel, 0) @@ -205,8 +201,7 @@ var _ = Describe("PublishAndIndexer", func() { }) It("Publishes and indexes storage IPLDs in a single tx", func() { - emptyReturn, err := repo.Publish(mocks.MockConvertedPayload) - Expect(emptyReturn).To(BeNil()) + err = repo.Publish(mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) // check that storage nodes were properly indexed storageNodes := make([]eth.StorageNodeWithStateKeyModel, 0) diff --git a/pkg/historical/config.go b/pkg/historical/config.go index bdb88481..c435f187 100644 --- a/pkg/historical/config.go +++ b/pkg/historical/config.go @@ -45,8 +45,6 @@ const ( // Config struct type Config struct { Chain shared.ChainType - IPFSPath string - IPFSMode shared.IPFSMode DBConfig config.Database DB *postgres.DB @@ -71,19 +69,7 @@ func NewConfig() (*Config, error) { return nil, err } - c.IPFSMode, err = shared.GetIPFSMode() - if err != nil { - return nil, err - } - if c.IPFSMode == shared.LocalInterface || c.IPFSMode == shared.RemoteClient { - c.IPFSPath, err = shared.GetIPFSPath() - if err != nil { - return nil, err - } - } - c.DBConfig.Init() - if err := c.init(); err != nil { return nil, err } diff --git a/pkg/historical/service.go b/pkg/historical/service.go index c02c2a17..95173435 100644 --- a/pkg/historical/service.go +++ b/pkg/historical/service.go @@ -40,8 +40,6 @@ type BackFillService struct { Converter shared.PayloadConverter // Interface for publishing the IPLD payloads to IPFS Publisher shared.IPLDPublisher - // Interface for indexing the CIDs of the published IPLDs in Postgres - Indexer shared.CIDIndexer // Interface for searching and retrieving CIDs from Postgres index Retriever shared.CIDRetriever // Interface for fetching payloads over at historical blocks; over http @@ -64,11 +62,7 @@ type BackFillService struct { // NewBackFillService returns a new BackFillInterface func NewBackFillService(settings *Config, screenAndServeChan chan shared.ConvertedData) (BackFillInterface, error) { - publisher, err := builders.NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.DB, settings.IPFSMode) - if err != nil { - return nil, err - } - indexer, err := builders.NewCIDIndexer(settings.Chain, settings.DB, settings.IPFSMode) + publisher, err := builders.NewIPLDPublisher(settings.Chain, settings.DB) if err != nil { return nil, err } @@ -93,7 +87,6 @@ func NewBackFillService(settings *Config, screenAndServeChan chan shared.Convert batchNumber = shared.DefaultMaxBatchNumber } return &BackFillService{ - Indexer: indexer, Converter: converter, Publisher: publisher, Retriever: retriever, @@ -183,14 +176,10 @@ func (bfs *BackFillService) backFill(wg *sync.WaitGroup, id int, heightChan chan default: log.Debugf("%s backFill worker %d unable to forward converted payload to server; no channel ready to receive", bfs.chain.String(), id) } - cidPayload, err := bfs.Publisher.Publish(ipldPayload) - if err != nil { + if err := bfs.Publisher.Publish(ipldPayload); err != nil { log.Errorf("%s backFill worker %d publisher error: %s", bfs.chain.String(), id, err.Error()) continue } - if err := bfs.Indexer.Index(cidPayload); err != nil { - log.Errorf("%s backFill worker %d indexer error: %s", bfs.chain.String(), id, err.Error()) - } } log.Infof("%s backFill worker %d finished section from %d to %d", bfs.chain.String(), id, heights[0], heights[len(heights)-1]) case <-bfs.QuitChan: diff --git a/pkg/historical/service_test.go b/pkg/historical/service_test.go index 2e3ef81b..51cf3e23 100644 --- a/pkg/historical/service_test.go +++ b/pkg/historical/service_test.go @@ -33,9 +33,6 @@ import ( var _ = Describe("BackFiller", func() { Describe("FillGaps", func() { It("Periodically checks for and fills in gaps in the watcher's data", func() { - mockCidRepo := &mocks.CIDIndexer{ - ReturnErr: nil, - } mockPublisher := &mocks.IterativeIPLDPublisher{ ReturnCIDPayload: []*eth.CIDPayload{mocks.MockCIDPayload, mocks.MockCIDPayload}, ReturnErr: nil, @@ -60,7 +57,6 @@ var _ = Describe("BackFiller", func() { } quitChan := make(chan bool, 1) backfiller := &historical.BackFillService{ - Indexer: mockCidRepo, Publisher: mockPublisher, Converter: mockConverter, Fetcher: mockFetcher, @@ -74,9 +70,6 @@ var _ = Describe("BackFiller", func() { backfiller.BackFill(wg) time.Sleep(time.Second * 3) quitChan <- true - Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(2)) - Expect(mockCidRepo.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload)) - Expect(mockCidRepo.PassedCIDPayload[1]).To(Equal(mocks.MockCIDPayload)) Expect(len(mockPublisher.PassedIPLDPayload)).To(Equal(2)) Expect(mockPublisher.PassedIPLDPayload[0]).To(Equal(mocks.MockConvertedPayload)) Expect(mockPublisher.PassedIPLDPayload[1]).To(Equal(mocks.MockConvertedPayload)) @@ -89,9 +82,6 @@ var _ = Describe("BackFiller", func() { }) It("Works for single block `ranges`", func() { - mockCidRepo := &mocks.CIDIndexer{ - ReturnErr: nil, - } mockPublisher := &mocks.IterativeIPLDPublisher{ ReturnCIDPayload: []*eth.CIDPayload{mocks.MockCIDPayload}, ReturnErr: nil, @@ -115,7 +105,6 @@ var _ = Describe("BackFiller", func() { } quitChan := make(chan bool, 1) backfiller := &historical.BackFillService{ - Indexer: mockCidRepo, Publisher: mockPublisher, Converter: mockConverter, Fetcher: mockFetcher, @@ -129,8 +118,6 @@ var _ = Describe("BackFiller", func() { backfiller.BackFill(wg) time.Sleep(time.Second * 3) quitChan <- true - Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(1)) - Expect(mockCidRepo.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload)) Expect(len(mockPublisher.PassedIPLDPayload)).To(Equal(1)) Expect(mockPublisher.PassedIPLDPayload[0]).To(Equal(mocks.MockConvertedPayload)) Expect(len(mockConverter.PassedStatediffPayload)).To(Equal(1)) @@ -141,9 +128,6 @@ var _ = Describe("BackFiller", func() { }) It("Finds beginning gap", func() { - mockCidRepo := &mocks.CIDIndexer{ - ReturnErr: nil, - } mockPublisher := &mocks.IterativeIPLDPublisher{ ReturnCIDPayload: []*eth.CIDPayload{mocks.MockCIDPayload, mocks.MockCIDPayload}, ReturnErr: nil, @@ -169,7 +153,6 @@ var _ = Describe("BackFiller", func() { } quitChan := make(chan bool, 1) backfiller := &historical.BackFillService{ - Indexer: mockCidRepo, Publisher: mockPublisher, Converter: mockConverter, Fetcher: mockFetcher, @@ -183,9 +166,6 @@ var _ = Describe("BackFiller", func() { backfiller.BackFill(wg) time.Sleep(time.Second * 3) quitChan <- true - Expect(len(mockCidRepo.PassedCIDPayload)).To(Equal(2)) - Expect(mockCidRepo.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload)) - Expect(mockCidRepo.PassedCIDPayload[1]).To(Equal(mocks.MockCIDPayload)) Expect(len(mockPublisher.PassedIPLDPayload)).To(Equal(2)) Expect(mockPublisher.PassedIPLDPayload[0]).To(Equal(mocks.MockConvertedPayload)) Expect(mockPublisher.PassedIPLDPayload[1]).To(Equal(mocks.MockConvertedPayload)) diff --git a/pkg/ipfs/builders.go b/pkg/ipfs/builders.go deleted file mode 100644 index 317a9454..00000000 --- a/pkg/ipfs/builders.go +++ /dev/null @@ -1,90 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package ipfs - -import ( - "context" - - "github.com/sirupsen/logrus" - - "github.com/ipfs/go-blockservice" - "github.com/ipfs/go-ipfs/core" - "github.com/ipfs/go-ipfs/plugin/loader" - "github.com/ipfs/go-ipfs/repo/fsrepo" - ipld "github.com/ipfs/go-ipld-format" -) - -// InitIPFSPlugins is used to initialized IPFS plugins before creating a new IPFS node -// This should only be called once -func InitIPFSPlugins() error { - logrus.Debug("initializing IPFS plugins") - l, err := loader.NewPluginLoader("") - if err != nil { - return err - } - err = l.Initialize() - if err != nil { - return err - } - return l.Inject() -} - -// InitIPFSBlockService is used to configure and return a BlockService using an ipfs repo path (e.g. ~/.ipfs) -func InitIPFSBlockService(ipfsPath string) (blockservice.BlockService, error) { - logrus.Debug("initializing IPFS block service interface") - r, openErr := fsrepo.Open(ipfsPath) - if openErr != nil { - return nil, openErr - } - ctx := context.Background() - cfg := &core.BuildCfg{ - Online: false, - Repo: r, - } - ipfsNode, newNodeErr := core.NewNode(ctx, cfg) - if newNodeErr != nil { - return nil, newNodeErr - } - return ipfsNode.Blocks, nil -} - -type IPFS struct { - n *core.IpfsNode - ctx context.Context -} - -func (ipfs IPFS) Add(node ipld.Node) error { - return ipfs.n.DAG.Add(ipfs.n.Context(), node) -} - -func InitIPFSNode(repoPath string) (*IPFS, error) { - logrus.Debug("initializing IPFS node interface") - r, err := fsrepo.Open(repoPath) - if err != nil { - return nil, err - } - ctx := context.Background() - cfg := &core.BuildCfg{ - Online: false, - Repo: r, - } - ipfsNode, err := core.NewNode(ctx, cfg) - if err != nil { - return nil, err - } - return &IPFS{n: ipfsNode, ctx: ctx}, nil -} diff --git a/pkg/ipfs/dag_putters/btc_header.go b/pkg/ipfs/dag_putters/btc_header.go deleted file mode 100644 index e1ff52a1..00000000 --- a/pkg/ipfs/dag_putters/btc_header.go +++ /dev/null @@ -1,50 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package dag_putters - -import ( - "fmt" - "strings" - - node "github.com/ipfs/go-ipld-format" - - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld" -) - -var ( - duplicateKeyErrorString = "pq: duplicate key value violates unique constraint" -) - -type BtcHeaderDagPutter struct { - adder *ipfs.IPFS -} - -func NewBtcHeaderDagPutter(adder *ipfs.IPFS) *BtcHeaderDagPutter { - return &BtcHeaderDagPutter{adder: adder} -} - -func (bhdp *BtcHeaderDagPutter) DagPut(n node.Node) (string, error) { - header, ok := n.(*ipld.BtcHeader) - if !ok { - return "", fmt.Errorf("BtcHeaderDagPutter expected input type %T got %T", &ipld.BtcHeader{}, n) - } - if err := bhdp.adder.Add(header); err != nil && !strings.Contains(err.Error(), duplicateKeyErrorString) { - return "", err - } - return header.Cid().String(), nil -} diff --git a/pkg/ipfs/dag_putters/btc_tx.go b/pkg/ipfs/dag_putters/btc_tx.go deleted file mode 100644 index 97d23fca..00000000 --- a/pkg/ipfs/dag_putters/btc_tx.go +++ /dev/null @@ -1,46 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package dag_putters - -import ( - "fmt" - "strings" - - node "github.com/ipfs/go-ipld-format" - - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld" -) - -type BtcTxDagPutter struct { - adder *ipfs.IPFS -} - -func NewBtcTxDagPutter(adder *ipfs.IPFS) *BtcTxDagPutter { - return &BtcTxDagPutter{adder: adder} -} - -func (etdp *BtcTxDagPutter) DagPut(n node.Node) (string, error) { - transaction, ok := n.(*ipld.BtcTx) - if !ok { - return "", fmt.Errorf("BtcTxDagPutter expected input type %T got %T", &ipld.BtcTx{}, n) - } - if err := etdp.adder.Add(transaction); err != nil && !strings.Contains(err.Error(), duplicateKeyErrorString) { - return "", err - } - return transaction.Cid().String(), nil -} diff --git a/pkg/ipfs/dag_putters/btc_tx_trie.go b/pkg/ipfs/dag_putters/btc_tx_trie.go deleted file mode 100644 index 7d4623d5..00000000 --- a/pkg/ipfs/dag_putters/btc_tx_trie.go +++ /dev/null @@ -1,46 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package dag_putters - -import ( - "fmt" - "strings" - - node "github.com/ipfs/go-ipld-format" - - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld" -) - -type BtcTxTrieDagPutter struct { - adder *ipfs.IPFS -} - -func NewBtcTxTrieDagPutter(adder *ipfs.IPFS) *BtcTxTrieDagPutter { - return &BtcTxTrieDagPutter{adder: adder} -} - -func (etdp *BtcTxTrieDagPutter) DagPut(n node.Node) (string, error) { - txTrieNode, ok := n.(*ipld.BtcTxTrie) - if !ok { - return "", fmt.Errorf("BtcTxTrieDagPutter expected input type %T got %T", &ipld.BtcTxTrie{}, n) - } - if err := etdp.adder.Add(txTrieNode); err != nil && !strings.Contains(err.Error(), duplicateKeyErrorString) { - return "", err - } - return txTrieNode.Cid().String(), nil -} diff --git a/pkg/ipfs/dag_putters/eth_header.go b/pkg/ipfs/dag_putters/eth_header.go deleted file mode 100644 index fd368f7c..00000000 --- a/pkg/ipfs/dag_putters/eth_header.go +++ /dev/null @@ -1,46 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package dag_putters - -import ( - "fmt" - "strings" - - node "github.com/ipfs/go-ipld-format" - - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld" -) - -type EthHeaderDagPutter struct { - adder *ipfs.IPFS -} - -func NewEthBlockHeaderDagPutter(adder *ipfs.IPFS) *EthHeaderDagPutter { - return &EthHeaderDagPutter{adder: adder} -} - -func (bhdp *EthHeaderDagPutter) DagPut(n node.Node) (string, error) { - header, ok := n.(*ipld.EthHeader) - if !ok { - return "", fmt.Errorf("EthHeaderDagPutter expected input type %T got %T", &ipld.EthHeader{}, n) - } - if err := bhdp.adder.Add(header); err != nil && !strings.Contains(err.Error(), duplicateKeyErrorString) { - return "", err - } - return header.Cid().String(), nil -} diff --git a/pkg/ipfs/dag_putters/eth_receipt.go b/pkg/ipfs/dag_putters/eth_receipt.go deleted file mode 100644 index 35ecb279..00000000 --- a/pkg/ipfs/dag_putters/eth_receipt.go +++ /dev/null @@ -1,46 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package dag_putters - -import ( - "fmt" - "strings" - - node "github.com/ipfs/go-ipld-format" - - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld" -) - -type EthReceiptDagPutter struct { - adder *ipfs.IPFS -} - -func NewEthReceiptDagPutter(adder *ipfs.IPFS) *EthReceiptDagPutter { - return &EthReceiptDagPutter{adder: adder} -} - -func (erdp *EthReceiptDagPutter) DagPut(n node.Node) (string, error) { - receipt, ok := n.(*ipld.EthReceipt) - if !ok { - return "", fmt.Errorf("EthReceiptDagPutter expected input type %T got type %T", &ipld.EthReceipt{}, n) - } - if err := erdp.adder.Add(receipt); err != nil && !strings.Contains(err.Error(), duplicateKeyErrorString) { - return "", err - } - return receipt.Cid().String(), nil -} diff --git a/pkg/ipfs/dag_putters/eth_receipt_trie.go b/pkg/ipfs/dag_putters/eth_receipt_trie.go deleted file mode 100644 index 531f5303..00000000 --- a/pkg/ipfs/dag_putters/eth_receipt_trie.go +++ /dev/null @@ -1,46 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package dag_putters - -import ( - "fmt" - "strings" - - node "github.com/ipfs/go-ipld-format" - - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld" -) - -type EthRctTrieDagPutter struct { - adder *ipfs.IPFS -} - -func NewEthRctTrieDagPutter(adder *ipfs.IPFS) *EthRctTrieDagPutter { - return &EthRctTrieDagPutter{adder: adder} -} - -func (etdp *EthRctTrieDagPutter) DagPut(n node.Node) (string, error) { - rctTrieNode, ok := n.(*ipld.EthRctTrie) - if !ok { - return "", fmt.Errorf("EthRctTrieDagPutter expected input type %T got %T", &ipld.EthRctTrie{}, n) - } - if err := etdp.adder.Add(rctTrieNode); err != nil && !strings.Contains(err.Error(), duplicateKeyErrorString) { - return "", err - } - return rctTrieNode.Cid().String(), nil -} diff --git a/pkg/ipfs/dag_putters/eth_state.go b/pkg/ipfs/dag_putters/eth_state.go deleted file mode 100644 index 50d39092..00000000 --- a/pkg/ipfs/dag_putters/eth_state.go +++ /dev/null @@ -1,46 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package dag_putters - -import ( - "fmt" - "strings" - - node "github.com/ipfs/go-ipld-format" - - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld" -) - -type EthStateDagPutter struct { - adder *ipfs.IPFS -} - -func NewEthStateDagPutter(adder *ipfs.IPFS) *EthStateDagPutter { - return &EthStateDagPutter{adder: adder} -} - -func (erdp *EthStateDagPutter) DagPut(n node.Node) (string, error) { - stateNode, ok := n.(*ipld.EthStateTrie) - if !ok { - return "", fmt.Errorf("EthStateDagPutter expected input type %T got %T", &ipld.EthStateTrie{}, n) - } - if err := erdp.adder.Add(stateNode); err != nil && !strings.Contains(err.Error(), duplicateKeyErrorString) { - return "", err - } - return stateNode.Cid().String(), nil -} diff --git a/pkg/ipfs/dag_putters/eth_storage.go b/pkg/ipfs/dag_putters/eth_storage.go deleted file mode 100644 index 1f6b8acd..00000000 --- a/pkg/ipfs/dag_putters/eth_storage.go +++ /dev/null @@ -1,46 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package dag_putters - -import ( - "fmt" - "strings" - - node "github.com/ipfs/go-ipld-format" - - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld" -) - -type EthStorageDagPutter struct { - adder *ipfs.IPFS -} - -func NewEthStorageDagPutter(adder *ipfs.IPFS) *EthStorageDagPutter { - return &EthStorageDagPutter{adder: adder} -} - -func (erdp *EthStorageDagPutter) DagPut(n node.Node) (string, error) { - storageNode, ok := n.(*ipld.EthStorageTrie) - if !ok { - return "", fmt.Errorf("EthStorageDagPutter expected input type %T got %T", &ipld.EthStorageTrie{}, n) - } - if err := erdp.adder.Add(storageNode); err != nil && !strings.Contains(err.Error(), duplicateKeyErrorString) { - return "", err - } - return storageNode.Cid().String(), nil -} diff --git a/pkg/ipfs/dag_putters/eth_tx.go b/pkg/ipfs/dag_putters/eth_tx.go deleted file mode 100644 index d4ef103f..00000000 --- a/pkg/ipfs/dag_putters/eth_tx.go +++ /dev/null @@ -1,46 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package dag_putters - -import ( - "fmt" - "strings" - - node "github.com/ipfs/go-ipld-format" - - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld" -) - -type EthTxsDagPutter struct { - adder *ipfs.IPFS -} - -func NewEthTxsDagPutter(adder *ipfs.IPFS) *EthTxsDagPutter { - return &EthTxsDagPutter{adder: adder} -} - -func (etdp *EthTxsDagPutter) DagPut(n node.Node) (string, error) { - transaction, ok := n.(*ipld.EthTx) - if !ok { - return "", fmt.Errorf("EthTxsDagPutter expected input type %T got %T", &ipld.EthTx{}, n) - } - if err := etdp.adder.Add(transaction); err != nil && !strings.Contains(err.Error(), duplicateKeyErrorString) { - return "", err - } - return transaction.Cid().String(), nil -} diff --git a/pkg/ipfs/dag_putters/eth_tx_trie.go b/pkg/ipfs/dag_putters/eth_tx_trie.go deleted file mode 100644 index e802d7f3..00000000 --- a/pkg/ipfs/dag_putters/eth_tx_trie.go +++ /dev/null @@ -1,46 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package dag_putters - -import ( - "fmt" - "strings" - - node "github.com/ipfs/go-ipld-format" - - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld" -) - -type EthTxTrieDagPutter struct { - adder *ipfs.IPFS -} - -func NewEthTxTrieDagPutter(adder *ipfs.IPFS) *EthTxTrieDagPutter { - return &EthTxTrieDagPutter{adder: adder} -} - -func (etdp *EthTxTrieDagPutter) DagPut(n node.Node) (string, error) { - txTrieNode, ok := n.(*ipld.EthTxTrie) - if !ok { - return "", fmt.Errorf("EthTxTrieDagPutter expected input type %T got %T", &ipld.EthTxTrie{}, n) - } - if err := etdp.adder.Add(txTrieNode); err != nil && !strings.Contains(err.Error(), duplicateKeyErrorString) { - return "", err - } - return txTrieNode.Cid().String(), nil -} diff --git a/pkg/ipfs/interfaces.go b/pkg/ipfs/interfaces.go deleted file mode 100644 index cb0f25d9..00000000 --- a/pkg/ipfs/interfaces.go +++ /dev/null @@ -1,26 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package ipfs - -import ( - ipld "github.com/ipfs/go-ipld-format" -) - -// DagPutter is a general interface for a dag putter -type DagPutter interface { - DagPut(n ipld.Node) (string, error) -} diff --git a/pkg/ipfs/mocks/blockservice.go b/pkg/ipfs/mocks/blockservice.go deleted file mode 100644 index fdab2fd9..00000000 --- a/pkg/ipfs/mocks/blockservice.go +++ /dev/null @@ -1,86 +0,0 @@ -package mocks - -import ( - "context" - "errors" - - "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" - "github.com/ipfs/go-ipfs-blockstore" - "github.com/ipfs/go-ipfs-exchange-interface" -) - -// MockIPFSBlockService is a mock for testing the ipfs fetcher -type MockIPFSBlockService struct { - Blocks map[cid.Cid]blocks.Block -} - -// GetBlock is used to retrieve a block from the mock BlockService -func (bs *MockIPFSBlockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { - if bs.Blocks == nil { - return nil, errors.New("BlockService has not been initialized") - } - blk, ok := bs.Blocks[c] - if ok { - return blk, nil - } - return nil, nil -} - -// GetBlocks is used to retrieve a set of blocks from the mock BlockService -func (bs *MockIPFSBlockService) GetBlocks(ctx context.Context, cs []cid.Cid) <-chan blocks.Block { - if bs.Blocks == nil { - panic("BlockService has not been initialized") - } - blkChan := make(chan blocks.Block) - go func() { - for _, c := range cs { - blk, ok := bs.Blocks[c] - if ok { - blkChan <- blk - } - } - close(blkChan) - }() - return blkChan -} - -// AddBlock adds a block to the mock BlockService -func (bs *MockIPFSBlockService) AddBlock(blk blocks.Block) error { - if bs.Blocks == nil { - bs.Blocks = make(map[cid.Cid]blocks.Block) - } - bs.Blocks[blk.Cid()] = blk - return nil -} - -// AddBlocks adds a set of blocks to the mock BlockService -func (bs *MockIPFSBlockService) AddBlocks(blks []blocks.Block) error { - if bs.Blocks == nil { - bs.Blocks = make(map[cid.Cid]blocks.Block) - } - for _, block := range blks { - bs.Blocks[block.Cid()] = block - } - return nil -} - -// Close is here to satisfy the interface -func (*MockIPFSBlockService) Close() error { - panic("implement me") -} - -// Blockstore is here to satisfy the interface -func (*MockIPFSBlockService) Blockstore() blockstore.Blockstore { - panic("implement me") -} - -// DeleteBlock is here to satisfy the interface -func (*MockIPFSBlockService) DeleteBlock(c cid.Cid) error { - panic("implement me") -} - -// Exchange is here to satisfy the interface -func (*MockIPFSBlockService) Exchange() exchange.Interface { - panic("implement me") -} diff --git a/pkg/ipfs/mocks/dag_putters.go b/pkg/ipfs/mocks/dag_putters.go deleted file mode 100644 index dd7a9380..00000000 --- a/pkg/ipfs/mocks/dag_putters.go +++ /dev/null @@ -1,53 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package mocks - -import ( - "errors" - - node "github.com/ipfs/go-ipld-format" - - "github.com/ethereum/go-ethereum/common" -) - -// DagPutter is a mock for testing the ipfs publisher -type DagPutter struct { - PassedNode node.Node - ErrToReturn error -} - -// DagPut returns the pre-loaded CIDs or error -func (dp *DagPutter) DagPut(n node.Node) (string, error) { - dp.PassedNode = n - return n.Cid().String(), dp.ErrToReturn -} - -// MappedDagPutter is a mock for testing the ipfs publisher -type MappedDagPutter struct { - CIDsToReturn map[common.Hash]string - PassedNode node.Node - ErrToReturn error -} - -// DagPut returns the pre-loaded CIDs or error -func (mdp *MappedDagPutter) DagPut(n node.Node) (string, error) { - if mdp.CIDsToReturn == nil { - return "", errors.New("mapped dag putter needs to be initialized with a map of cids to return") - } - hash := common.BytesToHash(n.RawData()) - return mdp.CIDsToReturn[hash], nil -} diff --git a/pkg/resync/config.go b/pkg/resync/config.go index 1451b2a6..c68ad7f6 100644 --- a/pkg/resync/config.go +++ b/pkg/resync/config.go @@ -51,8 +51,6 @@ type Config struct { // DB info DB *postgres.DB DBConfig config.Database - IPFSPath string - IPFSMode shared.IPFSMode HTTPClient interface{} // Note this client is expected to support the retrieval of the specified data type(s) NodeInfo node.Node // Info for the associated node @@ -91,16 +89,6 @@ func NewConfig() (*Config, error) { c.ClearOldCache = viper.GetBool("resync.clearOldCache") c.ResetValidation = viper.GetBool("resync.resetValidation") - c.IPFSMode, err = shared.GetIPFSMode() - if err != nil { - return nil, err - } - if c.IPFSMode == shared.LocalInterface || c.IPFSMode == shared.RemoteClient { - c.IPFSPath, err = shared.GetIPFSPath() - if err != nil { - return nil, err - } - } resyncType := viper.GetString("resync.type") c.ResyncType, err = shared.GenerateDataTypeFromString(resyncType) if err != nil { diff --git a/pkg/resync/service.go b/pkg/resync/service.go index 08d22305..1dc39753 100644 --- a/pkg/resync/service.go +++ b/pkg/resync/service.go @@ -35,8 +35,6 @@ type Service struct { Converter shared.PayloadConverter // Interface for publishing the IPLD payloads to IPFS Publisher shared.IPLDPublisher - // Interface for indexing the CIDs of the published IPLDs in Postgres - Indexer shared.CIDIndexer // Interface for searching and retrieving CIDs from Postgres index Retriever shared.CIDRetriever // Interface for fetching payloads over at historical blocks; over http @@ -63,11 +61,7 @@ type Service struct { // NewResyncService creates and returns a resync service from the provided settings func NewResyncService(settings *Config) (Resync, error) { - publisher, err := builders.NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.DB, settings.IPFSMode) - if err != nil { - return nil, err - } - indexer, err := builders.NewCIDIndexer(settings.Chain, settings.DB, settings.IPFSMode) + publisher, err := builders.NewIPLDPublisher(settings.Chain, settings.DB) if err != nil { return nil, err } @@ -96,7 +90,6 @@ func NewResyncService(settings *Config) (Resync, error) { batchNumber = shared.DefaultMaxBatchNumber } return &Service{ - Indexer: indexer, Converter: converter, Publisher: publisher, Retriever: retriever, @@ -168,13 +161,9 @@ func (rs *Service) resync(id int, heightChan chan []uint64) { if err != nil { logrus.Errorf("%s resync worker %d converter error: %s", rs.chain.String(), id, err.Error()) } - cidPayload, err := rs.Publisher.Publish(ipldPayload) - if err != nil { + if err := rs.Publisher.Publish(ipldPayload); err != nil { logrus.Errorf("%s resync worker %d publisher error: %s", rs.chain.String(), id, err.Error()) } - if err := rs.Indexer.Index(cidPayload); err != nil { - logrus.Errorf("%s resync worker %d indexer error: %s", rs.chain.String(), id, err.Error()) - } } logrus.Infof("%s resync worker %d finished section from %d to %d", rs.chain.String(), id, heights[0], heights[len(heights)-1]) case <-rs.quitChan: diff --git a/pkg/shared/env.go b/pkg/shared/env.go index b5a53e77..b11c726e 100644 --- a/pkg/shared/env.go +++ b/pkg/shared/env.go @@ -17,9 +17,6 @@ package shared import ( - "os" - "path/filepath" - "github.com/ethereum/go-ethereum/rpc" "github.com/btcsuite/btcd/rpcclient" @@ -29,8 +26,6 @@ import ( // Env variables const ( - IPFS_PATH = "IPFS_PATH" - IPFS_MODE = "IPFS_MODE" HTTP_TIMEOUT = "HTTP_TIMEOUT" ETH_WS_PATH = "ETH_WS_PATH" @@ -73,30 +68,6 @@ func GetEthNodeAndClient(path string) (node.Node, *rpc.Client, error) { }, rpcClient, nil } -// GetIPFSPath returns the ipfs path from the config or env variable -func GetIPFSPath() (string, error) { - viper.BindEnv("ipfs.path", IPFS_PATH) - ipfsPath := viper.GetString("ipfs.path") - if ipfsPath == "" { - home, err := os.UserHomeDir() - if err != nil { - return "", err - } - ipfsPath = filepath.Join(home, ".ipfs") - } - return ipfsPath, nil -} - -// GetIPFSMode returns the ipfs mode of operation from the config or env variable -func GetIPFSMode() (IPFSMode, error) { - viper.BindEnv("ipfs.mode", IPFS_MODE) - ipfsMode := viper.GetString("ipfs.mode") - if ipfsMode == "" { - return DirectPostgres, nil - } - return NewIPFSMode(ipfsMode) -} - // GetBtcNodeAndClient returns btc node info from path url func GetBtcNodeAndClient(path string) (node.Node, *rpcclient.ConnConfig) { viper.BindEnv("bitcoin.nodeID", BTC_NODE_ID) diff --git a/pkg/shared/intefaces.go b/pkg/shared/intefaces.go index 3c2e0110..03839b4d 100644 --- a/pkg/shared/intefaces.go +++ b/pkg/shared/intefaces.go @@ -37,12 +37,7 @@ type PayloadConverter interface { // IPLDPublisher publishes IPLD payloads and returns a CID payload for indexing type IPLDPublisher interface { - Publish(payload ConvertedData) (CIDsForIndexing, error) -} - -// CIDIndexer indexes a CID payload in Postgres -type CIDIndexer interface { - Index(cids CIDsForIndexing) error + Publish(payload ConvertedData) error } // ResponseFilterer applies a filter to an IPLD payload to return a subscription response packet diff --git a/pkg/shared/ipfs_mode.go b/pkg/shared/ipfs_mode.go deleted file mode 100644 index 14d7d856..00000000 --- a/pkg/shared/ipfs_mode.go +++ /dev/null @@ -1,58 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package shared - -import ( - "errors" - "strings" -) - -// IPFSMode enum for specifying how we want to interface and publish objects to IPFS -type IPFSMode int - -const ( - Unknown IPFSMode = iota - LocalInterface - RemoteClient - DirectPostgres -) - -func (c IPFSMode) String() string { - switch c { - case LocalInterface: - return "Local" - case RemoteClient: - return "Remote" - case DirectPostgres: - return "Postgres" - default: - return "" - } -} - -func NewIPFSMode(name string) (IPFSMode, error) { - switch strings.ToLower(name) { - case "local", "interface": - return LocalInterface, nil - case "remote", "client": - return RemoteClient, errors.New("remote IPFS client mode is not currently supported") - case "postgres", "direct": - return DirectPostgres, nil - default: - return Unknown, errors.New("unrecognized name for ipfs mode") - } -} diff --git a/pkg/watch/config.go b/pkg/watch/config.go index 267c3887..2f8bac20 100644 --- a/pkg/watch/config.go +++ b/pkg/watch/config.go @@ -53,8 +53,6 @@ const ( // Config struct type Config struct { Chain shared.ChainType - IPFSPath string - IPFSMode shared.IPFSMode DBConfig config.Database // Server fields Serve bool @@ -96,19 +94,7 @@ func NewConfig() (*Config, error) { return nil, err } - c.IPFSMode, err = shared.GetIPFSMode() - if err != nil { - return nil, err - } - if c.IPFSMode == shared.LocalInterface || c.IPFSMode == shared.RemoteClient { - c.IPFSPath, err = shared.GetIPFSPath() - if err != nil { - return nil, err - } - } - c.DBConfig.Init() - c.Sync = viper.GetBool("watcher.sync") if c.Sync { workers := viper.GetInt("watcher.workers") diff --git a/pkg/watch/service.go b/pkg/watch/service.go index 43a493b0..74cb7422 100644 --- a/pkg/watch/service.go +++ b/pkg/watch/service.go @@ -66,10 +66,8 @@ type Service struct { Streamer shared.PayloadStreamer // Interface for converting raw payloads into IPLD object payloads Converter shared.PayloadConverter - // Interface for publishing the IPLD payloads to IPFS + // Interface for publishing and indexing the PG-IPLD payloads Publisher shared.IPLDPublisher - // Interface for indexing the CIDs of the published IPLDs in Postgres - Indexer shared.CIDIndexer // Interface for filtering and serving data according to subscribed clients according to their specification Filterer shared.ResponseFilterer // Interface for fetching IPLD objects from IPFS @@ -86,12 +84,10 @@ type Service struct { SubscriptionTypes map[common.Hash]shared.SubscriptionSettings // Info for the Geth node that this watcher is working with NodeInfo *node.Node - // Number of publishAndIndex workers + // Number of publish workers WorkerPoolSize int // chain type for this service chain shared.ChainType - // Path to ipfs data dir - ipfsPath string // Underlying db db *postgres.DB // wg for syncing serve processes @@ -112,11 +108,7 @@ func NewWatcher(settings *Config) (Watcher, error) { if err != nil { return nil, err } - sn.Publisher, err = builders.NewIPLDPublisher(settings.Chain, settings.IPFSPath, settings.SyncDBConn, settings.IPFSMode) - if err != nil { - return nil, err - } - sn.Indexer, err = builders.NewCIDIndexer(settings.Chain, settings.SyncDBConn, settings.IPFSMode) + sn.Publisher, err = builders.NewIPLDPublisher(settings.Chain, settings.SyncDBConn) if err != nil { return nil, err } @@ -131,7 +123,7 @@ func NewWatcher(settings *Config) (Watcher, error) { if err != nil { return nil, err } - sn.IPLDFetcher, err = builders.NewIPLDFetcher(settings.Chain, settings.IPFSPath, settings.ServeDBConn, settings.IPFSMode) + sn.IPLDFetcher, err = builders.NewIPLDFetcher(settings.Chain, settings.ServeDBConn) if err != nil { return nil, err } @@ -142,7 +134,6 @@ func NewWatcher(settings *Config) (Watcher, error) { sn.SubscriptionTypes = make(map[common.Hash]shared.SubscriptionSettings) sn.WorkerPoolSize = settings.Workers sn.NodeInfo = &settings.NodeInfo - sn.ipfsPath = settings.IPFSPath sn.chain = settings.Chain return sn, nil } @@ -190,7 +181,7 @@ func (sap *Service) APIs() []rpc.API { } // Sync streams incoming raw chain data and converts it for further processing -// It forwards the converted data to the publishAndIndex process(es) it spins up +// It forwards the converted data to the publish process(es) it spins up // If forwards the converted data to a ScreenAndServe process if it there is one listening on the passed screenAndServePayload channel // This continues on no matter if or how many subscribers there are func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared.ConvertedData) error { @@ -198,11 +189,11 @@ func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared if err != nil { return err } - // spin up publishAndIndex worker goroutines - publishAndIndexPayload := make(chan shared.ConvertedData, PayloadChanBufferSize) + // spin up publish worker goroutines + publishPayload := make(chan shared.ConvertedData, PayloadChanBufferSize) for i := 1; i <= sap.WorkerPoolSize; i++ { - go sap.publishAndIndex(wg, i, publishAndIndexPayload) - log.Debugf("%s publishAndIndex worker %d successfully spun up", sap.chain.String(), i) + go sap.publish(wg, i, publishPayload) + log.Debugf("%s publish worker %d successfully spun up", sap.chain.String(), i) } go func() { wg.Add(1) @@ -221,13 +212,13 @@ func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared case screenAndServePayload <- ipldPayload: default: } - // Forward the payload to the publishAndIndex workers + // Forward the payload to the publish workers // this channel acts as a ring buffer select { - case publishAndIndexPayload <- ipldPayload: + case publishPayload <- ipldPayload: default: - <-publishAndIndexPayload - publishAndIndexPayload <- ipldPayload + <-publishPayload + publishPayload <- ipldPayload } case err := <-sub.Err(): log.Errorf("watcher subscription error for chain %s: %v", sap.chain.String(), err) @@ -241,26 +232,21 @@ func (sap *Service) Sync(wg *sync.WaitGroup, screenAndServePayload chan<- shared return nil } -// publishAndIndex is spun up by SyncAndConvert and receives converted chain data from that process +// publish is spun up by SyncAndConvert and receives converted chain data from that process // it publishes this data to IPFS and indexes their CIDs with useful metadata in Postgres -func (sap *Service) publishAndIndex(wg *sync.WaitGroup, id int, publishAndIndexPayload <-chan shared.ConvertedData) { +func (sap *Service) publish(wg *sync.WaitGroup, id int, publishPayload <-chan shared.ConvertedData) { wg.Add(1) defer wg.Done() for { select { - case payload := <-publishAndIndexPayload: - log.Debugf("%s watcher publishAndIndex worker %d publishing data streamed at head height %d", sap.chain.String(), id, payload.Height()) - cidPayload, err := sap.Publisher.Publish(payload) - if err != nil { - log.Errorf("%s watcher publishAndIndex worker %d publishing error: %v", sap.chain.String(), id, err) + case payload := <-publishPayload: + log.Debugf("%s watcher sync worker %d publishing and indexing data streamed at head height %d", sap.chain.String(), id, payload.Height()) + if err := sap.Publisher.Publish(payload); err != nil { + log.Errorf("%s watcher publish worker %d publishing error: %v", sap.chain.String(), id, err) continue } - log.Debugf("%s watcher publishAndIndex worker %d indexing data streamed at head height %d", sap.chain.String(), id, payload.Height()) - if err := sap.Indexer.Index(cidPayload); err != nil { - log.Errorf("%s watcher publishAndIndex worker %d indexing error: %v", sap.chain.String(), id, err) - } case <-sap.QuitChan: - log.Infof("%s watcher publishAndIndex worker %d shutting down", sap.chain.String(), id) + log.Infof("%s watcher publish worker %d shutting down", sap.chain.String(), id) return } } diff --git a/pkg/watch/service_test.go b/pkg/watch/service_test.go index b94b5a8a..d2902c88 100644 --- a/pkg/watch/service_test.go +++ b/pkg/watch/service_test.go @@ -36,9 +36,6 @@ var _ = Describe("Service", func() { wg := new(sync.WaitGroup) payloadChan := make(chan shared.RawChainData, 1) quitChan := make(chan bool, 1) - mockCidIndexer := &mocks.CIDIndexer{ - ReturnErr: nil, - } mockPublisher := &mocks.IPLDPublisher{ ReturnCIDPayload: mocks.MockCIDPayload, ReturnErr: nil, @@ -55,7 +52,6 @@ var _ = Describe("Service", func() { ReturnErr: nil, } processor := &watch.Service{ - Indexer: mockCidIndexer, Publisher: mockPublisher, Streamer: mockStreamer, Converter: mockConverter, @@ -69,8 +65,6 @@ var _ = Describe("Service", func() { close(quitChan) wg.Wait() Expect(mockConverter.PassedStatediffPayload).To(Equal(mocks.MockStateDiffPayload)) - Expect(len(mockCidIndexer.PassedCIDPayload)).To(Equal(1)) - Expect(mockCidIndexer.PassedCIDPayload[0]).To(Equal(mocks.MockCIDPayload)) Expect(mockPublisher.PassedIPLDPayload).To(Equal(mocks.MockConvertedPayload)) Expect(mockStreamer.PassedPayloadChan).To(Equal(payloadChan)) })