support for total difficulty (needed to support some eth endpoints)

This commit is contained in:
Ian Norden 2020-01-16 17:21:30 -06:00
parent 358575335b
commit 0785507a7d
14 changed files with 118 additions and 113 deletions

View File

@ -5,6 +5,7 @@ CREATE TABLE public.header_cids (
block_hash VARCHAR(66) NOT NULL,
cid TEXT NOT NULL,
uncle BOOLEAN NOT NULL,
td BIGINT NOT NULL,
UNIQUE (block_number, block_hash)
);

View File

@ -3,7 +3,7 @@
--
-- Dumped from database version 10.10
-- Dumped by pg_dump version 11.5
-- Dumped by pg_dump version 12.1
SET statement_timeout = 0;
SET lock_timeout = 0;
@ -18,8 +18,6 @@ SET row_security = off;
SET default_tablespace = '';
SET default_with_oids = false;
--
-- Name: addresses; Type: TABLE; Schema: public; Owner: -
--
@ -316,7 +314,8 @@ CREATE TABLE public.header_cids (
block_number bigint NOT NULL,
block_hash character varying(66) NOT NULL,
cid text NOT NULL,
uncle boolean NOT NULL
uncle boolean NOT NULL,
td bigint NOT NULL
);

View File

@ -17,6 +17,8 @@
package repositories_test
import (
"math/rand"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/core"
@ -25,7 +27,6 @@ import (
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres/repositories"
"github.com/vulcanize/vulcanizedb/pkg/fakes"
"github.com/vulcanize/vulcanizedb/test_config"
"math/rand"
)
var _ = Describe("Checked Headers repository", func() {

View File

@ -18,6 +18,7 @@ package repositories
import (
"database/sql"
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/core"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"

View File

@ -20,12 +20,14 @@ import (
"bytes"
"encoding/json"
"errors"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/vulcanize/vulcanizedb/pkg/core"
"math/rand"
"strconv"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/vulcanize/vulcanizedb/pkg/core"
)
var (

View File

@ -19,12 +19,12 @@ package fakes
import (
"context"
"errors"
"github.com/ethereum/go-ethereum/statediff"
"math/big"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/core"

View File

@ -45,17 +45,18 @@ func NewPayloadConverter(chainConfig *params.ChainConfig) *Converter {
func (pc *Converter) Convert(payload statediff.Payload) (*IPLDPayload, error) {
// Unpack block rlp to access fields
block := new(types.Block)
decodeErr := rlp.DecodeBytes(payload.BlockRlp, block)
if decodeErr != nil {
return nil, decodeErr
err := rlp.DecodeBytes(payload.BlockRlp, block)
if err != nil {
return nil, err
}
header := block.Header()
headerRlp, encodeErr := rlp.EncodeToBytes(header)
if encodeErr != nil {
return nil, encodeErr
headerRlp, err := rlp.EncodeToBytes(header)
if err != nil {
return nil, err
}
trxLen := len(block.Transactions())
convertedPayload := &IPLDPayload{
TotalDifficulty: payload.TotalDifficulty,
BlockHash: block.Hash(),
BlockNumber: block.Number(),
HeaderRLP: headerRlp,
@ -70,9 +71,9 @@ func (pc *Converter) Convert(payload statediff.Payload) (*IPLDPayload, error) {
transactions := block.Transactions()
for _, trx := range transactions {
// Extract to and from data from the the transactions for indexing
from, senderErr := types.Sender(signer, trx)
if senderErr != nil {
return nil, senderErr
from, err := types.Sender(signer, trx)
if err != nil {
return nil, err
}
txMeta := &TrxMetaData{
Dst: handleNullAddr(trx.To()),
@ -84,14 +85,12 @@ func (pc *Converter) Convert(payload statediff.Payload) (*IPLDPayload, error) {
// Decode receipts for this block
receipts := make(types.Receipts, 0)
decodeErr = rlp.DecodeBytes(payload.ReceiptsRlp, &receipts)
if decodeErr != nil {
return nil, decodeErr
if err := rlp.DecodeBytes(payload.ReceiptsRlp, &receipts); err != nil {
return nil, err
}
// Derive any missing fields
deriveErr := receipts.DeriveFields(pc.chainConfig, block.Hash(), block.NumberU64(), block.Transactions())
if deriveErr != nil {
return nil, deriveErr
if err := receipts.DeriveFields(pc.chainConfig, block.Hash(), block.NumberU64(), block.Transactions()); err != nil {
return nil, err
}
for i, receipt := range receipts {
// If the transaction for this receipt has a "to" address, the above DeriveFields() fails to assign it to the receipt's ContractAddress
@ -118,9 +117,8 @@ func (pc *Converter) Convert(payload statediff.Payload) (*IPLDPayload, error) {
// Unpack state diff rlp to access fields
stateDiff := new(statediff.StateDiff)
decodeErr = rlp.DecodeBytes(payload.StateDiffRlp, stateDiff)
if decodeErr != nil {
return nil, decodeErr
if err = rlp.DecodeBytes(payload.StateDiffRlp, stateDiff); err != nil {
return nil, err
}
for _, createdAccount := range stateDiff.CreatedAccounts {
hashKey := common.BytesToHash(createdAccount.Key)

View File

@ -36,6 +36,7 @@ var _ = Describe("Converter", func() {
Expect(converterPayload.BlockHash).To(Equal(mocks.MockBlock.Hash()))
Expect(converterPayload.StateNodes).To(Equal(mocks.MockStateNodes))
Expect(converterPayload.StorageNodes).To(Equal(mocks.MockStorageNodes))
Expect(converterPayload.TotalDifficulty.Int64()).To(Equal(mocks.MockStateDiffPayload.TotalDifficulty.Int64()))
gotBody, err := rlp.EncodeToBytes(converterPayload.BlockBody)
Expect(err).ToNot(HaveOccurred())
expectedBody, err := rlp.EncodeToBytes(mocks.MockBlock.Body())

View File

@ -164,9 +164,11 @@ var (
BlockRlp: MockBlockRlp,
StateDiffRlp: MockStateDiffBytes,
ReceiptsRlp: ReceiptsRlp,
TotalDifficulty: big.NewInt(1337),
}
MockIPLDPayload = &ipfs.IPLDPayload{
TotalDifficulty: big.NewInt(1337),
BlockNumber: big.NewInt(1),
BlockHash: MockBlock.Hash(),
Receipts: MockReceipts,
@ -205,6 +207,7 @@ var (
}
MockCIDPayload = &ipfs.CIDPayload{
TotalDifficulty: "1337",
BlockNumber: "1",
BlockHash: MockBlock.Hash(),
HeaderCID: "mockHeaderCID",

View File

@ -63,51 +63,52 @@ func NewIPLDPublisher(ipfsPath string) (*Publisher, error) {
// Publish publishes an IPLDPayload to IPFS and returns the corresponding CIDPayload
func (pub *Publisher) Publish(payload *IPLDPayload) (*CIDPayload, error) {
// Process and publish headers
headerCid, headersErr := pub.publishHeaders(payload.HeaderRLP)
if headersErr != nil {
return nil, headersErr
headerCid, err := pub.publishHeaders(payload.HeaderRLP)
if err != nil {
return nil, err
}
// Process and publish uncles
uncleCids := make(map[common.Hash]string)
for _, uncle := range payload.BlockBody.Uncles {
uncleRlp, encodeErr := rlp.EncodeToBytes(uncle)
if encodeErr != nil {
return nil, encodeErr
uncleRlp, err := rlp.EncodeToBytes(uncle)
if err != nil {
return nil, err
}
cid, unclesErr := pub.publishHeaders(uncleRlp)
if unclesErr != nil {
return nil, unclesErr
cid, err := pub.publishHeaders(uncleRlp)
if err != nil {
return nil, err
}
uncleCids[uncle.Hash()] = cid
}
// Process and publish transactions
transactionCids, trxsErr := pub.publishTransactions(payload.BlockBody, payload.TrxMetaData)
if trxsErr != nil {
return nil, trxsErr
transactionCids, err := pub.publishTransactions(payload.BlockBody, payload.TrxMetaData)
if err != nil {
return nil, err
}
// Process and publish receipts
receiptsCids, rctsErr := pub.publishReceipts(payload.Receipts, payload.ReceiptMetaData)
if rctsErr != nil {
return nil, rctsErr
receiptsCids, err := pub.publishReceipts(payload.Receipts, payload.ReceiptMetaData)
if err != nil {
return nil, err
}
// Process and publish state leafs
stateNodeCids, stateErr := pub.publishStateNodes(payload.StateNodes)
if stateErr != nil {
return nil, stateErr
stateNodeCids, err := pub.publishStateNodes(payload.StateNodes)
if err != nil {
return nil, err
}
// Process and publish storage leafs
storageNodeCids, storageErr := pub.publishStorageNodes(payload.StorageNodes)
if storageErr != nil {
return nil, storageErr
storageNodeCids, err := pub.publishStorageNodes(payload.StorageNodes)
if err != nil {
return nil, err
}
// Package CIDs and their metadata into a single struct
return &CIDPayload{
TotalDifficulty: payload.TotalDifficulty.String(),
BlockHash: payload.BlockHash,
BlockNumber: payload.BlockNumber.String(),
HeaderCID: headerCid,

View File

@ -63,6 +63,7 @@ var _ = Describe("Publisher", func() {
}
cidPayload, err := publisher.Publish(mocks.MockIPLDPayload)
Expect(err).ToNot(HaveOccurred())
Expect(cidPayload.TotalDifficulty).To(Equal(mocks.MockIPLDPayload.TotalDifficulty.String()))
Expect(cidPayload.BlockNumber).To(Equal(mocks.MockCIDPayload.BlockNumber))
Expect(cidPayload.BlockHash).To(Equal(mocks.MockCIDPayload.BlockHash))
Expect(cidPayload.UncleCIDs).To(Equal(mocks.MockCIDPayload.UncleCIDs))

View File

@ -49,6 +49,7 @@ type IPLDWrapper struct {
// IPLDPayload is a custom type which packages raw ETH data for the IPFS publisher
type IPLDPayload struct {
HeaderRLP []byte
TotalDifficulty *big.Int
BlockNumber *big.Int
BlockHash common.Hash
BlockBody *types.Body
@ -76,6 +77,7 @@ type StorageNode struct {
type CIDPayload struct {
BlockNumber string
BlockHash common.Hash
TotalDifficulty string
HeaderCID string
UncleCIDs map[common.Hash]string
TransactionCIDs map[common.Hash]*TrxMetaData

View File

@ -44,78 +44,71 @@ func NewCIDRepository(db *postgres.DB) *Repository {
// Index indexes a cidPayload in Postgres
func (repo *Repository) Index(cidPayload *ipfs.CIDPayload) error {
tx, beginErr := repo.db.Beginx()
if beginErr != nil {
return beginErr
tx, err := repo.db.Beginx()
if err != nil {
return err
}
headerID, headerErr := repo.indexHeaderCID(tx, cidPayload.HeaderCID, cidPayload.BlockNumber, cidPayload.BlockHash.Hex())
if headerErr != nil {
rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error(rollbackErr)
headerID, err := repo.indexHeaderCID(tx, cidPayload.HeaderCID, cidPayload.BlockNumber, cidPayload.BlockHash.Hex(), cidPayload.TotalDifficulty)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
return headerErr
return err
}
for uncleHash, cid := range cidPayload.UncleCIDs {
uncleErr := repo.indexUncleCID(tx, cid, cidPayload.BlockNumber, uncleHash.Hex())
if uncleErr != nil {
rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error(rollbackErr)
err := repo.indexUncleCID(tx, cid, cidPayload.BlockNumber, uncleHash.Hex(), cidPayload.TotalDifficulty)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
return uncleErr
return err
}
}
trxAndRctErr := repo.indexTransactionAndReceiptCIDs(tx, cidPayload, headerID)
if trxAndRctErr != nil {
rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error(rollbackErr)
if err := repo.indexTransactionAndReceiptCIDs(tx, cidPayload, headerID); err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
return trxAndRctErr
return err
}
stateAndStorageErr := repo.indexStateAndStorageCIDs(tx, cidPayload, headerID)
if stateAndStorageErr != nil {
rollbackErr := tx.Rollback()
if rollbackErr != nil {
log.Error(rollbackErr)
if err := repo.indexStateAndStorageCIDs(tx, cidPayload, headerID); err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
return stateAndStorageErr
return err
}
return tx.Commit()
}
func (repo *Repository) indexHeaderCID(tx *sqlx.Tx, cid, blockNumber, hash string) (int64, error) {
func (repo *Repository) indexHeaderCID(tx *sqlx.Tx, cid, blockNumber, hash, td string) (int64, error) {
var headerID int64
err := tx.QueryRowx(`INSERT INTO public.header_cids (block_number, block_hash, cid, uncle) VALUES ($1, $2, $3, $4)
ON CONFLICT (block_number, block_hash) DO UPDATE SET (cid, uncle) = ($3, $4)
err := tx.QueryRowx(`INSERT INTO public.header_cids (block_number, block_hash, cid, uncle, td) VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (block_number, block_hash) DO UPDATE SET (cid, uncle, td) = ($3, $4, $5)
RETURNING id`,
blockNumber, hash, cid, false).Scan(&headerID)
blockNumber, hash, cid, false, td).Scan(&headerID)
return headerID, err
}
func (repo *Repository) indexUncleCID(tx *sqlx.Tx, cid, blockNumber, hash string) error {
_, err := tx.Exec(`INSERT INTO public.header_cids (block_number, block_hash, cid, uncle) VALUES ($1, $2, $3, $4)
ON CONFLICT (block_number, block_hash) DO UPDATE SET (cid, uncle) = ($3, $4)`,
blockNumber, hash, cid, true)
func (repo *Repository) indexUncleCID(tx *sqlx.Tx, cid, blockNumber, hash, td string) error {
_, err := tx.Exec(`INSERT INTO public.header_cids (block_number, block_hash, cid, uncle, td) VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (block_number, block_hash) DO UPDATE SET (cid, uncle, td) = ($3, $4, $5)`,
blockNumber, hash, cid, true, td)
return err
}
func (repo *Repository) indexTransactionAndReceiptCIDs(tx *sqlx.Tx, payload *ipfs.CIDPayload, headerID int64) error {
for hash, trxCidMeta := range payload.TransactionCIDs {
var txID int64
queryErr := tx.QueryRowx(`INSERT INTO public.transaction_cids (header_id, tx_hash, cid, dst, src) VALUES ($1, $2, $3, $4, $5)
err := tx.QueryRowx(`INSERT INTO public.transaction_cids (header_id, tx_hash, cid, dst, src) VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src) = ($3, $4, $5)
RETURNING id`,
headerID, hash.Hex(), trxCidMeta.CID, trxCidMeta.Dst, trxCidMeta.Src).Scan(&txID)
if queryErr != nil {
return queryErr
if err != nil {
return err
}
receiptCidMeta, ok := payload.ReceiptCIDs[hash]
if ok {
rctErr := repo.indexReceiptCID(tx, receiptCidMeta, txID)
if rctErr != nil {
return rctErr
if err := repo.indexReceiptCID(tx, receiptCidMeta, txID); err != nil {
return err
}
}
}
@ -131,17 +124,16 @@ func (repo *Repository) indexReceiptCID(tx *sqlx.Tx, cidMeta *ipfs.ReceiptMetaDa
func (repo *Repository) indexStateAndStorageCIDs(tx *sqlx.Tx, payload *ipfs.CIDPayload, headerID int64) error {
for accountKey, stateCID := range payload.StateNodeCIDs {
var stateID int64
queryErr := tx.QueryRowx(`INSERT INTO public.state_cids (header_id, state_key, cid, leaf) VALUES ($1, $2, $3, $4)
err := tx.QueryRowx(`INSERT INTO public.state_cids (header_id, state_key, cid, leaf) VALUES ($1, $2, $3, $4)
ON CONFLICT (header_id, state_key) DO UPDATE SET (cid, leaf) = ($3, $4)
RETURNING id`,
headerID, accountKey.Hex(), stateCID.CID, stateCID.Leaf).Scan(&stateID)
if queryErr != nil {
return queryErr
if err != nil {
return err
}
for _, storageCID := range payload.StorageNodeCIDs[accountKey] {
storageErr := repo.indexStorageCID(tx, storageCID, stateID)
if storageErr != nil {
return storageErr
if err := repo.indexStorageCID(tx, storageCID, stateID); err != nil {
return err
}
}
}

View File

@ -19,7 +19,6 @@ package super_node_test
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/pkg/ipfs/mocks"
@ -45,14 +44,18 @@ var _ = Describe("Repository", func() {
It("Indexes CIDs and related metadata into vulcanizedb", func() {
err = repo.Index(mocks.MockCIDPayload)
Expect(err).ToNot(HaveOccurred())
pgStr := `SELECT cid FROM header_cids
pgStr := `SELECT cid, td FROM header_cids
WHERE block_number = $1 AND uncle IS FALSE`
// check header was properly indexed
headers := make([]string, 0)
err = db.Select(&headers, pgStr, 1)
type res struct {
CID string
TD string
}
headers := new(res)
err = db.QueryRowx(pgStr, 1).StructScan(headers)
Expect(err).ToNot(HaveOccurred())
Expect(len(headers)).To(Equal(1))
Expect(headers[0]).To(Equal("mockHeaderCID"))
Expect(headers.CID).To(Equal("mockHeaderCID"))
Expect(headers.TD).To(Equal("1337"))
// check trxs were properly indexed
trxs := make([]string, 0)
pgStr = `SELECT transaction_cids.cid FROM transaction_cids INNER JOIN header_cids ON (transaction_cids.header_id = header_cids.id)