adjust for schema updates #159

Merged
telackey merged 2 commits from schema_updates into postgres_refactor 2021-11-26 15:23:23 +00:00
15 changed files with 229 additions and 151 deletions

View File

@ -184,12 +184,6 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (string, error) {
tx.cacheIPLD(headerNode)
var baseFee *string
if header.BaseFee != nil {
baseFee = new(string)
*baseFee = header.BaseFee.String()
}
headerID := header.Hash().String()
mod := models.HeaderModel{
CID: headerNode.Cid().String(),
@ -205,7 +199,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
TxRoot: header.TxHash.String(),
UncleRoot: header.UncleHash.String(),
Timestamp: header.Time,
BaseFee: baseFee,
Coinbase: header.Coinbase.String(),
}
_, err := fmt.Fprintf(sdi.dump, "%+v\r\n", mod)
return headerID, err
@ -268,6 +262,12 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
// index tx
trx := args.txs[i]
trxID := trx.Hash().String()
var val string
if trx.Value() != nil {
val = trx.Value().String()
}
// derive sender for the tx that corresponds with this receipt
from, err := types.Sender(signer, trx)
if err != nil {
@ -283,6 +283,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
CID: txNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
Type: trx.Type(),
Value: val,
}
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", txModel); err != nil {
return err

View File

@ -212,7 +212,7 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node
TxRoot: header.TxHash.String(),
UncleRoot: header.UncleHash.String(),
Timestamp: header.Time,
BaseFee: baseFee,
Coinbase: header.Coinbase.String(),
})
return headerID
}
@ -269,6 +269,12 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
// index tx
trx := args.txs[i]
txID := trx.Hash().String()
var val string
if trx.Value() != nil {
val = trx.Value().String()
}
// derive sender for the tx that corresponds with this receipt
from, err := types.Sender(signer, trx)
if err != nil {
@ -284,6 +290,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
CID: txNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
Type: trx.Type(),
Value: val,
}
sdi.fileWriter.upsertTransactionCID(txModel)

View File

@ -108,7 +108,7 @@ func TestFileIndexerLegacy(t *testing.T) {
setupLegacy(t)
dumpData(t)
defer tearDown(t)
pgStr := `SELECT cid, td, reward, block_hash, base_fee
pgStr := `SELECT cid, td, reward, block_hash, coinbase
FROM eth.header_cids
WHERE block_number = $1`
// check header was properly indexed
@ -116,8 +116,8 @@ func TestFileIndexerLegacy(t *testing.T) {
CID string
TD string
Reward string
BlockHash string `db:"block_hash"`
BaseFee *string `db:"base_fee"`
BlockHash string `db:"block_hash"`
Coinbase string `db:"coinbase"`
}
header := new(res)
err = sqlxdb.QueryRowx(pgStr, legacyData.BlockNumber.Uint64()).StructScan(header)
@ -126,7 +126,7 @@ func TestFileIndexerLegacy(t *testing.T) {
test_helpers.ExpectEqual(t, header.CID, legacyHeaderCID.String())
test_helpers.ExpectEqual(t, header.TD, legacyData.MockBlock.Difficulty().String())
test_helpers.ExpectEqual(t, header.Reward, "5000000000000011250")
test_helpers.ExpectEqual(t, header.Coinbase, legacyData.MockBlock.Coinbase().String())
require.Nil(t, legacyData.MockHeader.BaseFee)
require.Nil(t, header.BaseFee)
})
}

View File

@ -52,6 +52,8 @@ var (
ipfsPgGet = `SELECT data FROM public.blocks
WHERE key = $1`
tx1, tx2, tx3, tx4, tx5, rct1, rct2, rct3, rct4, rct5 []byte
txs types.Transactions
rcts types.Receipts
mockBlock *types.Block
headerCID, trx1CID, trx2CID, trx3CID, trx4CID, trx5CID cid.Cid
rct1CID, rct2CID, rct3CID, rct4CID, rct5CID cid.Cid
@ -65,7 +67,7 @@ func init() {
}
mockBlock = mocks.MockBlock
txs, rcts := mocks.MockBlock.Transactions(), mocks.MockReceipts
txs, rcts = mocks.MockBlock.Transactions(), mocks.MockReceipts
buf := new(bytes.Buffer)
txs.EncodeIndex(0, buf)
@ -177,7 +179,7 @@ func TestFileIndexer(t *testing.T) {
setup(t)
dumpData(t)
defer tearDown(t)
pgStr := `SELECT cid, td, reward, block_hash, base_fee
pgStr := `SELECT cid, td, reward, block_hash, coinbase
FROM eth.header_cids
WHERE block_number = $1`
// check header was properly indexed
@ -185,8 +187,8 @@ func TestFileIndexer(t *testing.T) {
CID string
TD string
Reward string
BlockHash string `db:"block_hash"`
BaseFee *string `db:"base_fee"`
BlockHash string `db:"block_hash"`
Coinbase string `db:"coinbase"`
}
header := new(res)
err = sqlxdb.QueryRowx(pgStr, mocks.BlockNumber.Uint64()).StructScan(header)
@ -197,7 +199,7 @@ func TestFileIndexer(t *testing.T) {
test_helpers.ExpectEqual(t, header.CID, headerCID.String())
test_helpers.ExpectEqual(t, header.TD, mocks.MockBlock.Difficulty().String())
test_helpers.ExpectEqual(t, header.Reward, "2000000000000021250")
test_helpers.ExpectEqual(t, *header.BaseFee, mocks.MockHeader.BaseFee.String())
test_helpers.ExpectEqual(t, header.Coinbase, mocks.MockHeader.Coinbase.String())
dc, err := cid.Decode(header.CID)
if err != nil {
t.Fatal(err)
@ -231,6 +233,10 @@ func TestFileIndexer(t *testing.T) {
expectTrue(t, test_helpers.ListContainsString(trxs, trx4CID.String()))
expectTrue(t, test_helpers.ListContainsString(trxs, trx5CID.String()))
// and published
type txResult struct {
TxType uint8 `db:"tx_type"`
Value string
}
for _, c := range trxs {
dc, err := cid.Decode(c)
if err != nil {
@ -243,47 +249,59 @@ func TestFileIndexer(t *testing.T) {
if err != nil {
t.Fatal(err)
}
txTypePgStr := `SELECT tx_type FROM eth.transaction_cids WHERE cid = $1`
txTypeAndValueStr := `SELECT tx_type, value FROM eth.transaction_cids WHERE cid = $1`
switch c {
case trx1CID.String():
test_helpers.ExpectEqual(t, data, tx1)
var txType uint8
err = sqlxdb.Get(&txType, txTypePgStr, c)
txRes := new(txResult)
err = sqlxdb.QueryRowx(txTypeAndValueStr, c).StructScan(txRes)
if err != nil {
t.Fatal(err)
}
if txType != 0 {
t.Fatalf("expected LegacyTxType (0), got %d", txType)
if txRes.TxType != 0 {
t.Fatalf("expected LegacyTxType (0), got %d", txRes.TxType)
}
if txRes.Value != txs[0].Value().String() {
t.Fatalf("expected tx value %s got %s", txs[0].Value().String(), txRes.Value)
}
case trx2CID.String():
test_helpers.ExpectEqual(t, data, tx2)
var txType uint8
err = sqlxdb.Get(&txType, txTypePgStr, c)
txRes := new(txResult)
err = sqlxdb.QueryRowx(txTypeAndValueStr, c).StructScan(txRes)
if err != nil {
t.Fatal(err)
}
if txType != 0 {
t.Fatalf("expected LegacyTxType (0), got %d", txType)
if txRes.TxType != 0 {
t.Fatalf("expected LegacyTxType (0), got %d", txRes.TxType)
}
if txRes.Value != txs[1].Value().String() {
t.Fatalf("expected tx value %s got %s", txs[1].Value().String(), txRes.Value)
}
case trx3CID.String():
test_helpers.ExpectEqual(t, data, tx3)
var txType uint8
err = sqlxdb.Get(&txType, txTypePgStr, c)
txRes := new(txResult)
err = sqlxdb.QueryRowx(txTypeAndValueStr, c).StructScan(txRes)
if err != nil {
t.Fatal(err)
}
if txType != 0 {
t.Fatalf("expected LegacyTxType (0), got %d", txType)
if txRes.TxType != 0 {
t.Fatalf("expected LegacyTxType (0), got %d", txRes.TxType)
}
if txRes.Value != txs[2].Value().String() {
t.Fatalf("expected tx value %s got %s", txs[2].Value().String(), txRes.Value)
}
case trx4CID.String():
test_helpers.ExpectEqual(t, data, tx4)
var txType uint8
err = sqlxdb.Get(&txType, txTypePgStr, c)
txRes := new(txResult)
err = sqlxdb.QueryRowx(txTypeAndValueStr, c).StructScan(txRes)
if err != nil {
t.Fatal(err)
}
if txType != types.AccessListTxType {
t.Fatalf("expected AccessListTxType (1), got %d", txType)
if txRes.TxType != types.AccessListTxType {
t.Fatalf("expected AccessListTxType (1), got %d", txRes.TxType)
}
if txRes.Value != txs[3].Value().String() {
t.Fatalf("expected tx value %s got %s", txs[3].Value().String(), txRes.Value)
}
accessListElementModels := make([]models.AccessListElementModel, 0)
pgStr = `SELECT access_list_elements.* FROM eth.access_list_elements INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.tx_hash) WHERE cid = $1 ORDER BY access_list_elements.index ASC`
@ -307,13 +325,16 @@ func TestFileIndexer(t *testing.T) {
test_helpers.ExpectEqual(t, model2, mocks.AccessListEntry2Model)
case trx5CID.String():
test_helpers.ExpectEqual(t, data, tx5)
var txType *uint8
err = sqlxdb.Get(&txType, txTypePgStr, c)
txRes := new(txResult)
err = sqlxdb.QueryRowx(txTypeAndValueStr, c).StructScan(txRes)
if err != nil {
t.Fatal(err)
}
if *txType != types.DynamicFeeTxType {
t.Fatalf("expected DynamicFeeTxType (2), got %d", *txType)
if txRes.TxType != types.DynamicFeeTxType {
t.Fatalf("expected DynamicFeeTxType (2), got %d", txRes.TxType)
}
if txRes.Value != txs[4].Value().String() {
t.Fatalf("expected tx value %s got %s", txs[4].Value().String(), txRes.Value)
}
}
}

View File

@ -124,18 +124,14 @@ const (
ipldInsert = "INSERT INTO public.blocks (key, data) VALUES ('%s', '\\x%x');\n"
headerInsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, " +
"state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) VALUES " +
"('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %d, %s);\n"
headerInsertWithoutBaseFee = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, " +
"reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) VALUES " +
"('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %d, NULL);\n"
"state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES " +
"('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %d, '%s');\n"
uncleInsert = "INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES " +
"('%s', '%s', '%s', '%s', '%s', '%s');\n"
txInsert = "INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) " +
"VALUES ('%s', '%s', '%s', '%s', '%s', %d, '%s', '\\x%x', %d);\n"
txInsert = "INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, " +
"value) VALUES ('%s', '%s', '%s', '%s', '%s', %d, '%s', '\\x%x', %d, '%s');\n"
alInsert = "INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES ('%s', %d, '%s', '%s');\n"
@ -191,42 +187,40 @@ func (sqw *SQLWriter) upsertIPLDRaw(codec, mh uint64, raw []byte) (string, strin
}
func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) {
var stmt string
if header.BaseFee == nil {
stmt = fmt.Sprintf(headerInsertWithoutBaseFee, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1)
} else {
stmt = fmt.Sprintf(headerInsert, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, *header.BaseFee)
}
stmt := fmt.Sprintf(headerInsert, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.Coinbase)
sqw.stmts <- []byte(stmt)
indexerMetrics.blocks.Inc(1)
}
func (sqw *SQLWriter) upsertUncleCID(uncle models.UncleModel) {
sqw.stmts <- []byte(fmt.Sprintf(uncleInsert, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey))
sqw.stmts <- []byte(fmt.Sprintf(uncleInsert, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID,
uncle.Reward, uncle.MhKey))
}
func (sqw *SQLWriter) upsertTransactionCID(transaction models.TxModel) {
sqw.stmts <- []byte(fmt.Sprintf(txInsert, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type))
sqw.stmts <- []byte(fmt.Sprintf(txInsert, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst,
transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value))
indexerMetrics.transactions.Inc(1)
}
func (sqw *SQLWriter) upsertAccessListElement(accessListElement models.AccessListElementModel) {
sqw.stmts <- []byte(fmt.Sprintf(alInsert, accessListElement.TxID, accessListElement.Index, accessListElement.Address, formatPostgresStringArray(accessListElement.StorageKeys)))
sqw.stmts <- []byte(fmt.Sprintf(alInsert, accessListElement.TxID, accessListElement.Index, accessListElement.Address,
formatPostgresStringArray(accessListElement.StorageKeys)))
indexerMetrics.accessListEntries.Inc(1)
}
func (sqw *SQLWriter) upsertReceiptCID(rct *models.ReceiptModel) {
sqw.stmts <- []byte(fmt.Sprintf(rctInsert, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot))
sqw.stmts <- []byte(fmt.Sprintf(rctInsert, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey,
rct.PostState, rct.PostStatus, rct.LogRoot))
indexerMetrics.receipts.Inc(1)
}
func (sqw *SQLWriter) upsertLogCID(logs []*models.LogsModel) {
for _, l := range logs {
sqw.stmts <- []byte(fmt.Sprintf(logInsert, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0, l.Topic1, l.Topic2, l.Topic3, l.Data))
sqw.stmts <- []byte(fmt.Sprintf(logInsert, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0,
l.Topic1, l.Topic2, l.Topic3, l.Data))
indexerMetrics.logs.Inc(1)
}
}

View File

@ -256,7 +256,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
TxRoot: header.TxHash.String(),
UncleRoot: header.UncleHash.String(),
Timestamp: header.Time,
BaseFee: baseFee,
Coinbase: header.Coinbase.String(),
})
}
@ -316,6 +316,12 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
// index tx
trx := args.txs[i]
txID := trx.Hash().String()
var val string
if trx.Value() != nil {
val = trx.Value().String()
}
// derive sender for the tx that corresponds with this receipt
from, err := types.Sender(signer, trx)
if err != nil {
@ -331,6 +337,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
CID: txNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
Type: trx.Type(),
Value: val,
}
if err := sdi.dbWriter.upsertTransactionCID(tx.dbtx, txModel); err != nil {
return err

View File

@ -63,7 +63,7 @@ func TestPGXIndexerLegacy(t *testing.T) {
t.Run("Publish and index header IPLDs", func(t *testing.T) {
setupLegacyPGX(t)
defer tearDown(t)
pgStr := `SELECT cid, cast(td AS TEXT), cast(reward AS TEXT), block_hash, base_fee
pgStr := `SELECT cid, cast(td AS TEXT), cast(reward AS TEXT), block_hash, coinbase
FROM eth.header_cids
WHERE block_number = $1`
// check header was properly indexed
@ -72,18 +72,18 @@ func TestPGXIndexerLegacy(t *testing.T) {
TD string
Reward string
BlockHash string `db:"block_hash"`
BaseFee *int64 `db:"base_fee"`
Coinbase string `db:"coinbase"`
}
header := new(res)
err = db.QueryRow(context.Background(), pgStr, legacyData.BlockNumber.Uint64()).Scan(
&header.CID, &header.TD, &header.Reward, &header.BlockHash, &header.BaseFee)
&header.CID, &header.TD, &header.Reward, &header.BlockHash, &header.Coinbase)
require.NoError(t, err)
test_helpers.ExpectEqual(t, header.CID, legacyHeaderCID.String())
test_helpers.ExpectEqual(t, header.TD, legacyData.MockBlock.Difficulty().String())
test_helpers.ExpectEqual(t, header.Reward, "5000000000000011250")
test_helpers.ExpectEqual(t, header.Coinbase, legacyData.MockHeader.Coinbase.String())
require.Nil(t, legacyData.MockHeader.BaseFee)
require.Nil(t, header.BaseFee)
})
}

View File

@ -150,7 +150,7 @@ func TestPGXIndexer(t *testing.T) {
t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) {
setupPGX(t)
defer tearDown(t)
pgStr := `SELECT cid, cast(td AS TEXT), cast(reward AS TEXT), block_hash, cast(base_fee AS TEXT)
pgStr := `SELECT cid, cast(td AS TEXT), cast(reward AS TEXT), block_hash, coinbase
FROM eth.header_cids
WHERE block_number = $1`
// check header was properly indexed
@ -158,8 +158,8 @@ func TestPGXIndexer(t *testing.T) {
CID string
TD string
Reward string
BlockHash string `db:"block_hash"`
BaseFee *string `db:"base_fee"`
BlockHash string `db:"block_hash"`
Coinbase string `db:"coinbase"`
}
header := new(res)
err = db.QueryRow(context.Background(), pgStr, mocks.BlockNumber.Uint64()).Scan(
@ -167,14 +167,14 @@ func TestPGXIndexer(t *testing.T) {
&header.TD,
&header.Reward,
&header.BlockHash,
&header.BaseFee)
&header.Coinbase)
if err != nil {
t.Fatal(err)
}
test_helpers.ExpectEqual(t, header.CID, headerCID.String())
test_helpers.ExpectEqual(t, header.TD, mocks.MockBlock.Difficulty().String())
test_helpers.ExpectEqual(t, header.Reward, "2000000000000021250")
test_helpers.ExpectEqual(t, *header.BaseFee, mocks.MockHeader.BaseFee.String())
test_helpers.ExpectEqual(t, header.Coinbase, mocks.MockHeader.Coinbase.String())
dc, err := cid.Decode(header.CID)
if err != nil {
t.Fatal(err)
@ -207,6 +207,11 @@ func TestPGXIndexer(t *testing.T) {
expectTrue(t, test_helpers.ListContainsString(trxs, trx4CID.String()))
expectTrue(t, test_helpers.ListContainsString(trxs, trx5CID.String()))
// and published
transactions := mocks.MockBlock.Transactions()
type txResult struct {
TxType uint8 `db:"tx_type"`
Value string
}
for _, c := range trxs {
dc, err := cid.Decode(c)
if err != nil {
@ -219,47 +224,59 @@ func TestPGXIndexer(t *testing.T) {
if err != nil {
t.Fatal(err)
}
txTypePgStr := `SELECT tx_type FROM eth.transaction_cids WHERE cid = $1`
txTypeAndValueStr := `SELECT tx_type, CAST(value as TEXT) FROM eth.transaction_cids WHERE cid = $1`
switch c {
case trx1CID.String():
test_helpers.ExpectEqual(t, data, tx1)
var txType uint8
err = db.Get(context.Background(), &txType, txTypePgStr, c)
txRes := new(txResult)
err = db.QueryRow(context.Background(), txTypeAndValueStr, c).Scan(&txRes.TxType, &txRes.Value)
if err != nil {
t.Fatal(err)
}
if txType != 0 {
t.Fatalf("expected tx_type 0, got %d", txType)
if txRes.TxType != 0 {
t.Fatalf("expected LegacyTxType (0), got %d", txRes.TxType)
}
if txRes.Value != transactions[0].Value().String() {
t.Fatalf("expected tx value %s got %s", transactions[0].Value().String(), txRes.Value)
}
case trx2CID.String():
test_helpers.ExpectEqual(t, data, tx2)
var txType uint8
err = db.Get(context.Background(), &txType, txTypePgStr, c)
txRes := new(txResult)
err = db.QueryRow(context.Background(), txTypeAndValueStr, c).Scan(&txRes.TxType, &txRes.Value)
if err != nil {
t.Fatal(err)
}
if txType != 0 {
t.Fatalf("expected tx_type 0, got %d", txType)
if txRes.TxType != 0 {
t.Fatalf("expected LegacyTxType (0), got %d", txRes.TxType)
}
if txRes.Value != transactions[1].Value().String() {
t.Fatalf("expected tx value %s got %s", transactions[1].Value().String(), txRes.Value)
}
case trx3CID.String():
test_helpers.ExpectEqual(t, data, tx3)
var txType uint8
err = db.Get(context.Background(), &txType, txTypePgStr, c)
txRes := new(txResult)
err = db.QueryRow(context.Background(), txTypeAndValueStr, c).Scan(&txRes.TxType, &txRes.Value)
if err != nil {
t.Fatal(err)
}
if txType != 0 {
t.Fatalf("expected tx_type 0, got %d", txType)
if txRes.TxType != 0 {
t.Fatalf("expected LegacyTxType (0), got %d", txRes.TxType)
}
if txRes.Value != transactions[2].Value().String() {
t.Fatalf("expected tx value %s got %s", transactions[2].Value().String(), txRes.Value)
}
case trx4CID.String():
test_helpers.ExpectEqual(t, data, tx4)
var txType uint8
err = db.Get(context.Background(), &txType, txTypePgStr, c)
txRes := new(txResult)
err = db.QueryRow(context.Background(), txTypeAndValueStr, c).Scan(&txRes.TxType, &txRes.Value)
if err != nil {
t.Fatal(err)
}
if txType != types.AccessListTxType {
t.Fatalf("expected AccessListTxType (1), got %d", txType)
if txRes.TxType != types.AccessListTxType {
t.Fatalf("expected AccessListTxType (1), got %d", txRes.TxType)
}
if txRes.Value != transactions[3].Value().String() {
t.Fatalf("expected tx value %s got %s", transactions[3].Value().String(), txRes.Value)
}
accessListElementModels := make([]models.AccessListElementModel, 0)
pgStr = `SELECT access_list_elements.* FROM eth.access_list_elements INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.tx_hash) WHERE cid = $1 ORDER BY access_list_elements.index ASC`
@ -283,13 +300,16 @@ func TestPGXIndexer(t *testing.T) {
test_helpers.ExpectEqual(t, model2, mocks.AccessListEntry2Model)
case trx5CID.String():
test_helpers.ExpectEqual(t, data, tx5)
var txType *uint8
err = db.Get(context.Background(), &txType, txTypePgStr, c)
txRes := new(txResult)
err = db.QueryRow(context.Background(), txTypeAndValueStr, c).Scan(&txRes.TxType, &txRes.Value)
if err != nil {
t.Fatal(err)
}
if *txType != types.DynamicFeeTxType {
t.Fatalf("expected DynamicFeeTxType (2), got %d", *txType)
if txRes.TxType != types.DynamicFeeTxType {
t.Fatalf("expected DynamicFeeTxType (2), got %d", txRes.TxType)
}
if txRes.Value != transactions[4].Value().String() {
t.Fatalf("expected tx value %s got %s", transactions[4].Value().String(), txRes.Value)
}
}
}

View File

@ -49,9 +49,9 @@ func ResolveDriverType(str string) (DriverType, error) {
var DefaultConfig = Config{
Hostname: "localhost",
Port: 5432,
DatabaseName: "vulcanize_testing",
DatabaseName: "vulcanize_test",
Username: "postgres",
Password: "password",
Password: "",
}
// Config holds params for a Postgres db

View File

@ -37,9 +37,9 @@ type DB struct {
// InsertHeaderStm satisfies the sql.Statements interface
func (db *DB) InsertHeaderStm() string {
return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee)
return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)`
ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)`
}
// InsertUncleStm satisfies the sql.Statements interface
@ -50,7 +50,7 @@ func (db *DB) InsertUncleStm() string {
// InsertTxStm satisfies the sql.Statements interface
func (db *DB) InsertTxStm() string {
return `INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
return `INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, value) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (tx_hash) DO NOTHING`
}

View File

@ -73,7 +73,7 @@ func TestSQLXIndexerLegacy(t *testing.T) {
t.Run("Publish and index header IPLDs", func(t *testing.T) {
setupLegacySQLX(t)
defer tearDown(t)
pgStr := `SELECT cid, td, reward, block_hash, base_fee
pgStr := `SELECT cid, td, reward, block_hash, coinbase
FROM eth.header_cids
WHERE block_number = $1`
// check header was properly indexed
@ -82,7 +82,7 @@ func TestSQLXIndexerLegacy(t *testing.T) {
TD string
Reward string
BlockHash string `db:"block_hash"`
BaseFee *int64 `db:"base_fee"`
Coinbase string `db:"coinbase"`
}
header := new(res)
err = db.QueryRow(context.Background(), pgStr, legacyData.BlockNumber.Uint64()).(*sqlx.Row).StructScan(header)
@ -91,7 +91,7 @@ func TestSQLXIndexerLegacy(t *testing.T) {
test_helpers.ExpectEqual(t, header.CID, legacyHeaderCID.String())
test_helpers.ExpectEqual(t, header.TD, legacyData.MockBlock.Difficulty().String())
test_helpers.ExpectEqual(t, header.Reward, "5000000000000011250")
test_helpers.ExpectEqual(t, header.Coinbase, legacyData.MockHeader.Coinbase.String())
require.Nil(t, legacyData.MockHeader.BaseFee)
require.Nil(t, header.BaseFee)
})
}

View File

@ -177,7 +177,7 @@ func TestSQLXIndexer(t *testing.T) {
t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) {
setupSQLX(t)
defer tearDown(t)
pgStr := `SELECT cid, td, reward, block_hash, base_fee
pgStr := `SELECT cid, td, reward, block_hash, coinbase
FROM eth.header_cids
WHERE block_number = $1`
// check header was properly indexed
@ -185,8 +185,8 @@ func TestSQLXIndexer(t *testing.T) {
CID string
TD string
Reward string
BlockHash string `db:"block_hash"`
BaseFee *string `db:"base_fee"`
BlockHash string `db:"block_hash"`
Coinbase string `db:"coinbase"`
}
header := new(res)
err = db.QueryRow(context.Background(), pgStr, mocks.BlockNumber.Uint64()).(*sqlx.Row).StructScan(header)
@ -196,7 +196,7 @@ func TestSQLXIndexer(t *testing.T) {
test_helpers.ExpectEqual(t, header.CID, headerCID.String())
test_helpers.ExpectEqual(t, header.TD, mocks.MockBlock.Difficulty().String())
test_helpers.ExpectEqual(t, header.Reward, "2000000000000021250")
test_helpers.ExpectEqual(t, *header.BaseFee, mocks.MockHeader.BaseFee.String())
test_helpers.ExpectEqual(t, header.Coinbase, mocks.MockHeader.Coinbase.String())
dc, err := cid.Decode(header.CID)
if err != nil {
t.Fatal(err)
@ -229,6 +229,11 @@ func TestSQLXIndexer(t *testing.T) {
expectTrue(t, test_helpers.ListContainsString(trxs, trx4CID.String()))
expectTrue(t, test_helpers.ListContainsString(trxs, trx5CID.String()))
// and published
transactions := mocks.MockBlock.Transactions()
type txResult struct {
TxType uint8 `db:"tx_type"`
Value string
}
for _, c := range trxs {
dc, err := cid.Decode(c)
if err != nil {
@ -241,47 +246,59 @@ func TestSQLXIndexer(t *testing.T) {
if err != nil {
t.Fatal(err)
}
txTypePgStr := `SELECT tx_type FROM eth.transaction_cids WHERE cid = $1`
txTypeAndValueStr := `SELECT tx_type, value FROM eth.transaction_cids WHERE cid = $1`
switch c {
case trx1CID.String():
test_helpers.ExpectEqual(t, data, tx1)
var txType uint8
err = db.Get(context.Background(), &txType, txTypePgStr, c)
txRes := new(txResult)
err = db.QueryRow(context.Background(), txTypeAndValueStr, c).(*sqlx.Row).StructScan(txRes)
if err != nil {
t.Fatal(err)
}
if txType != 0 {
t.Fatalf("expected LegacyTxType (0), got %d", txType)
if txRes.TxType != 0 {
t.Fatalf("expected LegacyTxType (0), got %d", txRes.TxType)
}
if txRes.Value != transactions[0].Value().String() {
t.Fatalf("expected tx value %s got %s", transactions[0].Value().String(), txRes.Value)
}
case trx2CID.String():
test_helpers.ExpectEqual(t, data, tx2)
var txType uint8
err = db.Get(context.Background(), &txType, txTypePgStr, c)
txRes := new(txResult)
err = db.QueryRow(context.Background(), txTypeAndValueStr, c).(*sqlx.Row).StructScan(txRes)
if err != nil {
t.Fatal(err)
}
if txType != 0 {
t.Fatalf("expected LegacyTxType (0), got %d", txType)
if txRes.TxType != 0 {
t.Fatalf("expected LegacyTxType (0), got %d", txRes.TxType)
}
if txRes.Value != transactions[1].Value().String() {
t.Fatalf("expected tx value %s got %s", transactions[1].Value().String(), txRes.Value)
}
case trx3CID.String():
test_helpers.ExpectEqual(t, data, tx3)
var txType uint8
err = db.Get(context.Background(), &txType, txTypePgStr, c)
txRes := new(txResult)
err = db.QueryRow(context.Background(), txTypeAndValueStr, c).(*sqlx.Row).StructScan(txRes)
if err != nil {
t.Fatal(err)
}
if txType != 0 {
t.Fatalf("expected LegacyTxType (0), got %d", txType)
if txRes.TxType != 0 {
t.Fatalf("expected LegacyTxType (0), got %d", txRes.TxType)
}
if txRes.Value != transactions[2].Value().String() {
t.Fatalf("expected tx value %s got %s", transactions[2].Value().String(), txRes.Value)
}
case trx4CID.String():
test_helpers.ExpectEqual(t, data, tx4)
var txType uint8
err = db.Get(context.Background(), &txType, txTypePgStr, c)
txRes := new(txResult)
err = db.QueryRow(context.Background(), txTypeAndValueStr, c).(*sqlx.Row).StructScan(txRes)
if err != nil {
t.Fatal(err)
}
if txType != types.AccessListTxType {
t.Fatalf("expected AccessListTxType (1), got %d", txType)
if txRes.TxType != types.AccessListTxType {
t.Fatalf("expected AccessListTxType (1), got %d", txRes.TxType)
}
if txRes.Value != transactions[3].Value().String() {
t.Fatalf("expected tx value %s got %s", transactions[3].Value().String(), txRes.Value)
}
accessListElementModels := make([]models.AccessListElementModel, 0)
pgStr = `SELECT access_list_elements.* FROM eth.access_list_elements INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.tx_hash) WHERE cid = $1 ORDER BY access_list_elements.index ASC`
@ -305,13 +322,16 @@ func TestSQLXIndexer(t *testing.T) {
test_helpers.ExpectEqual(t, model2, mocks.AccessListEntry2Model)
case trx5CID.String():
test_helpers.ExpectEqual(t, data, tx5)
var txType *uint8
err = db.Get(context.Background(), &txType, txTypePgStr, c)
txRes := new(txResult)
err = db.QueryRow(context.Background(), txTypeAndValueStr, c).(*sqlx.Row).StructScan(txRes)
if err != nil {
t.Fatal(err)
}
if *txType != types.DynamicFeeTxType {
t.Fatalf("expected DynamicFeeTxType (2), got %d", *txType)
if txRes.TxType != types.DynamicFeeTxType {
t.Fatalf("expected DynamicFeeTxType (2), got %d", txRes.TxType)
}
if txRes.Value != transactions[4].Value().String() {
t.Fatalf("expected tx value %s got %s", transactions[4].Value().String(), txRes.Value)
}
}
}

View File

@ -45,14 +45,15 @@ func (w *Writer) Close() error {
}
/*
INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee)
INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
ON CONFLICT (block_hash) DO UPDATE SET (block_number, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = ($1, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)
ON CONFLICT (block_hash) DO UPDATE SET (block_number, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($1, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)
*/
func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error {
_, err := tx.Exec(w.db.Context(), w.db.InsertHeaderStm(),
header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, w.db.NodeID(), header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.BaseFee)
header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, w.db.NodeID(),
header.Reward, header.StateRoot, header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom,
header.Timestamp, header.MhKey, 1, header.Coinbase)
if err != nil {
return fmt.Errorf("error upserting header_cids entry: %v", err)
}
@ -74,12 +75,13 @@ func (w *Writer) upsertUncleCID(tx Tx, uncle models.UncleModel) error {
}
/*
INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type, value) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (tx_hash) DO NOTHING
*/
func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error {
_, err := tx.Exec(w.db.Context(), w.db.InsertTxStm(),
transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type)
transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index,
transaction.MhKey, transaction.Data, transaction.Type, transaction.Value)
if err != nil {
return fmt.Errorf("error upserting transaction_cids entry: %v", err)
}
@ -122,7 +124,8 @@ ON CONFLICT (rct_id, index) DO NOTHING
func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error {
for _, log := range logs {
_, err := tx.Exec(w.db.Context(), w.db.InsertLogStm(),
log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3, log.Data)
log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2,
log.Topic3, log.Data)
if err != nil {
return fmt.Errorf("error upserting logs entry: %w", err)
}
@ -154,7 +157,8 @@ ON CONFLICT (header_id, state_path) DO NOTHING
*/
func (w *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel) error {
_, err := tx.Exec(w.db.Context(), w.db.InsertAccountStm(),
stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance, stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot)
stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance, stateAccount.Nonce, stateAccount.CodeHash,
stateAccount.StorageRoot)
if err != nil {
return fmt.Errorf("error upserting state_accounts entry: %v", err)
}
@ -171,7 +175,8 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err
storageKey = storageCID.StorageKey
}
_, err := tx.Exec(w.db.Context(), w.db.InsertStorageStm(),
storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey)
storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType,
true, storageCID.MhKey)
if err != nil {
return fmt.Errorf("error upserting storage_cids entry: %v", err)
}

View File

@ -49,6 +49,7 @@ var (
Difficulty: big.NewInt(5000000),
Extra: []byte{},
BaseFee: big.NewInt(params.InitialBaseFee),
Coinbase: common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476777"),
}
MockTransactions, MockReceipts, SenderAddr = createTransactionsAndReceipts(TestConfig, BlockNumber)
MockBlock = types.NewBlock(&MockHeader, MockTransactions, nil, MockReceipts, new(trie.Trie))
@ -216,6 +217,7 @@ func NewLegacyData() *LegacyData {
ReceiptHash: common.HexToHash("0x0"),
Difficulty: big.NewInt(5000000),
Extra: []byte{},
Coinbase: common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476888"),
}
mockTransactions, mockReceipts, senderAddr := createLegacyTransactionsAndReceipts(config, blockNumber)
@ -306,7 +308,7 @@ func createTransactionsAndReceipts(config *params.ChainConfig, blockNumber *big.
GasPrice: big.NewInt(100),
Gas: 50,
To: &AnotherAddress,
Value: big.NewInt(1000),
Value: big.NewInt(999),
Data: []byte{},
AccessList: types.AccessList{
AccessListEntry1,

View File

@ -26,22 +26,22 @@ type IPLDModel struct {
// HeaderModel is the db model for eth.header_cids
type HeaderModel struct {
BlockNumber string `db:"block_number"`
BlockHash string `db:"block_hash"`
ParentHash string `db:"parent_hash"`
CID string `db:"cid"`
MhKey string `db:"mh_key"`
TotalDifficulty string `db:"td"`
NodeID string `db:"node_id"`
Reward string `db:"reward"`
StateRoot string `db:"state_root"`
UncleRoot string `db:"uncle_root"`
TxRoot string `db:"tx_root"`
RctRoot string `db:"receipt_root"`
Bloom []byte `db:"bloom"`
Timestamp uint64 `db:"timestamp"`
TimesValidated int64 `db:"times_validated"`
BaseFee *string `db:"base_fee"`
BlockNumber string `db:"block_number"`
BlockHash string `db:"block_hash"`
ParentHash string `db:"parent_hash"`
CID string `db:"cid"`
MhKey string `db:"mh_key"`
TotalDifficulty string `db:"td"`
NodeID string `db:"node_id"`
Reward string `db:"reward"`
StateRoot string `db:"state_root"`
UncleRoot string `db:"uncle_root"`
TxRoot string `db:"tx_root"`
RctRoot string `db:"receipt_root"`
Bloom []byte `db:"bloom"`
Timestamp uint64 `db:"timestamp"`
TimesValidated int64 `db:"times_validated"`
Coinbase string `db:"coinbase"`
}
// UncleModel is the db model for eth.uncle_cids
@ -65,6 +65,7 @@ type TxModel struct {
Src string `db:"src"`
Data []byte `db:"tx_data"`
Type uint8 `db:"tx_type"`
Value string `db:"value"`
}
// AccessListElementModel is the db model for eth.access_list_entry