Schema updates #156
@ -184,10 +184,10 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
|||||||
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (string, error) {
|
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (string, error) {
|
||||||
tx.cacheIPLD(headerNode)
|
tx.cacheIPLD(headerNode)
|
||||||
|
|
||||||
var baseFee *int64
|
var baseFee *string
|
||||||
if header.BaseFee != nil {
|
if header.BaseFee != nil {
|
||||||
baseFee = new(int64)
|
baseFee = new(string)
|
||||||
*baseFee = header.BaseFee.Int64()
|
*baseFee = header.BaseFee.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
headerID := header.Hash().String()
|
headerID := header.Hash().String()
|
||||||
|
@ -31,3 +31,15 @@ type Config struct {
|
|||||||
func (c Config) Type() shared.DBType {
|
func (c Config) Type() shared.DBType {
|
||||||
return shared.FILE
|
return shared.FILE
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestConfig config for unit tests
|
||||||
|
var TestConfig = Config{
|
||||||
|
FilePath: "./statediffing_test_file.sql",
|
||||||
|
NodeInfo: node.Info{
|
||||||
|
GenesisBlock: "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3",
|
||||||
|
NetworkID: "1",
|
||||||
|
ChainID: 1,
|
||||||
|
ID: "mockNodeID",
|
||||||
|
ClientName: "go-ethereum",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
@ -53,7 +53,7 @@ var (
|
|||||||
|
|
||||||
// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void
|
// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void
|
||||||
type StateDiffIndexer struct {
|
type StateDiffIndexer struct {
|
||||||
writer *SQLWriter
|
fileWriter *SQLWriter
|
||||||
chainConfig *params.ChainConfig
|
chainConfig *params.ChainConfig
|
||||||
nodeID string
|
nodeID string
|
||||||
wg *sync.WaitGroup
|
wg *sync.WaitGroup
|
||||||
@ -79,7 +79,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c
|
|||||||
w.upsertNode(config.NodeInfo)
|
w.upsertNode(config.NodeInfo)
|
||||||
w.upsertIPLDDirect(shared.RemovedNodeMhKey, []byte{})
|
w.upsertIPLDDirect(shared.RemovedNodeMhKey, []byte{})
|
||||||
return &StateDiffIndexer{
|
return &StateDiffIndexer{
|
||||||
writer: w,
|
fileWriter: w,
|
||||||
chainConfig: chainConfig,
|
chainConfig: chainConfig,
|
||||||
nodeID: config.NodeInfo.ID,
|
nodeID: config.NodeInfo.ID,
|
||||||
wg: wg,
|
wg: wg,
|
||||||
@ -133,7 +133,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
|||||||
indexerMetrics.tStateStoreCodeProcessing.Update(tDiff)
|
indexerMetrics.tStateStoreCodeProcessing.Update(tDiff)
|
||||||
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
|
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
|
||||||
t = time.Now()
|
t = time.Now()
|
||||||
sdi.writer.Flush()
|
sdi.fileWriter.Flush()
|
||||||
tDiff = time.Since(t)
|
tDiff = time.Since(t)
|
||||||
indexerMetrics.tPostgresCommit.Update(tDiff)
|
indexerMetrics.tPostgresCommit.Update(tDiff)
|
||||||
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
|
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
|
||||||
@ -189,15 +189,15 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
|||||||
// processHeader write a header IPLD insert SQL stmt to a file
|
// processHeader write a header IPLD insert SQL stmt to a file
|
||||||
// it returns the headerID
|
// it returns the headerID
|
||||||
func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node.Node, reward, td *big.Int) string {
|
func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node.Node, reward, td *big.Int) string {
|
||||||
sdi.writer.upsertIPLDNode(headerNode)
|
sdi.fileWriter.upsertIPLDNode(headerNode)
|
||||||
|
|
||||||
var baseFee *int64
|
var baseFee *string
|
||||||
if header.BaseFee != nil {
|
if header.BaseFee != nil {
|
||||||
baseFee = new(int64)
|
baseFee = new(string)
|
||||||
*baseFee = header.BaseFee.Int64()
|
*baseFee = header.BaseFee.String()
|
||||||
}
|
}
|
||||||
headerID := header.Hash().String()
|
headerID := header.Hash().String()
|
||||||
sdi.writer.upsertHeaderCID(models.HeaderModel{
|
sdi.fileWriter.upsertHeaderCID(models.HeaderModel{
|
||||||
NodeID: sdi.nodeID,
|
NodeID: sdi.nodeID,
|
||||||
CID: headerNode.Cid().String(),
|
CID: headerNode.Cid().String(),
|
||||||
MhKey: shared.MultihashKeyFromCID(headerNode.Cid()),
|
MhKey: shared.MultihashKeyFromCID(headerNode.Cid()),
|
||||||
@ -221,7 +221,7 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node
|
|||||||
func (sdi *StateDiffIndexer) processUncles(headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) {
|
func (sdi *StateDiffIndexer) processUncles(headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) {
|
||||||
// publish and index uncles
|
// publish and index uncles
|
||||||
for _, uncleNode := range uncleNodes {
|
for _, uncleNode := range uncleNodes {
|
||||||
sdi.writer.upsertIPLDNode(uncleNode)
|
sdi.fileWriter.upsertIPLDNode(uncleNode)
|
||||||
var uncleReward *big.Int
|
var uncleReward *big.Int
|
||||||
// in PoA networks uncle reward is 0
|
// in PoA networks uncle reward is 0
|
||||||
if sdi.chainConfig.Clique != nil {
|
if sdi.chainConfig.Clique != nil {
|
||||||
@ -229,7 +229,7 @@ func (sdi *StateDiffIndexer) processUncles(headerID string, blockNumber uint64,
|
|||||||
} else {
|
} else {
|
||||||
uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64())
|
uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64())
|
||||||
}
|
}
|
||||||
sdi.writer.upsertUncleCID(models.UncleModel{
|
sdi.fileWriter.upsertUncleCID(models.UncleModel{
|
||||||
HeaderID: headerID,
|
HeaderID: headerID,
|
||||||
CID: uncleNode.Cid().String(),
|
CID: uncleNode.Cid().String(),
|
||||||
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
|
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
|
||||||
@ -261,10 +261,10 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
|
|||||||
signer := types.MakeSigner(sdi.chainConfig, args.blockNumber)
|
signer := types.MakeSigner(sdi.chainConfig, args.blockNumber)
|
||||||
for i, receipt := range args.receipts {
|
for i, receipt := range args.receipts {
|
||||||
for _, logTrieNode := range args.logTrieNodes[i] {
|
for _, logTrieNode := range args.logTrieNodes[i] {
|
||||||
sdi.writer.upsertIPLDNode(logTrieNode)
|
sdi.fileWriter.upsertIPLDNode(logTrieNode)
|
||||||
}
|
}
|
||||||
txNode := args.txNodes[i]
|
txNode := args.txNodes[i]
|
||||||
sdi.writer.upsertIPLDNode(txNode)
|
sdi.fileWriter.upsertIPLDNode(txNode)
|
||||||
|
|
||||||
// index tx
|
// index tx
|
||||||
trx := args.txs[i]
|
trx := args.txs[i]
|
||||||
@ -285,7 +285,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
|
|||||||
MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
|
MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
|
||||||
Type: trx.Type(),
|
Type: trx.Type(),
|
||||||
}
|
}
|
||||||
sdi.writer.upsertTransactionCID(txModel)
|
sdi.fileWriter.upsertTransactionCID(txModel)
|
||||||
|
|
||||||
// index access list if this is one
|
// index access list if this is one
|
||||||
for j, accessListElement := range trx.AccessList() {
|
for j, accessListElement := range trx.AccessList() {
|
||||||
@ -299,7 +299,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
|
|||||||
Address: accessListElement.Address.Hex(),
|
Address: accessListElement.Address.Hex(),
|
||||||
StorageKeys: storageKeys,
|
StorageKeys: storageKeys,
|
||||||
}
|
}
|
||||||
sdi.writer.upsertAccessListElement(accessListElementModel)
|
sdi.fileWriter.upsertAccessListElement(accessListElementModel)
|
||||||
}
|
}
|
||||||
|
|
||||||
// this is the contract address if this receipt is for a contract creation tx
|
// this is the contract address if this receipt is for a contract creation tx
|
||||||
@ -327,7 +327,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
|
|||||||
} else {
|
} else {
|
||||||
rctModel.PostState = common.Bytes2Hex(receipt.PostState)
|
rctModel.PostState = common.Bytes2Hex(receipt.PostState)
|
||||||
}
|
}
|
||||||
sdi.writer.upsertReceiptCID(rctModel)
|
sdi.fileWriter.upsertReceiptCID(rctModel)
|
||||||
|
|
||||||
// index logs
|
// index logs
|
||||||
logDataSet := make([]*models.LogsModel, len(receipt.Logs))
|
logDataSet := make([]*models.LogsModel, len(receipt.Logs))
|
||||||
@ -354,13 +354,13 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
|
|||||||
Topic3: topicSet[3],
|
Topic3: topicSet[3],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
sdi.writer.upsertLogCID(logDataSet)
|
sdi.fileWriter.upsertLogCID(logDataSet)
|
||||||
}
|
}
|
||||||
|
|
||||||
// publish trie nodes, these aren't indexed directly
|
// publish trie nodes, these aren't indexed directly
|
||||||
for i, n := range args.txTrieNodes {
|
for i, n := range args.txTrieNodes {
|
||||||
sdi.writer.upsertIPLDNode(n)
|
sdi.fileWriter.upsertIPLDNode(n)
|
||||||
sdi.writer.upsertIPLDNode(args.rctTrieNodes[i])
|
sdi.fileWriter.upsertIPLDNode(args.rctTrieNodes[i])
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -380,10 +380,10 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
|
|||||||
MhKey: shared.RemovedNodeMhKey,
|
MhKey: shared.RemovedNodeMhKey,
|
||||||
NodeType: stateNode.NodeType.Int(),
|
NodeType: stateNode.NodeType.Int(),
|
||||||
}
|
}
|
||||||
sdi.writer.upsertStateCID(stateModel)
|
sdi.fileWriter.upsertStateCID(stateModel)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
stateCIDStr, stateMhKey, err := sdi.writer.upsertIPLDRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
|
stateCIDStr, stateMhKey, err := sdi.fileWriter.upsertIPLDRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error generating and cacheing state node IPLD: %v", err)
|
return fmt.Errorf("error generating and cacheing state node IPLD: %v", err)
|
||||||
}
|
}
|
||||||
@ -396,7 +396,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
|
|||||||
NodeType: stateNode.NodeType.Int(),
|
NodeType: stateNode.NodeType.Int(),
|
||||||
}
|
}
|
||||||
// index the state node
|
// index the state node
|
||||||
sdi.writer.upsertStateCID(stateModel)
|
sdi.fileWriter.upsertStateCID(stateModel)
|
||||||
// if we have a leaf, decode and index the account data
|
// if we have a leaf, decode and index the account data
|
||||||
if stateNode.NodeType == sdtypes.Leaf {
|
if stateNode.NodeType == sdtypes.Leaf {
|
||||||
var i []interface{}
|
var i []interface{}
|
||||||
@ -418,7 +418,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
|
|||||||
CodeHash: account.CodeHash,
|
CodeHash: account.CodeHash,
|
||||||
StorageRoot: account.Root.String(),
|
StorageRoot: account.Root.String(),
|
||||||
}
|
}
|
||||||
sdi.writer.upsertStateAccount(accountModel)
|
sdi.fileWriter.upsertStateAccount(accountModel)
|
||||||
}
|
}
|
||||||
// if there are any storage nodes associated with this node, publish and index them
|
// if there are any storage nodes associated with this node, publish and index them
|
||||||
for _, storageNode := range stateNode.StorageNodes {
|
for _, storageNode := range stateNode.StorageNodes {
|
||||||
@ -434,10 +434,10 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
|
|||||||
MhKey: shared.RemovedNodeMhKey,
|
MhKey: shared.RemovedNodeMhKey,
|
||||||
NodeType: storageNode.NodeType.Int(),
|
NodeType: storageNode.NodeType.Int(),
|
||||||
}
|
}
|
||||||
sdi.writer.upsertStorageCID(storageModel)
|
sdi.fileWriter.upsertStorageCID(storageModel)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
storageCIDStr, storageMhKey, err := sdi.writer.upsertIPLDRaw(ipld2.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue)
|
storageCIDStr, storageMhKey, err := sdi.fileWriter.upsertIPLDRaw(ipld2.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err)
|
return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err)
|
||||||
}
|
}
|
||||||
@ -450,7 +450,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
|
|||||||
MhKey: storageMhKey,
|
MhKey: storageMhKey,
|
||||||
NodeType: storageNode.NodeType.Int(),
|
NodeType: storageNode.NodeType.Int(),
|
||||||
}
|
}
|
||||||
sdi.writer.upsertStorageCID(storageModel)
|
sdi.fileWriter.upsertStorageCID(storageModel)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -463,11 +463,11 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAnd
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error deriving multihash key from codehash: %v", err)
|
return fmt.Errorf("error deriving multihash key from codehash: %v", err)
|
||||||
}
|
}
|
||||||
sdi.writer.upsertIPLDDirect(mhKey, codeAndCodeHash.Code)
|
sdi.fileWriter.upsertIPLDDirect(mhKey, codeAndCodeHash.Code)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close satisfies io.Closer
|
// Close satisfies io.Closer
|
||||||
func (sdi *StateDiffIndexer) Close() error {
|
func (sdi *StateDiffIndexer) Close() error {
|
||||||
return sdi.writer.Close()
|
return sdi.fileWriter.Close()
|
||||||
}
|
}
|
||||||
|
132
statediff/indexer/database/file/indexer_legacy_test.go
Normal file
132
statediff/indexer/database/file/indexer_legacy_test.go
Normal file
@ -0,0 +1,132 @@
|
|||||||
|
// VulcanizeDB
|
||||||
|
// Copyright © 2019 Vulcanize
|
||||||
|
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package file_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
"github.com/jmoiron/sqlx"
|
||||||
|
"github.com/multiformats/go-multihash"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/database/file"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/mocks"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/test_helpers"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
legacyData = mocks.NewLegacyData()
|
||||||
|
mockLegacyBlock *types.Block
|
||||||
|
legacyHeaderCID cid.Cid
|
||||||
|
)
|
||||||
|
|
||||||
|
func setupLegacy(t *testing.T) {
|
||||||
|
mockLegacyBlock = legacyData.MockBlock
|
||||||
|
legacyHeaderCID, _ = ipld.RawdataToCid(ipld.MEthHeader, legacyData.MockHeaderRlp, multihash.KECCAK_256)
|
||||||
|
if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
|
||||||
|
err := os.Remove(file.TestConfig.FilePath)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
ind, err := file.NewStateDiffIndexer(context.Background(), legacyData.Config, file.TestConfig)
|
||||||
|
require.NoError(t, err)
|
||||||
|
var tx interfaces.Batch
|
||||||
|
tx, err = ind.PushBlock(
|
||||||
|
mockLegacyBlock,
|
||||||
|
legacyData.MockReceipts,
|
||||||
|
legacyData.MockBlock.Difficulty())
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
if err := tx.Submit(err); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := ind.Close(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
for _, node := range legacyData.StateDiffs {
|
||||||
|
err = ind.PushStateNode(tx, node, legacyData.MockBlock.Hash().String())
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
test_helpers.ExpectEqual(t, tx.(*file.BatchTx).BlockNumber, legacyData.BlockNumber.Uint64())
|
||||||
|
|
||||||
|
connStr := postgres.DefaultConfig.DbConnectionString()
|
||||||
|
|
||||||
|
sqlxdb, err = sqlx.Connect("postgres", connStr)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to connect to db with connection string: %s err: %v", connStr, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func dumpData(t *testing.T) {
|
||||||
|
sqlFileBytes, err := os.ReadFile(file.TestConfig.FilePath)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
_, err = sqlxdb.Exec(string(sqlFileBytes))
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func tearDown(t *testing.T) {
|
||||||
|
file.TearDownDB(t, sqlxdb)
|
||||||
|
err := os.Remove(file.TestConfig.FilePath)
|
||||||
|
require.NoError(t, err)
|
||||||
|
err = sqlxdb.Close()
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func expectTrue(t *testing.T, value bool) {
|
||||||
|
if !value {
|
||||||
|
t.Fatalf("Assertion failed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFileIndexerLegacy(t *testing.T) {
|
||||||
|
t.Run("Publish and index header IPLDs", func(t *testing.T) {
|
||||||
|
setupLegacy(t)
|
||||||
|
dumpData(t)
|
||||||
|
defer tearDown(t)
|
||||||
|
pgStr := `SELECT cid, td, reward, block_hash, base_fee
|
||||||
|
FROM eth.header_cids
|
||||||
|
WHERE block_number = $1`
|
||||||
|
// check header was properly indexed
|
||||||
|
type res struct {
|
||||||
|
CID string
|
||||||
|
TD string
|
||||||
|
Reward string
|
||||||
|
BlockHash string `db:"block_hash"`
|
||||||
|
BaseFee *string `db:"base_fee"`
|
||||||
|
}
|
||||||
|
header := new(res)
|
||||||
|
err = sqlxdb.QueryRowx(pgStr, legacyData.BlockNumber.Uint64()).StructScan(header)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
test_helpers.ExpectEqual(t, header.CID, legacyHeaderCID.String())
|
||||||
|
test_helpers.ExpectEqual(t, header.TD, legacyData.MockBlock.Difficulty().String())
|
||||||
|
test_helpers.ExpectEqual(t, header.Reward, "5000000000000011250")
|
||||||
|
require.Nil(t, legacyData.MockHeader.BaseFee)
|
||||||
|
require.Nil(t, header.BaseFee)
|
||||||
|
})
|
||||||
|
}
|
634
statediff/indexer/database/file/indexer_test.go
Normal file
634
statediff/indexer/database/file/indexer_test.go
Normal file
@ -0,0 +1,634 @@
|
|||||||
|
// VulcanizeDB
|
||||||
|
// Copyright © 2019 Vulcanize
|
||||||
|
|
||||||
|
// This program is free software: you can redistribute it and/or modify
|
||||||
|
// it under the terms of the GNU Affero General Public License as published by
|
||||||
|
// the Free Software Foundation, either version 3 of the License, or
|
||||||
|
// (at your option) any later version.
|
||||||
|
|
||||||
|
// This program is distributed in the hope that it will be useful,
|
||||||
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
// GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
// You should have received a copy of the GNU Affero General Public License
|
||||||
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
package file_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/common"
|
||||||
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
|
||||||
|
|
||||||
|
"github.com/ipfs/go-cid"
|
||||||
|
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||||
|
dshelp "github.com/ipfs/go-ipfs-ds-help"
|
||||||
|
"github.com/jmoiron/sqlx"
|
||||||
|
"github.com/multiformats/go-multihash"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/database/file"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/mocks"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/test_helpers"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
sqlxdb *sqlx.DB
|
||||||
|
err error
|
||||||
|
ind interfaces.StateDiffIndexer
|
||||||
|
ipfsPgGet = `SELECT data FROM public.blocks
|
||||||
|
WHERE key = $1`
|
||||||
|
tx1, tx2, tx3, tx4, tx5, rct1, rct2, rct3, rct4, rct5 []byte
|
||||||
|
mockBlock *types.Block
|
||||||
|
headerCID, trx1CID, trx2CID, trx3CID, trx4CID, trx5CID cid.Cid
|
||||||
|
rct1CID, rct2CID, rct3CID, rct4CID, rct5CID cid.Cid
|
||||||
|
state1CID, state2CID, storageCID cid.Cid
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
if os.Getenv("MODE") != "statediff" {
|
||||||
|
fmt.Println("Skipping statediff test")
|
||||||
|
os.Exit(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
mockBlock = mocks.MockBlock
|
||||||
|
txs, rcts := mocks.MockBlock.Transactions(), mocks.MockReceipts
|
||||||
|
|
||||||
|
buf := new(bytes.Buffer)
|
||||||
|
txs.EncodeIndex(0, buf)
|
||||||
|
tx1 = make([]byte, buf.Len())
|
||||||
|
copy(tx1, buf.Bytes())
|
||||||
|
buf.Reset()
|
||||||
|
|
||||||
|
txs.EncodeIndex(1, buf)
|
||||||
|
tx2 = make([]byte, buf.Len())
|
||||||
|
copy(tx2, buf.Bytes())
|
||||||
|
buf.Reset()
|
||||||
|
|
||||||
|
txs.EncodeIndex(2, buf)
|
||||||
|
tx3 = make([]byte, buf.Len())
|
||||||
|
copy(tx3, buf.Bytes())
|
||||||
|
buf.Reset()
|
||||||
|
|
||||||
|
txs.EncodeIndex(3, buf)
|
||||||
|
tx4 = make([]byte, buf.Len())
|
||||||
|
copy(tx4, buf.Bytes())
|
||||||
|
buf.Reset()
|
||||||
|
|
||||||
|
txs.EncodeIndex(4, buf)
|
||||||
|
tx5 = make([]byte, buf.Len())
|
||||||
|
copy(tx5, buf.Bytes())
|
||||||
|
buf.Reset()
|
||||||
|
|
||||||
|
rcts.EncodeIndex(0, buf)
|
||||||
|
rct1 = make([]byte, buf.Len())
|
||||||
|
copy(rct1, buf.Bytes())
|
||||||
|
buf.Reset()
|
||||||
|
|
||||||
|
rcts.EncodeIndex(1, buf)
|
||||||
|
rct2 = make([]byte, buf.Len())
|
||||||
|
copy(rct2, buf.Bytes())
|
||||||
|
buf.Reset()
|
||||||
|
|
||||||
|
rcts.EncodeIndex(2, buf)
|
||||||
|
rct3 = make([]byte, buf.Len())
|
||||||
|
copy(rct3, buf.Bytes())
|
||||||
|
buf.Reset()
|
||||||
|
|
||||||
|
rcts.EncodeIndex(3, buf)
|
||||||
|
rct4 = make([]byte, buf.Len())
|
||||||
|
copy(rct4, buf.Bytes())
|
||||||
|
buf.Reset()
|
||||||
|
|
||||||
|
rcts.EncodeIndex(4, buf)
|
||||||
|
rct5 = make([]byte, buf.Len())
|
||||||
|
copy(rct5, buf.Bytes())
|
||||||
|
buf.Reset()
|
||||||
|
|
||||||
|
headerCID, _ = ipld.RawdataToCid(ipld.MEthHeader, mocks.MockHeaderRlp, multihash.KECCAK_256)
|
||||||
|
trx1CID, _ = ipld.RawdataToCid(ipld.MEthTx, tx1, multihash.KECCAK_256)
|
||||||
|
trx2CID, _ = ipld.RawdataToCid(ipld.MEthTx, tx2, multihash.KECCAK_256)
|
||||||
|
trx3CID, _ = ipld.RawdataToCid(ipld.MEthTx, tx3, multihash.KECCAK_256)
|
||||||
|
trx4CID, _ = ipld.RawdataToCid(ipld.MEthTx, tx4, multihash.KECCAK_256)
|
||||||
|
trx5CID, _ = ipld.RawdataToCid(ipld.MEthTx, tx5, multihash.KECCAK_256)
|
||||||
|
rct1CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct1, multihash.KECCAK_256)
|
||||||
|
rct2CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct2, multihash.KECCAK_256)
|
||||||
|
rct3CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct3, multihash.KECCAK_256)
|
||||||
|
rct4CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct4, multihash.KECCAK_256)
|
||||||
|
rct5CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct5, multihash.KECCAK_256)
|
||||||
|
state1CID, _ = ipld.RawdataToCid(ipld.MEthStateTrie, mocks.ContractLeafNode, multihash.KECCAK_256)
|
||||||
|
state2CID, _ = ipld.RawdataToCid(ipld.MEthStateTrie, mocks.AccountLeafNode, multihash.KECCAK_256)
|
||||||
|
storageCID, _ = ipld.RawdataToCid(ipld.MEthStorageTrie, mocks.StorageLeafNode, multihash.KECCAK_256)
|
||||||
|
}
|
||||||
|
|
||||||
|
func setup(t *testing.T) {
|
||||||
|
if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
|
||||||
|
err := os.Remove(file.TestConfig.FilePath)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
ind, err = file.NewStateDiffIndexer(context.Background(), mocks.TestConfig, file.TestConfig)
|
||||||
|
require.NoError(t, err)
|
||||||
|
var tx interfaces.Batch
|
||||||
|
tx, err = ind.PushBlock(
|
||||||
|
mockBlock,
|
||||||
|
mocks.MockReceipts,
|
||||||
|
mocks.MockBlock.Difficulty())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err := tx.Submit(err); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := ind.Close(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
for _, node := range mocks.StateDiffs {
|
||||||
|
err = ind.PushStateNode(tx, node, mockBlock.Hash().String())
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
test_helpers.ExpectEqual(t, tx.(*file.BatchTx).BlockNumber, mocks.BlockNumber.Uint64())
|
||||||
|
|
||||||
|
connStr := postgres.DefaultConfig.DbConnectionString()
|
||||||
|
|
||||||
|
sqlxdb, err = sqlx.Connect("postgres", connStr)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to connect to db with connection string: %s err: %v", connStr, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFileIndexer(t *testing.T) {
|
||||||
|
t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) {
|
||||||
|
setup(t)
|
||||||
|
dumpData(t)
|
||||||
|
defer tearDown(t)
|
||||||
|
pgStr := `SELECT cid, td, reward, block_hash, base_fee
|
||||||
|
FROM eth.header_cids
|
||||||
|
WHERE block_number = $1`
|
||||||
|
// check header was properly indexed
|
||||||
|
type res struct {
|
||||||
|
CID string
|
||||||
|
TD string
|
||||||
|
Reward string
|
||||||
|
BlockHash string `db:"block_hash"`
|
||||||
|
BaseFee *string `db:"base_fee"`
|
||||||
|
}
|
||||||
|
header := new(res)
|
||||||
|
err = sqlxdb.QueryRowx(pgStr, mocks.BlockNumber.Uint64()).StructScan(header)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
test_helpers.ExpectEqual(t, header.CID, headerCID.String())
|
||||||
|
test_helpers.ExpectEqual(t, header.TD, mocks.MockBlock.Difficulty().String())
|
||||||
|
test_helpers.ExpectEqual(t, header.Reward, "2000000000000021250")
|
||||||
|
test_helpers.ExpectEqual(t, *header.BaseFee, mocks.MockHeader.BaseFee.String())
|
||||||
|
dc, err := cid.Decode(header.CID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
mhKey := dshelp.MultihashToDsKey(dc.Hash())
|
||||||
|
prefixedKey := blockstore.BlockPrefix.String() + mhKey.String()
|
||||||
|
var data []byte
|
||||||
|
err = sqlxdb.Get(&data, ipfsPgGet, prefixedKey)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
test_helpers.ExpectEqual(t, data, mocks.MockHeaderRlp)
|
||||||
|
})
|
||||||
|
t.Run("Publish and index transaction IPLDs in a single tx", func(t *testing.T) {
|
||||||
|
setup(t)
|
||||||
|
dumpData(t)
|
||||||
|
defer tearDown(t)
|
||||||
|
|
||||||
|
// check that txs were properly indexed
|
||||||
|
trxs := make([]string, 0)
|
||||||
|
pgStr := `SELECT transaction_cids.cid FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash)
|
||||||
|
WHERE header_cids.block_number = $1`
|
||||||
|
err = sqlxdb.Select(&trxs, pgStr, mocks.BlockNumber.Uint64())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
test_helpers.ExpectEqual(t, len(trxs), 5)
|
||||||
|
expectTrue(t, test_helpers.ListContainsString(trxs, trx1CID.String()))
|
||||||
|
expectTrue(t, test_helpers.ListContainsString(trxs, trx2CID.String()))
|
||||||
|
expectTrue(t, test_helpers.ListContainsString(trxs, trx3CID.String()))
|
||||||
|
expectTrue(t, test_helpers.ListContainsString(trxs, trx4CID.String()))
|
||||||
|
expectTrue(t, test_helpers.ListContainsString(trxs, trx5CID.String()))
|
||||||
|
// and published
|
||||||
|
for _, c := range trxs {
|
||||||
|
dc, err := cid.Decode(c)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
mhKey := dshelp.MultihashToDsKey(dc.Hash())
|
||||||
|
prefixedKey := blockstore.BlockPrefix.String() + mhKey.String()
|
||||||
|
var data []byte
|
||||||
|
err = sqlxdb.Get(&data, ipfsPgGet, prefixedKey)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
txTypePgStr := `SELECT tx_type FROM eth.transaction_cids WHERE cid = $1`
|
||||||
|
switch c {
|
||||||
|
case trx1CID.String():
|
||||||
|
test_helpers.ExpectEqual(t, data, tx1)
|
||||||
|
var txType uint8
|
||||||
|
err = sqlxdb.Get(&txType, txTypePgStr, c)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if txType != 0 {
|
||||||
|
t.Fatalf("expected LegacyTxType (0), got %d", txType)
|
||||||
|
}
|
||||||
|
case trx2CID.String():
|
||||||
|
test_helpers.ExpectEqual(t, data, tx2)
|
||||||
|
var txType uint8
|
||||||
|
err = sqlxdb.Get(&txType, txTypePgStr, c)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if txType != 0 {
|
||||||
|
t.Fatalf("expected LegacyTxType (0), got %d", txType)
|
||||||
|
}
|
||||||
|
case trx3CID.String():
|
||||||
|
test_helpers.ExpectEqual(t, data, tx3)
|
||||||
|
var txType uint8
|
||||||
|
err = sqlxdb.Get(&txType, txTypePgStr, c)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if txType != 0 {
|
||||||
|
t.Fatalf("expected LegacyTxType (0), got %d", txType)
|
||||||
|
}
|
||||||
|
case trx4CID.String():
|
||||||
|
test_helpers.ExpectEqual(t, data, tx4)
|
||||||
|
var txType uint8
|
||||||
|
err = sqlxdb.Get(&txType, txTypePgStr, c)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if txType != types.AccessListTxType {
|
||||||
|
t.Fatalf("expected AccessListTxType (1), got %d", txType)
|
||||||
|
}
|
||||||
|
accessListElementModels := make([]models.AccessListElementModel, 0)
|
||||||
|
pgStr = `SELECT access_list_elements.* FROM eth.access_list_elements INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.tx_hash) WHERE cid = $1 ORDER BY access_list_elements.index ASC`
|
||||||
|
err = sqlxdb.Select(&accessListElementModels, pgStr, c)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if len(accessListElementModels) != 2 {
|
||||||
|
t.Fatalf("expected two access list entries, got %d", len(accessListElementModels))
|
||||||
|
}
|
||||||
|
model1 := models.AccessListElementModel{
|
||||||
|
Index: accessListElementModels[0].Index,
|
||||||
|
Address: accessListElementModels[0].Address,
|
||||||
|
}
|
||||||
|
model2 := models.AccessListElementModel{
|
||||||
|
Index: accessListElementModels[1].Index,
|
||||||
|
Address: accessListElementModels[1].Address,
|
||||||
|
StorageKeys: accessListElementModels[1].StorageKeys,
|
||||||
|
}
|
||||||
|
test_helpers.ExpectEqual(t, model1, mocks.AccessListEntry1Model)
|
||||||
|
test_helpers.ExpectEqual(t, model2, mocks.AccessListEntry2Model)
|
||||||
|
case trx5CID.String():
|
||||||
|
test_helpers.ExpectEqual(t, data, tx5)
|
||||||
|
var txType *uint8
|
||||||
|
err = sqlxdb.Get(&txType, txTypePgStr, c)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if *txType != types.DynamicFeeTxType {
|
||||||
|
t.Fatalf("expected DynamicFeeTxType (2), got %d", *txType)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Publish and index log IPLDs for multiple receipt of a specific block", func(t *testing.T) {
|
||||||
|
setup(t)
|
||||||
|
dumpData(t)
|
||||||
|
defer tearDown(t)
|
||||||
|
|
||||||
|
rcts := make([]string, 0)
|
||||||
|
pgStr := `SELECT receipt_cids.leaf_cid FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids
|
||||||
|
WHERE receipt_cids.tx_id = transaction_cids.tx_hash
|
||||||
|
AND transaction_cids.header_id = header_cids.block_hash
|
||||||
|
AND header_cids.block_number = $1
|
||||||
|
ORDER BY transaction_cids.index`
|
||||||
|
err = sqlxdb.Select(&rcts, pgStr, mocks.BlockNumber.Uint64())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
type logIPLD struct {
|
||||||
|
Index int `db:"index"`
|
||||||
|
Address string `db:"address"`
|
||||||
|
Data []byte `db:"data"`
|
||||||
|
Topic0 string `db:"topic0"`
|
||||||
|
Topic1 string `db:"topic1"`
|
||||||
|
}
|
||||||
|
for i := range rcts {
|
||||||
|
results := make([]logIPLD, 0)
|
||||||
|
pgStr = `SELECT log_cids.index, log_cids.address, log_cids.topic0, log_cids.topic1, data FROM eth.log_cids
|
||||||
|
INNER JOIN eth.receipt_cids ON (log_cids.rct_id = receipt_cids.tx_id)
|
||||||
|
INNER JOIN public.blocks ON (log_cids.leaf_mh_key = blocks.key)
|
||||||
|
WHERE receipt_cids.leaf_cid = $1 ORDER BY eth.log_cids.index ASC`
|
||||||
|
err = sqlxdb.Select(&results, pgStr, rcts[i])
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// expecting MockLog1 and MockLog2 for mockReceipt4
|
||||||
|
expectedLogs := mocks.MockReceipts[i].Logs
|
||||||
|
test_helpers.ExpectEqual(t, len(results), len(expectedLogs))
|
||||||
|
|
||||||
|
var nodeElements []interface{}
|
||||||
|
for idx, r := range results {
|
||||||
|
// Decode the log leaf node.
|
||||||
|
err = rlp.DecodeBytes(r.Data, &nodeElements)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
logRaw, err := rlp.EncodeToBytes(expectedLogs[idx])
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// 2nd element of the leaf node contains the encoded log data.
|
||||||
|
test_helpers.ExpectEqual(t, logRaw, nodeElements[1].([]byte))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Publish and index receipt IPLDs in a single tx", func(t *testing.T) {
|
||||||
|
setup(t)
|
||||||
|
dumpData(t)
|
||||||
|
defer tearDown(t)
|
||||||
|
|
||||||
|
// check receipts were properly indexed
|
||||||
|
rcts := make([]string, 0)
|
||||||
|
pgStr := `SELECT receipt_cids.leaf_cid FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids
|
||||||
|
WHERE receipt_cids.tx_id = transaction_cids.tx_hash
|
||||||
|
AND transaction_cids.header_id = header_cids.block_hash
|
||||||
|
AND header_cids.block_number = $1 order by transaction_cids.index`
|
||||||
|
err = sqlxdb.Select(&rcts, pgStr, mocks.BlockNumber.Uint64())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
test_helpers.ExpectEqual(t, len(rcts), 5)
|
||||||
|
|
||||||
|
for idx, rctLeafCID := range rcts {
|
||||||
|
result := make([]models.IPLDModel, 0)
|
||||||
|
pgStr = `SELECT data
|
||||||
|
FROM eth.receipt_cids
|
||||||
|
INNER JOIN public.blocks ON (receipt_cids.leaf_mh_key = public.blocks.key)
|
||||||
|
WHERE receipt_cids.leaf_cid = $1`
|
||||||
|
err = sqlxdb.Select(&result, pgStr, rctLeafCID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decode the log leaf node.
|
||||||
|
var nodeElements []interface{}
|
||||||
|
err = rlp.DecodeBytes(result[0].Data, &nodeElements)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
expectedRct, err := mocks.MockReceipts[idx].MarshalBinary()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
test_helpers.ExpectEqual(t, expectedRct, nodeElements[1].([]byte))
|
||||||
|
}
|
||||||
|
|
||||||
|
// and published
|
||||||
|
for _, c := range rcts {
|
||||||
|
dc, err := cid.Decode(c)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
mhKey := dshelp.MultihashToDsKey(dc.Hash())
|
||||||
|
prefixedKey := blockstore.BlockPrefix.String() + mhKey.String()
|
||||||
|
var data []byte
|
||||||
|
err = sqlxdb.Get(&data, ipfsPgGet, prefixedKey)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
postStatePgStr := `SELECT post_state FROM eth.receipt_cids WHERE leaf_cid = $1`
|
||||||
|
switch c {
|
||||||
|
case rct1CID.String():
|
||||||
|
test_helpers.ExpectEqual(t, data, rct1)
|
||||||
|
var postStatus uint64
|
||||||
|
pgStr = `SELECT post_status FROM eth.receipt_cids WHERE leaf_cid = $1`
|
||||||
|
err = sqlxdb.Get(&postStatus, pgStr, c)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
test_helpers.ExpectEqual(t, postStatus, mocks.ExpectedPostStatus)
|
||||||
|
case rct2CID.String():
|
||||||
|
test_helpers.ExpectEqual(t, data, rct2)
|
||||||
|
var postState string
|
||||||
|
err = sqlxdb.Get(&postState, postStatePgStr, c)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
test_helpers.ExpectEqual(t, postState, mocks.ExpectedPostState1)
|
||||||
|
case rct3CID.String():
|
||||||
|
test_helpers.ExpectEqual(t, data, rct3)
|
||||||
|
var postState string
|
||||||
|
err = sqlxdb.Get(&postState, postStatePgStr, c)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
test_helpers.ExpectEqual(t, postState, mocks.ExpectedPostState2)
|
||||||
|
case rct4CID.String():
|
||||||
|
test_helpers.ExpectEqual(t, data, rct4)
|
||||||
|
var postState string
|
||||||
|
err = sqlxdb.Get(&postState, postStatePgStr, c)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
test_helpers.ExpectEqual(t, postState, mocks.ExpectedPostState3)
|
||||||
|
case rct5CID.String():
|
||||||
|
test_helpers.ExpectEqual(t, data, rct5)
|
||||||
|
var postState string
|
||||||
|
err = sqlxdb.Get(&postState, postStatePgStr, c)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
test_helpers.ExpectEqual(t, postState, mocks.ExpectedPostState3)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) {
|
||||||
|
setup(t)
|
||||||
|
dumpData(t)
|
||||||
|
defer tearDown(t)
|
||||||
|
|
||||||
|
// check that state nodes were properly indexed and published
|
||||||
|
stateNodes := make([]models.StateNodeModel, 0)
|
||||||
|
pgStr := `SELECT state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id
|
||||||
|
FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash)
|
||||||
|
WHERE header_cids.block_number = $1 AND node_type != 3`
|
||||||
|
err = sqlxdb.Select(&stateNodes, pgStr, mocks.BlockNumber.Uint64())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
test_helpers.ExpectEqual(t, len(stateNodes), 2)
|
||||||
|
for _, stateNode := range stateNodes {
|
||||||
|
var data []byte
|
||||||
|
dc, err := cid.Decode(stateNode.CID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
mhKey := dshelp.MultihashToDsKey(dc.Hash())
|
||||||
|
prefixedKey := blockstore.BlockPrefix.String() + mhKey.String()
|
||||||
|
err = sqlxdb.Get(&data, ipfsPgGet, prefixedKey)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
pgStr = `SELECT * from eth.state_accounts WHERE header_id = $1 AND state_path = $2`
|
||||||
|
var account models.StateAccountModel
|
||||||
|
err = sqlxdb.Get(&account, pgStr, stateNode.HeaderID, stateNode.Path)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if stateNode.CID == state1CID.String() {
|
||||||
|
test_helpers.ExpectEqual(t, stateNode.NodeType, 2)
|
||||||
|
test_helpers.ExpectEqual(t, stateNode.StateKey, common.BytesToHash(mocks.ContractLeafKey).Hex())
|
||||||
|
test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x06'})
|
||||||
|
test_helpers.ExpectEqual(t, data, mocks.ContractLeafNode)
|
||||||
|
test_helpers.ExpectEqual(t, account, models.StateAccountModel{
|
||||||
|
HeaderID: account.HeaderID,
|
||||||
|
StatePath: stateNode.Path,
|
||||||
|
Balance: "0",
|
||||||
|
CodeHash: mocks.ContractCodeHash.Bytes(),
|
||||||
|
StorageRoot: mocks.ContractRoot,
|
||||||
|
Nonce: 1,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if stateNode.CID == state2CID.String() {
|
||||||
|
test_helpers.ExpectEqual(t, stateNode.NodeType, 2)
|
||||||
|
test_helpers.ExpectEqual(t, stateNode.StateKey, common.BytesToHash(mocks.AccountLeafKey).Hex())
|
||||||
|
test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x0c'})
|
||||||
|
test_helpers.ExpectEqual(t, data, mocks.AccountLeafNode)
|
||||||
|
test_helpers.ExpectEqual(t, account, models.StateAccountModel{
|
||||||
|
HeaderID: account.HeaderID,
|
||||||
|
StatePath: stateNode.Path,
|
||||||
|
Balance: "1000",
|
||||||
|
CodeHash: mocks.AccountCodeHash.Bytes(),
|
||||||
|
StorageRoot: mocks.AccountRoot,
|
||||||
|
Nonce: 0,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// check that Removed state nodes were properly indexed and published
|
||||||
|
stateNodes = make([]models.StateNodeModel, 0)
|
||||||
|
pgStr = `SELECT state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id
|
||||||
|
FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash)
|
||||||
|
WHERE header_cids.block_number = $1 AND node_type = 3`
|
||||||
|
err = sqlxdb.Select(&stateNodes, pgStr, mocks.BlockNumber.Uint64())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
test_helpers.ExpectEqual(t, len(stateNodes), 1)
|
||||||
|
stateNode := stateNodes[0]
|
||||||
|
var data []byte
|
||||||
|
dc, err := cid.Decode(stateNode.CID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
mhKey := dshelp.MultihashToDsKey(dc.Hash())
|
||||||
|
prefixedKey := blockstore.BlockPrefix.String() + mhKey.String()
|
||||||
|
test_helpers.ExpectEqual(t, prefixedKey, shared.RemovedNodeMhKey)
|
||||||
|
err = sqlxdb.Get(&data, ipfsPgGet, prefixedKey)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
test_helpers.ExpectEqual(t, stateNode.CID, shared.RemovedNodeStateCID)
|
||||||
|
test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x02'})
|
||||||
|
test_helpers.ExpectEqual(t, data, []byte{})
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("Publish and index storage IPLDs in a single tx", func(t *testing.T) {
|
||||||
|
setup(t)
|
||||||
|
dumpData(t)
|
||||||
|
defer tearDown(t)
|
||||||
|
|
||||||
|
// check that storage nodes were properly indexed
|
||||||
|
storageNodes := make([]models.StorageNodeWithStateKeyModel, 0)
|
||||||
|
pgStr := `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path
|
||||||
|
FROM eth.storage_cids, eth.state_cids, eth.header_cids
|
||||||
|
WHERE (storage_cids.state_path, storage_cids.header_id) = (state_cids.state_path, state_cids.header_id)
|
||||||
|
AND state_cids.header_id = header_cids.block_hash
|
||||||
|
AND header_cids.block_number = $1
|
||||||
|
AND storage_cids.node_type != 3`
|
||||||
|
err = sqlxdb.Select(&storageNodes, pgStr, mocks.BlockNumber.Uint64())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
test_helpers.ExpectEqual(t, len(storageNodes), 1)
|
||||||
|
test_helpers.ExpectEqual(t, storageNodes[0], models.StorageNodeWithStateKeyModel{
|
||||||
|
CID: storageCID.String(),
|
||||||
|
NodeType: 2,
|
||||||
|
StorageKey: common.BytesToHash(mocks.StorageLeafKey).Hex(),
|
||||||
|
StateKey: common.BytesToHash(mocks.ContractLeafKey).Hex(),
|
||||||
|
Path: []byte{},
|
||||||
|
})
|
||||||
|
var data []byte
|
||||||
|
dc, err := cid.Decode(storageNodes[0].CID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
mhKey := dshelp.MultihashToDsKey(dc.Hash())
|
||||||
|
prefixedKey := blockstore.BlockPrefix.String() + mhKey.String()
|
||||||
|
err = sqlxdb.Get(&data, ipfsPgGet, prefixedKey)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
test_helpers.ExpectEqual(t, data, mocks.StorageLeafNode)
|
||||||
|
|
||||||
|
// check that Removed storage nodes were properly indexed
|
||||||
|
storageNodes = make([]models.StorageNodeWithStateKeyModel, 0)
|
||||||
|
pgStr = `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path
|
||||||
|
FROM eth.storage_cids, eth.state_cids, eth.header_cids
|
||||||
|
WHERE (storage_cids.state_path, storage_cids.header_id) = (state_cids.state_path, state_cids.header_id)
|
||||||
|
AND state_cids.header_id = header_cids.block_hash
|
||||||
|
AND header_cids.block_number = $1
|
||||||
|
AND storage_cids.node_type = 3`
|
||||||
|
err = sqlxdb.Select(&storageNodes, pgStr, mocks.BlockNumber.Uint64())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
test_helpers.ExpectEqual(t, len(storageNodes), 1)
|
||||||
|
test_helpers.ExpectEqual(t, storageNodes[0], models.StorageNodeWithStateKeyModel{
|
||||||
|
CID: shared.RemovedNodeStorageCID,
|
||||||
|
NodeType: 3,
|
||||||
|
StorageKey: common.BytesToHash(mocks.RemovedLeafKey).Hex(),
|
||||||
|
StateKey: common.BytesToHash(mocks.ContractLeafKey).Hex(),
|
||||||
|
Path: []byte{'\x03'},
|
||||||
|
})
|
||||||
|
dc, err = cid.Decode(storageNodes[0].CID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
mhKey = dshelp.MultihashToDsKey(dc.Hash())
|
||||||
|
prefixedKey = blockstore.BlockPrefix.String() + mhKey.String()
|
||||||
|
test_helpers.ExpectEqual(t, prefixedKey, shared.RemovedNodeMhKey)
|
||||||
|
err = sqlxdb.Get(&data, ipfsPgGet, prefixedKey)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
test_helpers.ExpectEqual(t, data, []byte{})
|
||||||
|
})
|
||||||
|
}
|
64
statediff/indexer/database/file/test_helpers.go
Normal file
64
statediff/indexer/database/file/test_helpers.go
Normal file
@ -0,0 +1,64 @@
|
|||||||
|
package file
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/jmoiron/sqlx"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TearDownDB is used to tear down the watcher dbs after tests
|
||||||
|
func TearDownDB(t *testing.T, db *sqlx.DB) {
|
||||||
|
tx, err := db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.Exec(`DELETE FROM eth.header_cids`)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
_, err = tx.Exec(`DELETE FROM eth.uncle_cids`)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
_, err = tx.Exec(`DELETE FROM eth.transaction_cids`)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
_, err = tx.Exec(`DELETE FROM eth.receipt_cids`)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
_, err = tx.Exec(`DELETE FROM eth.state_cids`)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
_, err = tx.Exec(`DELETE FROM eth.storage_cids`)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
_, err = tx.Exec(`DELETE FROM eth.state_accounts`)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
_, err = tx.Exec(`DELETE FROM eth.access_list_elements`)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
_, err = tx.Exec(`DELETE FROM eth.log_cids`)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
_, err = tx.Exec(`DELETE FROM blocks`)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
_, err = tx.Exec(`DELETE FROM nodes`)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
err = tx.Commit()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
@ -18,7 +18,7 @@ package file
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"io"
|
||||||
|
|
||||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||||
dshelp "github.com/ipfs/go-ipfs-ds-help"
|
dshelp "github.com/ipfs/go-ipfs-ds-help"
|
||||||
@ -38,7 +38,7 @@ var (
|
|||||||
|
|
||||||
// SQLWriter writes sql statements to a file
|
// SQLWriter writes sql statements to a file
|
||||||
type SQLWriter struct {
|
type SQLWriter struct {
|
||||||
file *os.File
|
wc io.WriteCloser
|
||||||
stmts chan []byte
|
stmts chan []byte
|
||||||
collatedStmt []byte
|
collatedStmt []byte
|
||||||
collationIndex int
|
collationIndex int
|
||||||
@ -50,9 +50,9 @@ type SQLWriter struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewSQLWriter creates a new pointer to a Writer
|
// NewSQLWriter creates a new pointer to a Writer
|
||||||
func NewSQLWriter(file *os.File) *SQLWriter {
|
func NewSQLWriter(wc io.WriteCloser) *SQLWriter {
|
||||||
return &SQLWriter{
|
return &SQLWriter{
|
||||||
file: file,
|
wc: wc,
|
||||||
stmts: make(chan []byte),
|
stmts: make(chan []byte),
|
||||||
collatedStmt: make([]byte, collatedStmtSize),
|
collatedStmt: make([]byte, collatedStmtSize),
|
||||||
flushChan: make(chan struct{}),
|
flushChan: make(chan struct{}),
|
||||||
@ -100,7 +100,7 @@ func (sqw *SQLWriter) Loop() {
|
|||||||
func (sqw *SQLWriter) Close() error {
|
func (sqw *SQLWriter) Close() error {
|
||||||
close(sqw.quitChan)
|
close(sqw.quitChan)
|
||||||
<-sqw.doneChan
|
<-sqw.doneChan
|
||||||
return nil
|
return sqw.wc.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Flush sends a flush signal to the looping process
|
// Flush sends a flush signal to the looping process
|
||||||
@ -110,7 +110,7 @@ func (sqw *SQLWriter) Flush() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sqw *SQLWriter) flush() error {
|
func (sqw *SQLWriter) flush() error {
|
||||||
if _, err := sqw.file.Write(sqw.collatedStmt[0:sqw.collationIndex]); err != nil {
|
if _, err := sqw.wc.Write(sqw.collatedStmt[0:sqw.collationIndex]); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
sqw.collationIndex = 0
|
sqw.collationIndex = 0
|
||||||
@ -121,21 +121,21 @@ const (
|
|||||||
nodeInsert = "INSERT INTO nodes (genesis_block, network_id, node_id, client_name, chain_id) VALUES " +
|
nodeInsert = "INSERT INTO nodes (genesis_block, network_id, node_id, client_name, chain_id) VALUES " +
|
||||||
"('%s', '%s', '%s', '%s', %d);\n"
|
"('%s', '%s', '%s', '%s', %d);\n"
|
||||||
|
|
||||||
ipldInsert = "INSERT INTO public.blocks (key, data) VALUES ('%s', '%x');\n"
|
ipldInsert = "INSERT INTO public.blocks (key, data) VALUES ('%s', '\\x%x');\n"
|
||||||
|
|
||||||
headerInsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, " +
|
headerInsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, " +
|
||||||
"state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) VALUES " +
|
"state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) VALUES " +
|
||||||
"('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%x', %d, '%s', %d, %d);\n"
|
"('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %d, %s);\n"
|
||||||
|
|
||||||
headerInsertWithoutBaseFee = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, " +
|
headerInsertWithoutBaseFee = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, " +
|
||||||
"reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) VALUES " +
|
"reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) VALUES " +
|
||||||
"('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%x', %d, '%s', %d, NULL);\n"
|
"('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %d, NULL);\n"
|
||||||
|
|
||||||
uncleInsert = "INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES " +
|
uncleInsert = "INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES " +
|
||||||
"('%s', '%s', '%s', '%s', '%s', '%s');\n"
|
"('%s', '%s', '%s', '%s', '%s', '%s');\n"
|
||||||
|
|
||||||
txInsert = "INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) " +
|
txInsert = "INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) " +
|
||||||
"VALUES ('%s', '%s', '%s', '%s', '%s', %d, '%s', '%x', %d);\n"
|
"VALUES ('%s', '%s', '%s', '%s', '%s', %d, '%s', '\\x%x', %d);\n"
|
||||||
|
|
||||||
alInsert = "INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES ('%s', %d, '%s', '%s');\n"
|
alInsert = "INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES ('%s', %d, '%s', '%s');\n"
|
||||||
|
|
||||||
@ -143,16 +143,16 @@ const (
|
|||||||
"post_status, log_root) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', %d, '%s');\n"
|
"post_status, log_root) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', %d, '%s');\n"
|
||||||
|
|
||||||
logInsert = "INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, " +
|
logInsert = "INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, " +
|
||||||
"topic3, log_data) VALUES ('%s', '%s', '%s', '%s', %d, '%s', '%s', '%s', '%s', '%x');\n"
|
"topic3, log_data) VALUES ('%s', '%s', '%s', '%s', %d, '%s', '%s', '%s', '%s', '\\x%x');\n"
|
||||||
|
|
||||||
stateInsert = "INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) " +
|
stateInsert = "INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) " +
|
||||||
"VALUES ('%s', '%s', '%s', '%x', %d, %t, '%s');\n"
|
"VALUES ('%s', '%s', '%s', '\\x%x', %d, %t, '%s');\n"
|
||||||
|
|
||||||
accountInsert = "INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) " +
|
accountInsert = "INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) " +
|
||||||
"VALUES ('%s', '%x', '%s', %d, '%x', '%s');\n"
|
"VALUES ('%s', '\\x%x', '%s', %d, '\\x%x', '%s');\n"
|
||||||
|
|
||||||
storageInsert = "INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, " +
|
storageInsert = "INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, " +
|
||||||
"node_type, diff, mh_key) VALUES ('%s', '%x', '%s', '%s', '%x', %d, %t, '%s');\n"
|
"node_type, diff, mh_key) VALUES ('%s', '\\x%x', '%s', '%s', '\\x%x', %d, %t, '%s');\n"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (sqw *SQLWriter) upsertNode(node nodeinfo.Info) {
|
func (sqw *SQLWriter) upsertNode(node nodeinfo.Info) {
|
||||||
@ -199,7 +199,7 @@ func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) {
|
|||||||
} else {
|
} else {
|
||||||
stmt = fmt.Sprintf(headerInsert, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
|
stmt = fmt.Sprintf(headerInsert, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
|
||||||
header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot,
|
header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot,
|
||||||
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.BaseFee)
|
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, *header.BaseFee)
|
||||||
}
|
}
|
||||||
sqw.stmts <- []byte(stmt)
|
sqw.stmts <- []byte(stmt)
|
||||||
indexerMetrics.blocks.Inc(1)
|
indexerMetrics.blocks.Inc(1)
|
||||||
|
@ -1,216 +0,0 @@
|
|||||||
// VulcanizeDB
|
|
||||||
// Copyright © 2021 Vulcanize
|
|
||||||
|
|
||||||
// This program is free software: you can redistribute it and/or modify
|
|
||||||
// it under the terms of the GNU Affero General Public License as published by
|
|
||||||
// the Free Software Foundation, either version 3 of the License, or
|
|
||||||
// (at your option) any later version.
|
|
||||||
|
|
||||||
// This program is distributed in the hope that it will be useful,
|
|
||||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
// GNU Affero General Public License for more details.
|
|
||||||
|
|
||||||
// You should have received a copy of the GNU Affero General Public License
|
|
||||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
package sql
|
|
||||||
|
|
||||||
/*
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
|
||||||
"github.com/jmoiron/sqlx"
|
|
||||||
)
|
|
||||||
|
|
||||||
*/
|
|
||||||
/*
|
|
||||||
// PG_MAX_PARAMS is the max number of placeholders+args a statement can support
|
|
||||||
// above this limit we need to split into a separate batch
|
|
||||||
const PG_MAX_PARAMS int = 32767
|
|
||||||
|
|
||||||
const (
|
|
||||||
ipldInsertPgStr string = `INSERT INTO public.blocks (key, data) VALUES (unnest($1), unnest($2)) ON CONFLICT (key) DO NOTHING`
|
|
||||||
headerCIDsPgStr string = `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee)
|
|
||||||
VALUES (unnest($1), unnest($2), unnest($3), unnest($4), unnest($5), unnest($6), unnest($7), unnest($8), unnest($9), unnest($10), unnest($11), unnest($12), unnest($13), unnest($14), unnest($15), unnest($16))
|
|
||||||
ON CONFLICT (block_number, block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = (excluded.parent_hash, excluded.cid, excluded.td, excluded.node_id, excluded.reward, excluded.state_root, excluded.tx_root, excluded.receipt_root, excluded.uncle_root, excluded.bloom, excluded.timestamp, excluded.mh_key, eth.header_cids.times_validated + 1, excluded.base_fee)
|
|
||||||
RETURNING id`
|
|
||||||
unclesCIDsPgStr string = `INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES (unnest($1), unnest($2), unnest($3), unnest($4), unnest($5), unnest($6))
|
|
||||||
ON CONFLICT (header_id, block_hash) DO UPDATE SET (parent_hash, cid, reward, mh_key) = (excluded.parent_hash, excluded.cid, excluded.reward, excluded.mh_key)`
|
|
||||||
txCIDsPgStr string = `INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) VALUES (unnest($1), unnest($2), unnest($3), unnest($4), unnest($5), unnest($6), unnest($7), unnest($8), unnest($9))
|
|
||||||
ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index, mh_key, tx_data, tx_type) = (excluded.cid, excluded.dst, excluded.src, excluded.index, excluded.mh_key, excluded.tx_data, excluded.tx_type)
|
|
||||||
RETURNING id`
|
|
||||||
accessListPgStr string = `INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES (unnest($1), unnest($2), unnest($3), unnest($4))
|
|
||||||
ON CONFLICT (tx_id, index) DO UPDATE SET (address, storage_keys) = (excluded.address, excluded.storage_keys)`
|
|
||||||
rctCIDsPgStr string = `INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES (unnest($1), unnest($2), unnest($3), unnest($4), unnest($5), unnest($6), unnest($7), unnest($8))
|
|
||||||
ON CONFLICT (tx_id) DO UPDATE SET (leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) = (excluded.leaf_cid, excluded.contract, excluded.contract_hash, excluded.leaf_mh_key, excluded.post_state, excluded.post_status, excluded.log_root)
|
|
||||||
RETURNING id`
|
|
||||||
logCIDsPgStr string = `INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, receipt_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES (unnest($1), unnest($2), unnest($3), unnest($4), unnest($5), unnest($6), unnest($7), unnest($8), unnest($9), unnest($10))
|
|
||||||
ON CONFLICT (receipt_id, index) DO UPDATE SET (leaf_cid, leaf_mh_key, address, topic0, topic1, topic2, topic3, log_data) = (excluded.leaf_cid, excluded.leaf_mh_key, excluded.address, excluded.topic0, excluded.topic1, excluded.topic2, excluded.topic3, excluded.log_data)`
|
|
||||||
stateCIDsPgStr string = `INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES (unnest($1), unnest($2), unnest($3), unnest($4), unnest($5), unnest($6), unnest($7))
|
|
||||||
ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = (excluded.state_leaf_key, excluded.cid, excluded.node_type, excluded.diff, excluded.mh_key)
|
|
||||||
RETURNING id`
|
|
||||||
stateAccountsPgStr string = `INSERT INTO eth.state_accounts (state_id, balance, nonce, code_hash, storage_root) VALUES (unnest($1), unnest($2), unnest($3), unnest($4), unnest($5))
|
|
||||||
ON CONFLICT (state_id) DO UPDATE SET (balance, nonce, code_hash, storage_root) = (excluded.balance, excluded.nonce, excluded.code_hash, excluded.storage_root)`
|
|
||||||
storageCIDsPgStr string = `INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES (unnest($1), unnest($2), unnest($3), unnest($4), unnest($5), unnest($6), unnest($7))
|
|
||||||
ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = (excluded.storage_leaf_key, excluded.cid, excluded.node_type, excluded.diff, excluded.mh_key)`
|
|
||||||
)
|
|
||||||
|
|
||||||
// PostgresBatchWriter is used to write statediff data to Postgres using batch inserts/upserts
|
|
||||||
type PostgresBatchWriter struct {
|
|
||||||
db *postgres.DB
|
|
||||||
|
|
||||||
// prepared statements (prepared inside tx)
|
|
||||||
ipldsPreparedStm *sqlx.Stmt
|
|
||||||
unclesPrepared *sqlx.Stmt
|
|
||||||
txPreparedStm *sqlx.Stmt
|
|
||||||
accessListPreparedStm *sqlx.Stmt
|
|
||||||
rctPreparedStm *sqlx.Stmt
|
|
||||||
logPreparedStm *sqlx.Stmt
|
|
||||||
statePreparedStm *sqlx.Stmt
|
|
||||||
accountPreparedStm *sqlx.Stmt
|
|
||||||
storagePreparedStm *sqlx.Stmt
|
|
||||||
|
|
||||||
// cached arguments
|
|
||||||
queuedHeaderArgs models.HeaderModel
|
|
||||||
queuedUnclesArgs models.UncleBatch
|
|
||||||
queuedTxArgs models.TxBatch
|
|
||||||
queuedAccessListArgs models.AccessListBatch
|
|
||||||
queuedRctArgs models.ReceiptBatch
|
|
||||||
queuedLogArgs models.LogBatch
|
|
||||||
queuedStateArgs models.StateBatch
|
|
||||||
queuedAccountArgs models.AccountBatch
|
|
||||||
queuedStorageArgs models.StorageBatch
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewPostgresBatchWriter creates a new pointer to a PostgresBatchWriter
|
|
||||||
func NewPostgresBatchWriter(db *postgres.DB) *PostgresBatchWriter {
|
|
||||||
return &PostgresBatchWriter{
|
|
||||||
db: db,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pbw *PostgresBatchWriter) queueHeader(header models.HeaderModel) {
|
|
||||||
pbw.queuedHeaderArgs = header
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pbw *PostgresBatchWriter) queueUncle(uncle models.UncleModel) {
|
|
||||||
pbw.queuedUnclesArgs.BlockHashes = append(pbw.queuedUnclesArgs.BlockHashes, uncle.BlockHash)
|
|
||||||
pbw.queuedUnclesArgs.ParentHashes = append(pbw.queuedUnclesArgs.ParentHashes, uncle.ParentHash)
|
|
||||||
pbw.queuedUnclesArgs.CIDs = append(pbw.queuedUnclesArgs.CIDs, uncle.CID)
|
|
||||||
pbw.queuedUnclesArgs.MhKeys = append(pbw.queuedUnclesArgs.MhKeys, uncle.MhKey)
|
|
||||||
pbw.queuedUnclesArgs.Rewards = append(pbw.queuedUnclesArgs.Rewards, uncle.Reward)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pbw *PostgresBatchWriter) queueTransaction(tx models.TxModel) {
|
|
||||||
pbw.queuedTxArgs.Indexes = append(pbw.queuedTxArgs.Indexes, tx.Index)
|
|
||||||
pbw.queuedTxArgs.TxHashes = append(pbw.queuedTxArgs.TxHashes, tx.TxHash)
|
|
||||||
pbw.queuedTxArgs.CIDs = append(pbw.queuedTxArgs.CIDs, tx.CID)
|
|
||||||
pbw.queuedTxArgs.MhKeys = append(pbw.queuedTxArgs.MhKeys, tx.MhKey)
|
|
||||||
pbw.queuedTxArgs.Dsts = append(pbw.queuedTxArgs.Dsts, tx.Dst)
|
|
||||||
pbw.queuedTxArgs.Srcs = append(pbw.queuedTxArgs.Srcs, tx.Src)
|
|
||||||
pbw.queuedTxArgs.Datas = append(pbw.queuedTxArgs.Datas, tx.Data)
|
|
||||||
pbw.queuedTxArgs.Types = append(pbw.queuedTxArgs.Types, tx.Type)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pbw *PostgresBatchWriter) queueAccessListElement(al models.AccessListElementModel) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pbw *PostgresBatchWriter) queueReceipt(rct models.ReceiptModel) {
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pbw *PostgresBatchWriter) upsertTransactionCID(tx *sqlx.Tx, transaction models.TxModel, headerID int64) (int64, error) {
|
|
||||||
var txID int64
|
|
||||||
err := tx.QueryRowx(`INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
|
||||||
ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index, mh_key, tx_data, tx_type) = ($3, $4, $5, $6, $7, $8, $9)
|
|
||||||
RETURNING id`,
|
|
||||||
headerID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type).Scan(&txID)
|
|
||||||
if err != nil {
|
|
||||||
return 0, fmt.Errorf("error upserting transaction_cids entry: %v", err)
|
|
||||||
}
|
|
||||||
indexerMetrics.transactions.Inc(1)
|
|
||||||
return txID, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pbw *PostgresBatchWriter) upsertAccessListElement(tx *sqlx.Tx, accessListElement models.AccessListElementModel, txID int64) error {
|
|
||||||
_, err := tx.Exec(`INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4)
|
|
||||||
ON CONFLICT (tx_id, index) DO UPDATE SET (address, storage_keys) = ($3, $4)`,
|
|
||||||
txID, accessListElement.Index, accessListElement.Address, accessListElement.StorageKeys)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error upserting access_list_element entry: %v", err)
|
|
||||||
}
|
|
||||||
indexerMetrics.accessListEntries.Inc(1)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pbw *PostgresBatchWriter) upsertReceiptCID(tx *sqlx.Tx, rct *models.ReceiptModel, txID int64) (int64, error) {
|
|
||||||
var receiptID int64
|
|
||||||
err := tx.QueryRowx(`INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
|
||||||
ON CONFLICT (tx_id) DO UPDATE SET (leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) = ($2, $3, $4, $5, $6, $7, $8)
|
|
||||||
RETURNING id`,
|
|
||||||
txID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot).Scan(&receiptID)
|
|
||||||
if err != nil {
|
|
||||||
return 0, fmt.Errorf("error upserting receipt_cids entry: %w", err)
|
|
||||||
}
|
|
||||||
indexerMetrics.receipts.Inc(1)
|
|
||||||
return receiptID, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pbw *PostgresBatchWriter) upsertLogCID(tx *sqlx.Tx, logs []*models.LogsModel, receiptID int64) error {
|
|
||||||
for _, log := range logs {
|
|
||||||
_, err := tx.Exec(`INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, receipt_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
|
||||||
ON CONFLICT (receipt_id, index) DO UPDATE SET (leaf_cid, leaf_mh_key, address, topic0, topic1, topic2, topic3, log_data) = ($1, $2, $4, $6, $7, $8, $9, $10)`,
|
|
||||||
log.LeafCID, log.LeafMhKey, receiptID, log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3, log.Data)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error upserting logs entry: %w", err)
|
|
||||||
}
|
|
||||||
indexerMetrics.logs.Inc(1)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pbw *PostgresBatchWriter) upsertStateCID(tx *sqlx.Tx, stateNode models.StateNodeModel, headerID int64) (int64, error) {
|
|
||||||
var stateID int64
|
|
||||||
var stateKey string
|
|
||||||
if stateNode.StateKey != nullHash.String() {
|
|
||||||
stateKey = stateNode.StateKey
|
|
||||||
}
|
|
||||||
err := tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
|
|
||||||
ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)
|
|
||||||
RETURNING id`,
|
|
||||||
headerID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey).Scan(&stateID)
|
|
||||||
if err != nil {
|
|
||||||
return 0, fmt.Errorf("error upserting state_cids entry: %v", err)
|
|
||||||
}
|
|
||||||
return stateID, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pbw *PostgresBatchWriter) upsertStateAccount(tx *sqlx.Tx, stateAccount models.StateAccountModel, stateID int64) error {
|
|
||||||
_, err := tx.Exec(`INSERT INTO eth.state_accounts (state_id, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5)
|
|
||||||
ON CONFLICT (state_id) DO UPDATE SET (balance, nonce, code_hash, storage_root) = ($2, $3, $4, $5)`,
|
|
||||||
stateID, stateAccount.Balance, stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error upserting state_accounts entry: %v", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pbw *PostgresBatchWriter) upsertStorageCID(tx *sqlx.Tx, storageCID models.StorageNodeModel, stateID int64) error {
|
|
||||||
var storageKey string
|
|
||||||
if storageCID.StorageKey != nullHash.String() {
|
|
||||||
storageKey = storageCID.StorageKey
|
|
||||||
}
|
|
||||||
_, err := tx.Exec(`INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
|
|
||||||
ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)`,
|
|
||||||
stateID, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("error upserting storage_cids entry: %v", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
*/
|
|
@ -235,10 +235,10 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
|||||||
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (string, error) {
|
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (string, error) {
|
||||||
tx.cacheIPLD(headerNode)
|
tx.cacheIPLD(headerNode)
|
||||||
|
|
||||||
var baseFee *int64
|
var baseFee *string
|
||||||
if header.BaseFee != nil {
|
if header.BaseFee != nil {
|
||||||
baseFee = new(int64)
|
baseFee = new(string)
|
||||||
*baseFee = header.BaseFee.Int64()
|
*baseFee = header.BaseFee.String()
|
||||||
}
|
}
|
||||||
headerID := header.Hash().String()
|
headerID := header.Hash().String()
|
||||||
// index header
|
// index header
|
||||||
@ -541,5 +541,5 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAnd
|
|||||||
|
|
||||||
// Close satisfies io.Closer
|
// Close satisfies io.Closer
|
||||||
func (sdi *StateDiffIndexer) Close() error {
|
func (sdi *StateDiffIndexer) Close() error {
|
||||||
return sdi.dbWriter.db.Close()
|
return sdi.dbWriter.Close()
|
||||||
}
|
}
|
||||||
|
@ -60,7 +60,7 @@ func setupLegacyPGX(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPGXIndexerLegacy(t *testing.T) {
|
func TestPGXIndexerLegacy(t *testing.T) {
|
||||||
t.Run("Publish and index header IPLDs in a legacy tx", func(t *testing.T) {
|
t.Run("Publish and index header IPLDs", func(t *testing.T) {
|
||||||
setupLegacyPGX(t)
|
setupLegacyPGX(t)
|
||||||
defer tearDown(t)
|
defer tearDown(t)
|
||||||
pgStr := `SELECT cid, cast(td AS TEXT), cast(reward AS TEXT), block_hash, base_fee
|
pgStr := `SELECT cid, cast(td AS TEXT), cast(reward AS TEXT), block_hash, base_fee
|
||||||
|
@ -140,9 +140,7 @@ func setupPGX(t *testing.T) {
|
|||||||
}()
|
}()
|
||||||
for _, node := range mocks.StateDiffs {
|
for _, node := range mocks.StateDiffs {
|
||||||
err = ind.PushStateNode(tx, node, mockBlock.Hash().String())
|
err = ind.PushStateNode(tx, node, mockBlock.Hash().String())
|
||||||
if err != nil {
|
require.NoError(t, err)
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
test_helpers.ExpectEqual(t, tx.(*sql.BatchTx).BlockNumber, mocks.BlockNumber.Uint64())
|
test_helpers.ExpectEqual(t, tx.(*sql.BatchTx).BlockNumber, mocks.BlockNumber.Uint64())
|
||||||
@ -152,7 +150,7 @@ func TestPGXIndexer(t *testing.T) {
|
|||||||
t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) {
|
t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) {
|
||||||
setupPGX(t)
|
setupPGX(t)
|
||||||
defer tearDown(t)
|
defer tearDown(t)
|
||||||
pgStr := `SELECT cid, cast(td AS TEXT), cast(reward AS TEXT), block_hash, base_fee
|
pgStr := `SELECT cid, cast(td AS TEXT), cast(reward AS TEXT), block_hash, cast(base_fee AS TEXT)
|
||||||
FROM eth.header_cids
|
FROM eth.header_cids
|
||||||
WHERE block_number = $1`
|
WHERE block_number = $1`
|
||||||
// check header was properly indexed
|
// check header was properly indexed
|
||||||
@ -161,7 +159,7 @@ func TestPGXIndexer(t *testing.T) {
|
|||||||
TD string
|
TD string
|
||||||
Reward string
|
Reward string
|
||||||
BlockHash string `db:"block_hash"`
|
BlockHash string `db:"block_hash"`
|
||||||
BaseFee *int64 `db:"base_fee"`
|
BaseFee *string `db:"base_fee"`
|
||||||
}
|
}
|
||||||
header := new(res)
|
header := new(res)
|
||||||
err = db.QueryRow(context.Background(), pgStr, mocks.BlockNumber.Uint64()).Scan(
|
err = db.QueryRow(context.Background(), pgStr, mocks.BlockNumber.Uint64()).Scan(
|
||||||
@ -176,7 +174,7 @@ func TestPGXIndexer(t *testing.T) {
|
|||||||
test_helpers.ExpectEqual(t, header.CID, headerCID.String())
|
test_helpers.ExpectEqual(t, header.CID, headerCID.String())
|
||||||
test_helpers.ExpectEqual(t, header.TD, mocks.MockBlock.Difficulty().String())
|
test_helpers.ExpectEqual(t, header.TD, mocks.MockBlock.Difficulty().String())
|
||||||
test_helpers.ExpectEqual(t, header.Reward, "2000000000000021250")
|
test_helpers.ExpectEqual(t, header.Reward, "2000000000000021250")
|
||||||
test_helpers.ExpectEqual(t, *header.BaseFee, mocks.MockHeader.BaseFee.Int64())
|
test_helpers.ExpectEqual(t, *header.BaseFee, mocks.MockHeader.BaseFee.String())
|
||||||
dc, err := cid.Decode(header.CID)
|
dc, err := cid.Decode(header.CID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@ -49,9 +49,9 @@ func ResolveDriverType(str string) (DriverType, error) {
|
|||||||
var DefaultConfig = Config{
|
var DefaultConfig = Config{
|
||||||
Hostname: "localhost",
|
Hostname: "localhost",
|
||||||
Port: 5432,
|
Port: 5432,
|
||||||
DatabaseName: "vulcanize_test",
|
DatabaseName: "vulcanize_testing",
|
||||||
Username: "postgres",
|
Username: "postgres",
|
||||||
Password: "",
|
Password: "password",
|
||||||
}
|
}
|
||||||
|
|
||||||
// Config holds params for a Postgres db
|
// Config holds params for a Postgres db
|
||||||
|
@ -47,10 +47,10 @@ func TestPostgresPGX(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to connect to db with connection string: %s err: %v", pgConfig.ConnString(), err)
|
t.Fatalf("failed to connect to db with connection string: %s err: %v", pgConfig.ConnString(), err)
|
||||||
}
|
}
|
||||||
defer dbPool.Close()
|
|
||||||
if dbPool == nil {
|
if dbPool == nil {
|
||||||
t.Fatal("DB pool is nil")
|
t.Fatal("DB pool is nil")
|
||||||
}
|
}
|
||||||
|
dbPool.Close()
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("serializes big.Int to db", func(t *testing.T) {
|
t.Run("serializes big.Int to db", func(t *testing.T) {
|
||||||
|
@ -39,11 +39,15 @@ func TestPostgresSQLX(t *testing.T) {
|
|||||||
|
|
||||||
sqlxdb, err = sqlx.Connect("postgres", connStr)
|
sqlxdb, err = sqlx.Connect("postgres", connStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to connect to db with connection string: %s err: %v", pgConfig.ConnString(), err)
|
t.Fatalf("failed to connect to db with connection string: %s err: %v", connStr, err)
|
||||||
}
|
}
|
||||||
if sqlxdb == nil {
|
if sqlxdb == nil {
|
||||||
t.Fatal("DB is nil")
|
t.Fatal("DB is nil")
|
||||||
}
|
}
|
||||||
|
err = sqlxdb.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("serializes big.Int to db", func(t *testing.T) {
|
t.Run("serializes big.Int to db", func(t *testing.T) {
|
||||||
@ -59,9 +63,7 @@ func TestPostgresSQLX(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if err != nil {
|
defer db.Close()
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
bi := new(big.Int)
|
bi := new(big.Int)
|
||||||
bi.SetString("34940183920000000000", 10)
|
bi.SetString("34940183920000000000", 10)
|
||||||
|
@ -70,7 +70,7 @@ func setupLegacySQLX(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestSQLXIndexerLegacy(t *testing.T) {
|
func TestSQLXIndexerLegacy(t *testing.T) {
|
||||||
t.Run("Publish and index header IPLDs in a legacy tx", func(t *testing.T) {
|
t.Run("Publish and index header IPLDs", func(t *testing.T) {
|
||||||
setupLegacySQLX(t)
|
setupLegacySQLX(t)
|
||||||
defer tearDown(t)
|
defer tearDown(t)
|
||||||
pgStr := `SELECT cid, td, reward, block_hash, base_fee
|
pgStr := `SELECT cid, td, reward, block_hash, base_fee
|
||||||
|
@ -160,9 +160,7 @@ func setupSQLX(t *testing.T) {
|
|||||||
}()
|
}()
|
||||||
for _, node := range mocks.StateDiffs {
|
for _, node := range mocks.StateDiffs {
|
||||||
err = ind.PushStateNode(tx, node, mockBlock.Hash().String())
|
err = ind.PushStateNode(tx, node, mockBlock.Hash().String())
|
||||||
if err != nil {
|
require.NoError(t, err)
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
test_helpers.ExpectEqual(t, tx.(*sql.BatchTx).BlockNumber, mocks.BlockNumber.Uint64())
|
test_helpers.ExpectEqual(t, tx.(*sql.BatchTx).BlockNumber, mocks.BlockNumber.Uint64())
|
||||||
@ -188,7 +186,7 @@ func TestSQLXIndexer(t *testing.T) {
|
|||||||
TD string
|
TD string
|
||||||
Reward string
|
Reward string
|
||||||
BlockHash string `db:"block_hash"`
|
BlockHash string `db:"block_hash"`
|
||||||
BaseFee *int64 `db:"base_fee"`
|
BaseFee *string `db:"base_fee"`
|
||||||
}
|
}
|
||||||
header := new(res)
|
header := new(res)
|
||||||
err = db.QueryRow(context.Background(), pgStr, mocks.BlockNumber.Uint64()).(*sqlx.Row).StructScan(header)
|
err = db.QueryRow(context.Background(), pgStr, mocks.BlockNumber.Uint64()).(*sqlx.Row).StructScan(header)
|
||||||
@ -198,7 +196,7 @@ func TestSQLXIndexer(t *testing.T) {
|
|||||||
test_helpers.ExpectEqual(t, header.CID, headerCID.String())
|
test_helpers.ExpectEqual(t, header.CID, headerCID.String())
|
||||||
test_helpers.ExpectEqual(t, header.TD, mocks.MockBlock.Difficulty().String())
|
test_helpers.ExpectEqual(t, header.TD, mocks.MockBlock.Difficulty().String())
|
||||||
test_helpers.ExpectEqual(t, header.Reward, "2000000000000021250")
|
test_helpers.ExpectEqual(t, header.Reward, "2000000000000021250")
|
||||||
test_helpers.ExpectEqual(t, *header.BaseFee, mocks.MockHeader.BaseFee.Int64())
|
test_helpers.ExpectEqual(t, *header.BaseFee, mocks.MockHeader.BaseFee.String())
|
||||||
dc, err := cid.Decode(header.CID)
|
dc, err := cid.Decode(header.CID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@ -33,6 +33,10 @@ func TearDownDB(t *testing.T, db Database) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
_, err = tx.Exec(ctx, `DELETE FROM eth.uncle_cids`)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
_, err = tx.Exec(ctx, `DELETE FROM eth.transaction_cids`)
|
_, err = tx.Exec(ctx, `DELETE FROM eth.transaction_cids`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -65,6 +69,10 @@ func TearDownDB(t *testing.T, db Database) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
_, err = tx.Exec(ctx, `DELETE FROM nodes`)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
err = tx.Commit(ctx)
|
err = tx.Commit(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
@ -39,14 +39,19 @@ func NewWriter(db Database) *Writer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close satisfies io.Closer
|
||||||
|
func (w *Writer) Close() error {
|
||||||
|
return w.db.Close()
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee)
|
INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee)
|
||||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
|
||||||
ON CONFLICT (block_hash) DO UPDATE SET (block_number, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = ($1, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)
|
ON CONFLICT (block_hash) DO UPDATE SET (block_number, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = ($1, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)
|
||||||
*/
|
*/
|
||||||
func (in *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error {
|
func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error {
|
||||||
_, err := tx.Exec(in.db.Context(), in.db.InsertHeaderStm(),
|
_, err := tx.Exec(w.db.Context(), w.db.InsertHeaderStm(),
|
||||||
header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, in.db.NodeID(), header.Reward, header.StateRoot, header.TxRoot,
|
header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, w.db.NodeID(), header.Reward, header.StateRoot, header.TxRoot,
|
||||||
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.BaseFee)
|
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.BaseFee)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error upserting header_cids entry: %v", err)
|
return fmt.Errorf("error upserting header_cids entry: %v", err)
|
||||||
@ -59,8 +64,8 @@ func (in *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error {
|
|||||||
INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6)
|
INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6)
|
||||||
ON CONFLICT (block_hash) DO NOTHING
|
ON CONFLICT (block_hash) DO NOTHING
|
||||||
*/
|
*/
|
||||||
func (in *Writer) upsertUncleCID(tx Tx, uncle models.UncleModel) error {
|
func (w *Writer) upsertUncleCID(tx Tx, uncle models.UncleModel) error {
|
||||||
_, err := tx.Exec(in.db.Context(), in.db.InsertUncleStm(),
|
_, err := tx.Exec(w.db.Context(), w.db.InsertUncleStm(),
|
||||||
uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey)
|
uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error upserting uncle_cids entry: %v", err)
|
return fmt.Errorf("error upserting uncle_cids entry: %v", err)
|
||||||
@ -72,8 +77,8 @@ func (in *Writer) upsertUncleCID(tx Tx, uncle models.UncleModel) error {
|
|||||||
INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
||||||
ON CONFLICT (tx_hash) DO NOTHING
|
ON CONFLICT (tx_hash) DO NOTHING
|
||||||
*/
|
*/
|
||||||
func (in *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error {
|
func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error {
|
||||||
_, err := tx.Exec(in.db.Context(), in.db.InsertTxStm(),
|
_, err := tx.Exec(w.db.Context(), w.db.InsertTxStm(),
|
||||||
transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type)
|
transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error upserting transaction_cids entry: %v", err)
|
return fmt.Errorf("error upserting transaction_cids entry: %v", err)
|
||||||
@ -86,8 +91,8 @@ func (in *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error
|
|||||||
INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4)
|
INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4)
|
||||||
ON CONFLICT (tx_id, index) DO NOTHING
|
ON CONFLICT (tx_id, index) DO NOTHING
|
||||||
*/
|
*/
|
||||||
func (in *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessListElementModel) error {
|
func (w *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessListElementModel) error {
|
||||||
_, err := tx.Exec(in.db.Context(), in.db.InsertAccessListElementStm(),
|
_, err := tx.Exec(w.db.Context(), w.db.InsertAccessListElementStm(),
|
||||||
accessListElement.TxID, accessListElement.Index, accessListElement.Address, accessListElement.StorageKeys)
|
accessListElement.TxID, accessListElement.Index, accessListElement.Address, accessListElement.StorageKeys)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error upserting access_list_element entry: %v", err)
|
return fmt.Errorf("error upserting access_list_element entry: %v", err)
|
||||||
@ -100,8 +105,8 @@ func (in *Writer) upsertAccessListElement(tx Tx, accessListElement models.Access
|
|||||||
INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||||
ON CONFLICT (tx_id) DO NOTHING
|
ON CONFLICT (tx_id) DO NOTHING
|
||||||
*/
|
*/
|
||||||
func (in *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error {
|
func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error {
|
||||||
_, err := tx.Exec(in.db.Context(), in.db.InsertRctStm(),
|
_, err := tx.Exec(w.db.Context(), w.db.InsertRctStm(),
|
||||||
rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot)
|
rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error upserting receipt_cids entry: %w", err)
|
return fmt.Errorf("error upserting receipt_cids entry: %w", err)
|
||||||
@ -114,9 +119,9 @@ func (in *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error {
|
|||||||
INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
||||||
ON CONFLICT (rct_id, index) DO NOTHING
|
ON CONFLICT (rct_id, index) DO NOTHING
|
||||||
*/
|
*/
|
||||||
func (in *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error {
|
func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error {
|
||||||
for _, log := range logs {
|
for _, log := range logs {
|
||||||
_, err := tx.Exec(in.db.Context(), in.db.InsertLogStm(),
|
_, err := tx.Exec(w.db.Context(), w.db.InsertLogStm(),
|
||||||
log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3, log.Data)
|
log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3, log.Data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error upserting logs entry: %w", err)
|
return fmt.Errorf("error upserting logs entry: %w", err)
|
||||||
@ -130,12 +135,12 @@ func (in *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error {
|
|||||||
INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
|
INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
|
||||||
ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)
|
ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)
|
||||||
*/
|
*/
|
||||||
func (in *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error {
|
func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error {
|
||||||
var stateKey string
|
var stateKey string
|
||||||
if stateNode.StateKey != nullHash.String() {
|
if stateNode.StateKey != nullHash.String() {
|
||||||
stateKey = stateNode.StateKey
|
stateKey = stateNode.StateKey
|
||||||
}
|
}
|
||||||
_, err := tx.Exec(in.db.Context(), in.db.InsertStateStm(),
|
_, err := tx.Exec(w.db.Context(), w.db.InsertStateStm(),
|
||||||
stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey)
|
stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error upserting state_cids entry: %v", err)
|
return fmt.Errorf("error upserting state_cids entry: %v", err)
|
||||||
@ -147,8 +152,8 @@ func (in *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error {
|
|||||||
INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5, $6)
|
INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5, $6)
|
||||||
ON CONFLICT (header_id, state_path) DO NOTHING
|
ON CONFLICT (header_id, state_path) DO NOTHING
|
||||||
*/
|
*/
|
||||||
func (in *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel) error {
|
func (w *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel) error {
|
||||||
_, err := tx.Exec(in.db.Context(), in.db.InsertAccountStm(),
|
_, err := tx.Exec(w.db.Context(), w.db.InsertAccountStm(),
|
||||||
stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance, stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot)
|
stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance, stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error upserting state_accounts entry: %v", err)
|
return fmt.Errorf("error upserting state_accounts entry: %v", err)
|
||||||
@ -160,12 +165,12 @@ func (in *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountMode
|
|||||||
INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||||
ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($3, $4, $6, $7, $8)
|
ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($3, $4, $6, $7, $8)
|
||||||
*/
|
*/
|
||||||
func (in *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) error {
|
func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) error {
|
||||||
var storageKey string
|
var storageKey string
|
||||||
if storageCID.StorageKey != nullHash.String() {
|
if storageCID.StorageKey != nullHash.String() {
|
||||||
storageKey = storageCID.StorageKey
|
storageKey = storageCID.StorageKey
|
||||||
}
|
}
|
||||||
_, err := tx.Exec(in.db.Context(), in.db.InsertStorageStm(),
|
_, err := tx.Exec(w.db.Context(), w.db.InsertStorageStm(),
|
||||||
storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey)
|
storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error upserting storage_cids entry: %v", err)
|
return fmt.Errorf("error upserting storage_cids entry: %v", err)
|
||||||
|
@ -22,18 +22,16 @@ import (
|
|||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
"math/big"
|
"math/big"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/trie"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/crypto"
|
"github.com/ethereum/go-ethereum/crypto"
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
"github.com/ethereum/go-ethereum/params"
|
"github.com/ethereum/go-ethereum/params"
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||||
"github.com/ethereum/go-ethereum/statediff/test_helpers"
|
"github.com/ethereum/go-ethereum/statediff/test_helpers"
|
||||||
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
|
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
|
||||||
|
"github.com/ethereum/go-ethereum/trie"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Test variables
|
// Test variables
|
||||||
|
@ -41,7 +41,7 @@ type HeaderModel struct {
|
|||||||
Bloom []byte `db:"bloom"`
|
Bloom []byte `db:"bloom"`
|
||||||
Timestamp uint64 `db:"timestamp"`
|
Timestamp uint64 `db:"timestamp"`
|
||||||
TimesValidated int64 `db:"times_validated"`
|
TimesValidated int64 `db:"times_validated"`
|
||||||
BaseFee *int64 `db:"base_fee"`
|
BaseFee *string `db:"base_fee"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// UncleModel is the db model for eth.uncle_cids
|
// UncleModel is the db model for eth.uncle_cids
|
||||||
|
Loading…
Reference in New Issue
Block a user