split uncles into their own table; add tx index to the tx index (lol); continue work on geth api endpoints

This commit is contained in:
Ian Norden 2020-01-26 13:55:26 -06:00
parent e52284b9ba
commit 33ac5978f5
21 changed files with 605 additions and 83 deletions

View File

@ -80,6 +80,7 @@ func superNode() {
}
backFiller.FillGaps(wg, nil)
}
wg.Wait()
}
func newSuperNode() (super_node.SuperNode, *config.SuperNode, error) {

View File

@ -5,7 +5,6 @@ CREATE TABLE public.header_cids (
block_hash VARCHAR(66) NOT NULL,
parent_hash VARCHAR(66) NOT NULL,
cid TEXT NOT NULL,
uncle BOOLEAN NOT NULL,
td BIGINT,
UNIQUE (block_number, block_hash)
);

View File

@ -0,0 +1,12 @@
-- +goose Up
CREATE TABLE public.uncle_cids (
id SERIAL PRIMARY KEY,
header_id INTEGER NOT NULL REFERENCES header_cids (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
block_hash VARCHAR(66) NOT NULL,
parent_hash VARCHAR(66) NOT NULL,
cid TEXT NOT NULL,
UNIQUE (header_id, block_hash)
);
-- +goose Down
DROP TABLE public.uncle_cids;

View File

@ -3,6 +3,7 @@ CREATE TABLE public.transaction_cids (
id SERIAL PRIMARY KEY,
header_id INTEGER NOT NULL REFERENCES header_cids (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
tx_hash VARCHAR(66) NOT NULL,
index INTEGER NOT NULL,
cid TEXT NOT NULL,
dst VARCHAR(66) NOT NULL,
src VARCHAR(66) NOT NULL,

View File

@ -315,7 +315,6 @@ CREATE TABLE public.header_cids (
block_hash character varying(66) NOT NULL,
parent_hash character varying(66) NOT NULL,
cid text NOT NULL,
uncle boolean NOT NULL,
td bigint
);
@ -728,6 +727,7 @@ CREATE TABLE public.transaction_cids (
id integer NOT NULL,
header_id integer NOT NULL,
tx_hash character varying(66) NOT NULL,
index integer NOT NULL,
cid text NOT NULL,
dst character varying(66) NOT NULL,
src character varying(66) NOT NULL
@ -754,6 +754,39 @@ CREATE SEQUENCE public.transaction_cids_id_seq
ALTER SEQUENCE public.transaction_cids_id_seq OWNED BY public.transaction_cids.id;
--
-- Name: uncle_cids; Type: TABLE; Schema: public; Owner: -
--
CREATE TABLE public.uncle_cids (
id integer NOT NULL,
header_id integer NOT NULL,
block_hash character varying(66) NOT NULL,
parent_hash character varying(66) NOT NULL,
cid text NOT NULL
);
--
-- Name: uncle_cids_id_seq; Type: SEQUENCE; Schema: public; Owner: -
--
CREATE SEQUENCE public.uncle_cids_id_seq
AS integer
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1;
--
-- Name: uncle_cids_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: -
--
ALTER SEQUENCE public.uncle_cids_id_seq OWNED BY public.uncle_cids.id;
--
-- Name: uncles; Type: TABLE; Schema: public; Owner: -
--
@ -1016,6 +1049,13 @@ ALTER TABLE ONLY public.storage_diff ALTER COLUMN id SET DEFAULT nextval('public
ALTER TABLE ONLY public.transaction_cids ALTER COLUMN id SET DEFAULT nextval('public.transaction_cids_id_seq'::regclass);
--
-- Name: uncle_cids id; Type: DEFAULT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.uncle_cids ALTER COLUMN id SET DEFAULT nextval('public.uncle_cids_id_seq'::regclass);
--
-- Name: uncles id; Type: DEFAULT; Schema: public; Owner: -
--
@ -1317,6 +1357,22 @@ ALTER TABLE ONLY public.transaction_cids
ADD CONSTRAINT transaction_cids_pkey PRIMARY KEY (id);
--
-- Name: uncle_cids uncle_cids_header_id_block_hash_key; Type: CONSTRAINT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.uncle_cids
ADD CONSTRAINT uncle_cids_header_id_block_hash_key UNIQUE (header_id, block_hash);
--
-- Name: uncle_cids uncle_cids_pkey; Type: CONSTRAINT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.uncle_cids
ADD CONSTRAINT uncle_cids_pkey PRIMARY KEY (id);
--
-- Name: uncles uncles_block_id_hash_key; Type: CONSTRAINT; Schema: public; Owner: -
--
@ -1578,6 +1634,14 @@ ALTER TABLE ONLY public.transaction_cids
ADD CONSTRAINT transaction_cids_header_id_fkey FOREIGN KEY (header_id) REFERENCES public.header_cids(id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED;
--
-- Name: uncle_cids uncle_cids_header_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: -
--
ALTER TABLE ONLY public.uncle_cids
ADD CONSTRAINT uncle_cids_header_id_fkey FOREIGN KEY (header_id) REFERENCES public.header_cids(id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED;
--
-- Name: uncles uncles_block_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: -
--

View File

@ -133,11 +133,61 @@ func (pea *PublicEthAPI) GetLogs(ctx context.Context, crit ethereum.FilterQuery)
return extractLogsOfInterest(rctIPLDs, filter.Topics)
}
// GetHeaderByNumber returns the requested canonical block header.
// * When blockNr is -1 the chain head is returned.
// * We cannot support pending block calls since we do not have an active miner
func (pea *PublicEthAPI) GetHeaderByNumber(ctx context.Context, number rpc.BlockNumber) (map[string]interface{}, error) {
header, err := pea.b.HeaderByNumber(ctx, number)
if header != nil && err == nil {
return pea.rpcMarshalHeader(header)
}
return nil, err
}
// GetBlockByNumber returns the requested canonical block.
// * When blockNr is -1 the chain head is returned.
// * We cannot support pending block calls since we do not have an active miner
// * When fullTx is true all transactions in the block are returned, otherwise
// only the transaction hash is returned.
func (pea *PublicEthAPI) GetBlockByNumber(ctx context.Context, number rpc.BlockNumber, fullTx bool) (map[string]interface{}, error) {
block, err := pea.b.BlockByNumber(ctx, number)
if block != nil && err == nil {
return pea.rpcMarshalBlock(block, true, fullTx)
}
return nil, err
}
// GetBlockByHash returns the requested block. When fullTx is true all transactions in the block are returned in full
// detail, otherwise only the transaction hash is returned.
func (pea *PublicEthAPI) GetBlockByHash(ctx context.Context, hash common.Hash, fullTx bool) (map[string]interface{}, error) {
block, err := pea.b.BlockByHash(ctx, hash)
if block != nil {
return pea.rpcMarshalBlock(block, true, fullTx)
}
return nil, err
}
// GetTransactionByHash returns the transaction for the given hash
// SuperNode cannot currently handle pending/tx_pool txs
func (pea *PublicEthAPI) GetTransactionByHash(ctx context.Context, hash common.Hash) (*RPCTransaction, error) {
// Try to return an already finalized transaction
tx, blockHash, blockNumber, index, err := pea.b.GetTransaction(ctx, hash)
if err != nil {
return nil, err
}
if tx != nil {
return newRPCTransaction(tx, blockHash, blockNumber, index), nil
}
// Transaction unknown, return as such
return nil, nil
}
// extractLogsOfInterest returns logs from the receipt IPLD
func extractLogsOfInterest(rctIPLDs []blocks.Block, wantedTopics [][]string) ([]*types.Log, error) {
var logs []*types.Log
for _, rctIPLD := range rctIPLDs {
rctRLP := rctIPLD.RawData()
var rct types.ReceiptForStorage
var rct types.Receipt
if err := rlp.DecodeBytes(rctRLP, &rct); err != nil {
return nil, err
}
@ -180,17 +230,6 @@ func sliceContainsHash(slice []string, hash common.Hash) int {
return 0
}
// GetHeaderByNumber returns the requested canonical block header.
// When blockNr is -1 the chain head is returned.
// We cannot support pending block calls since we do not have an active miner
func (pea *PublicEthAPI) GetHeaderByNumber(ctx context.Context, number rpc.BlockNumber) (map[string]interface{}, error) {
header, err := pea.b.HeaderByNumber(ctx, number)
if header != nil && err == nil {
return pea.rpcMarshalHeader(header)
}
return nil, err
}
// rpcMarshalHeader uses the generalized output filler, then adds the total difficulty field, which requires
// a `PublicEthAPI`.
func (pea *PublicEthAPI) rpcMarshalHeader(header *types.Header) (map[string]interface{}, error) {
@ -203,7 +242,8 @@ func (pea *PublicEthAPI) rpcMarshalHeader(header *types.Header) (map[string]inte
return fields, nil
}
// RPCMarshalHeader converts the given header to the RPC output .
// RPCMarshalHeader converts the given header to the RPC output.
// This function is eth/internal so we have to make our own version here...
func RPCMarshalHeader(head *types.Header) map[string]interface{} {
return map[string]interface{}{
"number": (*hexutil.Big)(head.Number),
@ -225,3 +265,122 @@ func RPCMarshalHeader(head *types.Header) map[string]interface{} {
"receiptsRoot": head.ReceiptHash,
}
}
// rpcMarshalBlock uses the generalized output filler, then adds the total difficulty field, which requires
// a `PublicBlockchainAPI`.
func (pea *PublicEthAPI) rpcMarshalBlock(b *types.Block, inclTx bool, fullTx bool) (map[string]interface{}, error) {
fields, err := RPCMarshalBlock(b, inclTx, fullTx)
if err != nil {
return nil, err
}
td, err := pea.b.GetTd(b.Hash())
if err != nil {
return nil, err
}
fields["totalDifficulty"] = (*hexutil.Big)(td)
return fields, err
}
// RPCMarshalBlock converts the given block to the RPC output which depends on fullTx. If inclTx is true transactions are
// returned. When fullTx is true the returned block contains full transaction details, otherwise it will only contain
// transaction hashes.
func RPCMarshalBlock(block *types.Block, inclTx bool, fullTx bool) (map[string]interface{}, error) {
fields := RPCMarshalHeader(block.Header())
fields["size"] = hexutil.Uint64(block.Size())
if inclTx {
formatTx := func(tx *types.Transaction) (interface{}, error) {
return tx.Hash(), nil
}
if fullTx {
formatTx = func(tx *types.Transaction) (interface{}, error) {
return newRPCTransactionFromBlockHash(block, tx.Hash()), nil
}
}
txs := block.Transactions()
transactions := make([]interface{}, len(txs))
var err error
for i, tx := range txs {
if transactions[i], err = formatTx(tx); err != nil {
return nil, err
}
}
fields["transactions"] = transactions
}
uncles := block.Uncles()
uncleHashes := make([]common.Hash, len(uncles))
for i, uncle := range uncles {
uncleHashes[i] = uncle.Hash()
}
fields["uncles"] = uncleHashes
return fields, nil
}
// newRPCTransactionFromBlockHash returns a transaction that will serialize to the RPC representation.
func newRPCTransactionFromBlockHash(b *types.Block, hash common.Hash) *RPCTransaction {
for idx, tx := range b.Transactions() {
if tx.Hash() == hash {
return newRPCTransactionFromBlockIndex(b, uint64(idx))
}
}
return nil
}
// newRPCTransactionFromBlockIndex returns a transaction that will serialize to the RPC representation.
func newRPCTransactionFromBlockIndex(b *types.Block, index uint64) *RPCTransaction {
txs := b.Transactions()
if index >= uint64(len(txs)) {
return nil
}
return newRPCTransaction(txs[index], b.Hash(), b.NumberU64(), index)
}
// RPCTransaction represents a transaction that will serialize to the RPC representation of a transaction
type RPCTransaction struct {
BlockHash *common.Hash `json:"blockHash"`
BlockNumber *hexutil.Big `json:"blockNumber"`
From common.Address `json:"from"`
Gas hexutil.Uint64 `json:"gas"`
GasPrice *hexutil.Big `json:"gasPrice"`
Hash common.Hash `json:"hash"`
Input hexutil.Bytes `json:"input"`
Nonce hexutil.Uint64 `json:"nonce"`
To *common.Address `json:"to"`
TransactionIndex *hexutil.Uint64 `json:"transactionIndex"`
Value *hexutil.Big `json:"value"`
V *hexutil.Big `json:"v"`
R *hexutil.Big `json:"r"`
S *hexutil.Big `json:"s"`
}
// newRPCTransaction returns a transaction that will serialize to the RPC
// representation, with the given location metadata set (if available).
func newRPCTransaction(tx *types.Transaction, blockHash common.Hash, blockNumber uint64, index uint64) *RPCTransaction {
var signer types.Signer = types.FrontierSigner{}
if tx.Protected() {
signer = types.NewEIP155Signer(tx.ChainId())
}
from, _ := types.Sender(signer, tx)
v, r, s := tx.RawSignatureValues()
result := &RPCTransaction{
From: from,
Gas: hexutil.Uint64(tx.Gas()),
GasPrice: (*hexutil.Big)(tx.GasPrice()),
Hash: tx.Hash(),
Input: hexutil.Bytes(tx.Data()),
Nonce: hexutil.Uint64(tx.Nonce()),
To: tx.To(),
Value: (*hexutil.Big)(tx.Value()),
V: (*hexutil.Big)(v),
R: (*hexutil.Big)(r),
S: (*hexutil.Big)(s),
}
if blockHash != (common.Hash{}) {
result.BlockHash = &blockHash
result.BlockNumber = (*hexutil.Big)(new(big.Int).SetUint64(blockNumber))
result.TransactionIndex = (*hexutil.Uint64)(&index)
}
return result
}

View File

@ -26,7 +26,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/hashicorp/golang-lru"
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/datastore/postgres"
"github.com/vulcanize/vulcanizedb/pkg/super_node/config"
@ -39,12 +39,7 @@ var (
type Backend struct {
retriever *CIDRetriever
fetcher *IPLDFetcher
resolver *IPLDResolver
db *postgres.DB
headerCache *lru.Cache // Cache for the most recent block headers
tdCache *lru.Cache // Cache for the most recent block total difficulties
numberCache *lru.Cache // Cache for the most recent block numbers
}
func NewEthBackend(db *postgres.DB, ipfsPath string) (*Backend, error) {
@ -56,8 +51,7 @@ func NewEthBackend(db *postgres.DB, ipfsPath string) (*Backend, error) {
return &Backend{
retriever: r,
fetcher: f,
resolver: NewIPLDResolver(),
db: r.Database(),
db: db,
}, nil
}
@ -80,6 +74,9 @@ func (b *Backend) HeaderByNumber(ctx context.Context, blockNumber rpc.BlockNumbe
}
headerCids, err := b.retriever.RetrieveHeaderCIDs(tx, number)
if err != nil {
if err := tx.Rollback(); err != nil {
logrus.Error(err)
}
return nil, err
}
if err := tx.Commit(); err != nil {
@ -90,7 +87,7 @@ func (b *Backend) HeaderByNumber(ctx context.Context, blockNumber rpc.BlockNumbe
return nil, fmt.Errorf("header at block %d is not available", number)
}
// Fetch the header IPLDs for those CIDs
headerIPLDs, err := b.fetcher.FetchHeaders(headerCids)
headerIPLDs, err := b.fetcher.FetchHeaders([]HeaderModel{headerCids[0]})
if err != nil {
return nil, err
}
@ -120,6 +117,7 @@ func (b *Backend) GetTd(blockHash common.Hash) (*big.Int, error) {
return td, nil
}
// GetLogs returns all the logs for the given block hash
func (b *Backend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) {
tx, err := b.db.Beginx()
if err != nil {
@ -127,6 +125,9 @@ func (b *Backend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log
}
receiptCIDs, err := b.retriever.RetrieveRctCIDs(tx, config.ReceiptFilter{}, 0, &hash, nil)
if err != nil {
if err := tx.Rollback(); err != nil {
logrus.Error(err)
}
return nil, err
}
if err := tx.Commit(); err != nil {
@ -139,14 +140,172 @@ func (b *Backend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log
if err != nil {
return nil, err
}
receiptBytes := b.resolver.ResolveReceipts(receiptIPLDs)
logs := make([][]*types.Log, len(receiptBytes))
for i, rctRLP := range receiptBytes {
var rct types.ReceiptForStorage
if err := rlp.DecodeBytes(rctRLP, &rct); err != nil {
logs := make([][]*types.Log, len(receiptIPLDs))
for i, rctIPLD := range receiptIPLDs {
var rct types.Receipt
if err := rlp.DecodeBytes(rctIPLD.RawData(), &rct); err != nil {
return nil, err
}
logs[i] = rct.Logs
}
return logs, nil
}
// BlockByNumber returns the requested canonical block.
// Since the SuperNode can contain forked blocks, it is recommended to fetch BlockByHash as
// fetching by number can return non-deterministic results (returns the first block found at that height)
func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber) (*types.Block, error) {
number := blockNumber.Int64()
var err error
if blockNumber == rpc.LatestBlockNumber {
number, err = b.retriever.RetrieveLastBlockNumber()
if err != nil {
return nil, err
}
}
if blockNumber == rpc.PendingBlockNumber {
return nil, errPendingBlockNumber
}
// Retrieve all the CIDs for the block
headerCID, uncleCIDs, txCIDs, rctCIDs, err := b.retriever.RetrieveBlockByNumber(number)
if err != nil {
return nil, err
}
// Fetch and decode the header IPLD
headerIPLDs, err := b.fetcher.FetchHeaders([]HeaderModel{headerCID})
if err != nil {
return nil, err
}
var header *types.Header
if err := rlp.DecodeBytes(headerIPLDs[0].RawData(), header); err != nil {
return nil, err
}
// Fetch and decode the uncle IPLDs
uncleIPLDs, err := b.fetcher.FetchUncles(uncleCIDs)
if err != nil {
return nil, err
}
var uncles []*types.Header
for _, uncleIPLD := range uncleIPLDs {
var uncle *types.Header
if err := rlp.DecodeBytes(uncleIPLD.RawData(), uncle); err != nil {
return nil, err
}
uncles = append(uncles, uncle)
}
// Fetch and decode the transaction IPLDs
txIPLDs, err := b.fetcher.FetchTrxs(txCIDs)
if err != nil {
return nil, err
}
var transactions []*types.Transaction
for _, txIPLD := range txIPLDs {
var tx *types.Transaction
if err := rlp.DecodeBytes(txIPLD.RawData(), tx); err != nil {
return nil, err
}
transactions = append(transactions, tx)
}
// Fetch and decode the receipt IPLDs
rctIPLDs, err := b.fetcher.FetchRcts(rctCIDs)
if err != nil {
return nil, err
}
var receipts []*types.Receipt
for _, rctIPLD := range rctIPLDs {
var receipt *types.Receipt
if err := rlp.DecodeBytes(rctIPLD.RawData(), receipt); err != nil {
return nil, err
}
receipts = append(receipts, receipt)
}
// Compose everything together into a complete block
return types.NewBlock(header, transactions, uncles, receipts), nil
}
// BlockByHash returns the requested block. When fullTx is true all transactions in the block are returned in full
// detail, otherwise only the transaction hash is returned.
func (b *Backend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) {
// Retrieve all the CIDs for the block
headerCID, uncleCIDs, txCIDs, rctCIDs, err := b.retriever.RetrieveBlockByHash(hash)
if err != nil {
return nil, err
}
// Fetch and decode the header IPLD
headerIPLDs, err := b.fetcher.FetchHeaders([]HeaderModel{headerCID})
if err != nil {
return nil, err
}
var header *types.Header
if err := rlp.DecodeBytes(headerIPLDs[0].RawData(), header); err != nil {
return nil, err
}
// Fetch and decode the uncle IPLDs
uncleIPLDs, err := b.fetcher.FetchUncles(uncleCIDs)
if err != nil {
return nil, err
}
var uncles []*types.Header
for _, uncleIPLD := range uncleIPLDs {
var uncle *types.Header
if err := rlp.DecodeBytes(uncleIPLD.RawData(), uncle); err != nil {
return nil, err
}
uncles = append(uncles, uncle)
}
// Fetch and decode the transaction IPLDs
txIPLDs, err := b.fetcher.FetchTrxs(txCIDs)
if err != nil {
return nil, err
}
var transactions []*types.Transaction
for _, txIPLD := range txIPLDs {
var tx *types.Transaction
if err := rlp.DecodeBytes(txIPLD.RawData(), tx); err != nil {
return nil, err
}
transactions = append(transactions, tx)
}
// Fetch and decode the receipt IPLDs
rctIPLDs, err := b.fetcher.FetchRcts(rctCIDs)
if err != nil {
return nil, err
}
var receipts []*types.Receipt
for _, rctIPLD := range rctIPLDs {
var receipt *types.Receipt
if err := rlp.DecodeBytes(rctIPLD.RawData(), receipt); err != nil {
return nil, err
}
receipts = append(receipts, receipt)
}
// Compose everything together into a complete block
return types.NewBlock(header, transactions, uncles, receipts), nil
}
// GetTransaction retrieves a tx by hash
// It also returns the blockhash, blocknumber, and tx index associated with the transaction
func (b *Backend) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) {
pgStr := `SELECT transaction_cids.cid, transaction_cids.index, header_cids.block_hash, header_cids.block_number
FROM transaction_cids, header_cids
WHERE transaction_cids.header_id = header_cids.id
AND transaction_cids.tx_hash = $1`
var txCIDWithHeaderInfo struct {
CID string `db:"cid"`
Index int64 `db:"index"`
BlockHash string `db:"block_hash"`
BlockNumber int64 `db:"block_number"`
}
if err := b.db.Get(&txCIDWithHeaderInfo, pgStr, txHash.String()); err != nil {
return nil, common.Hash{}, 0, 0, err
}
txIPLD, err := b.fetcher.FetchTrxs([]TxModel{{CID: txCIDWithHeaderInfo.CID}})
if err != nil {
return nil, common.Hash{}, 0, 0, err
}
var transaction *types.Transaction
if err := rlp.DecodeBytes(txIPLD[0].RawData(), transaction); err != nil {
return nil, common.Hash{}, 0, 0, err
}
return transaction, common.HexToHash(txCIDWithHeaderInfo.BlockHash), uint64(txCIDWithHeaderInfo.BlockNumber), uint64(txCIDWithHeaderInfo.Index), nil
}

View File

@ -68,7 +68,7 @@ func (pc *PayloadConverter) Convert(payload interface{}) (interface{}, error) {
}
signer := types.MakeSigner(pc.chainConfig, block.Number())
transactions := block.Transactions()
for _, trx := range transactions {
for i, trx := range transactions {
// Extract to and from data from the the transactions for indexing
from, err := types.Sender(signer, trx)
if err != nil {
@ -78,6 +78,7 @@ func (pc *PayloadConverter) Convert(payload interface{}) (interface{}, error) {
Dst: handleNullAddr(trx.To()),
Src: handleNullAddr(&from),
TxHash: trx.Hash().String(),
Index: int64(i),
}
// txMeta will have same index as its corresponding trx in the convertedPayload.BlockBody
convertedPayload.TrxMetaData = append(convertedPayload.TrxMetaData, txMeta)

View File

@ -57,8 +57,7 @@ func (in *CIDIndexer) Index(cids interface{}) error {
return err
}
for _, uncle := range cidPayload.UncleCIDs {
err := in.indexUncleCID(tx, uncle)
if err != nil {
if err := in.indexUncleCID(tx, uncle, headerID); err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
@ -82,27 +81,27 @@ func (in *CIDIndexer) Index(cids interface{}) error {
func (repo *CIDIndexer) indexHeaderCID(tx *sqlx.Tx, header HeaderModel) (int64, error) {
var headerID int64
err := tx.QueryRowx(`INSERT INTO public.header_cids (block_number, block_hash, parent_hash, cid, uncle, td) VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (block_number, block_hash) DO UPDATE SET (parent_hash, cid, uncle, td) = ($3, $4, $5, $6)
err := tx.QueryRowx(`INSERT INTO public.header_cids (block_number, block_hash, parent_hash, cid, td) VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (block_number, block_hash) DO UPDATE SET (parent_hash, cid, td) = ($3, $4, $5)
RETURNING id`,
header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, false, header.TotalDifficulty).Scan(&headerID)
header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty).Scan(&headerID)
return headerID, err
}
func (in *CIDIndexer) indexUncleCID(tx *sqlx.Tx, uncle HeaderModel) error {
_, err := tx.Exec(`INSERT INTO public.header_cids (block_number, block_hash, parent_hash, cid, uncle) VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (block_number, block_hash) DO UPDATE SET (parent_hash, cid, uncle) = ($3, $4, $5)`,
uncle.BlockNumber, uncle.BlockHash, uncle.ParentHash, uncle.CID, true)
func (in *CIDIndexer) indexUncleCID(tx *sqlx.Tx, uncle UncleModel, headerID int64) error {
_, err := tx.Exec(`INSERT INTO public.uncle_cids (block_hash, header_id, parent_hash, cid) VALUES ($1, $2, $3, $4)
ON CONFLICT (header_id, block_hash) DO UPDATE SET (parent_hash, cid) = ($3, $4)`,
uncle.BlockHash, headerID, uncle.ParentHash, uncle.CID)
return err
}
func (in *CIDIndexer) indexTransactionAndReceiptCIDs(tx *sqlx.Tx, payload *CIDPayload, headerID int64) error {
for _, trxCidMeta := range payload.TransactionCIDs {
var txID int64
err := tx.QueryRowx(`INSERT INTO public.transaction_cids (header_id, tx_hash, cid, dst, src) VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src) = ($3, $4, $5)
err := tx.QueryRowx(`INSERT INTO public.transaction_cids (header_id, tx_hash, cid, dst, src, index) VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index) = ($3, $4, $5, $6)
RETURNING id`,
headerID, trxCidMeta.TxHash, trxCidMeta.CID, trxCidMeta.Dst, trxCidMeta.Src).Scan(&txID)
headerID, trxCidMeta.TxHash, trxCidMeta.CID, trxCidMeta.Dst, trxCidMeta.Src, trxCidMeta.Index).Scan(&txID)
if err != nil {
return err
}

View File

@ -47,7 +47,7 @@ var _ = Describe("Indexer", func() {
err = repo.Index(mocks.MockCIDPayload)
Expect(err).ToNot(HaveOccurred())
pgStr := `SELECT cid, td FROM header_cids
WHERE block_number = $1 AND uncle IS FALSE`
WHERE block_number = $1`
// check header was properly indexed
type res struct {
CID string

View File

@ -109,7 +109,7 @@ func (f *IPLDFetcher) FetchHeaders(cids []HeaderModel) ([]blocks.Block, error) {
// FetchUncles fetches uncles
// It uses the f.fetchBatch method
func (f *IPLDFetcher) FetchUncles(cids []HeaderModel) ([]blocks.Block, error) {
func (f *IPLDFetcher) FetchUncles(cids []UncleModel) ([]blocks.Block, error) {
log.Debug("fetching uncle iplds")
uncleCids := make([]cid.Cid, 0, len(cids))
for _, c := range cids {

View File

@ -52,7 +52,7 @@ var (
CID: mockHeaderBlock.Cid().String(),
},
},
Uncles: []eth.HeaderModel{
Uncles: []eth.UncleModel{
{
CID: mockUncleBlock.Cid().String(),
},

View File

@ -64,12 +64,14 @@ var (
CID: "", // This is empty until we go to publish to ipfs
Src: senderAddr.Hex(),
Dst: Address.String(),
Index: 0,
TxHash: MockTransactions[0].Hash().String(),
},
{
CID: "",
Src: senderAddr.Hex(),
Dst: AnotherAddress.String(),
Index: 1,
TxHash: MockTransactions[1].Hash().String(),
},
}
@ -78,12 +80,14 @@ var (
CID: "mockTrxCID1", // This is empty until we go to publish to ipfs
Src: senderAddr.Hex(),
Dst: Address.String(),
Index: 0,
TxHash: MockTransactions[0].Hash().String(),
},
{
CID: "mockTrxCID2",
Src: senderAddr.Hex(),
Dst: AnotherAddress.String(),
Index: 1,
TxHash: MockTransactions[1].Hash().String(),
},
}
@ -245,11 +249,10 @@ var (
BlockHash: MockBlock.Hash().String(),
BlockNumber: MockBlock.Number().String(),
CID: "mockHeaderCID",
Uncle: false,
ParentHash: MockBlock.ParentHash().String(),
TotalDifficulty: "1337",
},
UncleCIDs: []eth2.HeaderModel{},
UncleCIDs: []eth2.UncleModel{},
TransactionCIDs: MockTrxMetaPostPublsh,
ReceiptCIDs: map[common.Hash]eth.ReceiptModel{
MockTransactions[0].Hash(): MockRctMetaPostPublish[0],
@ -275,13 +278,12 @@ var (
BlockHash: MockBlock.Hash().String(),
ParentHash: "0x0000000000000000000000000000000000000000000000000000000000000000",
CID: "mockHeaderCID",
Uncle: false,
TotalDifficulty: "1337",
},
},
Transactions: MockTrxMetaPostPublsh,
Receipts: MockRctMetaPostPublish,
Uncles: []eth2.HeaderModel{},
Uncles: []eth2.UncleModel{},
StateNodes: MockStateMetaPostPublish,
StorageNodes: []eth.StorageNodeWithStateKeyModel{
{

View File

@ -24,13 +24,21 @@ type HeaderModel struct {
BlockHash string `db:"block_hash"`
ParentHash string `db:"parent_hash"`
CID string `db:"cid"`
Uncle bool `db:"uncle"`
TotalDifficulty string `db:"td"`
}
type UncleModel struct {
ID int64 `db:"id"`
HeaderID int64 `db:"header_id"`
BlockHash string `db:"block_hash"`
ParentHash string `db:"parent_hash"`
CID string `db:"cid"`
}
type TxModel struct {
ID int64 `db:"id"`
HeaderID int64 `db:"header_id"`
Index int64 `db:"index"`
TxHash string `db:"tx_hash"`
CID string `db:"cid"`
Dst string `db:"dst"`

View File

@ -72,13 +72,12 @@ func (pub *IPLDPublisher) Publish(payload interface{}) (interface{}, error) {
CID: headerCid,
ParentHash: ipldPayload.Block.ParentHash().String(),
BlockNumber: ipldPayload.Block.Number().String(),
Uncle: false,
BlockHash: ipldPayload.Block.Hash().String(),
TotalDifficulty: ipldPayload.TotalDifficulty.String(),
}
// Process and publish uncles
uncleCids := make([]HeaderModel, 0, len(ipldPayload.Block.Uncles()))
uncleCids := make([]UncleModel, 0, len(ipldPayload.Block.Uncles()))
for _, uncle := range ipldPayload.Block.Uncles() {
uncleRlp, err := rlp.EncodeToBytes(uncle)
if err != nil {
@ -88,12 +87,10 @@ func (pub *IPLDPublisher) Publish(payload interface{}) (interface{}, error) {
if err != nil {
return nil, err
}
uncleCids = append(uncleCids, HeaderModel{
CID: uncleCid,
ParentHash: uncle.ParentHash.String(),
Uncle: true,
BlockHash: uncle.Hash().String(),
BlockNumber: uncle.Number.String(),
uncleCids = append(uncleCids, UncleModel{
CID: uncleCid,
ParentHash: uncle.ParentHash.String(),
BlockHash: uncle.Hash().String(),
})
}
@ -155,6 +152,7 @@ func (pub *IPLDPublisher) publishTransactions(blockBody *types.Body, trxMeta []T
for i, cid := range transactionCids {
mappedTrxCids[i] = TxModel{
CID: cid,
Index: trxMeta[i].Index,
TxHash: trxMeta[i].TxHash,
Src: trxMeta[i].Src,
Dst: trxMeta[i].Dst,

View File

@ -82,19 +82,22 @@ func (ecr *CIDRetriever) Retrieve(filter interface{}, blockNumber int64) (interf
return nil, true, err
}
if streamFilter.HeaderFilter.Uncles {
cw.Uncles, err = ecr.RetrieveUncleCIDs(tx, blockNumber)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
for _, headerCID := range cw.Headers {
uncleCIDs, err := ecr.RetrieveUncleCIDsByHeaderID(tx, headerCID.ID)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("uncle cid retrieval error")
return nil, true, err
}
log.Error("uncle cid retrieval error")
return nil, true, err
cw.Uncles = append(cw.Uncles, uncleCIDs...)
}
}
}
// Retrieve cached trx CIDs
if !streamFilter.TxFilter.Off {
cw.Transactions, err = ecr.RetrieveTrxCIDs(tx, streamFilter.TxFilter, blockNumber)
cw.Transactions, err = ecr.RetrieveTxCIDs(tx, streamFilter.TxFilter, blockNumber)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
@ -155,30 +158,28 @@ func (ecr *CIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]H
log.Debug("retrieving header cids for block ", blockNumber)
headers := make([]HeaderModel, 0)
pgStr := `SELECT * FROM header_cids
WHERE block_number = $1 AND uncle IS FALSE`
err := tx.Select(&headers, pgStr, blockNumber)
return headers, err
WHERE block_number = $1`
return headers, tx.Select(&headers, pgStr, blockNumber)
}
// RetrieveUncleCIDs retrieves and returns all of the uncle cids at the provided blockheight
func (ecr *CIDRetriever) RetrieveUncleCIDs(tx *sqlx.Tx, blockNumber int64) ([]HeaderModel, error) {
log.Debug("retrieving header cids for block ", blockNumber)
headers := make([]HeaderModel, 0)
pgStr := `SELECT * FROM header_cids
WHERE block_number = $1 AND uncle IS TRUE`
err := tx.Select(&headers, pgStr, blockNumber)
return headers, err
// RetrieveUncleCIDsByHeaderID retrieves and returns all of the uncle cids for the provided header
func (ecr *CIDRetriever) RetrieveUncleCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]UncleModel, error) {
log.Debug("retrieving uncle cids for block id ", headerID)
headers := make([]UncleModel, 0)
pgStr := `SELECT * FROM uncle_cids
WHERE header_id = $1`
return headers, tx.Select(&headers, pgStr, headerID)
}
// RetrieveTrxCIDs retrieves and returns all of the trx cids at the provided blockheight that conform to the provided filter parameters
// RetrieveTxCIDs retrieves and returns all of the trx cids at the provided blockheight that conform to the provided filter parameters
// also returns the ids for the returned transaction cids
func (ecr *CIDRetriever) RetrieveTrxCIDs(tx *sqlx.Tx, txFilter config.TxFilter, blockNumber int64) ([]TxModel, error) {
func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter config.TxFilter, blockNumber int64) ([]TxModel, error) {
log.Debug("retrieving transaction cids for block ", blockNumber)
args := make([]interface{}, 0, 3)
results := make([]TxModel, 0)
pgStr := `SELECT transaction_cids.id, transaction_cids.header_id,
transaction_cids.tx_hash, transaction_cids.cid,
transaction_cids.dst, transaction_cids.src
transaction_cids.dst, transaction_cids.src, transaction_cids.index
FROM transaction_cids INNER JOIN header_cids ON (transaction_cids.header_id = header_cids.id)
WHERE header_cids.block_number = $1`
args = append(args, blockNumber)
@ -363,6 +364,124 @@ func (ecr *CIDRetriever) RetrieveGapsInData() ([]shared.Gap, error) {
return gaps, nil
}
func (ecr *CIDRetriever) Database() *postgres.DB {
return ecr.db
// RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash
func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (HeaderModel, []UncleModel, []TxModel, []ReceiptModel, error) {
log.Debug("retrieving block cids for block hash ", blockHash.String())
tx, err := ecr.db.Beginx()
if err != nil {
return HeaderModel{}, nil, nil, nil, err
}
headerCID, err := ecr.RetrieveHeaderCIDByHash(tx, blockHash)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("header cid retrieval error")
return HeaderModel{}, nil, nil, nil, err
}
uncleCIDs, err := ecr.RetrieveUncleCIDsByHeaderID(tx, headerCID.ID)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("uncle cid retrieval error")
return HeaderModel{}, nil, nil, nil, err
}
txCIDs, err := ecr.RetrieveTxCIDsByHeaderID(tx, headerCID.ID)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("tx cid retrieval error")
return HeaderModel{}, nil, nil, nil, err
}
txIDs := make([]int64, len(txCIDs))
for i, txCID := range txCIDs {
txIDs[i] = txCID.ID
}
rctCIDs, err := ecr.RetrieveReceiptCIDsByTxIDs(tx, txIDs)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("rct cid retrieval error")
return HeaderModel{}, nil, nil, nil, err
}
return headerCID, uncleCIDs, txCIDs, rctCIDs, tx.Commit()
}
// RetrieveBlockByNumber returns all of the CIDs needed to compose an entire block, for a given block number
func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel, []UncleModel, []TxModel, []ReceiptModel, error) {
log.Debug("retrieving block cids for block number ", blockNumber)
tx, err := ecr.db.Beginx()
if err != nil {
return HeaderModel{}, nil, nil, nil, err
}
headerCID, err := ecr.RetrieveHeaderCIDs(tx, blockNumber)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("header cid retrieval error")
return HeaderModel{}, nil, nil, nil, err
}
if len(headerCID) < 1 {
return HeaderModel{}, nil, nil, nil, fmt.Errorf("header cid retrieval error, no header CIDs found at block %d", blockNumber)
}
uncleCIDs, err := ecr.RetrieveUncleCIDsByHeaderID(tx, headerCID[0].ID)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("uncle cid retrieval error")
return HeaderModel{}, nil, nil, nil, err
}
txCIDs, err := ecr.RetrieveTxCIDsByHeaderID(tx, headerCID[0].ID)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("tx cid retrieval error")
return HeaderModel{}, nil, nil, nil, err
}
txIDs := make([]int64, len(txCIDs))
for i, txCID := range txCIDs {
txIDs[i] = txCID.ID
}
rctCIDs, err := ecr.RetrieveReceiptCIDsByTxIDs(tx, txIDs)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("rct cid retrieval error")
return HeaderModel{}, nil, nil, nil, err
}
return headerCID[0], uncleCIDs, txCIDs, rctCIDs, tx.Commit()
}
// RetrieveHeaderCIDByHash returns the header for the given block hash
func (ecr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.Hash) (HeaderModel, error) {
log.Debug("retrieving header cids for block hash ", blockHash.String())
pgStr := `SELECT * FROM header_cids
WHERE block_hash = $1`
var headerCID HeaderModel
return headerCID, tx.Get(&headerCID, pgStr, blockHash.String())
}
// RetrieveTxCIDsByHeaderID retrieves all tx CIDs for the given header id
func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]TxModel, error) {
log.Debug("retrieving tx cids for block id ", headerID)
pgStr := `SELECT * FROM transaction_cids
WHERE transaction_cids.header_id = $1`
var txCIDs []TxModel
return txCIDs, tx.Select(&txCIDs, pgStr, headerID)
}
// RetrieveReceiptCIDsByTxIDs retrieves receipt CIDs by their associated tx IDs
func (ecr *CIDRetriever) RetrieveReceiptCIDsByTxIDs(tx *sqlx.Tx, txIDs []int64) ([]ReceiptModel, error) {
log.Debugf("retrieving receipt cids for tx ids %v", txIDs)
pgStr := `SELECT * FROM receipt_cids
WHERE receipt_cids.tx_id = ANY($1::INTEGER[])`
var rctCIDs []ReceiptModel
return rctCIDs, tx.Select(&rctCIDs, pgStr, pq.Array(txIDs))
}

View File

@ -51,7 +51,7 @@ type TrieNode struct {
// Passed to CIDIndexer
type CIDPayload struct {
HeaderCID HeaderModel
UncleCIDs []HeaderModel
UncleCIDs []UncleModel
TransactionCIDs []TxModel
ReceiptCIDs map[common.Hash]ReceiptModel
StateNodeCIDs []StateNodeModel
@ -64,7 +64,7 @@ type CIDPayload struct {
type CIDWrapper struct {
BlockNumber *big.Int
Headers []HeaderModel
Uncles []HeaderModel
Uncles []UncleModel
Transactions []TxModel
Receipts []ReceiptModel
StateNodes []StateNodeModel