diff --git a/pkg/btc/cid_retriever.go b/pkg/btc/cid_retriever.go index b52836e2..78acd489 100644 --- a/pkg/btc/cid_retriever.go +++ b/pkg/btc/cid_retriever.go @@ -130,7 +130,7 @@ func (bcr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID results := make([]TxModel, 0) id := 1 pgStr := fmt.Sprintf(`SELECT transaction_cids.id, transaction_cids.header_id, - transaction_cids.tx_hash, transaction_cids.cid, + transaction_cids.tx_hash, transaction_cids.cid, transaction_cids.mh_key, transaction_cids.segwit, transaction_cids.witness_hash, transaction_cids.index FROM btc.transaction_cids, btc.header_cids, btc.tx_inputs, btc.tx_outputs WHERE transaction_cids.header_id = header_cids.id diff --git a/pkg/btc/cleaner.go b/pkg/btc/cleaner.go index a9f83b2d..0b9d0167 100644 --- a/pkg/btc/cleaner.go +++ b/pkg/btc/cleaner.go @@ -160,7 +160,7 @@ func (c *Cleaner) cleanFull(tx *sqlx.Tx, rng [2]uint64) error { func (c *Cleaner) cleanTransactionIPLDs(tx *sqlx.Tx, rng [2]uint64) error { pgStr := `DELETE FROM public.blocks A USING btc.transaction_cids B, btc.header_cids C - WHERE A.key = B.cid + WHERE A.key = B.mh_key AND B.header_id = C.id AND C.block_number BETWEEN $1 AND $2` _, err := tx.Exec(pgStr, rng[0], rng[1]) @@ -179,7 +179,7 @@ func (c *Cleaner) cleanTransactionMetaData(tx *sqlx.Tx, rng [2]uint64) error { func (c *Cleaner) cleanHeaderIPLDs(tx *sqlx.Tx, rng [2]uint64) error { pgStr := `DELETE FROM public.blocks A USING btc.header_cids B - WHERE A.key = B.cid + WHERE A.key = B.mh_key AND B.block_number BETWEEN $1 AND $2` _, err := tx.Exec(pgStr, rng[0], rng[1]) return err diff --git a/pkg/btc/cleaner_test.go b/pkg/btc/cleaner_test.go index d468367e..c8e43320 100644 --- a/pkg/btc/cleaner_test.go +++ b/pkg/btc/cleaner_test.go @@ -33,25 +33,30 @@ var ( // header variables blockHash1 = crypto.Keccak256Hash([]byte{00, 02}) blocKNumber1 = big.NewInt(0) - headerCid1 = "mockHeader1CID" + headerCid1 = shared.TestCID([]byte("mockHeader1CID")) + headerMhKey1 = shared.MultihashKeyFromCID(headerCid1) parentHash = crypto.Keccak256Hash([]byte{00, 01}) headerModel1 = btc.HeaderModel{ BlockHash: blockHash1.String(), BlockNumber: blocKNumber1.String(), ParentHash: parentHash.String(), - CID: headerCid1, + CID: headerCid1.String(), + MhKey: headerMhKey1, } // tx variables - tx1CID = "mockTx1CID" - tx2CID = "mockTx2CID" + tx1CID = shared.TestCID([]byte("mockTx1CID")) + tx1MhKey = shared.MultihashKeyFromCID(tx1CID) + tx2CID = shared.TestCID([]byte("mockTx2CID")) + tx2MhKey = shared.MultihashKeyFromCID(tx2CID) tx1Hash = crypto.Keccak256Hash([]byte{01, 01}) tx2Hash = crypto.Keccak256Hash([]byte{01, 02}) opHash = crypto.Keccak256Hash([]byte{02, 01}) txModels1 = []btc.TxModelWithInsAndOuts{ { Index: 0, - CID: tx1CID, + CID: tx1CID.String(), + MhKey: tx1MhKey, TxHash: tx1Hash.String(), SegWit: true, TxInputs: []btc.TxInput{ @@ -75,7 +80,8 @@ var ( }, { Index: 1, - CID: tx2CID, + CID: tx2CID.String(), + MhKey: tx2MhKey, TxHash: tx2Hash.String(), SegWit: true, }, @@ -89,21 +95,25 @@ var ( // header variables blockHash2 = crypto.Keccak256Hash([]byte{00, 03}) blocKNumber2 = big.NewInt(1) - headerCid2 = "mockHeaderCID2" + headerCid2 = shared.TestCID([]byte("mockHeaderCID2")) + headerMhKey2 = shared.MultihashKeyFromCID(headerCid2) headerModel2 = btc.HeaderModel{ BlockNumber: blocKNumber2.String(), BlockHash: blockHash2.String(), ParentHash: blockHash1.String(), - CID: headerCid2, + CID: headerCid2.String(), + MhKey: headerMhKey2, } // tx variables - tx3CID = "mockTx3CID" + tx3CID = shared.TestCID([]byte("mockTx3CID")) + tx3MhKey = shared.MultihashKeyFromCID(tx3CID) tx3Hash = crypto.Keccak256Hash([]byte{01, 03}) txModels2 = []btc.TxModelWithInsAndOuts{ { Index: 0, - CID: tx3CID, + CID: tx3CID.String(), + MhKey: tx3MhKey, TxHash: tx3Hash.String(), SegWit: true, }, @@ -112,13 +122,13 @@ var ( HeaderCID: headerModel2, TransactionCIDs: txModels2, } - rngs = [][2]uint64{{0, 1}} - cids = []string{ - headerCid1, - headerCid2, - tx1CID, - tx2CID, - tx3CID, + rngs = [][2]uint64{{0, 1}} + mhKeys = []string{ + headerMhKey1, + headerMhKey2, + tx1MhKey, + tx2MhKey, + tx3MhKey, } mockData = []byte{'\x01'} ) @@ -139,16 +149,15 @@ var _ = Describe("Cleaner", func() { Describe("Clean", func() { BeforeEach(func() { + for _, key := range mhKeys { + _, err := db.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2)`, key, mockData) + Expect(err).ToNot(HaveOccurred()) + } err := repo.Index(mockCIDPayload1) Expect(err).ToNot(HaveOccurred()) err = repo.Index(mockCIDPayload2) Expect(err).ToNot(HaveOccurred()) - for _, cid := range cids { - _, err = db.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2)`, cid, mockData) - Expect(err).ToNot(HaveOccurred()) - } - tx, err := db.Beginx() Expect(err).ToNot(HaveOccurred()) var startingIPFSBlocksCount int @@ -286,6 +295,11 @@ var _ = Describe("Cleaner", func() { Describe("ResetValidation", func() { BeforeEach(func() { + for _, key := range mhKeys { + _, err := db.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2)`, key, mockData) + Expect(err).ToNot(HaveOccurred()) + } + err := repo.Index(mockCIDPayload1) Expect(err).ToNot(HaveOccurred()) err = repo.Index(mockCIDPayload2) diff --git a/pkg/btc/indexer.go b/pkg/btc/indexer.go index 4f6f1f5d..bd9af179 100644 --- a/pkg/btc/indexer.go +++ b/pkg/btc/indexer.go @@ -74,11 +74,11 @@ func (in *CIDIndexer) Index(cids shared.CIDsForIndexing) error { func (in *CIDIndexer) indexHeaderCID(tx *sqlx.Tx, header HeaderModel) (int64, error) { var headerID int64 - err := tx.QueryRowx(`INSERT INTO btc.header_cids (block_number, block_hash, parent_hash, cid, timestamp, bits, node_id, times_validated) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) - ON CONFLICT (block_number, block_hash) DO UPDATE SET (parent_hash, cid, timestamp, bits, node_id, times_validated) = ($3, $4, $5, $6, $7, btc.header_cids.times_validated + 1) + err := tx.QueryRowx(`INSERT INTO btc.header_cids (block_number, block_hash, parent_hash, cid, timestamp, bits, node_id, mh_key, times_validated) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT (block_number, block_hash) DO UPDATE SET (parent_hash, cid, timestamp, bits, node_id, mh_key, times_validated) = ($3, $4, $5, $6, $7, $8, btc.header_cids.times_validated + 1) RETURNING id`, - header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.Timestamp, header.Bits, in.db.NodeID, 1).Scan(&headerID) + header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.Timestamp, header.Bits, in.db.NodeID, header.MhKey, 1).Scan(&headerID) return headerID, err } @@ -107,11 +107,11 @@ func (in *CIDIndexer) indexTransactionCIDs(tx *sqlx.Tx, transactions []TxModelWi func (in *CIDIndexer) indexTransactionCID(tx *sqlx.Tx, transaction TxModelWithInsAndOuts, headerID int64) (int64, error) { var txID int64 - err := tx.QueryRowx(`INSERT INTO btc.transaction_cids (header_id, tx_hash, index, cid, segwit, witness_hash) - VALUES ($1, $2, $3, $4, $5, $6) - ON CONFLICT (tx_hash) DO UPDATE SET (header_id, index, cid, segwit, witness_hash) = ($1, $3, $4, $5, $6) + err := tx.QueryRowx(`INSERT INTO btc.transaction_cids (header_id, tx_hash, index, cid, segwit, witness_hash, mh_key) + VALUES ($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT (tx_hash) DO UPDATE SET (header_id, index, cid, segwit, witness_hash, mh_key) = ($1, $3, $4, $5, $6, $7) RETURNING id`, - headerID, transaction.TxHash, transaction.Index, transaction.CID, transaction.SegWit, transaction.WitnessHash).Scan(&txID) + headerID, transaction.TxHash, transaction.Index, transaction.CID, transaction.SegWit, transaction.WitnessHash, transaction.MhKey).Scan(&txID) return txID, err } diff --git a/pkg/btc/indexer_test.go b/pkg/btc/indexer_test.go index 5484150f..8b4dea11 100644 --- a/pkg/btc/indexer_test.go +++ b/pkg/btc/indexer_test.go @@ -28,14 +28,20 @@ import ( var _ = Describe("Indexer", func() { var ( - db *postgres.DB - err error - repo *btc.CIDIndexer + db *postgres.DB + err error + repo *btc.CIDIndexer + mockData = []byte{1, 2, 3} ) BeforeEach(func() { db, err = shared.SetupDB() Expect(err).ToNot(HaveOccurred()) repo = btc.NewCIDIndexer(db) + // need entries in the public.blocks with the mhkeys or the FK constraint will fail + shared.PublishMockIPLD(db, mocks.MockHeaderMhKey, mockData) + shared.PublishMockIPLD(db, mocks.MockTrxMhKey1, mockData) + shared.PublishMockIPLD(db, mocks.MockTrxMhKey2, mockData) + shared.PublishMockIPLD(db, mocks.MockTrxMhKey3, mockData) }) AfterEach(func() { btc.TearDownDB(db) @@ -43,6 +49,7 @@ var _ = Describe("Indexer", func() { Describe("Index", func() { It("Indexes CIDs and related metadata into vulcanizedb", func() { + err = repo.Index(&mocks.MockCIDPayload) Expect(err).ToNot(HaveOccurred()) pgStr := `SELECT * FROM btc.header_cids @@ -72,13 +79,13 @@ var _ = Describe("Indexer", func() { Expect(tx.WitnessHash).To(Equal("")) switch tx.Index { case 0: - Expect(tx.CID).To(Equal("mockTrxCID1")) + Expect(tx.CID).To(Equal(mocks.MockTrxCID1.String())) Expect(tx.TxHash).To(Equal(mocks.MockBlock.Transactions[0].TxHash().String())) case 1: - Expect(tx.CID).To(Equal("mockTrxCID2")) + Expect(tx.CID).To(Equal(mocks.MockTrxCID2.String())) Expect(tx.TxHash).To(Equal(mocks.MockBlock.Transactions[1].TxHash().String())) case 2: - Expect(tx.CID).To(Equal("mockTrxCID3")) + Expect(tx.CID).To(Equal(mocks.MockTrxCID3.String())) Expect(tx.TxHash).To(Equal(mocks.MockBlock.Transactions[2].TxHash().String())) } } diff --git a/pkg/btc/ipld_pg_fetcher.go b/pkg/btc/ipld_pg_fetcher.go index af9ab34a..cd673c75 100644 --- a/pkg/btc/ipld_pg_fetcher.go +++ b/pkg/btc/ipld_pg_fetcher.go @@ -79,7 +79,7 @@ func (f *IPLDPGFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) // FetchHeaders fetches headers func (f *IPLDPGFetcher) FetchHeader(tx *sqlx.Tx, c HeaderModel) (ipfs.BlockModel, error) { log.Debug("fetching header ipld") - headerBytes, err := shared.FetchIPLD(tx, c.CID) + headerBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey) if err != nil { return ipfs.BlockModel{}, err } @@ -94,7 +94,7 @@ func (f *IPLDPGFetcher) FetchTrxs(tx *sqlx.Tx, cids []TxModel) ([]ipfs.BlockMode log.Debug("fetching transaction iplds") trxIPLDs := make([]ipfs.BlockModel, len(cids)) for i, c := range cids { - trxBytes, err := shared.FetchIPLD(tx, c.CID) + trxBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey) if err != nil { return nil, err } diff --git a/pkg/btc/mocks/test_data.go b/pkg/btc/mocks/test_data.go index 663a8b35..7fc347ea 100644 --- a/pkg/btc/mocks/test_data.go +++ b/pkg/btc/mocks/test_data.go @@ -25,11 +25,19 @@ import ( "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcutil" - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/btc" + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" ) var ( + MockHeaderCID = shared.TestCID([]byte("MockHeaderCID")) + MockTrxCID1 = shared.TestCID([]byte("MockTrxCID1")) + MockTrxCID2 = shared.TestCID([]byte("MockTrxCID2")) + MockTrxCID3 = shared.TestCID([]byte("MockTrxCID3")) + MockHeaderMhKey = shared.MultihashKeyFromCID(MockHeaderCID) + MockTrxMhKey1 = shared.MultihashKeyFromCID(MockTrxCID1) + MockTrxMhKey2 = shared.MultihashKeyFromCID(MockTrxCID2) + MockTrxMhKey3 = shared.MultihashKeyFromCID(MockTrxCID3) MockBlockHeight int64 = 1337 MockBlock = wire.MsgBlock{ Header: wire.BlockHeader{ @@ -479,7 +487,8 @@ var ( } MockTxsMetaDataPostPublish = []btc.TxModelWithInsAndOuts{ { - CID: "mockTrxCID1", + CID: MockTrxCID1.String(), + MhKey: MockTrxMhKey1, TxHash: MockBlock.Transactions[0].TxHash().String(), Index: 0, SegWit: MockBlock.Transactions[0].HasWitness(), @@ -517,7 +526,8 @@ var ( }, }, { - CID: "mockTrxCID2", + CID: MockTrxCID2.String(), + MhKey: MockTrxMhKey2, TxHash: MockBlock.Transactions[1].TxHash().String(), Index: 1, SegWit: MockBlock.Transactions[1].HasWitness(), @@ -594,7 +604,8 @@ var ( }, }, { - CID: "mockTrxCID3", + CID: MockTrxCID3.String(), + MhKey: MockTrxMhKey3, TxHash: MockBlock.Transactions[2].TxHash().String(), Index: 2, SegWit: MockBlock.Transactions[2].HasWitness(), @@ -671,7 +682,8 @@ var ( }, } MockHeaderMetaData = btc.HeaderModel{ - CID: "mockHeaderCID", + CID: MockHeaderCID.String(), + MhKey: MockHeaderMhKey, ParentHash: MockBlock.Header.PrevBlock.String(), BlockNumber: strconv.Itoa(int(MockBlockHeight)), BlockHash: MockBlock.Header.BlockHash().String(), @@ -686,53 +698,6 @@ var ( HeaderCID: MockHeaderMetaData, TransactionCIDs: MockTxsMetaDataPostPublish, } - DummyCIDPayloadForFKReference = btc.CIDPayload{ - HeaderCID: btc.HeaderModel{ - CID: "dummyHeader", - ParentHash: "", - BlockHash: "", - BlockNumber: "1336", - Bits: 1, - Timestamp: 1000000000, - }, - TransactionCIDs: []btc.TxModelWithInsAndOuts{ - { - TxHash: "87a157f3fd88ac7907c05fc55e271dc4acdc5605d187d646604ca8c0e9382e03", - CID: "dummyTx1", - Index: 0, - TxOutputs: []btc.TxOutput{ - { - Index: 0, - RequiredSigs: 0, - Value: 0, - PkScript: []byte{}, - ScriptClass: 0, - }, - }, - }, - { - TxHash: "cf4e2978d0611ce46592e02d7e7daf8627a316ab69759a9f3df109a7f2bf3ec3", - CID: "dummyTx2", - Index: 1, - TxOutputs: []btc.TxOutput{ - { - Index: 0, - RequiredSigs: 0, - Value: 0, - PkScript: []byte{}, - ScriptClass: 0, - }, - { - Index: 1, - RequiredSigs: 0, - Value: 0, - PkScript: []byte{}, - ScriptClass: 0, - }, - }, - }, - }, - } ) func stringSliceFromAddresses(addrs []btcutil.Address) []string { diff --git a/pkg/btc/models.go b/pkg/btc/models.go index 1317a1bc..c2bbb81c 100644 --- a/pkg/btc/models.go +++ b/pkg/btc/models.go @@ -25,6 +25,7 @@ type HeaderModel struct { BlockHash string `db:"block_hash"` ParentHash string `db:"parent_hash"` CID string `db:"cid"` + MhKey string `db:"mh_key"` Timestamp int64 `db:"timestamp"` Bits uint32 `db:"bits"` NodeID int64 `db:"node_id"` @@ -38,6 +39,7 @@ type TxModel struct { Index int64 `db:"index"` TxHash string `db:"tx_hash"` CID string `db:"cid"` + MhKey string `db:"mh_key"` SegWit bool `db:"segwit"` WitnessHash string `db:"witness_hash"` } @@ -49,6 +51,7 @@ type TxModelWithInsAndOuts struct { Index int64 `db:"index"` TxHash string `db:"tx_hash"` CID string `db:"cid"` + MhKey string `db:"mh_key"` SegWit bool `db:"segwit"` WitnessHash string `db:"witness_hash"` TxInputs []TxInput diff --git a/pkg/btc/publish_and_indexer.go b/pkg/btc/publish_and_indexer.go index 799434ae..dd5a72cb 100644 --- a/pkg/btc/publish_and_indexer.go +++ b/pkg/btc/publish_and_indexer.go @@ -80,6 +80,7 @@ func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (share } header := HeaderModel{ CID: headerNode.Cid().String(), + MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), ParentHash: ipldPayload.Header.PrevBlock.String(), BlockNumber: strconv.Itoa(int(ipldPayload.BlockPayload.BlockHeight)), BlockHash: ipldPayload.Header.BlockHash().String(), @@ -98,6 +99,7 @@ func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (share } txModel := ipldPayload.TxMetaData[i] txModel.CID = txNode.Cid().String() + txModel.MhKey = shared.MultihashKeyFromCID(txNode.Cid()) txID, err := pub.indexer.indexTransactionCID(tx, txModel, headerID) if err != nil { return nil, err diff --git a/pkg/btc/publisher.go b/pkg/btc/publisher.go index 992f81ae..5c6c4d07 100644 --- a/pkg/btc/publisher.go +++ b/pkg/btc/publisher.go @@ -62,8 +62,10 @@ func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForI if err != nil { return nil, err } + mhKey, _ := shared.MultihashKeyFromCIDString(headerCid) header := HeaderModel{ CID: headerCid, + MhKey: mhKey, ParentHash: ipldPayload.Header.PrevBlock.String(), BlockNumber: strconv.Itoa(int(ipldPayload.BlockPayload.BlockHeight)), BlockHash: ipldPayload.Header.BlockHash().String(), @@ -97,8 +99,10 @@ func (pub *IPLDPublisher) publishTransactions(transactions []*ipld.BtcTx, txTrie if err != nil { return nil, err } + mhKey, _ := shared.MultihashKeyFromCIDString(cid) txCids[i] = TxModelWithInsAndOuts{ CID: cid, + MhKey: mhKey, Index: trxMeta[i].Index, TxHash: trxMeta[i].TxHash, SegWit: trxMeta[i].SegWit, diff --git a/pkg/btc/publisher_test.go b/pkg/btc/publisher_test.go index 2fdace5e..ef809f3a 100644 --- a/pkg/btc/publisher_test.go +++ b/pkg/btc/publisher_test.go @@ -57,12 +57,12 @@ var _ = Describe("Publisher", func() { Expect(err).ToNot(HaveOccurred()) tx3Bytes := by.Bytes() mockHeaderDagPutter.CIDsToReturn = map[common.Hash]string{ - common.BytesToHash(headerBytes): "mockHeaderCID", + common.BytesToHash(headerBytes): mocks.MockHeaderCID.String(), } mockTrxDagPutter.CIDsToReturn = map[common.Hash]string{ - common.BytesToHash(tx1Bytes): "mockTrxCID1", - common.BytesToHash(tx2Bytes): "mockTrxCID2", - common.BytesToHash(tx3Bytes): "mockTrxCID3", + common.BytesToHash(tx1Bytes): mocks.MockTrxCID1.String(), + common.BytesToHash(tx2Bytes): mocks.MockTrxCID2.String(), + common.BytesToHash(tx3Bytes): mocks.MockTrxCID3.String(), } publisher := btc.IPLDPublisher{ HeaderPutter: mockHeaderDagPutter, diff --git a/pkg/eth/api_test.go b/pkg/eth/api_test.go index fb7d3d53..edf4ec40 100644 --- a/pkg/eth/api_test.go +++ b/pkg/eth/api_test.go @@ -124,7 +124,7 @@ var _ = Describe("API", func() { }) Describe("GetTransactionByHash", func() { - It("Retrieves the head block number", func() { + It("Retrieves a transaction by hash", func() { hash := mocks.MockTransactions[0].Hash() tx, err := api.GetTransactionByHash(context.Background(), hash) Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/eth/backend.go b/pkg/eth/backend.go index 2128a47d..81ad8382 100644 --- a/pkg/eth/backend.go +++ b/pkg/eth/backend.go @@ -326,12 +326,12 @@ func (b *Backend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Blo // GetTransaction retrieves a tx by hash // It also returns the blockhash, blocknumber, and tx index associated with the transaction func (b *Backend) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) { - pgStr := `SELECT transaction_cids.cid, transaction_cids.index, header_cids.block_hash, header_cids.block_number + pgStr := `SELECT transaction_cids.mh_key, transaction_cids.index, header_cids.block_hash, header_cids.block_number FROM eth.transaction_cids, eth.header_cids WHERE transaction_cids.header_id = header_cids.id AND transaction_cids.tx_hash = $1` var txCIDWithHeaderInfo struct { - CID string `db:"cid"` + MhKey string `db:"mh_key"` Index int64 `db:"index"` BlockHash string `db:"block_hash"` BlockNumber int64 `db:"block_number"` @@ -356,7 +356,7 @@ func (b *Backend) GetTransaction(ctx context.Context, txHash common.Hash) (*type } }() - txIPLD, err := b.Fetcher.FetchTrxs(tx, []TxModel{{CID: txCIDWithHeaderInfo.CID}}) + txIPLD, err := b.Fetcher.FetchTrxs(tx, []TxModel{{MhKey: txCIDWithHeaderInfo.MhKey}}) if err != nil { return nil, common.Hash{}, 0, 0, err } diff --git a/pkg/eth/cid_retriever.go b/pkg/eth/cid_retriever.go index 367b9ead..ad93d3c2 100644 --- a/pkg/eth/cid_retriever.go +++ b/pkg/eth/cid_retriever.go @@ -186,7 +186,7 @@ func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID results := make([]TxModel, 0) id := 1 pgStr := fmt.Sprintf(`SELECT transaction_cids.id, transaction_cids.header_id, - transaction_cids.tx_hash, transaction_cids.cid, + transaction_cids.tx_hash, transaction_cids.cid, transaction_cids.mh_key, transaction_cids.dst, transaction_cids.src, transaction_cids.index FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.id) WHERE header_cids.id = $%d`, id) @@ -210,8 +210,8 @@ func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter ReceiptFilter, headerID int64, trxIds []int64) ([]ReceiptModel, error) { log.Debug("retrieving receipt cids for header id ", headerID) args := make([]interface{}, 0, 4) - pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.contract, - receipt_cids.contract_hash, receipt_cids.topic0s, receipt_cids.topic1s, + pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key, + receipt_cids.contract, receipt_cids.contract_hash, receipt_cids.topic0s, receipt_cids.topic1s, receipt_cids.topic2s, receipt_cids.topic3s, receipt_cids.log_contracts FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids WHERE receipt_cids.tx_id = transaction_cids.id @@ -290,8 +290,8 @@ func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter Receip func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, blockNumber int64, blockHash *common.Hash, trxIds []int64) ([]ReceiptModel, error) { log.Debug("retrieving receipt cids for block ", blockNumber) args := make([]interface{}, 0, 5) - pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.contract, - receipt_cids.contract_hash, receipt_cids.topic0s, receipt_cids.topic1s, + pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key, + receipt_cids.contract, receipt_cids.contract_hash, receipt_cids.topic0s, receipt_cids.topic1s, receipt_cids.topic2s, receipt_cids.topic3s, receipt_cids.log_contracts FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids WHERE receipt_cids.tx_id = transaction_cids.id @@ -387,7 +387,7 @@ func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter StateFilter, log.Debug("retrieving state cids for header id ", headerID) args := make([]interface{}, 0, 2) pgStr := `SELECT state_cids.id, state_cids.header_id, - state_cids.state_leaf_key, state_cids.node_type, state_cids.cid, state_cids.state_path + state_cids.state_leaf_key, state_cids.node_type, state_cids.cid, state_cids.mh_key, state_cids.state_path FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id) WHERE header_cids.id = $1` args = append(args, headerID) @@ -411,8 +411,8 @@ func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter StateFilter, func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageFilter, headerID int64) ([]StorageNodeWithStateKeyModel, error) { log.Debug("retrieving storage cids for header id ", headerID) args := make([]interface{}, 0, 3) - pgStr := `SELECT storage_cids.id, storage_cids.state_id, storage_cids.storage_leaf_key, - storage_cids.node_type, storage_cids.cid, storage_cids.storage_path, state_cids.state_leaf_key + pgStr := `SELECT storage_cids.id, storage_cids.state_id, storage_cids.storage_leaf_key, storage_cids.node_type, + storage_cids.cid, storage_cids.mh_key, storage_cids.storage_path, state_cids.state_leaf_key FROM eth.storage_cids, eth.state_cids, eth.header_cids WHERE storage_cids.state_id = state_cids.id AND state_cids.header_id = header_cids.id @@ -607,8 +607,8 @@ func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ( // RetrieveReceiptCIDsByTxIDs retrieves receipt CIDs by their associated tx IDs func (ecr *CIDRetriever) RetrieveReceiptCIDsByTxIDs(tx *sqlx.Tx, txIDs []int64) ([]ReceiptModel, error) { log.Debugf("retrieving receipt cids for tx ids %v", txIDs) - pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.contract, - receipt_cids.contract_hash, receipt_cids.topic0s, receipt_cids.topic1s, + pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key, + receipt_cids.contract, receipt_cids.contract_hash, receipt_cids.topic0s, receipt_cids.topic1s, receipt_cids.topic2s, receipt_cids.topic3s, receipt_cids.log_contracts FROM eth.receipt_cids, eth.transaction_cids WHERE tx_id = ANY($1::INTEGER[]) diff --git a/pkg/eth/cid_retriever_test.go b/pkg/eth/cid_retriever_test.go index 5a355bd9..668d763d 100644 --- a/pkg/eth/cid_retriever_test.go +++ b/pkg/eth/cid_retriever_test.go @@ -19,6 +19,8 @@ package eth_test import ( "math/big" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/common" . "github.com/onsi/ginkgo" @@ -209,14 +211,14 @@ var ( var _ = Describe("Retriever", func() { var ( db *postgres.DB - repo *eth2.CIDIndexer + repo *eth2.IPLDPublisherAndIndexer retriever *eth2.CIDRetriever ) BeforeEach(func() { var err error db, err = shared.SetupDB() Expect(err).ToNot(HaveOccurred()) - repo = eth2.NewCIDIndexer(db) + repo = eth2.NewIPLDPublisherAndIndexer(db) retriever = eth2.NewCIDRetriever(db) }) AfterEach(func() { @@ -225,7 +227,7 @@ var _ = Describe("Retriever", func() { Describe("Retrieve", func() { BeforeEach(func() { - err := repo.Index(mocks.MockCIDPayload) + _, err := repo.Publish(mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) }) It("Retrieves all CIDs for the given blocknumber when provided an open filter", func() { @@ -395,6 +397,7 @@ var _ = Describe("Retriever", func() { NodeType: 2, StateKey: common.BytesToHash(mocks.AccountLeafKey).Hex(), CID: mocks.State2CID.String(), + MhKey: mocks.State2MhKey, Path: []byte{'\x0c'}, })) @@ -405,8 +408,12 @@ var _ = Describe("Retriever", func() { }) Describe("RetrieveFirstBlockNumber", func() { + It("Throws an error if there are no blocks in the database", func() { + _, err := retriever.RetrieveFirstBlockNumber() + Expect(err).To(HaveOccurred()) + }) It("Gets the number of the first block that has data in the database", func() { - err := repo.Index(mocks.MockCIDPayload) + _, err := repo.Publish(mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) num, err := retriever.RetrieveFirstBlockNumber() Expect(err).ToNot(HaveOccurred()) @@ -414,9 +421,9 @@ var _ = Describe("Retriever", func() { }) It("Gets the number of the first block that has data in the database", func() { - payload := *mocks.MockCIDPayload - payload.HeaderCID.BlockNumber = "1010101" - err := repo.Index(&payload) + payload := mocks.MockConvertedPayload + payload.Block = newMockBlock(1010101) + _, err := repo.Publish(payload) Expect(err).ToNot(HaveOccurred()) num, err := retriever.RetrieveFirstBlockNumber() Expect(err).ToNot(HaveOccurred()) @@ -424,13 +431,13 @@ var _ = Describe("Retriever", func() { }) It("Gets the number of the first block that has data in the database", func() { - payload1 := *mocks.MockCIDPayload - payload1.HeaderCID.BlockNumber = "1010101" + payload1 := mocks.MockConvertedPayload + payload1.Block = newMockBlock(1010101) payload2 := payload1 - payload2.HeaderCID.BlockNumber = "5" - err := repo.Index(&payload1) + payload2.Block = newMockBlock(5) + _, err := repo.Publish(payload1) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload2) + _, err = repo.Publish(payload2) Expect(err).ToNot(HaveOccurred()) num, err := retriever.RetrieveFirstBlockNumber() Expect(err).ToNot(HaveOccurred()) @@ -439,8 +446,12 @@ var _ = Describe("Retriever", func() { }) Describe("RetrieveLastBlockNumber", func() { + It("Throws an error if there are no blocks in the database", func() { + _, err := retriever.RetrieveLastBlockNumber() + Expect(err).To(HaveOccurred()) + }) It("Gets the number of the latest block that has data in the database", func() { - err := repo.Index(mocks.MockCIDPayload) + _, err := repo.Publish(mocks.MockConvertedPayload) Expect(err).ToNot(HaveOccurred()) num, err := retriever.RetrieveLastBlockNumber() Expect(err).ToNot(HaveOccurred()) @@ -448,9 +459,9 @@ var _ = Describe("Retriever", func() { }) It("Gets the number of the latest block that has data in the database", func() { - payload := *mocks.MockCIDPayload - payload.HeaderCID.BlockNumber = "1010101" - err := repo.Index(&payload) + payload := mocks.MockConvertedPayload + payload.Block = newMockBlock(1010101) + _, err := repo.Publish(payload) Expect(err).ToNot(HaveOccurred()) num, err := retriever.RetrieveLastBlockNumber() Expect(err).ToNot(HaveOccurred()) @@ -458,13 +469,13 @@ var _ = Describe("Retriever", func() { }) It("Gets the number of the latest block that has data in the database", func() { - payload1 := *mocks.MockCIDPayload - payload1.HeaderCID.BlockNumber = "1010101" + payload1 := mocks.MockConvertedPayload + payload1.Block = newMockBlock(1010101) payload2 := payload1 - payload2.HeaderCID.BlockNumber = "5" - err := repo.Index(&payload1) + payload2.Block = newMockBlock(5) + _, err := repo.Publish(payload1) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload2) + _, err = repo.Publish(payload2) Expect(err).ToNot(HaveOccurred()) num, err := retriever.RetrieveLastBlockNumber() Expect(err).ToNot(HaveOccurred()) @@ -474,21 +485,20 @@ var _ = Describe("Retriever", func() { Describe("RetrieveGapsInData", func() { It("Doesn't return gaps if there are none", func() { - payload0 := *mocks.MockCIDPayload - payload0.HeaderCID.BlockNumber = "0" - payload1 := *mocks.MockCIDPayload - payload1.HeaderCID.BlockNumber = "1" + payload0 := mocks.MockConvertedPayload + payload0.Block = newMockBlock(0) + payload1 := mocks.MockConvertedPayload payload2 := payload1 - payload2.HeaderCID.BlockNumber = "2" + payload2.Block = newMockBlock(2) payload3 := payload2 - payload3.HeaderCID.BlockNumber = "3" - err := repo.Index(&payload0) + payload3.Block = newMockBlock(3) + _, err := repo.Publish(payload0) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload1) + _, err = repo.Publish(payload1) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload2) + _, err = repo.Publish(payload2) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload3) + _, err = repo.Publish(payload3) Expect(err).ToNot(HaveOccurred()) gaps, err := retriever.RetrieveGapsInData(1) Expect(err).ToNot(HaveOccurred()) @@ -496,9 +506,9 @@ var _ = Describe("Retriever", func() { }) It("Returns the gap from 0 to the earliest block", func() { - payload := *mocks.MockCIDPayload - payload.HeaderCID.BlockNumber = "5" - err := repo.Index(&payload) + payload := mocks.MockConvertedPayload + payload.Block = newMockBlock(5) + _, err := repo.Publish(payload) Expect(err).ToNot(HaveOccurred()) gaps, err := retriever.RetrieveGapsInData(1) Expect(err).ToNot(HaveOccurred()) @@ -508,17 +518,16 @@ var _ = Describe("Retriever", func() { }) It("Can handle single block gaps", func() { - payload0 := *mocks.MockCIDPayload - payload0.HeaderCID.BlockNumber = "0" - payload1 := *mocks.MockCIDPayload - payload1.HeaderCID.BlockNumber = "1" + payload0 := mocks.MockConvertedPayload + payload0.Block = newMockBlock(0) + payload1 := mocks.MockConvertedPayload payload3 := payload1 - payload3.HeaderCID.BlockNumber = "3" - err := repo.Index(&payload0) + payload3.Block = newMockBlock(3) + _, err := repo.Publish(payload0) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload1) + _, err = repo.Publish(payload1) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload3) + _, err = repo.Publish(payload3) Expect(err).ToNot(HaveOccurred()) gaps, err := retriever.RetrieveGapsInData(1) Expect(err).ToNot(HaveOccurred()) @@ -528,13 +537,13 @@ var _ = Describe("Retriever", func() { }) It("Finds gap between two entries", func() { - payload1 := *mocks.MockCIDPayload - payload1.HeaderCID.BlockNumber = "1010101" + payload1 := mocks.MockConvertedPayload + payload1.Block = newMockBlock(1010101) payload2 := payload1 - payload2.HeaderCID.BlockNumber = "0" - err := repo.Index(&payload1) + payload2.Block = newMockBlock(0) + _, err := repo.Publish(payload1) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload2) + _, err = repo.Publish(payload2) Expect(err).ToNot(HaveOccurred()) gaps, err := retriever.RetrieveGapsInData(1) Expect(err).ToNot(HaveOccurred()) @@ -544,49 +553,50 @@ var _ = Describe("Retriever", func() { }) It("Finds gaps between multiple entries", func() { - payload := *mocks.MockCIDPayload - payload.HeaderCID.BlockNumber = "1010101" - payload1 := payload - payload1.HeaderCID.BlockNumber = "1" - payload2 := payload1 - payload2.HeaderCID.BlockNumber = "5" - payload3 := payload2 - payload3.HeaderCID.BlockNumber = "100" - payload4 := payload3 - payload4.HeaderCID.BlockNumber = "101" - payload5 := payload4 - payload5.HeaderCID.BlockNumber = "102" - payload6 := payload4 - payload6.HeaderCID.BlockNumber = "103" - payload7 := payload4 - payload7.HeaderCID.BlockNumber = "104" - payload8 := payload4 - payload8.HeaderCID.BlockNumber = "105" - payload9 := payload4 - payload9.HeaderCID.BlockNumber = "106" - payload10 := payload5 - payload10.HeaderCID.BlockNumber = "1000" - err := repo.Index(&payload) + payload1 := mocks.MockConvertedPayload + payload1.Block = newMockBlock(1010101) + payload2 := mocks.MockConvertedPayload + payload2.Block = newMockBlock(1) + payload3 := mocks.MockConvertedPayload + payload3.Block = newMockBlock(5) + payload4 := mocks.MockConvertedPayload + payload4.Block = newMockBlock(100) + payload5 := mocks.MockConvertedPayload + payload5.Block = newMockBlock(101) + payload6 := mocks.MockConvertedPayload + payload6.Block = newMockBlock(102) + payload7 := mocks.MockConvertedPayload + payload7.Block = newMockBlock(103) + payload8 := mocks.MockConvertedPayload + payload8.Block = newMockBlock(104) + payload9 := mocks.MockConvertedPayload + payload9.Block = newMockBlock(105) + payload10 := mocks.MockConvertedPayload + payload10.Block = newMockBlock(106) + payload11 := mocks.MockConvertedPayload + payload11.Block = newMockBlock(1000) + + _, err := repo.Publish(payload1) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload1) + _, err = repo.Publish(payload2) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload2) + _, err = repo.Publish(payload3) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload3) + _, err = repo.Publish(payload4) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload4) + _, err = repo.Publish(payload5) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload5) + _, err = repo.Publish(payload6) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload6) + _, err = repo.Publish(payload7) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload7) + _, err = repo.Publish(payload8) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload8) + _, err = repo.Publish(payload9) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload9) + _, err = repo.Publish(payload10) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload10) + _, err = repo.Publish(payload11) Expect(err).ToNot(HaveOccurred()) gaps, err := retriever.RetrieveGapsInData(1) @@ -600,61 +610,63 @@ var _ = Describe("Retriever", func() { }) It("Finds validation level gaps", func() { - payload := *mocks.MockCIDPayload - payload.HeaderCID.BlockNumber = "1010101" - payload1 := payload - payload1.HeaderCID.BlockNumber = "1" - payload2 := payload1 - payload2.HeaderCID.BlockNumber = "5" - payload3 := payload2 - payload3.HeaderCID.BlockNumber = "100" - payload4 := payload3 - payload4.HeaderCID.BlockNumber = "101" - payload5 := payload4 - payload5.HeaderCID.BlockNumber = "102" - payload6 := payload4 - payload6.HeaderCID.BlockNumber = "103" - payload7 := payload4 - payload7.HeaderCID.BlockNumber = "104" - payload8 := payload4 - payload8.HeaderCID.BlockNumber = "105" - payload9 := payload4 - payload9.HeaderCID.BlockNumber = "106" - payload10 := payload4 - payload10.HeaderCID.BlockNumber = "107" - payload11 := payload4 - payload11.HeaderCID.BlockNumber = "108" - payload12 := payload4 - payload12.HeaderCID.BlockNumber = "109" - payload13 := payload5 - payload13.HeaderCID.BlockNumber = "1000" - err := repo.Index(&payload) + + payload1 := mocks.MockConvertedPayload + payload1.Block = newMockBlock(1010101) + payload2 := mocks.MockConvertedPayload + payload2.Block = newMockBlock(1) + payload3 := mocks.MockConvertedPayload + payload3.Block = newMockBlock(5) + payload4 := mocks.MockConvertedPayload + payload4.Block = newMockBlock(100) + payload5 := mocks.MockConvertedPayload + payload5.Block = newMockBlock(101) + payload6 := mocks.MockConvertedPayload + payload6.Block = newMockBlock(102) + payload7 := mocks.MockConvertedPayload + payload7.Block = newMockBlock(103) + payload8 := mocks.MockConvertedPayload + payload8.Block = newMockBlock(104) + payload9 := mocks.MockConvertedPayload + payload9.Block = newMockBlock(105) + payload10 := mocks.MockConvertedPayload + payload10.Block = newMockBlock(106) + payload11 := mocks.MockConvertedPayload + payload11.Block = newMockBlock(107) + payload12 := mocks.MockConvertedPayload + payload12.Block = newMockBlock(108) + payload13 := mocks.MockConvertedPayload + payload13.Block = newMockBlock(109) + payload14 := mocks.MockConvertedPayload + payload14.Block = newMockBlock(1000) + + _, err := repo.Publish(payload1) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload1) + _, err = repo.Publish(payload2) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload2) + _, err = repo.Publish(payload3) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload3) + _, err = repo.Publish(payload4) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload4) + _, err = repo.Publish(payload5) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload5) + _, err = repo.Publish(payload6) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload6) + _, err = repo.Publish(payload7) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload7) + _, err = repo.Publish(payload8) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload8) + _, err = repo.Publish(payload9) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload9) + _, err = repo.Publish(payload10) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload10) + _, err = repo.Publish(payload11) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload11) + _, err = repo.Publish(payload12) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload12) + _, err = repo.Publish(payload13) Expect(err).ToNot(HaveOccurred()) - err = repo.Index(&payload13) + _, err = repo.Publish(payload14) Expect(err).ToNot(HaveOccurred()) cleaner := eth.NewCleaner(db) @@ -675,3 +687,9 @@ var _ = Describe("Retriever", func() { }) }) }) + +func newMockBlock(blockNumber uint64) *types.Block { + header := mocks.MockHeader + header.Number.SetUint64(blockNumber) + return types.NewBlock(&mocks.MockHeader, mocks.MockTransactions, nil, mocks.MockReceipts) +} diff --git a/pkg/eth/cleaner.go b/pkg/eth/cleaner.go index e8686037..3b0bdf82 100644 --- a/pkg/eth/cleaner.go +++ b/pkg/eth/cleaner.go @@ -243,7 +243,7 @@ func (c *Cleaner) cleanFull(tx *sqlx.Tx, rng [2]uint64) error { func (c *Cleaner) cleanStorageIPLDs(tx *sqlx.Tx, rng [2]uint64) error { pgStr := `DELETE FROM public.blocks A USING eth.storage_cids B, eth.state_cids C, eth.header_cids D - WHERE A.key = B.cid + WHERE A.key = B.mh_key AND B.state_id = C.id AND C.header_id = D.id AND D.block_number BETWEEN $1 AND $2` @@ -264,7 +264,7 @@ func (c *Cleaner) cleanStorageMetaData(tx *sqlx.Tx, rng [2]uint64) error { func (c *Cleaner) cleanStateIPLDs(tx *sqlx.Tx, rng [2]uint64) error { pgStr := `DELETE FROM public.blocks A USING eth.state_cids B, eth.header_cids C - WHERE A.key = B.cid + WHERE A.key = B.mh_key AND B.header_id = C.id AND C.block_number BETWEEN $1 AND $2` _, err := tx.Exec(pgStr, rng[0], rng[1]) @@ -283,7 +283,7 @@ func (c *Cleaner) cleanStateMetaData(tx *sqlx.Tx, rng [2]uint64) error { func (c *Cleaner) cleanReceiptIPLDs(tx *sqlx.Tx, rng [2]uint64) error { pgStr := `DELETE FROM public.blocks A USING eth.receipt_cids B, eth.transaction_cids C, eth.header_cids D - WHERE A.key = B.cid + WHERE A.key = B.mh_key AND B.tx_id = C.id AND C.header_id = D.id AND D.block_number BETWEEN $1 AND $2` @@ -304,7 +304,7 @@ func (c *Cleaner) cleanReceiptMetaData(tx *sqlx.Tx, rng [2]uint64) error { func (c *Cleaner) cleanTransactionIPLDs(tx *sqlx.Tx, rng [2]uint64) error { pgStr := `DELETE FROM public.blocks A USING eth.transaction_cids B, eth.header_cids C - WHERE A.key = B.cid + WHERE A.key = B.mh_key AND B.header_id = C.id AND C.block_number BETWEEN $1 AND $2` _, err := tx.Exec(pgStr, rng[0], rng[1]) @@ -323,7 +323,7 @@ func (c *Cleaner) cleanTransactionMetaData(tx *sqlx.Tx, rng [2]uint64) error { func (c *Cleaner) cleanUncleIPLDs(tx *sqlx.Tx, rng [2]uint64) error { pgStr := `DELETE FROM public.blocks A USING eth.uncle_cids B, eth.header_cids C - WHERE A.key = B.cid + WHERE A.key = B.mh_key AND B.header_id = C.id AND C.block_number BETWEEN $1 AND $2` _, err := tx.Exec(pgStr, rng[0], rng[1]) @@ -342,7 +342,7 @@ func (c *Cleaner) cleanUncleMetaData(tx *sqlx.Tx, rng [2]uint64) error { func (c *Cleaner) cleanHeaderIPLDs(tx *sqlx.Tx, rng [2]uint64) error { pgStr := `DELETE FROM public.blocks A USING eth.header_cids B - WHERE A.key = B.cid + WHERE A.key = B.mh_key AND B.block_number BETWEEN $1 AND $2` _, err := tx.Exec(pgStr, rng[0], rng[1]) return err diff --git a/pkg/eth/cleaner_test.go b/pkg/eth/cleaner_test.go index 80beb9b2..14900bbc 100644 --- a/pkg/eth/cleaner_test.go +++ b/pkg/eth/cleaner_test.go @@ -34,47 +34,55 @@ var ( // header variables blockHash1 = crypto.Keccak256Hash([]byte{00, 02}) blocKNumber1 = big.NewInt(0) - headerCID1 = "mockHeader1CID" + headerCID1 = shared.TestCID([]byte("mockHeader1CID")) + headerMhKey1 = shared.MultihashKeyFromCID(headerCID1) parentHash = crypto.Keccak256Hash([]byte{00, 01}) totalDifficulty = "50000000000000000000" reward = "5000000000000000000" headerModel = eth.HeaderModel{ BlockHash: blockHash1.String(), BlockNumber: blocKNumber1.String(), - CID: headerCID1, + CID: headerCID1.String(), + MhKey: headerMhKey1, ParentHash: parentHash.String(), TotalDifficulty: totalDifficulty, Reward: reward, } // tx variables - tx1CID = "mockTx1CID" - tx2CID = "mockTx2CID" + tx1CID = shared.TestCID([]byte("mockTx1CID")) + tx1MhKey = shared.MultihashKeyFromCID(tx1CID) + tx2CID = shared.TestCID([]byte("mockTx2CID")) + tx2MhKey = shared.MultihashKeyFromCID(tx2CID) tx1Hash = crypto.Keccak256Hash([]byte{01, 01}) tx2Hash = crypto.Keccak256Hash([]byte{01, 02}) txSrc = common.HexToAddress("0x010a") txDst = common.HexToAddress("0x020a") txModels1 = []eth.TxModel{ { - CID: tx1CID, + CID: tx1CID.String(), + MhKey: tx1MhKey, TxHash: tx1Hash.String(), Index: 0, }, { - CID: tx2CID, + CID: tx2CID.String(), + MhKey: tx2MhKey, TxHash: tx2Hash.String(), Index: 1, }, } // uncle variables - uncleCID = "mockUncle1CID" + uncleCID = shared.TestCID([]byte("mockUncle1CID")) + uncleMhKey = shared.MultihashKeyFromCID(uncleCID) uncleHash = crypto.Keccak256Hash([]byte{02, 02}) uncleParentHash = crypto.Keccak256Hash([]byte{02, 01}) uncleReward = "1000000000000000000" uncleModels1 = []eth.UncleModel{ { - CID: uncleCID, + CID: uncleCID.String(), + MhKey: uncleMhKey, Reward: uncleReward, BlockHash: uncleHash.String(), ParentHash: uncleParentHash.String(), @@ -82,37 +90,45 @@ var ( } // receipt variables - rct1CID = "mockRct1CID" - rct2CID = "mockRct2CID" + rct1CID = shared.TestCID([]byte("mockRct1CID")) + rct1MhKey = shared.MultihashKeyFromCID(rct1CID) + rct2CID = shared.TestCID([]byte("mockRct2CID")) + rct2MhKey = shared.MultihashKeyFromCID(rct2CID) rct1Contract = common.Address{} rct2Contract = common.HexToAddress("0x010c") receiptModels1 = map[common.Hash]eth.ReceiptModel{ tx1Hash: { - CID: rct1CID, + CID: rct1CID.String(), + MhKey: rct1MhKey, ContractHash: crypto.Keccak256Hash(rct1Contract.Bytes()).String(), }, tx2Hash: { - CID: rct2CID, + CID: rct2CID.String(), + MhKey: rct2MhKey, ContractHash: crypto.Keccak256Hash(rct2Contract.Bytes()).String(), }, } // state variables - state1CID1 = "mockState1CID1" + state1CID1 = shared.TestCID([]byte("mockState1CID1")) + state1MhKey1 = shared.MultihashKeyFromCID(state1CID1) state1Path = []byte{'\x01'} state1Key = crypto.Keccak256Hash(txSrc.Bytes()) - state2CID1 = "mockState2CID1" + state2CID1 = shared.TestCID([]byte("mockState2CID1")) + state2MhKey1 = shared.MultihashKeyFromCID(state2CID1) state2Path = []byte{'\x02'} state2Key = crypto.Keccak256Hash(txDst.Bytes()) stateModels1 = []eth.StateNodeModel{ { - CID: state1CID1, + CID: state1CID1.String(), + MhKey: state1MhKey1, Path: state1Path, NodeType: 2, StateKey: state1Key.String(), }, { - CID: state2CID1, + CID: state2CID1.String(), + MhKey: state2MhKey1, Path: state2Path, NodeType: 2, StateKey: state2Key.String(), @@ -120,13 +136,15 @@ var ( } // storage variables - storageCID = "mockStorageCID1" + storageCID = shared.TestCID([]byte("mockStorageCID1")) + storageMhKey = shared.MultihashKeyFromCID(storageCID) storagePath = []byte{'\x01'} storageKey = crypto.Keccak256Hash(common.Hex2Bytes("0x0000000000000000000000000000000000000000000000000000000000000000")) storageModels1 = map[string][]eth.StorageNodeModel{ common.Bytes2Hex(state1Path): { { - CID: storageCID, + CID: storageCID.String(), + MhKey: storageMhKey, StorageKey: storageKey.String(), Path: storagePath, NodeType: 2, @@ -146,39 +164,47 @@ var ( // header variables blockHash2 = crypto.Keccak256Hash([]byte{00, 03}) blocKNumber2 = big.NewInt(1) - headerCID2 = "mockHeaderCID2" + headerCID2 = shared.TestCID([]byte("mockHeaderCID2")) + headerMhKey2 = shared.MultihashKeyFromCID(headerCID2) headerModel2 = eth.HeaderModel{ BlockHash: blockHash2.String(), BlockNumber: blocKNumber2.String(), - CID: headerCID2, + CID: headerCID2.String(), + MhKey: headerMhKey2, ParentHash: blockHash1.String(), TotalDifficulty: totalDifficulty, Reward: reward, } // tx variables - tx3CID = "mockTx3CID" + tx3CID = shared.TestCID([]byte("mockTx3CID")) + tx3MhKey = shared.MultihashKeyFromCID(tx3CID) tx3Hash = crypto.Keccak256Hash([]byte{01, 03}) txModels2 = []eth.TxModel{ { - CID: tx3CID, + CID: tx3CID.String(), + MhKey: tx3MhKey, TxHash: tx3Hash.String(), Index: 0, }, } // receipt variables - rct3CID = "mockRct3CID" + rct3CID = shared.TestCID([]byte("mockRct3CID")) + rct3MhKey = shared.MultihashKeyFromCID(rct3CID) receiptModels2 = map[common.Hash]eth.ReceiptModel{ tx3Hash: { - CID: rct3CID, + CID: rct3CID.String(), + MhKey: rct3MhKey, ContractHash: crypto.Keccak256Hash(rct1Contract.Bytes()).String(), }, } // state variables - state1CID2 = "mockState1CID2" + state1CID2 = shared.TestCID([]byte("mockState1CID2")) + state1MhKey2 = shared.MultihashKeyFromCID(state1CID2) stateModels2 = []eth.StateNodeModel{ { - CID: state1CID2, + CID: state1CID2.String(), + MhKey: state1MhKey2, Path: state1Path, NodeType: 2, StateKey: state1Key.String(), @@ -190,21 +216,21 @@ var ( ReceiptCIDs: receiptModels2, StateNodeCIDs: stateModels2, } - rngs = [][2]uint64{{0, 1}} - cids = []string{ - headerCID1, - headerCID2, - uncleCID, - tx1CID, - tx2CID, - tx3CID, - rct1CID, - rct2CID, - rct3CID, - state1CID1, - state2CID1, - state1CID2, - storageCID, + rngs = [][2]uint64{{0, 1}} + mhKeys = []string{ + headerMhKey1, + headerMhKey2, + uncleMhKey, + tx1MhKey, + tx2MhKey, + tx3MhKey, + rct1MhKey, + rct2MhKey, + rct3MhKey, + state1MhKey1, + state2MhKey1, + state1MhKey2, + storageMhKey, } mockData = []byte{'\x01'} ) @@ -224,16 +250,16 @@ var _ = Describe("Cleaner", func() { }) Describe("Clean", func() { BeforeEach(func() { + for _, key := range mhKeys { + _, err := db.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2)`, key, mockData) + Expect(err).ToNot(HaveOccurred()) + } + err := repo.Index(mockCIDPayload1) Expect(err).ToNot(HaveOccurred()) err = repo.Index(mockCIDPayload2) Expect(err).ToNot(HaveOccurred()) - for _, cid := range cids { - _, err = db.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2)`, cid, mockData) - Expect(err).ToNot(HaveOccurred()) - } - tx, err := db.Beginx() Expect(err).ToNot(HaveOccurred()) @@ -613,6 +639,11 @@ var _ = Describe("Cleaner", func() { Describe("ResetValidation", func() { BeforeEach(func() { + for _, key := range mhKeys { + _, err := db.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2)`, key, mockData) + Expect(err).ToNot(HaveOccurred()) + } + err := repo.Index(mockCIDPayload1) Expect(err).ToNot(HaveOccurred()) err = repo.Index(mockCIDPayload2) diff --git a/pkg/eth/converter.go b/pkg/eth/converter.go index 4fb7bea6..1d6d2d31 100644 --- a/pkg/eth/converter.go +++ b/pkg/eth/converter.go @@ -72,8 +72,8 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Convert return nil, err } txMeta := TxModel{ - Dst: shared.HandleNullAddrPointer(trx.To()), - Src: shared.HandleNullAddr(from), + Dst: shared.HandleZeroAddrPointer(trx.To()), + Src: shared.HandleZeroAddr(from), TxHash: trx.Hash().String(), Index: int64(i), } @@ -106,7 +106,7 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Convert logContracts = append(logContracts, addr) } // This is the contract address if this receipt is for a contract creation tx - contract := shared.HandleNullAddr(receipt.ContractAddress) + contract := shared.HandleZeroAddr(receipt.ContractAddress) var contractHash string if contract != "" { contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String() diff --git a/pkg/eth/indexer.go b/pkg/eth/indexer.go index 48f57373..4285c9fa 100644 --- a/pkg/eth/indexer.go +++ b/pkg/eth/indexer.go @@ -90,29 +90,29 @@ func (in *CIDIndexer) Index(cids shared.CIDsForIndexing) error { func (in *CIDIndexer) indexHeaderCID(tx *sqlx.Tx, header HeaderModel) (int64, error) { var headerID int64 - err := tx.QueryRowx(`INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, times_validated) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14) - ON CONFLICT (block_number, block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, times_validated) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, eth.header_cids.times_validated + 1) + err := tx.QueryRowx(`INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15) + ON CONFLICT (block_number, block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1) RETURNING id`, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, in.db.NodeID, header.Reward, header.StateRoot, header.TxRoot, - header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, 1).Scan(&headerID) + header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1).Scan(&headerID) return headerID, err } func (in *CIDIndexer) indexUncleCID(tx *sqlx.Tx, uncle UncleModel, headerID int64) error { - _, err := tx.Exec(`INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward) VALUES ($1, $2, $3, $4, $5) - ON CONFLICT (header_id, block_hash) DO UPDATE SET (parent_hash, cid, reward) = ($3, $4, $5)`, - uncle.BlockHash, headerID, uncle.ParentHash, uncle.CID, uncle.Reward) + _, err := tx.Exec(`INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (header_id, block_hash) DO UPDATE SET (parent_hash, cid, reward, mh_key) = ($3, $4, $5, $6)`, + uncle.BlockHash, headerID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey) return err } func (in *CIDIndexer) indexTransactionAndReceiptCIDs(tx *sqlx.Tx, payload *CIDPayload, headerID int64) error { for _, trxCidMeta := range payload.TransactionCIDs { var txID int64 - err := tx.QueryRowx(`INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index) VALUES ($1, $2, $3, $4, $5, $6) - ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index) = ($3, $4, $5, $6) + err := tx.QueryRowx(`INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index, mh_key) = ($3, $4, $5, $6, $7) RETURNING id`, - headerID, trxCidMeta.TxHash, trxCidMeta.CID, trxCidMeta.Dst, trxCidMeta.Src, trxCidMeta.Index).Scan(&txID) + headerID, trxCidMeta.TxHash, trxCidMeta.CID, trxCidMeta.Dst, trxCidMeta.Src, trxCidMeta.Index, trxCidMeta.MhKey).Scan(&txID) if err != nil { return err } @@ -128,17 +128,17 @@ func (in *CIDIndexer) indexTransactionAndReceiptCIDs(tx *sqlx.Tx, payload *CIDPa func (in *CIDIndexer) indexTransactionCID(tx *sqlx.Tx, transaction TxModel, headerID int64) (int64, error) { var txID int64 - err := tx.QueryRowx(`INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index) VALUES ($1, $2, $3, $4, $5, $6) - ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index) = ($3, $4, $5, $6) + err := tx.QueryRowx(`INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index, mh_key) = ($3, $4, $5, $6, $7) RETURNING id`, - headerID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index).Scan(&txID) + headerID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey).Scan(&txID) return txID, err } func (in *CIDIndexer) indexReceiptCID(tx *sqlx.Tx, cidMeta ReceiptModel, txID int64) error { - _, err := tx.Exec(`INSERT INTO eth.receipt_cids (tx_id, cid, contract, contract_hash, topic0s, topic1s, topic2s, topic3s, log_contracts) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) - ON CONFLICT (tx_id) DO UPDATE SET (cid, contract, contract_hash, topic0s, topic1s, topic2s, topic3s, log_contracts) = ($2, $3, $4, $5, $6, $7, $8, $9)`, - txID, cidMeta.CID, cidMeta.Contract, cidMeta.ContractHash, cidMeta.Topic0s, cidMeta.Topic1s, cidMeta.Topic2s, cidMeta.Topic3s, cidMeta.LogContracts) + _, err := tx.Exec(`INSERT INTO eth.receipt_cids (tx_id, cid, contract, contract_hash, topic0s, topic1s, topic2s, topic3s, log_contracts, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + ON CONFLICT (tx_id) DO UPDATE SET (cid, contract, contract_hash, topic0s, topic1s, topic2s, topic3s, log_contracts, mh_key) = ($2, $3, $4, $5, $6, $7, $8, $9, $10)`, + txID, cidMeta.CID, cidMeta.Contract, cidMeta.ContractHash, cidMeta.Topic0s, cidMeta.Topic1s, cidMeta.Topic2s, cidMeta.Topic3s, cidMeta.LogContracts, cidMeta.MhKey) return err } @@ -149,10 +149,10 @@ func (in *CIDIndexer) indexStateAndStorageCIDs(tx *sqlx.Tx, payload *CIDPayload, if stateCID.StateKey != nullHash.String() { stateKey = stateCID.StateKey } - err := tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff) VALUES ($1, $2, $3, $4, $5, $6) - ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff) = ($2, $3, $5, $6) + err := tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT (header_id, state_path, diff) DO UPDATE SET (state_leaf_key, cid, node_type, mh_key) = ($2, $3, $5, $7) RETURNING id`, - headerID, stateKey, stateCID.CID, stateCID.Path, stateCID.NodeType, true).Scan(&stateID) + headerID, stateKey, stateCID.CID, stateCID.Path, stateCID.NodeType, true, stateCID.MhKey).Scan(&stateID) if err != nil { return err } @@ -180,10 +180,10 @@ func (in *CIDIndexer) indexStateCID(tx *sqlx.Tx, stateNode StateNodeModel, heade if stateNode.StateKey != nullHash.String() { stateKey = stateNode.StateKey } - err := tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff) VALUES ($1, $2, $3, $4, $5, $6) - ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff) = ($2, $3, $5, $6) + err := tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT (header_id, state_path, diff) DO UPDATE SET (state_leaf_key, cid, node_type, mh_key) = ($2, $3, $5, $7) RETURNING id`, - headerID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true).Scan(&stateID) + headerID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey).Scan(&stateID) return stateID, err } @@ -199,8 +199,8 @@ func (in *CIDIndexer) indexStorageCID(tx *sqlx.Tx, storageCID StorageNodeModel, if storageCID.StorageKey != nullHash.String() { storageKey = storageCID.StorageKey } - _, err := tx.Exec(`INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff) VALUES ($1, $2, $3, $4, $5, $6) - ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff) = ($2, $3, $5, $6)`, - stateID, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true) + _, err := tx.Exec(`INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT (state_id, storage_path, diff) DO UPDATE SET (storage_leaf_key, cid, node_type, mh_key) = ($2, $3, $5, $7)`, + stateID, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey) return err } diff --git a/pkg/eth/indexer_test.go b/pkg/eth/indexer_test.go index 03169249..a00c958a 100644 --- a/pkg/eth/indexer_test.go +++ b/pkg/eth/indexer_test.go @@ -37,6 +37,17 @@ var _ = Describe("Indexer", func() { db, err = shared.SetupDB() Expect(err).ToNot(HaveOccurred()) repo = eth.NewCIDIndexer(db) + // need entries in the public.blocks with the mhkeys or the FK constraint will fail + shared.PublishMockIPLD(db, mocks.HeaderMhKey, mockData) + shared.PublishMockIPLD(db, mocks.Trx1MhKey, mockData) + shared.PublishMockIPLD(db, mocks.Trx2MhKey, mockData) + shared.PublishMockIPLD(db, mocks.Trx3MhKey, mockData) + shared.PublishMockIPLD(db, mocks.Rct1MhKey, mockData) + shared.PublishMockIPLD(db, mocks.Rct2MhKey, mockData) + shared.PublishMockIPLD(db, mocks.Rct3MhKey, mockData) + shared.PublishMockIPLD(db, mocks.State1MhKey, mockData) + shared.PublishMockIPLD(db, mocks.State2MhKey, mockData) + shared.PublishMockIPLD(db, mocks.StorageMhKey, mockData) }) AfterEach(func() { eth.TearDownDB(db) diff --git a/pkg/eth/ipld_pg_fetcher.go b/pkg/eth/ipld_pg_fetcher.go index d344d9c1..bbfe6b3d 100644 --- a/pkg/eth/ipld_pg_fetcher.go +++ b/pkg/eth/ipld_pg_fetcher.go @@ -102,7 +102,7 @@ func (f *IPLDPGFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error) // FetchHeaders fetches headers func (f *IPLDPGFetcher) FetchHeader(tx *sqlx.Tx, c HeaderModel) (ipfs.BlockModel, error) { log.Debug("fetching header ipld") - headerBytes, err := shared.FetchIPLD(tx, c.CID) + headerBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey) if err != nil { return ipfs.BlockModel{}, err } @@ -117,7 +117,7 @@ func (f *IPLDPGFetcher) FetchUncles(tx *sqlx.Tx, cids []UncleModel) ([]ipfs.Bloc log.Debug("fetching uncle iplds") uncleIPLDs := make([]ipfs.BlockModel, len(cids)) for i, c := range cids { - uncleBytes, err := shared.FetchIPLD(tx, c.CID) + uncleBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey) if err != nil { return nil, err } @@ -134,7 +134,7 @@ func (f *IPLDPGFetcher) FetchTrxs(tx *sqlx.Tx, cids []TxModel) ([]ipfs.BlockMode log.Debug("fetching transaction iplds") trxIPLDs := make([]ipfs.BlockModel, len(cids)) for i, c := range cids { - txBytes, err := shared.FetchIPLD(tx, c.CID) + txBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey) if err != nil { return nil, err } @@ -151,7 +151,7 @@ func (f *IPLDPGFetcher) FetchRcts(tx *sqlx.Tx, cids []ReceiptModel) ([]ipfs.Bloc log.Debug("fetching receipt iplds") rctIPLDs := make([]ipfs.BlockModel, len(cids)) for i, c := range cids { - rctBytes, err := shared.FetchIPLD(tx, c.CID) + rctBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey) if err != nil { return nil, err } @@ -171,7 +171,7 @@ func (f *IPLDPGFetcher) FetchState(tx *sqlx.Tx, cids []StateNodeModel) ([]StateN if stateNode.CID == "" { continue } - stateBytes, err := shared.FetchIPLD(tx, stateNode.CID) + stateBytes, err := shared.FetchIPLDByMhKey(tx, stateNode.MhKey) if err != nil { return nil, err } @@ -196,7 +196,7 @@ func (f *IPLDPGFetcher) FetchStorage(tx *sqlx.Tx, cids []StorageNodeWithStateKey if storageNode.CID == "" || storageNode.StateKey == "" { continue } - storageBytes, err := shared.FetchIPLD(tx, storageNode.CID) + storageBytes, err := shared.FetchIPLDByMhKey(tx, storageNode.MhKey) if err != nil { return nil, err } diff --git a/pkg/eth/mocks/test_data.go b/pkg/eth/mocks/test_data.go index ca925421..fb5a32fe 100644 --- a/pkg/eth/mocks/test_data.go +++ b/pkg/eth/mocks/test_data.go @@ -22,6 +22,8 @@ import ( "crypto/rand" "math/big" + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" @@ -76,18 +78,29 @@ var ( Data: []byte{}, } HeaderCID, _ = ipld.RawdataToCid(ipld.MEthHeader, MockHeaderRlp, multihash.KECCAK_256) + HeaderMhKey = shared.MultihashKeyFromCID(HeaderCID) Trx1CID, _ = ipld.RawdataToCid(ipld.MEthTx, MockTransactions.GetRlp(0), multihash.KECCAK_256) + Trx1MhKey = shared.MultihashKeyFromCID(Trx1CID) Trx2CID, _ = ipld.RawdataToCid(ipld.MEthTx, MockTransactions.GetRlp(1), multihash.KECCAK_256) + Trx2MhKey = shared.MultihashKeyFromCID(Trx2CID) Trx3CID, _ = ipld.RawdataToCid(ipld.MEthTx, MockTransactions.GetRlp(2), multihash.KECCAK_256) + Trx3MhKey = shared.MultihashKeyFromCID(Trx3CID) Rct1CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, MockReceipts.GetRlp(0), multihash.KECCAK_256) + Rct1MhKey = shared.MultihashKeyFromCID(Rct1CID) Rct2CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, MockReceipts.GetRlp(1), multihash.KECCAK_256) + Rct2MhKey = shared.MultihashKeyFromCID(Rct2CID) Rct3CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, MockReceipts.GetRlp(2), multihash.KECCAK_256) + Rct3MhKey = shared.MultihashKeyFromCID(Rct3CID) State1CID, _ = ipld.RawdataToCid(ipld.MEthStateTrie, ContractLeafNode, multihash.KECCAK_256) + State1MhKey = shared.MultihashKeyFromCID(State1CID) State2CID, _ = ipld.RawdataToCid(ipld.MEthStateTrie, AccountLeafNode, multihash.KECCAK_256) + State2MhKey = shared.MultihashKeyFromCID(State2CID) StorageCID, _ = ipld.RawdataToCid(ipld.MEthStorageTrie, StorageLeafNode, multihash.KECCAK_256) + StorageMhKey = shared.MultihashKeyFromCID(StorageCID) MockTrxMeta = []eth.TxModel{ { CID: "", // This is empty until we go to publish to ipfs + MhKey: "", Src: SenderAddr.Hex(), Dst: Address.String(), Index: 0, @@ -95,6 +108,7 @@ var ( }, { CID: "", + MhKey: "", Src: SenderAddr.Hex(), Dst: AnotherAddress.String(), Index: 1, @@ -102,6 +116,7 @@ var ( }, { CID: "", + MhKey: "", Src: SenderAddr.Hex(), Dst: "", Index: 2, @@ -111,6 +126,7 @@ var ( MockTrxMetaPostPublsh = []eth.TxModel{ { CID: Trx1CID.String(), // This is empty until we go to publish to ipfs + MhKey: Trx1MhKey, Src: SenderAddr.Hex(), Dst: Address.String(), Index: 0, @@ -118,6 +134,7 @@ var ( }, { CID: Trx2CID.String(), + MhKey: Trx2MhKey, Src: SenderAddr.Hex(), Dst: AnotherAddress.String(), Index: 1, @@ -125,6 +142,7 @@ var ( }, { CID: Trx3CID.String(), + MhKey: Trx3MhKey, Src: SenderAddr.Hex(), Dst: "", Index: 2, @@ -133,7 +151,8 @@ var ( } MockRctMeta = []eth.ReceiptModel{ { - CID: "", + CID: "", + MhKey: "", Topic0s: []string{ mockTopic11.String(), }, @@ -147,7 +166,8 @@ var ( }, }, { - CID: "", + CID: "", + MhKey: "", Topic0s: []string{ mockTopic21.String(), }, @@ -162,6 +182,7 @@ var ( }, { CID: "", + MhKey: "", Contract: ContractAddress.String(), ContractHash: ContractHash, LogContracts: []string{}, @@ -169,7 +190,8 @@ var ( } MockRctMetaPostPublish = []eth.ReceiptModel{ { - CID: Rct1CID.String(), + CID: Rct1CID.String(), + MhKey: Rct1MhKey, Topic0s: []string{ mockTopic11.String(), }, @@ -183,7 +205,8 @@ var ( }, }, { - CID: Rct2CID.String(), + CID: Rct2CID.String(), + MhKey: Rct2MhKey, Topic0s: []string{ mockTopic21.String(), }, @@ -198,6 +221,7 @@ var ( }, { CID: Rct3CID.String(), + MhKey: Rct3MhKey, Contract: ContractAddress.String(), ContractHash: ContractHash, LogContracts: []string{}, @@ -296,12 +320,14 @@ var ( MockStateMetaPostPublish = []eth.StateNodeModel{ { CID: State1CID.String(), + MhKey: State1MhKey, Path: []byte{'\x06'}, NodeType: 2, StateKey: common.BytesToHash(ContractLeafKey).Hex(), }, { CID: State2CID.String(), + MhKey: State2MhKey, Path: []byte{'\x0c'}, NodeType: 2, StateKey: common.BytesToHash(AccountLeafKey).Hex(), @@ -341,6 +367,7 @@ var ( BlockHash: MockBlock.Hash().String(), BlockNumber: MockBlock.Number().String(), CID: HeaderCID.String(), + MhKey: HeaderMhKey, ParentHash: MockBlock.ParentHash().String(), TotalDifficulty: MockBlock.Difficulty().String(), Reward: "5000000000000000000", @@ -363,6 +390,7 @@ var ( contractPath: { { CID: StorageCID.String(), + MhKey: StorageMhKey, Path: []byte{}, StorageKey: common.BytesToHash(StorageLeafKey).Hex(), NodeType: 2, @@ -392,6 +420,7 @@ var ( BlockHash: MockBlock.Hash().String(), ParentHash: "0x0000000000000000000000000000000000000000000000000000000000000000", CID: HeaderCID.String(), + MhKey: HeaderMhKey, TotalDifficulty: MockBlock.Difficulty().String(), Reward: "5000000000000000000", StateRoot: MockBlock.Root().String(), @@ -410,6 +439,7 @@ var ( { Path: []byte{}, CID: StorageCID.String(), + MhKey: StorageMhKey, NodeType: 2, StateKey: common.BytesToHash(ContractLeafKey).Hex(), StorageKey: common.BytesToHash(StorageLeafKey).Hex(), diff --git a/pkg/eth/models.go b/pkg/eth/models.go index e9b860ec..6c44a801 100644 --- a/pkg/eth/models.go +++ b/pkg/eth/models.go @@ -25,6 +25,7 @@ type HeaderModel struct { BlockHash string `db:"block_hash"` ParentHash string `db:"parent_hash"` CID string `db:"cid"` + MhKey string `db:"mh_key"` TotalDifficulty string `db:"td"` NodeID int64 `db:"node_id"` Reward string `db:"reward"` @@ -44,6 +45,7 @@ type UncleModel struct { BlockHash string `db:"block_hash"` ParentHash string `db:"parent_hash"` CID string `db:"cid"` + MhKey string `db:"mh_key"` Reward string `db:"reward"` } @@ -54,6 +56,7 @@ type TxModel struct { Index int64 `db:"index"` TxHash string `db:"tx_hash"` CID string `db:"cid"` + MhKey string `db:"mh_key"` Dst string `db:"dst"` Src string `db:"src"` } @@ -63,6 +66,7 @@ type ReceiptModel struct { ID int64 `db:"id"` TxID int64 `db:"tx_id"` CID string `db:"cid"` + MhKey string `db:"mh_key"` Contract string `db:"contract"` ContractHash string `db:"contract_hash"` LogContracts pq.StringArray `db:"log_contracts"` @@ -80,6 +84,7 @@ type StateNodeModel struct { StateKey string `db:"state_leaf_key"` NodeType int `db:"node_type"` CID string `db:"cid"` + MhKey string `db:"mh_key"` Diff bool `db:"diff"` } @@ -91,6 +96,7 @@ type StorageNodeModel struct { StorageKey string `db:"storage_leaf_key"` NodeType int `db:"node_type"` CID string `db:"cid"` + MhKey string `db:"mh_key"` Diff bool `db:"diff"` } @@ -103,6 +109,7 @@ type StorageNodeWithStateKeyModel struct { StorageKey string `db:"storage_leaf_key"` NodeType int `db:"node_type"` CID string `db:"cid"` + MhKey string `db:"mh_key"` Diff bool `db:"diff"` } diff --git a/pkg/eth/publish_and_indexer.go b/pkg/eth/publish_and_indexer.go index f352440e..eceaf59d 100644 --- a/pkg/eth/publish_and_indexer.go +++ b/pkg/eth/publish_and_indexer.go @@ -92,6 +92,7 @@ func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (share reward := CalcEthBlockReward(ipldPayload.Block.Header(), ipldPayload.Block.Uncles(), ipldPayload.Block.Transactions(), ipldPayload.Receipts) header := HeaderModel{ CID: headerNode.Cid().String(), + MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), ParentHash: ipldPayload.Block.ParentHash().String(), BlockNumber: ipldPayload.Block.Number().String(), BlockHash: ipldPayload.Block.Hash().String(), @@ -117,6 +118,7 @@ func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (share uncleReward := CalcUncleMinerReward(ipldPayload.Block.Number().Int64(), uncleNode.Number.Int64()) uncle := UncleModel{ CID: uncleNode.Cid().String(), + MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), ParentHash: uncleNode.ParentHash.String(), BlockHash: uncleNode.Hash().String(), Reward: uncleReward.String(), @@ -137,12 +139,14 @@ func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (share } txModel := ipldPayload.TxMetaData[i] txModel.CID = txNode.Cid().String() + txModel.MhKey = shared.MultihashKeyFromCID(txNode.Cid()) txID, err := pub.indexer.indexTransactionCID(tx, txModel, headerID) if err != nil { return nil, err } rctModel := ipldPayload.ReceiptMetaData[i] rctModel.CID = rctNode.Cid().String() + rctModel.MhKey = shared.MultihashKeyFromCID(rctNode.Cid()) if err := pub.indexer.indexReceiptCID(tx, rctModel, txID); err != nil { return nil, err } @@ -162,10 +166,12 @@ func (pub *IPLDPublisherAndIndexer) publishAndIndexStateAndStorage(tx *sqlx.Tx, if err != nil { return err } + mhKey, _ := shared.MultihashKeyFromCIDString(stateCIDStr) stateModel := StateNodeModel{ Path: stateNode.Path, StateKey: stateNode.LeafKey.String(), CID: stateCIDStr, + MhKey: mhKey, NodeType: ResolveFromNodeType(stateNode.Type), } stateID, err := pub.indexer.indexStateCID(tx, stateModel, headerID) @@ -199,10 +205,12 @@ func (pub *IPLDPublisherAndIndexer) publishAndIndexStateAndStorage(tx *sqlx.Tx, if err != nil { return err } + mhKey, _ := shared.MultihashKeyFromCIDString(storageCIDStr) storageModel := StorageNodeModel{ Path: storageNode.Path, StorageKey: storageNode.LeafKey.Hex(), CID: storageCIDStr, + MhKey: mhKey, NodeType: ResolveFromNodeType(storageNode.Type), } if err := pub.indexer.indexStorageCID(tx, storageModel, stateID); err != nil { diff --git a/pkg/eth/publisher.go b/pkg/eth/publisher.go index ecc1a6ac..89169270 100644 --- a/pkg/eth/publisher.go +++ b/pkg/eth/publisher.go @@ -79,6 +79,7 @@ func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForI reward := CalcEthBlockReward(ipldPayload.Block.Header(), ipldPayload.Block.Uncles(), ipldPayload.Block.Transactions(), ipldPayload.Receipts) header := HeaderModel{ CID: headerCid, + MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), ParentHash: ipldPayload.Block.ParentHash().String(), BlockNumber: ipldPayload.Block.Number().String(), BlockHash: ipldPayload.Block.Hash().String(), @@ -102,6 +103,7 @@ func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForI uncleReward := CalcUncleMinerReward(ipldPayload.Block.Number().Int64(), uncle.Number.Int64()) uncleCids[i] = UncleModel{ CID: uncleCid, + MhKey: shared.MultihashKeyFromCID(uncle.Cid()), ParentHash: uncle.ParentHash.String(), BlockHash: uncle.Hash().String(), Reward: uncleReward.String(), @@ -162,6 +164,7 @@ func (pub *IPLDPublisher) publishTransactions(transactions []*ipld.EthTx, txTrie } trxCids[i] = TxModel{ CID: cid, + MhKey: shared.MultihashKeyFromCID(tx.Cid()), Index: trxMeta[i].Index, TxHash: trxMeta[i].TxHash, Src: trxMeta[i].Src, @@ -186,6 +189,7 @@ func (pub *IPLDPublisher) publishReceipts(receipts []*ipld.EthReceipt, receiptTr } rctCids[rct.TxHash] = ReceiptModel{ CID: cid, + MhKey: shared.MultihashKeyFromCID(rct.Cid()), Contract: receiptMeta[i].Contract, ContractHash: receiptMeta[i].ContractHash, Topic0s: receiptMeta[i].Topic0s, @@ -220,6 +224,7 @@ func (pub *IPLDPublisher) publishStateNodes(stateNodes []TrieNode) ([]StateNodeM Path: stateNode.Path, StateKey: stateNode.LeafKey.String(), CID: cid, + MhKey: shared.MultihashKeyFromCID(node.Cid()), NodeType: ResolveFromNodeType(stateNode.Type), }) // If we have a leaf, decode the account to extract additional metadata for indexing @@ -266,6 +271,7 @@ func (pub *IPLDPublisher) publishStorageNodes(storageNodes map[string][]TrieNode Path: storageNode.Path, StorageKey: storageNode.LeafKey.Hex(), CID: cid, + MhKey: shared.MultihashKeyFromCID(node.Cid()), NodeType: ResolveFromNodeType(storageNode.Type), }) } diff --git a/pkg/shared/functions.go b/pkg/shared/functions.go index 9116116c..9b364ec6 100644 --- a/pkg/shared/functions.go +++ b/pkg/shared/functions.go @@ -17,8 +17,6 @@ package shared import ( - "bytes" - "github.com/ethereum/go-ethereum/common" "github.com/ipfs/go-cid" "github.com/ipfs/go-ipfs-blockstore" @@ -26,51 +24,19 @@ import ( node "github.com/ipfs/go-ipld-format" "github.com/jmoiron/sqlx" "github.com/sirupsen/logrus" - - "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld" ) -// ListContainsString used to check if a list of strings contains a particular string -func ListContainsString(sss []string, s string) bool { - for _, str := range sss { - if s == str { - return true - } - } - return false -} - -// IPLDsContainBytes used to check if a list of strings contains a particular string -func IPLDsContainBytes(iplds []ipfs.BlockModel, b []byte) bool { - for _, ipld := range iplds { - if bytes.Equal(ipld.Data, b) { - return true - } - } - return false -} - -// ListContainsGap used to check if a list of Gaps contains a particular Gap -func ListContainsGap(gapList []Gap, gap Gap) bool { - for _, listGap := range gapList { - if listGap == gap { - return true - } - } - return false -} - -// HandleNullAddrPointer will return an emtpy string for a nil address pointer -func HandleNullAddrPointer(to *common.Address) string { +// HandleZeroAddrPointer will return an emtpy string for a nil address pointer +func HandleZeroAddrPointer(to *common.Address) string { if to == nil { return "" } return to.Hex() } -// HandleNullAddr will return an empty string for a a null address -func HandleNullAddr(to common.Address) string { +// HandleZeroAddr will return an empty string for a 0 value address +func HandleZeroAddr(to common.Address) string { if to.Hex() == "0x0000000000000000000000000000000000000000" { return "" } @@ -93,7 +59,7 @@ func PublishIPLD(tx *sqlx.Tx, i node.Node) error { return err } -// FetchIPLD is used to retrieve an ipld from Postgres blockstore with the provided tx +// FetchIPLD is used to retrieve an ipld from Postgres blockstore with the provided tx and cid string func FetchIPLD(tx *sqlx.Tx, cid string) ([]byte, error) { mhKey, err := MultihashKeyFromCIDString(cid) if err != nil { @@ -104,6 +70,19 @@ func FetchIPLD(tx *sqlx.Tx, cid string) ([]byte, error) { return block, tx.Get(&block, pgStr, mhKey) } +// FetchIPLDByMhKey is used to retrieve an ipld from Postgres blockstore with the provided tx and mhkey string +func FetchIPLDByMhKey(tx *sqlx.Tx, mhKey string) ([]byte, error) { + pgStr := `SELECT data FROM public.blocks WHERE key = $1` + var block []byte + return block, tx.Get(&block, pgStr, mhKey) +} + +// MultihashKeyFromCID converts a cid into a blockstore-prefixed multihash db key string +func MultihashKeyFromCID(c cid.Cid) string { + dbKey := dshelp.MultihashToDsKey(c.Hash()) + return blockstore.BlockPrefix.String() + dbKey.String() +} + // MultihashKeyFromCIDString converts a cid string into a blockstore-prefixed multihash db key string func MultihashKeyFromCIDString(c string) (string, error) { dc, err := cid.Decode(c) diff --git a/pkg/shared/test_helpers.go b/pkg/shared/test_helpers.go index 8767e3b9..09291d2f 100644 --- a/pkg/shared/test_helpers.go +++ b/pkg/shared/test_helpers.go @@ -17,7 +17,13 @@ package shared import ( + "bytes" + + "github.com/ipfs/go-cid" + "github.com/multiformats/go-multihash" + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/config" + "github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/node" "github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres" ) @@ -30,3 +36,51 @@ func SetupDB() (*postgres.DB, error) { Port: 5432, }, node.Node{}) } + +// ListContainsString used to check if a list of strings contains a particular string +func ListContainsString(sss []string, s string) bool { + for _, str := range sss { + if s == str { + return true + } + } + return false +} + +// IPLDsContainBytes used to check if a list of strings contains a particular string +func IPLDsContainBytes(iplds []ipfs.BlockModel, b []byte) bool { + for _, ipld := range iplds { + if bytes.Equal(ipld.Data, b) { + return true + } + } + return false +} + +// ListContainsGap used to check if a list of Gaps contains a particular Gap +func ListContainsGap(gapList []Gap, gap Gap) bool { + for _, listGap := range gapList { + if listGap == gap { + return true + } + } + return false +} + +// TestCID creates a basic CID for testing purposes +func TestCID(b []byte) cid.Cid { + pref := cid.Prefix{ + Version: 1, + Codec: cid.Raw, + MhType: multihash.KECCAK_256, + MhLength: -1, + } + c, _ := pref.Sum(b) + return c +} + +// PublishMockIPLD writes a mhkey-data pair to the public.blocks table so that test data can FK reference the mhkey +func PublishMockIPLD(db *postgres.DB, mhKey string, mockData []byte) error { + _, err := db.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`, mhKey, mockData) + return err +}