diff --git a/db/migrations/00032_create_header_cids_table.sql b/db/migrations/00032_create_header_cids_table.sql index 8d63f299..f66c4b45 100644 --- a/db/migrations/00032_create_header_cids_table.sql +++ b/db/migrations/00032_create_header_cids_table.sql @@ -5,6 +5,7 @@ CREATE TABLE public.header_cids ( block_hash VARCHAR(66) NOT NULL, cid TEXT NOT NULL, uncle BOOLEAN NOT NULL, + td BIGINT NOT NULL, UNIQUE (block_number, block_hash) ); diff --git a/db/schema.sql b/db/schema.sql index 8db21b45..abeb214b 100644 --- a/db/schema.sql +++ b/db/schema.sql @@ -3,7 +3,7 @@ -- -- Dumped from database version 10.10 --- Dumped by pg_dump version 11.5 +-- Dumped by pg_dump version 12.1 SET statement_timeout = 0; SET lock_timeout = 0; @@ -18,8 +18,6 @@ SET row_security = off; SET default_tablespace = ''; -SET default_with_oids = false; - -- -- Name: addresses; Type: TABLE; Schema: public; Owner: - -- @@ -316,7 +314,8 @@ CREATE TABLE public.header_cids ( block_number bigint NOT NULL, block_hash character varying(66) NOT NULL, cid text NOT NULL, - uncle boolean NOT NULL + uncle boolean NOT NULL, + td bigint NOT NULL ); diff --git a/pkg/datastore/postgres/repositories/checked_headers_repository_test.go b/pkg/datastore/postgres/repositories/checked_headers_repository_test.go index cef5978c..bd5789be 100644 --- a/pkg/datastore/postgres/repositories/checked_headers_repository_test.go +++ b/pkg/datastore/postgres/repositories/checked_headers_repository_test.go @@ -17,6 +17,8 @@ package repositories_test import ( + "math/rand" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/vulcanize/vulcanizedb/pkg/core" @@ -25,7 +27,6 @@ import ( "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories" "github.com/vulcanize/vulcanizedb/pkg/fakes" "github.com/vulcanize/vulcanizedb/test_config" - "math/rand" ) var _ = Describe("Checked Headers repository", func() { diff --git a/pkg/datastore/postgres/repositories/full_sync_log_repository.go b/pkg/datastore/postgres/repositories/full_sync_log_repository.go index 9eff2035..e907043a 100644 --- a/pkg/datastore/postgres/repositories/full_sync_log_repository.go +++ b/pkg/datastore/postgres/repositories/full_sync_log_repository.go @@ -18,6 +18,7 @@ package repositories import ( "database/sql" + "github.com/sirupsen/logrus" "github.com/vulcanize/vulcanizedb/pkg/core" "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" diff --git a/pkg/fakes/data.go b/pkg/fakes/data.go index 52266f1a..ca143a2f 100644 --- a/pkg/fakes/data.go +++ b/pkg/fakes/data.go @@ -20,12 +20,14 @@ import ( "bytes" "encoding/json" "errors" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/vulcanize/vulcanizedb/pkg/core" "math/rand" "strconv" "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + + "github.com/vulcanize/vulcanizedb/pkg/core" ) var ( diff --git a/pkg/fakes/mock_rpc_client.go b/pkg/fakes/mock_rpc_client.go index 33e453b8..d384b0b3 100644 --- a/pkg/fakes/mock_rpc_client.go +++ b/pkg/fakes/mock_rpc_client.go @@ -19,12 +19,12 @@ package fakes import ( "context" "errors" - "github.com/ethereum/go-ethereum/statediff" "math/big" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/statediff" . "github.com/onsi/gomega" "github.com/vulcanize/vulcanizedb/pkg/core" diff --git a/pkg/ipfs/converter.go b/pkg/ipfs/converter.go index 93afb667..e47f8d70 100644 --- a/pkg/ipfs/converter.go +++ b/pkg/ipfs/converter.go @@ -45,17 +45,18 @@ func NewPayloadConverter(chainConfig *params.ChainConfig) *Converter { func (pc *Converter) Convert(payload statediff.Payload) (*IPLDPayload, error) { // Unpack block rlp to access fields block := new(types.Block) - decodeErr := rlp.DecodeBytes(payload.BlockRlp, block) - if decodeErr != nil { - return nil, decodeErr + err := rlp.DecodeBytes(payload.BlockRlp, block) + if err != nil { + return nil, err } header := block.Header() - headerRlp, encodeErr := rlp.EncodeToBytes(header) - if encodeErr != nil { - return nil, encodeErr + headerRlp, err := rlp.EncodeToBytes(header) + if err != nil { + return nil, err } trxLen := len(block.Transactions()) convertedPayload := &IPLDPayload{ + TotalDifficulty: payload.TotalDifficulty, BlockHash: block.Hash(), BlockNumber: block.Number(), HeaderRLP: headerRlp, @@ -70,9 +71,9 @@ func (pc *Converter) Convert(payload statediff.Payload) (*IPLDPayload, error) { transactions := block.Transactions() for _, trx := range transactions { // Extract to and from data from the the transactions for indexing - from, senderErr := types.Sender(signer, trx) - if senderErr != nil { - return nil, senderErr + from, err := types.Sender(signer, trx) + if err != nil { + return nil, err } txMeta := &TrxMetaData{ Dst: handleNullAddr(trx.To()), @@ -84,14 +85,12 @@ func (pc *Converter) Convert(payload statediff.Payload) (*IPLDPayload, error) { // Decode receipts for this block receipts := make(types.Receipts, 0) - decodeErr = rlp.DecodeBytes(payload.ReceiptsRlp, &receipts) - if decodeErr != nil { - return nil, decodeErr + if err := rlp.DecodeBytes(payload.ReceiptsRlp, &receipts); err != nil { + return nil, err } // Derive any missing fields - deriveErr := receipts.DeriveFields(pc.chainConfig, block.Hash(), block.NumberU64(), block.Transactions()) - if deriveErr != nil { - return nil, deriveErr + if err := receipts.DeriveFields(pc.chainConfig, block.Hash(), block.NumberU64(), block.Transactions()); err != nil { + return nil, err } for i, receipt := range receipts { // If the transaction for this receipt has a "to" address, the above DeriveFields() fails to assign it to the receipt's ContractAddress @@ -118,9 +117,8 @@ func (pc *Converter) Convert(payload statediff.Payload) (*IPLDPayload, error) { // Unpack state diff rlp to access fields stateDiff := new(statediff.StateDiff) - decodeErr = rlp.DecodeBytes(payload.StateDiffRlp, stateDiff) - if decodeErr != nil { - return nil, decodeErr + if err = rlp.DecodeBytes(payload.StateDiffRlp, stateDiff); err != nil { + return nil, err } for _, createdAccount := range stateDiff.CreatedAccounts { hashKey := common.BytesToHash(createdAccount.Key) diff --git a/pkg/ipfs/converter_test.go b/pkg/ipfs/converter_test.go index 5fd35009..fc346b40 100644 --- a/pkg/ipfs/converter_test.go +++ b/pkg/ipfs/converter_test.go @@ -36,6 +36,7 @@ var _ = Describe("Converter", func() { Expect(converterPayload.BlockHash).To(Equal(mocks.MockBlock.Hash())) Expect(converterPayload.StateNodes).To(Equal(mocks.MockStateNodes)) Expect(converterPayload.StorageNodes).To(Equal(mocks.MockStorageNodes)) + Expect(converterPayload.TotalDifficulty.Int64()).To(Equal(mocks.MockStateDiffPayload.TotalDifficulty.Int64())) gotBody, err := rlp.EncodeToBytes(converterPayload.BlockBody) Expect(err).ToNot(HaveOccurred()) expectedBody, err := rlp.EncodeToBytes(mocks.MockBlock.Body()) diff --git a/pkg/ipfs/mocks/test_data.go b/pkg/ipfs/mocks/test_data.go index 1f696938..9c42df8f 100644 --- a/pkg/ipfs/mocks/test_data.go +++ b/pkg/ipfs/mocks/test_data.go @@ -161,17 +161,19 @@ var ( // aggregate payloads MockStateDiffPayload = statediff.Payload{ - BlockRlp: MockBlockRlp, - StateDiffRlp: MockStateDiffBytes, - ReceiptsRlp: ReceiptsRlp, + BlockRlp: MockBlockRlp, + StateDiffRlp: MockStateDiffBytes, + ReceiptsRlp: ReceiptsRlp, + TotalDifficulty: big.NewInt(1337), } MockIPLDPayload = &ipfs.IPLDPayload{ - BlockNumber: big.NewInt(1), - BlockHash: MockBlock.Hash(), - Receipts: MockReceipts, - HeaderRLP: MockHeaderRlp, - BlockBody: MockBlock.Body(), + TotalDifficulty: big.NewInt(1337), + BlockNumber: big.NewInt(1), + BlockHash: MockBlock.Hash(), + Receipts: MockReceipts, + HeaderRLP: MockHeaderRlp, + BlockBody: MockBlock.Body(), TrxMetaData: []*ipfs.TrxMetaData{ { CID: "", @@ -205,10 +207,11 @@ var ( } MockCIDPayload = &ipfs.CIDPayload{ - BlockNumber: "1", - BlockHash: MockBlock.Hash(), - HeaderCID: "mockHeaderCID", - UncleCIDs: make(map[common.Hash]string), + TotalDifficulty: "1337", + BlockNumber: "1", + BlockHash: MockBlock.Hash(), + HeaderCID: "mockHeaderCID", + UncleCIDs: make(map[common.Hash]string), TransactionCIDs: map[common.Hash]*ipfs.TrxMetaData{ MockTransactions[0].Hash(): { CID: "mockTrxCID1", diff --git a/pkg/ipfs/publisher.go b/pkg/ipfs/publisher.go index b63e8621..f44c09ca 100644 --- a/pkg/ipfs/publisher.go +++ b/pkg/ipfs/publisher.go @@ -63,51 +63,52 @@ func NewIPLDPublisher(ipfsPath string) (*Publisher, error) { // Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload func (pub *Publisher) Publish(payload *IPLDPayload) (*CIDPayload, error) { // Process and publish headers - headerCid, headersErr := pub.publishHeaders(payload.HeaderRLP) - if headersErr != nil { - return nil, headersErr + headerCid, err := pub.publishHeaders(payload.HeaderRLP) + if err != nil { + return nil, err } // Process and publish uncles uncleCids := make(map[common.Hash]string) for _, uncle := range payload.BlockBody.Uncles { - uncleRlp, encodeErr := rlp.EncodeToBytes(uncle) - if encodeErr != nil { - return nil, encodeErr + uncleRlp, err := rlp.EncodeToBytes(uncle) + if err != nil { + return nil, err } - cid, unclesErr := pub.publishHeaders(uncleRlp) - if unclesErr != nil { - return nil, unclesErr + cid, err := pub.publishHeaders(uncleRlp) + if err != nil { + return nil, err } uncleCids[uncle.Hash()] = cid } // Process and publish transactions - transactionCids, trxsErr := pub.publishTransactions(payload.BlockBody, payload.TrxMetaData) - if trxsErr != nil { - return nil, trxsErr + transactionCids, err := pub.publishTransactions(payload.BlockBody, payload.TrxMetaData) + if err != nil { + return nil, err } // Process and publish receipts - receiptsCids, rctsErr := pub.publishReceipts(payload.Receipts, payload.ReceiptMetaData) - if rctsErr != nil { - return nil, rctsErr + receiptsCids, err := pub.publishReceipts(payload.Receipts, payload.ReceiptMetaData) + if err != nil { + return nil, err } // Process and publish state leafs - stateNodeCids, stateErr := pub.publishStateNodes(payload.StateNodes) - if stateErr != nil { - return nil, stateErr + stateNodeCids, err := pub.publishStateNodes(payload.StateNodes) + if err != nil { + return nil, err } // Process and publish storage leafs - storageNodeCids, storageErr := pub.publishStorageNodes(payload.StorageNodes) - if storageErr != nil { - return nil, storageErr + storageNodeCids, err := pub.publishStorageNodes(payload.StorageNodes) + if err != nil { + return nil, err } // Package CIDs and their metadata into a single struct return &CIDPayload{ + TotalDifficulty: payload.TotalDifficulty.String(), BlockHash: payload.BlockHash, BlockNumber: payload.BlockNumber.String(), HeaderCID: headerCid, diff --git a/pkg/ipfs/publisher_test.go b/pkg/ipfs/publisher_test.go index 87f5164b..6e34c13c 100644 --- a/pkg/ipfs/publisher_test.go +++ b/pkg/ipfs/publisher_test.go @@ -63,6 +63,7 @@ var _ = Describe("Publisher", func() { } cidPayload, err := publisher.Publish(mocks.MockIPLDPayload) Expect(err).ToNot(HaveOccurred()) + Expect(cidPayload.TotalDifficulty).To(Equal(mocks.MockIPLDPayload.TotalDifficulty.String())) Expect(cidPayload.BlockNumber).To(Equal(mocks.MockCIDPayload.BlockNumber)) Expect(cidPayload.BlockHash).To(Equal(mocks.MockCIDPayload.BlockHash)) Expect(cidPayload.UncleCIDs).To(Equal(mocks.MockCIDPayload.UncleCIDs)) diff --git a/pkg/ipfs/types.go b/pkg/ipfs/types.go index 5854e78c..f3e88b3f 100644 --- a/pkg/ipfs/types.go +++ b/pkg/ipfs/types.go @@ -49,6 +49,7 @@ type IPLDWrapper struct { // IPLDPayload is a custom type which packages raw ETH data for the IPFS publisher type IPLDPayload struct { HeaderRLP []byte + TotalDifficulty *big.Int BlockNumber *big.Int BlockHash common.Hash BlockBody *types.Body @@ -76,6 +77,7 @@ type StorageNode struct { type CIDPayload struct { BlockNumber string BlockHash common.Hash + TotalDifficulty string HeaderCID string UncleCIDs map[common.Hash]string TransactionCIDs map[common.Hash]*TrxMetaData diff --git a/pkg/super_node/repository.go b/pkg/super_node/repository.go index 5a824b37..b65fa3c9 100644 --- a/pkg/super_node/repository.go +++ b/pkg/super_node/repository.go @@ -44,78 +44,71 @@ func NewCIDRepository(db *postgres.DB) *Repository { // Index indexes a cidPayload in Postgres func (repo *Repository) Index(cidPayload *ipfs.CIDPayload) error { - tx, beginErr := repo.db.Beginx() - if beginErr != nil { - return beginErr + tx, err := repo.db.Beginx() + if err != nil { + return err } - headerID, headerErr := repo.indexHeaderCID(tx, cidPayload.HeaderCID, cidPayload.BlockNumber, cidPayload.BlockHash.Hex()) - if headerErr != nil { - rollbackErr := tx.Rollback() - if rollbackErr != nil { - log.Error(rollbackErr) + headerID, err := repo.indexHeaderCID(tx, cidPayload.HeaderCID, cidPayload.BlockNumber, cidPayload.BlockHash.Hex(), cidPayload.TotalDifficulty) + if err != nil { + if err := tx.Rollback(); err != nil { + log.Error(err) } - return headerErr + return err } for uncleHash, cid := range cidPayload.UncleCIDs { - uncleErr := repo.indexUncleCID(tx, cid, cidPayload.BlockNumber, uncleHash.Hex()) - if uncleErr != nil { - rollbackErr := tx.Rollback() - if rollbackErr != nil { - log.Error(rollbackErr) + err := repo.indexUncleCID(tx, cid, cidPayload.BlockNumber, uncleHash.Hex(), cidPayload.TotalDifficulty) + if err != nil { + if err := tx.Rollback(); err != nil { + log.Error(err) } - return uncleErr + return err } } - trxAndRctErr := repo.indexTransactionAndReceiptCIDs(tx, cidPayload, headerID) - if trxAndRctErr != nil { - rollbackErr := tx.Rollback() - if rollbackErr != nil { - log.Error(rollbackErr) + if err := repo.indexTransactionAndReceiptCIDs(tx, cidPayload, headerID); err != nil { + if err := tx.Rollback(); err != nil { + log.Error(err) } - return trxAndRctErr + return err } - stateAndStorageErr := repo.indexStateAndStorageCIDs(tx, cidPayload, headerID) - if stateAndStorageErr != nil { - rollbackErr := tx.Rollback() - if rollbackErr != nil { - log.Error(rollbackErr) + if err := repo.indexStateAndStorageCIDs(tx, cidPayload, headerID); err != nil { + if err := tx.Rollback(); err != nil { + log.Error(err) } - return stateAndStorageErr + return err } return tx.Commit() } -func (repo *Repository) indexHeaderCID(tx *sqlx.Tx, cid, blockNumber, hash string) (int64, error) { +func (repo *Repository) indexHeaderCID(tx *sqlx.Tx, cid, blockNumber, hash, td string) (int64, error) { var headerID int64 - err := tx.QueryRowx(`INSERT INTO public.header_cids (block_number, block_hash, cid, uncle) VALUES ($1, $2, $3, $4) - ON CONFLICT (block_number, block_hash) DO UPDATE SET (cid, uncle) = ($3, $4) + err := tx.QueryRowx(`INSERT INTO public.header_cids (block_number, block_hash, cid, uncle, td) VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (block_number, block_hash) DO UPDATE SET (cid, uncle, td) = ($3, $4, $5) RETURNING id`, - blockNumber, hash, cid, false).Scan(&headerID) + blockNumber, hash, cid, false, td).Scan(&headerID) return headerID, err } -func (repo *Repository) indexUncleCID(tx *sqlx.Tx, cid, blockNumber, hash string) error { - _, err := tx.Exec(`INSERT INTO public.header_cids (block_number, block_hash, cid, uncle) VALUES ($1, $2, $3, $4) - ON CONFLICT (block_number, block_hash) DO UPDATE SET (cid, uncle) = ($3, $4)`, - blockNumber, hash, cid, true) +func (repo *Repository) indexUncleCID(tx *sqlx.Tx, cid, blockNumber, hash, td string) error { + _, err := tx.Exec(`INSERT INTO public.header_cids (block_number, block_hash, cid, uncle, td) VALUES ($1, $2, $3, $4, $5) + ON CONFLICT (block_number, block_hash) DO UPDATE SET (cid, uncle, td) = ($3, $4, $5)`, + blockNumber, hash, cid, true, td) return err } func (repo *Repository) indexTransactionAndReceiptCIDs(tx *sqlx.Tx, payload *ipfs.CIDPayload, headerID int64) error { for hash, trxCidMeta := range payload.TransactionCIDs { var txID int64 - queryErr := tx.QueryRowx(`INSERT INTO public.transaction_cids (header_id, tx_hash, cid, dst, src) VALUES ($1, $2, $3, $4, $5) + err := tx.QueryRowx(`INSERT INTO public.transaction_cids (header_id, tx_hash, cid, dst, src) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src) = ($3, $4, $5) RETURNING id`, headerID, hash.Hex(), trxCidMeta.CID, trxCidMeta.Dst, trxCidMeta.Src).Scan(&txID) - if queryErr != nil { - return queryErr + if err != nil { + return err } receiptCidMeta, ok := payload.ReceiptCIDs[hash] if ok { - rctErr := repo.indexReceiptCID(tx, receiptCidMeta, txID) - if rctErr != nil { - return rctErr + if err := repo.indexReceiptCID(tx, receiptCidMeta, txID); err != nil { + return err } } } @@ -131,17 +124,16 @@ func (repo *Repository) indexReceiptCID(tx *sqlx.Tx, cidMeta *ipfs.ReceiptMetaDa func (repo *Repository) indexStateAndStorageCIDs(tx *sqlx.Tx, payload *ipfs.CIDPayload, headerID int64) error { for accountKey, stateCID := range payload.StateNodeCIDs { var stateID int64 - queryErr := tx.QueryRowx(`INSERT INTO public.state_cids (header_id, state_key, cid, leaf) VALUES ($1, $2, $3, $4) + err := tx.QueryRowx(`INSERT INTO public.state_cids (header_id, state_key, cid, leaf) VALUES ($1, $2, $3, $4) ON CONFLICT (header_id, state_key) DO UPDATE SET (cid, leaf) = ($3, $4) RETURNING id`, headerID, accountKey.Hex(), stateCID.CID, stateCID.Leaf).Scan(&stateID) - if queryErr != nil { - return queryErr + if err != nil { + return err } for _, storageCID := range payload.StorageNodeCIDs[accountKey] { - storageErr := repo.indexStorageCID(tx, storageCID, stateID) - if storageErr != nil { - return storageErr + if err := repo.indexStorageCID(tx, storageCID, stateID); err != nil { + return err } } } diff --git a/pkg/super_node/repository_test.go b/pkg/super_node/repository_test.go index 77cc5ed1..a21ef665 100644 --- a/pkg/super_node/repository_test.go +++ b/pkg/super_node/repository_test.go @@ -19,7 +19,6 @@ package super_node_test import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/vulcanize/vulcanizedb/pkg/datastore/postgres" "github.com/vulcanize/vulcanizedb/pkg/ipfs" "github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks" @@ -45,14 +44,18 @@ var _ = Describe("Repository", func() { It("Indexes CIDs and related metadata into vulcanizedb", func() { err = repo.Index(mocks.MockCIDPayload) Expect(err).ToNot(HaveOccurred()) - pgStr := `SELECT cid FROM header_cids + pgStr := `SELECT cid, td FROM header_cids WHERE block_number = $1 AND uncle IS FALSE` // check header was properly indexed - headers := make([]string, 0) - err = db.Select(&headers, pgStr, 1) + type res struct { + CID string + TD string + } + headers := new(res) + err = db.QueryRowx(pgStr, 1).StructScan(headers) Expect(err).ToNot(HaveOccurred()) - Expect(len(headers)).To(Equal(1)) - Expect(headers[0]).To(Equal("mockHeaderCID")) + Expect(headers.CID).To(Equal("mockHeaderCID")) + Expect(headers.TD).To(Equal("1337")) // check trxs were properly indexed trxs := make([]string, 0) pgStr = `SELECT transaction_cids.cid FROM transaction_cids INNER JOIN header_cids ON (transaction_cids.header_id = header_cids.id)