diff --git a/cmd/geth/config.go b/cmd/geth/config.go
index 9a8b169be..d77e261f9 100644
--- a/cmd/geth/config.go
+++ b/cmd/geth/config.go
@@ -45,6 +45,7 @@ import (
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/statediff"
dumpdb "github.com/ethereum/go-ethereum/statediff/indexer/database/dump"
+ "github.com/ethereum/go-ethereum/statediff/indexer/database/file"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
@@ -204,6 +205,10 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
utils.Fatalf("%v", err)
}
switch dbType {
+ case shared.FILE:
+ indexerConfig = file.Config{
+ FilePath: ctx.GlobalString(utils.StateDiffFilePath.Name),
+ }
case shared.POSTGRES:
driverTypeStr := ctx.GlobalString(utils.StateDiffDBDriverTypeFlag.Name)
driverType, err := postgres.ResolveDriverType(driverTypeStr)
diff --git a/cmd/geth/main.go b/cmd/geth/main.go
index c92810d11..990b40a60 100644
--- a/cmd/geth/main.go
+++ b/cmd/geth/main.go
@@ -167,6 +167,7 @@ var (
utils.StateDiffDBClientNameFlag,
utils.StateDiffWritingFlag,
utils.StateDiffWorkersFlag,
+ utils.StateDiffFilePath,
configFileFlag,
utils.CatalystFlag,
}
diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go
index 68e2a3f4c..885cc2c16 100644
--- a/cmd/geth/usage.go
+++ b/cmd/geth/usage.go
@@ -243,6 +243,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.StateDiffDBClientNameFlag,
utils.StateDiffWritingFlag,
utils.StateDiffWorkersFlag,
+ utils.StateDiffFilePath,
},
},
{
diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go
index 08f9088f5..ccc9ac89e 100644
--- a/cmd/utils/flags.go
+++ b/cmd/utils/flags.go
@@ -788,7 +788,7 @@ var (
}
StateDiffDBTypeFlag = cli.StringFlag{
Name: "statediff.db.type",
- Usage: "Statediff database type",
+ Usage: "Statediff database type (current options: postgres, file, dump)",
Value: "postgres",
}
StateDiffDBDriverTypeFlag = cli.StringFlag{
@@ -852,6 +852,10 @@ var (
Name: "statediff.db.nodeid",
Usage: "Node ID to use when writing state diffs to database",
}
+ StateDiffFilePath = cli.StringFlag{
+ Name: "statediff.file.path",
+ Usage: "Full path (including filename) to write statediff data out to when operating in file mode",
+ }
StateDiffDBClientNameFlag = cli.StringFlag{
Name: "statediff.db.clientname",
Usage: "Client name to use when writing state diffs to database",
diff --git a/statediff/README.md b/statediff/README.md
index 97666d50a..7170363ae 100644
--- a/statediff/README.md
+++ b/statediff/README.md
@@ -79,7 +79,7 @@ This service introduces a CLI flag namespace `statediff`
`--statediff` flag is used to turn on the service
`--statediff.writing` is used to tell the service to write state diff objects it produces from synced ChainEvents directly to a configured Postgres database
`--statediff.workers` is used to set the number of concurrent workers to process state diff objects and write them into the database
-`--statediff.db.type` is the type of database we write out to (current options: postgres and dump)
+`--statediff.db.type` is the type of database we write out to (current options: postgres, dump, file)
`--statediff.dump.dst` is the destination to write to when operating in database dump mode (stdout, stderr, discard)
`--statediff.db.driver` is the specific driver to use for the database (current options for postgres: pgx and sqlx)
`--statediff.db.host` is the hostname/ip to dial to connect to the database
@@ -95,6 +95,7 @@ This service introduces a CLI flag namespace `statediff`
`--statediff.db.maxconnlifetime` is the maximum lifetime for a connection (in seconds)
`--statediff.db.nodeid` is the node id to use in the Postgres database
`--statediff.db.clientname` is the client name to use in the Postgres database
+`--statediff.file.path` full path (including filename) to write statediff data out to when operating in file mode
The service can only operate in full sync mode (`--syncmode=full`), but only the historical RPC endpoints require an archive node (`--gcmode=archive`)
diff --git a/statediff/builder.go b/statediff/builder.go
index eacfeca15..8dc3cece8 100644
--- a/statediff/builder.go
+++ b/statediff/builder.go
@@ -23,16 +23,14 @@ import (
"bytes"
"fmt"
- "github.com/ethereum/go-ethereum/statediff/trie_helpers"
-
- types2 "github.com/ethereum/go-ethereum/statediff/types"
-
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
+ "github.com/ethereum/go-ethereum/statediff/trie_helpers"
+ types2 "github.com/ethereum/go-ethereum/statediff/types"
"github.com/ethereum/go-ethereum/trie"
)
diff --git a/statediff/indexer/constructor.go b/statediff/indexer/constructor.go
index 7a44638d0..bfb746080 100644
--- a/statediff/indexer/constructor.go
+++ b/statediff/indexer/constructor.go
@@ -22,6 +22,7 @@ import (
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/statediff/indexer/database/dump"
+ "github.com/ethereum/go-ethereum/statediff/indexer/database/file"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
@@ -32,10 +33,17 @@ import (
// NewStateDiffIndexer creates and returns an implementation of the StateDiffIndexer interface
func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, nodeInfo node.Info, config interfaces.Config) (interfaces.StateDiffIndexer, error) {
switch config.Type() {
+ case shared.FILE:
+ fc, ok := config.(file.Config)
+ if !ok {
+ return nil, fmt.Errorf("file config is not the correct type: got %T, expected %T", config, file.Config{})
+ }
+ fc.NodeInfo = nodeInfo
+ return file.NewStateDiffIndexer(ctx, chainConfig, fc)
case shared.POSTGRES:
pgc, ok := config.(postgres.Config)
if !ok {
- return nil, fmt.Errorf("ostgres config is not the correct type: got %T, expected %T", config, postgres.Config{})
+ return nil, fmt.Errorf("postgres config is not the correct type: got %T, expected %T", config, postgres.Config{})
}
var err error
var driver sql.Driver
diff --git a/statediff/indexer/database/dump/indexer.go b/statediff/indexer/database/dump/indexer.go
index 357a78ece..b75fb1af9 100644
--- a/statediff/indexer/database/dump/indexer.go
+++ b/statediff/indexer/database/dump/indexer.go
@@ -136,7 +136,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
t = time.Now()
// Publish and index header, collect headerID
- var headerID int64
+ var headerID string
headerID, err = sdi.processHeader(blockTx, block.Header(), headerNode, reward, totalDifficulty)
if err != nil {
return nil, err
@@ -181,7 +181,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
// processHeader publishes and indexes a header IPLD in Postgres
// it returns the headerID
-func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (int64, error) {
+func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (string, error) {
tx.cacheIPLD(headerNode)
var baseFee *int64
@@ -190,12 +190,13 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
*baseFee = header.BaseFee.Int64()
}
+ headerID := header.Hash().String()
mod := models.HeaderModel{
CID: headerNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(headerNode.Cid()),
ParentHash: header.ParentHash.String(),
BlockNumber: header.Number.String(),
- BlockHash: header.Hash().String(),
+ BlockHash: headerID,
TotalDifficulty: td.String(),
Reward: reward.String(),
Bloom: header.Bloom.Bytes(),
@@ -207,11 +208,11 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
BaseFee: baseFee,
}
_, err := fmt.Fprintf(sdi.dump, "%+v\r\n", mod)
- return 0, err
+ return headerID, err
}
// processUncles publishes and indexes uncle IPLDs in Postgres
-func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID int64, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error {
+func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error {
// publish and index uncles
for _, uncleNode := range uncleNodes {
tx.cacheIPLD(uncleNode)
@@ -223,6 +224,7 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID int64, blockNum
uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64())
}
uncle := models.UncleModel{
+ HeaderID: headerID,
CID: uncleNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
ParentHash: uncleNode.ParentHash.String(),
@@ -238,7 +240,7 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID int64, blockNum
// processArgs bundles arguments to processReceiptsAndTxs
type processArgs struct {
- headerID int64
+ headerID string
blockNumber *big.Int
receipts types.Receipts
txs types.Transactions
@@ -263,59 +265,24 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
tx.cacheIPLD(txNode)
// Indexing
- // extract topic and contract data from the receipt for indexing
- mappedContracts := make(map[string]bool) // use map to avoid duplicate addresses
- logDataSet := make([]*models.LogsModel, len(receipt.Logs))
- for idx, l := range receipt.Logs {
- topicSet := make([]string, 4)
- for ti, topic := range l.Topics {
- topicSet[ti] = topic.Hex()
- }
-
- if !args.logLeafNodeCIDs[i][idx].Defined() {
- return fmt.Errorf("invalid log cid")
- }
-
- mappedContracts[l.Address.String()] = true
- logDataSet[idx] = &models.LogsModel{
- Address: l.Address.String(),
- Index: int64(l.Index),
- Data: l.Data,
- LeafCID: args.logLeafNodeCIDs[i][idx].String(),
- LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]),
- Topic0: topicSet[0],
- Topic1: topicSet[1],
- Topic2: topicSet[2],
- Topic3: topicSet[3],
- }
- }
- // these are the contracts seen in the logs
- logContracts := make([]string, 0, len(mappedContracts))
- for addr := range mappedContracts {
- logContracts = append(logContracts, addr)
- }
- // this is the contract address if this receipt is for a contract creation tx
- contract := shared.HandleZeroAddr(receipt.ContractAddress)
- var contractHash string
- if contract != "" {
- contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String()
- }
- // index tx first so that the receipt can reference it by FK
+ // index tx
trx := args.txs[i]
+ trxID := trx.Hash().String()
// derive sender for the tx that corresponds with this receipt
from, err := types.Sender(signer, trx)
if err != nil {
return fmt.Errorf("error deriving tx sender: %v", err)
}
txModel := models.TxModel{
- Dst: shared.HandleZeroAddrPointer(trx.To()),
- Src: shared.HandleZeroAddr(from),
- TxHash: trx.Hash().String(),
- Index: int64(i),
- Data: trx.Data(),
- CID: txNode.Cid().String(),
- MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
- Type: trx.Type(),
+ HeaderID: args.headerID,
+ Dst: shared.HandleZeroAddrPointer(trx.To()),
+ Src: shared.HandleZeroAddr(from),
+ TxHash: trxID,
+ Index: int64(i),
+ Data: trx.Data(),
+ CID: txNode.Cid().String(),
+ MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
+ Type: trx.Type(),
}
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", txModel); err != nil {
return err
@@ -328,6 +295,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
storageKeys[k] = storageKey.Hex()
}
accessListElementModel := models.AccessListElementModel{
+ TxID: trxID,
Index: int64(j),
Address: accessListElement.Address.Hex(),
StorageKeys: storageKeys,
@@ -337,12 +305,20 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
}
}
+ // this is the contract address if this receipt is for a contract creation tx
+ contract := shared.HandleZeroAddr(receipt.ContractAddress)
+ var contractHash string
+ if contract != "" {
+ contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String()
+ }
+
// index the receipt
if !args.rctLeafNodeCIDs[i].Defined() {
return fmt.Errorf("invalid receipt leaf node cid")
}
rctModel := &models.ReceiptModel{
+ TxID: trxID,
Contract: contract,
ContractHash: contractHash,
LeafCID: args.rctLeafNodeCIDs[i].String(),
@@ -359,6 +335,31 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
return err
}
+ logDataSet := make([]*models.LogsModel, len(receipt.Logs))
+ for idx, l := range receipt.Logs {
+ topicSet := make([]string, 4)
+ for ti, topic := range l.Topics {
+ topicSet[ti] = topic.Hex()
+ }
+
+ if !args.logLeafNodeCIDs[i][idx].Defined() {
+ return fmt.Errorf("invalid log cid")
+ }
+
+ logDataSet[idx] = &models.LogsModel{
+ ReceiptID: trxID,
+ Address: l.Address.String(),
+ Index: int64(l.Index),
+ Data: l.Data,
+ LeafCID: args.logLeafNodeCIDs[i][idx].String(),
+ LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]),
+ Topic0: topicSet[0],
+ Topic1: topicSet[1],
+ Topic2: topicSet[2],
+ Topic3: topicSet[3],
+ }
+ }
+
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", logDataSet); err != nil {
return err
}
@@ -374,7 +375,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
}
// PushStateNode publishes and indexes a state diff node object (including any child storage nodes) in the IPLD sql
-func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode) error {
+func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error {
tx, ok := batch.(*BatchTx)
if !ok {
return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch)
@@ -384,6 +385,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// short circuit if it is a Removed node
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present
stateModel := models.StateNodeModel{
+ HeaderID: headerID,
Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
CID: shared.RemovedNodeStateCID,
@@ -398,6 +400,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error generating and cacheing state node IPLD: %v", err)
}
stateModel := models.StateNodeModel{
+ HeaderID: headerID,
Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
CID: stateCIDStr,
@@ -422,6 +425,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error decoding state account rlp: %s", err.Error())
}
accountModel := models.StateAccountModel{
+ HeaderID: headerID,
+ StatePath: stateNode.Path,
Balance: account.Balance.String(),
Nonce: account.Nonce,
CodeHash: account.CodeHash,
@@ -437,6 +442,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// short circuit if it is a Removed node
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present
storageModel := models.StorageNodeModel{
+ HeaderID: headerID,
+ StatePath: stateNode.Path,
Path: storageNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: shared.RemovedNodeStorageCID,
@@ -453,6 +460,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err)
}
storageModel := models.StorageNodeModel{
+ HeaderID: headerID,
+ StatePath: stateNode.Path,
Path: storageNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: storageCIDStr,
@@ -482,7 +491,7 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAnd
return nil
}
-// Close satisfied io.Closer
+// Close satisfies io.Closer
func (sdi *StateDiffIndexer) Close() error {
return sdi.dump.Close()
}
diff --git a/statediff/indexer/database/file/batch_tx.go b/statediff/indexer/database/file/batch_tx.go
new file mode 100644
index 000000000..39e5d3713
--- /dev/null
+++ b/statediff/indexer/database/file/batch_tx.go
@@ -0,0 +1,29 @@
+// VulcanizeDB
+// Copyright © 2021 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package file
+
+// BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration
+type BatchTx struct {
+ BlockNumber uint64
+
+ submit func(blockTx *BatchTx, err error) error
+}
+
+// Submit satisfies indexer.AtomicTx
+func (tx *BatchTx) Submit(err error) error {
+ return tx.submit(tx, err)
+}
diff --git a/statediff/indexer/database/file/config.go b/statediff/indexer/database/file/config.go
new file mode 100644
index 000000000..2553174a3
--- /dev/null
+++ b/statediff/indexer/database/file/config.go
@@ -0,0 +1,33 @@
+// VulcanizeDB
+// Copyright © 2021 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package file
+
+import (
+ "github.com/ethereum/go-ethereum/statediff/indexer/node"
+ "github.com/ethereum/go-ethereum/statediff/indexer/shared"
+)
+
+// Config holds params for writing sql statements out to a file
+type Config struct {
+ FilePath string
+ NodeInfo node.Info
+}
+
+// Type satisfies interfaces.Config
+func (c Config) Type() shared.DBType {
+ return shared.FILE
+}
diff --git a/statediff/indexer/database/file/helpers.go b/statediff/indexer/database/file/helpers.go
new file mode 100644
index 000000000..dc635110c
--- /dev/null
+++ b/statediff/indexer/database/file/helpers.go
@@ -0,0 +1,60 @@
+// VulcanizeDB
+// Copyright © 2021 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package file
+
+import "bytes"
+
+// formatPostgresStringArray parses an array of strings into the proper Postgres string representation of that array
+func formatPostgresStringArray(a []string) string {
+ if a == nil {
+ return ""
+ }
+
+ if n := len(a); n > 0 {
+ // There will be at least two curly brackets, 2*N bytes of quotes,
+ // and N-1 bytes of delimiters.
+ b := make([]byte, 1, 1+3*n)
+ b[0] = '{'
+
+ b = appendArrayQuotedBytes(b, []byte(a[0]))
+ for i := 1; i < n; i++ {
+ b = append(b, ',')
+ b = appendArrayQuotedBytes(b, []byte(a[i]))
+ }
+
+ return string(append(b, '}'))
+ }
+
+ return "{}"
+}
+
+func appendArrayQuotedBytes(b, v []byte) []byte {
+ b = append(b, '"')
+ for {
+ i := bytes.IndexAny(v, `"\`)
+ if i < 0 {
+ b = append(b, v...)
+ break
+ }
+ if i > 0 {
+ b = append(b, v[:i]...)
+ }
+ b = append(b, '\\', v[i])
+ v = v[i+1:]
+ }
+ return append(b, '"')
+}
diff --git a/statediff/indexer/database/file/indexer.go b/statediff/indexer/database/file/indexer.go
new file mode 100644
index 000000000..1cc19480a
--- /dev/null
+++ b/statediff/indexer/database/file/indexer.go
@@ -0,0 +1,474 @@
+// VulcanizeDB
+// Copyright © 2021 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package file
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "math/big"
+ "os"
+ "sync"
+ "time"
+
+ "github.com/ipfs/go-cid"
+ node "github.com/ipfs/go-ipld-format"
+ "github.com/multiformats/go-multihash"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/metrics"
+ "github.com/ethereum/go-ethereum/params"
+ "github.com/ethereum/go-ethereum/rlp"
+ "github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
+ ipld2 "github.com/ethereum/go-ethereum/statediff/indexer/ipld"
+ "github.com/ethereum/go-ethereum/statediff/indexer/models"
+ "github.com/ethereum/go-ethereum/statediff/indexer/shared"
+ sdtypes "github.com/ethereum/go-ethereum/statediff/types"
+)
+
+const defaultFilePath = "./statediff.sql"
+
+var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
+
+var (
+ indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry)
+)
+
+// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void
+type StateDiffIndexer struct {
+ writer *SQLWriter
+ chainConfig *params.ChainConfig
+ nodeID string
+ wg *sync.WaitGroup
+}
+
+// NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer
+func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, config Config) (*StateDiffIndexer, error) {
+ filePath := config.FilePath
+ if filePath == "" {
+ filePath = defaultFilePath
+ }
+ if _, err := os.Stat(filePath); !errors.Is(err, os.ErrNotExist) {
+ return nil, fmt.Errorf("cannot create file, file (%s) already exists", filePath)
+ }
+ file, err := os.Create(filePath)
+ if err != nil {
+ return nil, fmt.Errorf("unable to create file (%s), err: %v", filePath, err)
+ }
+ w := NewSQLWriter(file)
+ wg := new(sync.WaitGroup)
+ w.Loop()
+ return &StateDiffIndexer{
+ writer: w,
+ chainConfig: chainConfig,
+ nodeID: config.NodeInfo.ID,
+ wg: wg,
+ }, nil
+}
+
+// ReportDBMetrics has nothing to report for dump
+func (sdi *StateDiffIndexer) ReportDBMetrics(time.Duration, <-chan bool) {}
+
+// PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts)
+// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
+func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
+ start, t := time.Now(), time.Now()
+ blockHash := block.Hash()
+ blockHashStr := blockHash.String()
+ height := block.NumberU64()
+ traceMsg := fmt.Sprintf("indexer stats for statediff at %d with hash %s:\r\n", height, blockHashStr)
+ transactions := block.Transactions()
+ // Derive any missing fields
+ if err := receipts.DeriveFields(sdi.chainConfig, blockHash, height, transactions); err != nil {
+ return nil, err
+ }
+
+ // Generate the block iplds
+ headerNode, uncleNodes, txNodes, txTrieNodes, rctNodes, rctTrieNodes, logTrieNodes, logLeafNodeCIDs, rctLeafNodeCIDs, err := ipld2.FromBlockAndReceipts(block, receipts)
+ if err != nil {
+ return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err)
+ }
+
+ if len(txNodes) != len(rctNodes) || len(rctNodes) != len(rctLeafNodeCIDs) {
+ return nil, fmt.Errorf("expected number of transactions (%d), receipts (%d), and receipt trie leaf nodes (%d) to be equal", len(txNodes), len(rctNodes), len(rctLeafNodeCIDs))
+ }
+ if len(txTrieNodes) != len(rctTrieNodes) {
+ return nil, fmt.Errorf("expected number of tx trie (%d) and rct trie (%d) nodes to be equal", len(txTrieNodes), len(rctTrieNodes))
+ }
+
+ // Calculate reward
+ var reward *big.Int
+ // in PoA networks block reward is 0
+ if sdi.chainConfig.Clique != nil {
+ reward = big.NewInt(0)
+ } else {
+ reward = shared.CalcEthBlockReward(block.Header(), block.Uncles(), block.Transactions(), receipts)
+ }
+ t = time.Now()
+
+ blockTx := &BatchTx{
+ BlockNumber: height,
+ submit: func(self *BatchTx, err error) error {
+ tDiff := time.Since(t)
+ indexerMetrics.tStateStoreCodeProcessing.Update(tDiff)
+ traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
+ t = time.Now()
+ if err := sdi.writer.flush(); err != nil {
+ traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
+ log.Debug(traceMsg)
+ return err
+ }
+ tDiff = time.Since(t)
+ indexerMetrics.tPostgresCommit.Update(tDiff)
+ traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
+ traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
+ log.Debug(traceMsg)
+ return err
+ },
+ }
+ tDiff := time.Since(t)
+ indexerMetrics.tFreePostgres.Update(tDiff)
+ traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
+ t = time.Now()
+
+ // write header, collect headerID
+ headerID := sdi.processHeader(block.Header(), headerNode, reward, totalDifficulty)
+ tDiff = time.Since(t)
+ indexerMetrics.tHeaderProcessing.Update(tDiff)
+ traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
+ t = time.Now()
+
+ // write uncles
+ sdi.processUncles(headerID, height, uncleNodes)
+ tDiff = time.Since(t)
+ indexerMetrics.tUncleProcessing.Update(tDiff)
+ traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
+ t = time.Now()
+
+ // write receipts and txs
+ err = sdi.processReceiptsAndTxs(processArgs{
+ headerID: headerID,
+ blockNumber: block.Number(),
+ receipts: receipts,
+ txs: transactions,
+ rctNodes: rctNodes,
+ rctTrieNodes: rctTrieNodes,
+ txNodes: txNodes,
+ txTrieNodes: txTrieNodes,
+ logTrieNodes: logTrieNodes,
+ logLeafNodeCIDs: logLeafNodeCIDs,
+ rctLeafNodeCIDs: rctLeafNodeCIDs,
+ })
+ if err != nil {
+ return nil, err
+ }
+ tDiff = time.Since(t)
+ indexerMetrics.tTxAndRecProcessing.Update(tDiff)
+ traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String())
+ t = time.Now()
+
+ return blockTx, err
+}
+
+// processHeader write a header IPLD insert SQL stmt to a file
+// it returns the headerID
+func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node.Node, reward, td *big.Int) string {
+ sdi.writer.upsertIPLDNode(headerNode)
+
+ var baseFee *int64
+ if header.BaseFee != nil {
+ baseFee = new(int64)
+ *baseFee = header.BaseFee.Int64()
+ }
+ headerID := header.Hash().String()
+ sdi.writer.upsertHeaderCID(models.HeaderModel{
+ NodeID: sdi.nodeID,
+ CID: headerNode.Cid().String(),
+ MhKey: shared.MultihashKeyFromCID(headerNode.Cid()),
+ ParentHash: header.ParentHash.String(),
+ BlockNumber: header.Number.String(),
+ BlockHash: headerID,
+ TotalDifficulty: td.String(),
+ Reward: reward.String(),
+ Bloom: header.Bloom.Bytes(),
+ StateRoot: header.Root.String(),
+ RctRoot: header.ReceiptHash.String(),
+ TxRoot: header.TxHash.String(),
+ UncleRoot: header.UncleHash.String(),
+ Timestamp: header.Time,
+ BaseFee: baseFee,
+ })
+ return headerID
+}
+
+// processUncles writes uncle IPLD insert SQL stmts to a file
+func (sdi *StateDiffIndexer) processUncles(headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) {
+ // publish and index uncles
+ for _, uncleNode := range uncleNodes {
+ sdi.writer.upsertIPLDNode(uncleNode)
+ var uncleReward *big.Int
+ // in PoA networks uncle reward is 0
+ if sdi.chainConfig.Clique != nil {
+ uncleReward = big.NewInt(0)
+ } else {
+ uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64())
+ }
+ sdi.writer.upsertUncleCID(models.UncleModel{
+ HeaderID: headerID,
+ CID: uncleNode.Cid().String(),
+ MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
+ ParentHash: uncleNode.ParentHash.String(),
+ BlockHash: uncleNode.Hash().String(),
+ Reward: uncleReward.String(),
+ })
+ }
+}
+
+// processArgs bundles arguments to processReceiptsAndTxs
+type processArgs struct {
+ headerID string
+ blockNumber *big.Int
+ receipts types.Receipts
+ txs types.Transactions
+ rctNodes []*ipld2.EthReceipt
+ rctTrieNodes []*ipld2.EthRctTrie
+ txNodes []*ipld2.EthTx
+ txTrieNodes []*ipld2.EthTxTrie
+ logTrieNodes [][]*ipld2.EthLogTrie
+ logLeafNodeCIDs [][]cid.Cid
+ rctLeafNodeCIDs []cid.Cid
+}
+
+// processReceiptsAndTxs writes receipt and tx IPLD insert SQL stmts to a file
+func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
+ // Process receipts and txs
+ signer := types.MakeSigner(sdi.chainConfig, args.blockNumber)
+ for i, receipt := range args.receipts {
+ for _, logTrieNode := range args.logTrieNodes[i] {
+ sdi.writer.upsertIPLDNode(logTrieNode)
+ }
+ txNode := args.txNodes[i]
+ sdi.writer.upsertIPLDNode(txNode)
+
+ // index tx
+ trx := args.txs[i]
+ txID := trx.Hash().String()
+ // derive sender for the tx that corresponds with this receipt
+ from, err := types.Sender(signer, trx)
+ if err != nil {
+ return fmt.Errorf("error deriving tx sender: %v", err)
+ }
+ txModel := models.TxModel{
+ HeaderID: args.headerID,
+ Dst: shared.HandleZeroAddrPointer(trx.To()),
+ Src: shared.HandleZeroAddr(from),
+ TxHash: txID,
+ Index: int64(i),
+ Data: trx.Data(),
+ CID: txNode.Cid().String(),
+ MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
+ Type: trx.Type(),
+ }
+ sdi.writer.upsertTransactionCID(txModel)
+
+ // index access list if this is one
+ for j, accessListElement := range trx.AccessList() {
+ storageKeys := make([]string, len(accessListElement.StorageKeys))
+ for k, storageKey := range accessListElement.StorageKeys {
+ storageKeys[k] = storageKey.Hex()
+ }
+ accessListElementModel := models.AccessListElementModel{
+ TxID: txID,
+ Index: int64(j),
+ Address: accessListElement.Address.Hex(),
+ StorageKeys: storageKeys,
+ }
+ sdi.writer.upsertAccessListElement(accessListElementModel)
+ }
+
+ // this is the contract address if this receipt is for a contract creation tx
+ contract := shared.HandleZeroAddr(receipt.ContractAddress)
+ var contractHash string
+ if contract != "" {
+ contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String()
+ }
+
+ // index receipt
+ if !args.rctLeafNodeCIDs[i].Defined() {
+ return fmt.Errorf("invalid receipt leaf node cid")
+ }
+
+ rctModel := &models.ReceiptModel{
+ TxID: txID,
+ Contract: contract,
+ ContractHash: contractHash,
+ LeafCID: args.rctLeafNodeCIDs[i].String(),
+ LeafMhKey: shared.MultihashKeyFromCID(args.rctLeafNodeCIDs[i]),
+ LogRoot: args.rctNodes[i].LogRoot.String(),
+ }
+ if len(receipt.PostState) == 0 {
+ rctModel.PostStatus = receipt.Status
+ } else {
+ rctModel.PostState = common.Bytes2Hex(receipt.PostState)
+ }
+ sdi.writer.upsertReceiptCID(rctModel)
+
+ // index logs
+ logDataSet := make([]*models.LogsModel, len(receipt.Logs))
+ for idx, l := range receipt.Logs {
+ topicSet := make([]string, 4)
+ for ti, topic := range l.Topics {
+ topicSet[ti] = topic.Hex()
+ }
+
+ if !args.logLeafNodeCIDs[i][idx].Defined() {
+ return fmt.Errorf("invalid log cid")
+ }
+
+ logDataSet[idx] = &models.LogsModel{
+ ReceiptID: txID,
+ Address: l.Address.String(),
+ Index: int64(l.Index),
+ Data: l.Data,
+ LeafCID: args.logLeafNodeCIDs[i][idx].String(),
+ LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]),
+ Topic0: topicSet[0],
+ Topic1: topicSet[1],
+ Topic2: topicSet[2],
+ Topic3: topicSet[3],
+ }
+ }
+ sdi.writer.upsertLogCID(logDataSet)
+ }
+
+ // publish trie nodes, these aren't indexed directly
+ for i, n := range args.txTrieNodes {
+ sdi.writer.upsertIPLDNode(n)
+ sdi.writer.upsertIPLDNode(args.rctTrieNodes[i])
+ }
+
+ return nil
+}
+
+// PushStateNode writes a state diff node object (including any child storage nodes) IPLD insert SQL stmt to a file
+func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error {
+ // publish the state node
+ if stateNode.NodeType == sdtypes.Removed {
+ // short circuit if it is a Removed node
+ // this assumes the db has been initialized and a public.blocks entry for the Removed node is present
+ stateModel := models.StateNodeModel{
+ HeaderID: headerID,
+ Path: stateNode.Path,
+ StateKey: common.BytesToHash(stateNode.LeafKey).String(),
+ CID: shared.RemovedNodeStateCID,
+ MhKey: shared.RemovedNodeMhKey,
+ NodeType: stateNode.NodeType.Int(),
+ }
+ sdi.writer.upsertStateCID(stateModel)
+ return nil
+ }
+ stateCIDStr, stateMhKey, err := sdi.writer.upsertIPLDRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
+ if err != nil {
+ return fmt.Errorf("error generating and cacheing state node IPLD: %v", err)
+ }
+ stateModel := models.StateNodeModel{
+ HeaderID: headerID,
+ Path: stateNode.Path,
+ StateKey: common.BytesToHash(stateNode.LeafKey).String(),
+ CID: stateCIDStr,
+ MhKey: stateMhKey,
+ NodeType: stateNode.NodeType.Int(),
+ }
+ // index the state node
+ sdi.writer.upsertStateCID(stateModel)
+ // if we have a leaf, decode and index the account data
+ if stateNode.NodeType == sdtypes.Leaf {
+ var i []interface{}
+ if err := rlp.DecodeBytes(stateNode.NodeValue, &i); err != nil {
+ return fmt.Errorf("error decoding state leaf node rlp: %s", err.Error())
+ }
+ if len(i) != 2 {
+ return fmt.Errorf("eth IPLDPublisher expected state leaf node rlp to decode into two elements")
+ }
+ var account types.StateAccount
+ if err := rlp.DecodeBytes(i[1].([]byte), &account); err != nil {
+ return fmt.Errorf("error decoding state account rlp: %s", err.Error())
+ }
+ accountModel := models.StateAccountModel{
+ HeaderID: headerID,
+ StatePath: stateNode.Path,
+ Balance: account.Balance.String(),
+ Nonce: account.Nonce,
+ CodeHash: account.CodeHash,
+ StorageRoot: account.Root.String(),
+ }
+ sdi.writer.upsertStateAccount(accountModel)
+ }
+ // if there are any storage nodes associated with this node, publish and index them
+ for _, storageNode := range stateNode.StorageNodes {
+ if storageNode.NodeType == sdtypes.Removed {
+ // short circuit if it is a Removed node
+ // this assumes the db has been initialized and a public.blocks entry for the Removed node is present
+ storageModel := models.StorageNodeModel{
+ HeaderID: headerID,
+ StatePath: stateNode.Path,
+ Path: storageNode.Path,
+ StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
+ CID: shared.RemovedNodeStorageCID,
+ MhKey: shared.RemovedNodeMhKey,
+ NodeType: storageNode.NodeType.Int(),
+ }
+ sdi.writer.upsertStorageCID(storageModel)
+ continue
+ }
+ storageCIDStr, storageMhKey, err := sdi.writer.upsertIPLDRaw(ipld2.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue)
+ if err != nil {
+ return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err)
+ }
+ storageModel := models.StorageNodeModel{
+ HeaderID: headerID,
+ StatePath: stateNode.Path,
+ Path: storageNode.Path,
+ StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
+ CID: storageCIDStr,
+ MhKey: storageMhKey,
+ NodeType: storageNode.NodeType.Int(),
+ }
+ sdi.writer.upsertStorageCID(storageModel)
+ }
+
+ return nil
+}
+
+// PushCodeAndCodeHash writes code and codehash pairs insert SQL stmts to a file
+func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error {
+ // codec doesn't matter since db key is multihash-based
+ mhKey, err := shared.MultihashKeyFromKeccak256(codeAndCodeHash.Hash)
+ if err != nil {
+ return fmt.Errorf("error deriving multihash key from codehash: %v", err)
+ }
+ sdi.writer.upsertIPLDDirect(mhKey, codeAndCodeHash.Code)
+ return nil
+}
+
+// Close satisfies io.Closer
+func (sdi *StateDiffIndexer) Close() error {
+ return sdi.writer.Close()
+}
diff --git a/statediff/indexer/database/file/metrics.go b/statediff/indexer/database/file/metrics.go
new file mode 100644
index 000000000..ca6e88f2b
--- /dev/null
+++ b/statediff/indexer/database/file/metrics.go
@@ -0,0 +1,94 @@
+// VulcanizeDB
+// Copyright © 2021 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package file
+
+import (
+ "strings"
+
+ "github.com/ethereum/go-ethereum/metrics"
+)
+
+const (
+ namespace = "statediff"
+)
+
+// Build a fully qualified metric name
+func metricName(subsystem, name string) string {
+ if name == "" {
+ return ""
+ }
+ parts := []string{namespace, name}
+ if subsystem != "" {
+ parts = []string{namespace, subsystem, name}
+ }
+ // Prometheus uses _ but geth metrics uses / and replaces
+ return strings.Join(parts, "/")
+}
+
+type indexerMetricsHandles struct {
+ // The total number of processed blocks
+ blocks metrics.Counter
+ // The total number of processed transactions
+ transactions metrics.Counter
+ // The total number of processed receipts
+ receipts metrics.Counter
+ // The total number of processed logs
+ logs metrics.Counter
+ // The total number of access list entries processed
+ accessListEntries metrics.Counter
+ // Time spent waiting for free postgres tx
+ tFreePostgres metrics.Timer
+ // Postgres transaction commit duration
+ tPostgresCommit metrics.Timer
+ // Header processing time
+ tHeaderProcessing metrics.Timer
+ // Uncle processing time
+ tUncleProcessing metrics.Timer
+ // Tx and receipt processing time
+ tTxAndRecProcessing metrics.Timer
+ // State, storage, and code combined processing time
+ tStateStoreCodeProcessing metrics.Timer
+}
+
+func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles {
+ ctx := indexerMetricsHandles{
+ blocks: metrics.NewCounter(),
+ transactions: metrics.NewCounter(),
+ receipts: metrics.NewCounter(),
+ logs: metrics.NewCounter(),
+ accessListEntries: metrics.NewCounter(),
+ tFreePostgres: metrics.NewTimer(),
+ tPostgresCommit: metrics.NewTimer(),
+ tHeaderProcessing: metrics.NewTimer(),
+ tUncleProcessing: metrics.NewTimer(),
+ tTxAndRecProcessing: metrics.NewTimer(),
+ tStateStoreCodeProcessing: metrics.NewTimer(),
+ }
+ subsys := "indexer"
+ reg.Register(metricName(subsys, "blocks"), ctx.blocks)
+ reg.Register(metricName(subsys, "transactions"), ctx.transactions)
+ reg.Register(metricName(subsys, "receipts"), ctx.receipts)
+ reg.Register(metricName(subsys, "logs"), ctx.logs)
+ reg.Register(metricName(subsys, "access_list_entries"), ctx.accessListEntries)
+ reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres)
+ reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit)
+ reg.Register(metricName(subsys, "t_header_processing"), ctx.tHeaderProcessing)
+ reg.Register(metricName(subsys, "t_uncle_processing"), ctx.tUncleProcessing)
+ reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.tTxAndRecProcessing)
+ reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.tStateStoreCodeProcessing)
+ return ctx
+}
diff --git a/statediff/indexer/database/file/writer.go b/statediff/indexer/database/file/writer.go
new file mode 100644
index 000000000..5ee169229
--- /dev/null
+++ b/statediff/indexer/database/file/writer.go
@@ -0,0 +1,248 @@
+// VulcanizeDB
+// Copyright © 2019 Vulcanize
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+package file
+
+import (
+ "fmt"
+ "os"
+
+ blockstore "github.com/ipfs/go-ipfs-blockstore"
+ dshelp "github.com/ipfs/go-ipfs-ds-help"
+ node "github.com/ipfs/go-ipld-format"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/statediff/indexer/ipld"
+ "github.com/ethereum/go-ethereum/statediff/indexer/models"
+ nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
+)
+
+var (
+ nullHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000")
+ collatedStmtSize = 65336 // min(linuxPipeSize, macOSPipeSize)
+)
+
+// SQLWriter writes sql statements to a file
+type SQLWriter struct {
+ file *os.File
+ stmts chan []byte
+ collatedStmt []byte
+ collationIndex int
+
+ quitChan chan struct{}
+ doneChan chan struct{}
+}
+
+// NewSQLWriter creates a new pointer to a Writer
+func NewSQLWriter(file *os.File) *SQLWriter {
+ return &SQLWriter{
+ file: file,
+ stmts: make(chan []byte),
+ collatedStmt: make([]byte, collatedStmtSize),
+ quitChan: make(chan struct{}),
+ doneChan: make(chan struct{}),
+ }
+}
+
+// Loop enables concurrent writes to the underlying os.File
+// since os.File does not buffer, it utilizes an internal buffer that is the size of a unix pipe
+// by using copy() and tracking the index/size of the buffer, we require only the initial memory allocation
+func (sqw *SQLWriter) Loop() {
+ sqw.collationIndex = 0
+ go func() {
+ defer close(sqw.doneChan)
+ var l int
+ for {
+ select {
+ case stmt := <-sqw.stmts:
+ l = len(stmt)
+ if l+sqw.collationIndex+1 > collatedStmtSize {
+ if err := sqw.flush(); err != nil {
+ log.Error("error writing cached sql stmts to file", "err", err)
+ }
+ }
+ copy(sqw.collatedStmt[sqw.collationIndex:sqw.collationIndex+l-1], stmt)
+ sqw.collationIndex += l
+ case <-sqw.quitChan:
+ if err := sqw.flush(); err != nil {
+ log.Error("error writing cached sql stmts to file", "err", err)
+ }
+ return
+ }
+ }
+ }()
+}
+
+// Close satisfies io.Closer
+func (sqw *SQLWriter) Close() error {
+ close(sqw.quitChan)
+ <-sqw.doneChan
+ return nil
+}
+
+func (sqw *SQLWriter) flush() error {
+ if _, err := sqw.file.Write(sqw.collatedStmt[0 : sqw.collationIndex-1]); err != nil {
+ return err
+ }
+ sqw.collationIndex = 0
+ return nil
+}
+
+const (
+ nodeInsert = `INSERT INTO nodes (genesis_block, network_id, node_id, client_name, chain_id) VALUES (%s, %s, %s, %s, %d)
+ ON CONFLICT (node_id) DO NOTHING;\n`
+
+ ipldInsert = `INSERT INTO public.blocks (key, data) VALUES (%s, %x) ON CONFLICT (key) DO NOTHING;\n`
+
+ headerInsert = `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee)
+VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d, %s, %d, %d)
+ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d, %s, eth.header_cids.times_validated + 1, %d);\n`
+
+ headerInsertWithoutBaseFee = `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee)
+VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d, %s, %d, NULL)
+ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d, %s, eth.header_cids.times_validated + 1, NULL);\n`
+
+ uncleInsert = `INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES (%s, %s, %s, %s, %s, %s)
+ON CONFLICT (block_hash) DO NOTHING;\n`
+
+ txInsert = `INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) VALUES (%s, %s, %s, %s, %s, %d, %s, %s, %d)
+ON CONFLICT (tx_hash) DO NOTHING;\n`
+
+ alInsert = `INSERT INTO eth.access_list_element (tx_id, index, address, storage_keys) VALUES (%s, %d, %s, %s)
+ON CONFLICT (tx_id, index) DO NOTHING;\n`
+
+ rctInsert = `INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES (%s, %s, %s, %s, %s, %s, %d, %s)
+ON CONFLICT (tx_id) DO NOTHING;\n`
+
+ logInsert = `INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES (%s, %s, %s, %s, %d, %s, %s, %s, %s, %s)
+ON CONFLICT (rct_id, index) DO NOTHING;\n`
+
+ stateInsert = `INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES (%s, %s, %s, %s, %d, %t, %s)
+ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = (%s, %s, %d, %t, %s);\n`
+
+ accountInsert = `INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) VALUES (%s, %s, %s, %d, %s, %s)
+ON CONFLICT (header_id, state_path) DO NOTHING;\n`
+
+ storageInsert = `INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES (%s, %s, %s, %s, %s, %d, %t, %s)
+ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = (%s, %s, %d, %t, %s);\n`
+)
+
+// ON CONFLICT (node_id) DO UPDATE SET genesis_block = %s, network_id = %s, client_name = %s, chain_id = %s;\n`
+
+func (sqw *SQLWriter) upsertNode(node nodeinfo.Info) {
+ sqw.stmts <- []byte(fmt.Sprintf(nodeInsert, node.GenesisBlock, node.NetworkID, node.ID, node.ClientName, node.ChainID))
+}
+
+func (sqw *SQLWriter) upsertIPLD(ipld models.IPLDModel) {
+ sqw.stmts <- []byte(fmt.Sprintf(ipldInsert, ipld.Key, ipld.Data))
+}
+
+func (sqw *SQLWriter) upsertIPLDDirect(key string, value []byte) {
+ sqw.upsertIPLD(models.IPLDModel{
+ Key: key,
+ Data: value,
+ })
+}
+
+func (sqw *SQLWriter) upsertIPLDNode(i node.Node) {
+ sqw.upsertIPLD(models.IPLDModel{
+ Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(),
+ Data: i.RawData(),
+ })
+}
+
+func (sqw *SQLWriter) upsertIPLDRaw(codec, mh uint64, raw []byte) (string, string, error) {
+ c, err := ipld.RawdataToCid(codec, raw, mh)
+ if err != nil {
+ return "", "", err
+ }
+ prefixedKey := blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(c.Hash()).String()
+ sqw.upsertIPLD(models.IPLDModel{
+ Key: prefixedKey,
+ Data: raw,
+ })
+ return c.String(), prefixedKey, err
+}
+
+func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) {
+ var stmt string
+ if header.BaseFee == nil {
+ stmt = fmt.Sprintf(headerInsertWithoutBaseFee, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
+ header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot,
+ header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1,
+ header.ParentHash, header.CID, header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot,
+ header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey)
+ } else {
+ stmt = fmt.Sprintf(headerInsert, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
+ header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot,
+ header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.BaseFee,
+ header.ParentHash, header.CID, header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot,
+ header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, header.BaseFee)
+ }
+ sqw.stmts <- []byte(stmt)
+ indexerMetrics.blocks.Inc(1)
+}
+
+func (sqw *SQLWriter) upsertUncleCID(uncle models.UncleModel) {
+ sqw.stmts <- []byte(fmt.Sprintf(uncleInsert, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey))
+}
+
+func (sqw *SQLWriter) upsertTransactionCID(transaction models.TxModel) {
+ sqw.stmts <- []byte(fmt.Sprintf(txInsert, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type))
+ indexerMetrics.transactions.Inc(1)
+}
+
+func (sqw *SQLWriter) upsertAccessListElement(accessListElement models.AccessListElementModel) {
+ sqw.stmts <- []byte(fmt.Sprintf(alInsert, accessListElement.TxID, accessListElement.Index, accessListElement.Address, formatPostgresStringArray(accessListElement.StorageKeys)))
+ indexerMetrics.accessListEntries.Inc(1)
+}
+
+func (sqw *SQLWriter) upsertReceiptCID(rct *models.ReceiptModel) {
+ sqw.stmts <- []byte(fmt.Sprintf(rctInsert, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot))
+ indexerMetrics.receipts.Inc(1)
+}
+
+func (sqw *SQLWriter) upsertLogCID(logs []*models.LogsModel) {
+ for _, l := range logs {
+ sqw.stmts <- []byte(fmt.Sprintf(logInsert, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0, l.Topic1, l.Topic2, l.Topic3, l.Data))
+ indexerMetrics.logs.Inc(1)
+ }
+}
+
+func (sqw *SQLWriter) upsertStateCID(stateNode models.StateNodeModel) {
+ var stateKey string
+ if stateNode.StateKey != nullHash.String() {
+ stateKey = stateNode.StateKey
+ }
+ sqw.stmts <- []byte(fmt.Sprintf(stateInsert, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType,
+ true, stateNode.MhKey, stateKey, stateNode.CID, stateNode.NodeType, true, stateNode.MhKey))
+}
+
+func (sqw *SQLWriter) upsertStateAccount(stateAccount models.StateAccountModel) {
+ sqw.stmts <- []byte(fmt.Sprintf(accountInsert, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance,
+ stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot))
+}
+
+func (sqw *SQLWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
+ var storageKey string
+ if storageCID.StorageKey != nullHash.String() {
+ storageKey = storageCID.StorageKey
+ }
+ sqw.stmts <- []byte(fmt.Sprintf(storageInsert, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID,
+ storageCID.Path, storageCID.NodeType, true, storageCID.MhKey, storageKey, storageCID.CID, storageCID.NodeType,
+ true, storageCID.MhKey))
+}
diff --git a/statediff/indexer/database/sql/batch_tx.go b/statediff/indexer/database/sql/batch_tx.go
index ff847eec6..fb1b289a1 100644
--- a/statediff/indexer/database/sql/batch_tx.go
+++ b/statediff/indexer/database/sql/batch_tx.go
@@ -34,7 +34,6 @@ type BatchTx struct {
BlockNumber uint64
ctx context.Context
dbtx Tx
- headerID int64
stm string
quit chan struct{}
iplds chan models.IPLDModel
diff --git a/statediff/indexer/database/sql/indexer.go b/statediff/indexer/database/sql/indexer.go
index fad68bf96..b557ec903 100644
--- a/statediff/indexer/database/sql/indexer.go
+++ b/statediff/indexer/database/sql/indexer.go
@@ -187,7 +187,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
t = time.Now()
// Publish and index header, collect headerID
- var headerID int64
+ var headerID string
headerID, err = sdi.processHeader(blockTx, block.Header(), headerNode, reward, totalDifficulty)
if err != nil {
return nil, err
@@ -227,13 +227,12 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String())
t = time.Now()
- blockTx.headerID = headerID
return blockTx, err
}
// processHeader publishes and indexes a header IPLD in Postgres
// it returns the headerID
-func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (int64, error) {
+func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (string, error) {
tx.cacheIPLD(headerNode)
var baseFee *int64
@@ -241,14 +240,14 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
baseFee = new(int64)
*baseFee = header.BaseFee.Int64()
}
-
+ headerID := header.Hash().String()
// index header
- return sdi.dbWriter.upsertHeaderCID(tx.dbtx, models.HeaderModel{
+ return headerID, sdi.dbWriter.upsertHeaderCID(tx.dbtx, models.HeaderModel{
CID: headerNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(headerNode.Cid()),
ParentHash: header.ParentHash.String(),
BlockNumber: header.Number.String(),
- BlockHash: header.Hash().String(),
+ BlockHash: headerID,
TotalDifficulty: td.String(),
Reward: reward.String(),
Bloom: header.Bloom.Bytes(),
@@ -262,7 +261,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
}
// processUncles publishes and indexes uncle IPLDs in Postgres
-func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID int64, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error {
+func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) error {
// publish and index uncles
for _, uncleNode := range uncleNodes {
tx.cacheIPLD(uncleNode)
@@ -274,13 +273,14 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID int64, blockNum
uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64())
}
uncle := models.UncleModel{
+ HeaderID: headerID,
CID: uncleNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
ParentHash: uncleNode.ParentHash.String(),
BlockHash: uncleNode.Hash().String(),
Reward: uncleReward.String(),
}
- if err := sdi.dbWriter.upsertUncleCID(tx.dbtx, uncle, headerID); err != nil {
+ if err := sdi.dbWriter.upsertUncleCID(tx.dbtx, uncle); err != nil {
return err
}
}
@@ -289,7 +289,7 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID int64, blockNum
// processArgs bundles arguments to processReceiptsAndTxs
type processArgs struct {
- headerID int64
+ headerID string
blockNumber *big.Int
receipts types.Receipts
txs types.Transactions
@@ -313,63 +313,26 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
txNode := args.txNodes[i]
tx.cacheIPLD(txNode)
- // Indexing
- // extract topic and contract data from the receipt for indexing
- mappedContracts := make(map[string]bool) // use map to avoid duplicate addresses
- logDataSet := make([]*models.LogsModel, len(receipt.Logs))
- for idx, l := range receipt.Logs {
- topicSet := make([]string, 4)
- for ti, topic := range l.Topics {
- topicSet[ti] = topic.Hex()
- }
-
- if !args.logLeafNodeCIDs[i][idx].Defined() {
- return fmt.Errorf("invalid log cid")
- }
-
- mappedContracts[l.Address.String()] = true
- logDataSet[idx] = &models.LogsModel{
- Address: l.Address.String(),
- Index: int64(l.Index),
- Data: l.Data,
- LeafCID: args.logLeafNodeCIDs[i][idx].String(),
- LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]),
- Topic0: topicSet[0],
- Topic1: topicSet[1],
- Topic2: topicSet[2],
- Topic3: topicSet[3],
- }
- }
- // these are the contracts seen in the logs
- logContracts := make([]string, 0, len(mappedContracts))
- for addr := range mappedContracts {
- logContracts = append(logContracts, addr)
- }
- // this is the contract address if this receipt is for a contract creation tx
- contract := shared.HandleZeroAddr(receipt.ContractAddress)
- var contractHash string
- if contract != "" {
- contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String()
- }
- // index tx first so that the receipt can reference it by FK
+ // index tx
trx := args.txs[i]
+ txID := trx.Hash().String()
// derive sender for the tx that corresponds with this receipt
from, err := types.Sender(signer, trx)
if err != nil {
return fmt.Errorf("error deriving tx sender: %v", err)
}
txModel := models.TxModel{
- Dst: shared.HandleZeroAddrPointer(trx.To()),
- Src: shared.HandleZeroAddr(from),
- TxHash: trx.Hash().String(),
- Index: int64(i),
- Data: trx.Data(),
- CID: txNode.Cid().String(),
- MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
- Type: trx.Type(),
+ HeaderID: args.headerID,
+ Dst: shared.HandleZeroAddrPointer(trx.To()),
+ Src: shared.HandleZeroAddr(from),
+ TxHash: txID,
+ Index: int64(i),
+ Data: trx.Data(),
+ CID: txNode.Cid().String(),
+ MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
+ Type: trx.Type(),
}
- txID, err := sdi.dbWriter.upsertTransactionCID(tx.dbtx, txModel, args.headerID)
- if err != nil {
+ if err := sdi.dbWriter.upsertTransactionCID(tx.dbtx, txModel); err != nil {
return err
}
@@ -380,21 +343,30 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
storageKeys[k] = storageKey.Hex()
}
accessListElementModel := models.AccessListElementModel{
+ TxID: txID,
Index: int64(j),
Address: accessListElement.Address.Hex(),
StorageKeys: storageKeys,
}
- if err := sdi.dbWriter.upsertAccessListElement(tx.dbtx, accessListElementModel, txID); err != nil {
+ if err := sdi.dbWriter.upsertAccessListElement(tx.dbtx, accessListElementModel); err != nil {
return err
}
}
- // index the receipt
+ // this is the contract address if this receipt is for a contract creation tx
+ contract := shared.HandleZeroAddr(receipt.ContractAddress)
+ var contractHash string
+ if contract != "" {
+ contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String()
+ }
+
+ // index receipt
if !args.rctLeafNodeCIDs[i].Defined() {
return fmt.Errorf("invalid receipt leaf node cid")
}
rctModel := &models.ReceiptModel{
+ TxID: txID,
Contract: contract,
ContractHash: contractHash,
LeafCID: args.rctLeafNodeCIDs[i].String(),
@@ -407,12 +379,37 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
rctModel.PostState = common.Bytes2Hex(receipt.PostState)
}
- receiptID, err := sdi.dbWriter.upsertReceiptCID(tx.dbtx, rctModel, txID)
- if err != nil {
+ if err := sdi.dbWriter.upsertReceiptCID(tx.dbtx, rctModel); err != nil {
return err
}
- if err = sdi.dbWriter.upsertLogCID(tx.dbtx, logDataSet, receiptID); err != nil {
+ // index logs
+ logDataSet := make([]*models.LogsModel, len(receipt.Logs))
+ for idx, l := range receipt.Logs {
+ topicSet := make([]string, 4)
+ for ti, topic := range l.Topics {
+ topicSet[ti] = topic.Hex()
+ }
+
+ if !args.logLeafNodeCIDs[i][idx].Defined() {
+ return fmt.Errorf("invalid log cid")
+ }
+
+ logDataSet[idx] = &models.LogsModel{
+ ReceiptID: txID,
+ Address: l.Address.String(),
+ Index: int64(l.Index),
+ Data: l.Data,
+ LeafCID: args.logLeafNodeCIDs[i][idx].String(),
+ LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]),
+ Topic0: topicSet[0],
+ Topic1: topicSet[1],
+ Topic2: topicSet[2],
+ Topic3: topicSet[3],
+ }
+ }
+
+ if err := sdi.dbWriter.upsertLogCID(tx.dbtx, logDataSet); err != nil {
return err
}
}
@@ -427,7 +424,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
}
// PushStateNode publishes and indexes a state diff node object (including any child storage nodes) in the IPLD sql
-func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode) error {
+func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error {
tx, ok := batch.(*BatchTx)
if !ok {
return fmt.Errorf("sql batch is expected to be of type %T, got %T", &BatchTx{}, batch)
@@ -437,29 +434,29 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// short circuit if it is a Removed node
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present
stateModel := models.StateNodeModel{
+ HeaderID: headerID,
Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
CID: shared.RemovedNodeStateCID,
MhKey: shared.RemovedNodeMhKey,
NodeType: stateNode.NodeType.Int(),
}
- _, err := sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel, tx.headerID)
- return err
+ return sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel)
}
stateCIDStr, stateMhKey, err := tx.cacheRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
if err != nil {
return fmt.Errorf("error generating and cacheing state node IPLD: %v", err)
}
stateModel := models.StateNodeModel{
+ HeaderID: headerID,
Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
CID: stateCIDStr,
MhKey: stateMhKey,
NodeType: stateNode.NodeType.Int(),
}
- // index the state node, collect the stateID to reference by FK
- stateID, err := sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel, tx.headerID)
- if err != nil {
+ // index the state node
+ if err := sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel); err != nil {
return err
}
// if we have a leaf, decode and index the account data
@@ -476,12 +473,14 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error decoding state account rlp: %s", err.Error())
}
accountModel := models.StateAccountModel{
+ HeaderID: headerID,
+ StatePath: stateNode.Path,
Balance: account.Balance.String(),
Nonce: account.Nonce,
CodeHash: account.CodeHash,
StorageRoot: account.Root.String(),
}
- if err := sdi.dbWriter.upsertStateAccount(tx.dbtx, accountModel, stateID); err != nil {
+ if err := sdi.dbWriter.upsertStateAccount(tx.dbtx, accountModel); err != nil {
return err
}
}
@@ -491,13 +490,15 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// short circuit if it is a Removed node
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present
storageModel := models.StorageNodeModel{
+ HeaderID: headerID,
+ StatePath: stateNode.Path,
Path: storageNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: shared.RemovedNodeStorageCID,
MhKey: shared.RemovedNodeMhKey,
NodeType: storageNode.NodeType.Int(),
}
- if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel, stateID); err != nil {
+ if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel); err != nil {
return err
}
continue
@@ -507,13 +508,15 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err)
}
storageModel := models.StorageNodeModel{
+ HeaderID: headerID,
+ StatePath: stateNode.Path,
Path: storageNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: storageCIDStr,
MhKey: storageMhKey,
NodeType: storageNode.NodeType.Int(),
}
- if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel, stateID); err != nil {
+ if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel); err != nil {
return err
}
}
@@ -536,7 +539,7 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAnd
return nil
}
-// Close satisfied io.Closer
+// Close satisfies io.Closer
func (sdi *StateDiffIndexer) Close() error {
return sdi.dbWriter.db.Close()
}
diff --git a/statediff/indexer/database/sql/interfaces.go b/statediff/indexer/database/sql/interfaces.go
index 3ed1a11e7..445b35d9b 100644
--- a/statediff/indexer/database/sql/interfaces.go
+++ b/statediff/indexer/database/sql/interfaces.go
@@ -36,7 +36,7 @@ type Driver interface {
Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error
Begin(ctx context.Context) (Tx, error)
Stats() Stats
- NodeID() int64
+ NodeID() string
Context() context.Context
io.Closer
}
diff --git a/statediff/indexer/database/sql/pgx_indexer_legacy_test.go b/statediff/indexer/database/sql/pgx_indexer_legacy_test.go
index d08336e63..21b74b3b2 100644
--- a/statediff/indexer/database/sql/pgx_indexer_legacy_test.go
+++ b/statediff/indexer/database/sql/pgx_indexer_legacy_test.go
@@ -20,7 +20,6 @@ import (
"context"
"testing"
- "github.com/jmoiron/sqlx"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
@@ -53,7 +52,7 @@ func setupLegacyPGX(t *testing.T) {
}
}()
for _, node := range legacyData.StateDiffs {
- err = ind.PushStateNode(tx, node)
+ err = ind.PushStateNode(tx, node, legacyData.MockBlock.Hash().String())
require.NoError(t, err)
}
@@ -64,20 +63,21 @@ func TestPGXIndexerLegacy(t *testing.T) {
t.Run("Publish and index header IPLDs in a legacy tx", func(t *testing.T) {
setupLegacyPGX(t)
defer tearDown(t)
- pgStr := `SELECT cid, td, reward, id, base_fee
+ pgStr := `SELECT cid, cast(td AS TEXT), cast(reward AS TEXT), block_hash, base_fee
FROM eth.header_cids
WHERE block_number = $1`
// check header was properly indexed
type res struct {
- CID string
- TD string
- Reward string
- ID int
- BaseFee *int64 `db:"base_fee"`
+ CID string
+ TD string
+ Reward string
+ BlockHash string `db:"block_hash"`
+ BaseFee *int64 `db:"base_fee"`
}
header := new(res)
- err = db.QueryRow(context.Background(), pgStr, legacyData.BlockNumber.Uint64()).(*sqlx.Row).StructScan(header)
+ err = db.QueryRow(context.Background(), pgStr, legacyData.BlockNumber.Uint64()).Scan(
+ &header.CID, &header.TD, &header.Reward, &header.BlockHash, &header.BaseFee)
require.NoError(t, err)
test_helpers.ExpectEqual(t, header.CID, legacyHeaderCID.String())
diff --git a/statediff/indexer/database/sql/pgx_indexer_test.go b/statediff/indexer/database/sql/pgx_indexer_test.go
index f63efe712..a86927341 100644
--- a/statediff/indexer/database/sql/pgx_indexer_test.go
+++ b/statediff/indexer/database/sql/pgx_indexer_test.go
@@ -26,7 +26,6 @@ import (
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help"
- "github.com/jmoiron/sqlx"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
@@ -140,7 +139,7 @@ func setupPGX(t *testing.T) {
}
}()
for _, node := range mocks.StateDiffs {
- err = ind.PushStateNode(tx, node)
+ err = ind.PushStateNode(tx, node, mockBlock.Hash().String())
if err != nil {
t.Fatal(err)
}
@@ -153,19 +152,24 @@ func TestPGXIndexer(t *testing.T) {
t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) {
setupPGX(t)
defer tearDown(t)
- pgStr := `SELECT cid, td, reward, id, base_fee
+ pgStr := `SELECT cid, cast(td AS TEXT), cast(reward AS TEXT), block_hash, base_fee
FROM eth.header_cids
WHERE block_number = $1`
// check header was properly indexed
type res struct {
- CID string
- TD string
- Reward string
- ID int
- BaseFee *int64 `db:"base_fee"`
+ CID string
+ TD string
+ Reward string
+ BlockHash string `db:"block_hash"`
+ BaseFee *int64 `db:"base_fee"`
}
header := new(res)
- err = db.QueryRow(context.Background(), pgStr, mocks.BlockNumber.Uint64()).(*sqlx.Row).StructScan(header)
+ err = db.QueryRow(context.Background(), pgStr, mocks.BlockNumber.Uint64()).Scan(
+ &header.CID,
+ &header.TD,
+ &header.Reward,
+ &header.BlockHash,
+ &header.BaseFee)
if err != nil {
t.Fatal(err)
}
@@ -192,7 +196,7 @@ func TestPGXIndexer(t *testing.T) {
defer tearDown(t)
// check that txs were properly indexed
trxs := make([]string, 0)
- pgStr := `SELECT transaction_cids.cid FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.id)
+ pgStr := `SELECT transaction_cids.cid FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash)
WHERE header_cids.block_number = $1`
err = db.Select(context.Background(), &trxs, pgStr, mocks.BlockNumber.Uint64())
if err != nil {
@@ -221,46 +225,46 @@ func TestPGXIndexer(t *testing.T) {
switch c {
case trx1CID.String():
test_helpers.ExpectEqual(t, data, tx1)
- var txType *uint8
+ var txType uint8
err = db.Get(context.Background(), &txType, txTypePgStr, c)
if err != nil {
t.Fatal(err)
}
- if txType != nil {
- t.Fatalf("expected nil tx_type, got %d", *txType)
+ if txType != 0 {
+ t.Fatalf("expected tx_type 0, got %d", txType)
}
case trx2CID.String():
test_helpers.ExpectEqual(t, data, tx2)
- var txType *uint8
+ var txType uint8
err = db.Get(context.Background(), &txType, txTypePgStr, c)
if err != nil {
t.Fatal(err)
}
- if txType != nil {
- t.Fatalf("expected nil tx_type, got %d", *txType)
+ if txType != 0 {
+ t.Fatalf("expected tx_type 0, got %d", txType)
}
case trx3CID.String():
test_helpers.ExpectEqual(t, data, tx3)
- var txType *uint8
+ var txType uint8
err = db.Get(context.Background(), &txType, txTypePgStr, c)
if err != nil {
t.Fatal(err)
}
- if txType != nil {
- t.Fatalf("expected nil tx_type, got %d", *txType)
+ if txType != 0 {
+ t.Fatalf("expected tx_type 0, got %d", txType)
}
case trx4CID.String():
test_helpers.ExpectEqual(t, data, tx4)
- var txType *uint8
+ var txType uint8
err = db.Get(context.Background(), &txType, txTypePgStr, c)
if err != nil {
t.Fatal(err)
}
- if *txType != types.AccessListTxType {
- t.Fatalf("expected AccessListTxType (1), got %d", *txType)
+ if txType != types.AccessListTxType {
+ t.Fatalf("expected AccessListTxType (1), got %d", txType)
}
accessListElementModels := make([]models.AccessListElementModel, 0)
- pgStr = `SELECT access_list_element.* FROM eth.access_list_element INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.id) WHERE cid = $1 ORDER BY access_list_element.index ASC`
+ pgStr = `SELECT access_list_element.* FROM eth.access_list_element INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.tx_hash) WHERE cid = $1 ORDER BY access_list_element.index ASC`
err = db.Select(context.Background(), &accessListElementModels, pgStr, c)
if err != nil {
t.Fatal(err)
@@ -299,8 +303,8 @@ func TestPGXIndexer(t *testing.T) {
rcts := make([]string, 0)
pgStr := `SELECT receipt_cids.leaf_cid FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids
- WHERE receipt_cids.tx_id = transaction_cids.id
- AND transaction_cids.header_id = header_cids.id
+ WHERE receipt_cids.tx_id = transaction_cids.tx_hash
+ AND transaction_cids.header_id = header_cids.block_hash
AND header_cids.block_number = $1
ORDER BY transaction_cids.index`
err = db.Select(context.Background(), &rcts, pgStr, mocks.BlockNumber.Uint64())
@@ -317,8 +321,8 @@ func TestPGXIndexer(t *testing.T) {
}
for i := range rcts {
results := make([]logIPLD, 0)
- pgStr = `SELECT log_cids.index, log_cids.address, log_cids.Topic0, log_cids.Topic1, data FROM eth.log_cids
- INNER JOIN eth.receipt_cids ON (log_cids.receipt_id = receipt_cids.id)
+ pgStr = `SELECT log_cids.index, log_cids.address, log_cids.topic0, log_cids.topic1, data FROM eth.log_cids
+ INNER JOIN eth.receipt_cids ON (log_cids.rct_id = receipt_cids.tx_id)
INNER JOIN public.blocks ON (log_cids.leaf_mh_key = blocks.key)
WHERE receipt_cids.leaf_cid = $1 ORDER BY eth.log_cids.index ASC`
err = db.Select(context.Background(), &results, pgStr, rcts[i])
@@ -350,9 +354,9 @@ func TestPGXIndexer(t *testing.T) {
// check receipts were properly indexed
rcts := make([]string, 0)
pgStr := `SELECT receipt_cids.leaf_cid FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids
- WHERE receipt_cids.tx_id = transaction_cids.id
- AND transaction_cids.header_id = header_cids.id
- AND header_cids.block_number = $1 order by transaction_cids.id`
+ WHERE receipt_cids.tx_id = transaction_cids.tx_hash
+ AND transaction_cids.header_id = header_cids.block_hash
+ AND header_cids.block_number = $1 order by transaction_cids.index`
err = db.Select(context.Background(), &rcts, pgStr, mocks.BlockNumber.Uint64())
if err != nil {
t.Fatal(err)
@@ -447,8 +451,8 @@ func TestPGXIndexer(t *testing.T) {
defer tearDown(t)
// check that state nodes were properly indexed and published
stateNodes := make([]models.StateNodeModel, 0)
- pgStr := `SELECT state_cids.id, state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id
- FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id)
+ pgStr := `SELECT state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id
+ FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash)
WHERE header_cids.block_number = $1 AND node_type != 3`
err = db.Select(context.Background(), &stateNodes, pgStr, mocks.BlockNumber.Uint64())
if err != nil {
@@ -467,9 +471,9 @@ func TestPGXIndexer(t *testing.T) {
if err != nil {
t.Fatal(err)
}
- pgStr = `SELECT * from eth.state_accounts WHERE state_id = $1`
+ pgStr = `SELECT header_id, state_path, cast(balance AS TEXT), nonce, code_hash, storage_root from eth.state_accounts WHERE header_id = $1 AND state_path = $2`
var account models.StateAccountModel
- err = db.Get(context.Background(), &account, pgStr, stateNode.ID)
+ err = db.Get(context.Background(), &account, pgStr, stateNode.HeaderID, stateNode.Path)
if err != nil {
t.Fatal(err)
}
@@ -479,8 +483,8 @@ func TestPGXIndexer(t *testing.T) {
test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x06'})
test_helpers.ExpectEqual(t, data, mocks.ContractLeafNode)
test_helpers.ExpectEqual(t, account, models.StateAccountModel{
- ID: account.ID,
- StateID: stateNode.ID,
+ HeaderID: account.HeaderID,
+ StatePath: stateNode.Path,
Balance: "0",
CodeHash: mocks.ContractCodeHash.Bytes(),
StorageRoot: mocks.ContractRoot,
@@ -493,8 +497,8 @@ func TestPGXIndexer(t *testing.T) {
test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x0c'})
test_helpers.ExpectEqual(t, data, mocks.AccountLeafNode)
test_helpers.ExpectEqual(t, account, models.StateAccountModel{
- ID: account.ID,
- StateID: stateNode.ID,
+ HeaderID: account.HeaderID,
+ StatePath: stateNode.Path,
Balance: "1000",
CodeHash: mocks.AccountCodeHash.Bytes(),
StorageRoot: mocks.AccountRoot,
@@ -505,8 +509,8 @@ func TestPGXIndexer(t *testing.T) {
// check that Removed state nodes were properly indexed and published
stateNodes = make([]models.StateNodeModel, 0)
- pgStr = `SELECT state_cids.id, state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id
- FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id)
+ pgStr = `SELECT state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id
+ FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash)
WHERE header_cids.block_number = $1 AND node_type = 3`
err = db.Select(context.Background(), &stateNodes, pgStr, mocks.BlockNumber.Uint64())
if err != nil {
@@ -538,8 +542,8 @@ func TestPGXIndexer(t *testing.T) {
storageNodes := make([]models.StorageNodeWithStateKeyModel, 0)
pgStr := `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path
FROM eth.storage_cids, eth.state_cids, eth.header_cids
- WHERE storage_cids.state_id = state_cids.id
- AND state_cids.header_id = header_cids.id
+ WHERE (storage_cids.state_path, storage_cids.header_id) = (state_cids.state_path, state_cids.header_id)
+ AND state_cids.header_id = header_cids.block_hash
AND header_cids.block_number = $1
AND storage_cids.node_type != 3`
err = db.Select(context.Background(), &storageNodes, pgStr, mocks.BlockNumber.Uint64())
@@ -571,8 +575,8 @@ func TestPGXIndexer(t *testing.T) {
storageNodes = make([]models.StorageNodeWithStateKeyModel, 0)
pgStr = `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path
FROM eth.storage_cids, eth.state_cids, eth.header_cids
- WHERE storage_cids.state_id = state_cids.id
- AND state_cids.header_id = header_cids.id
+ WHERE (storage_cids.state_path, storage_cids.header_id) = (state_cids.state_path, state_cids.header_id)
+ AND state_cids.header_id = header_cids.block_hash
AND header_cids.block_number = $1
AND storage_cids.node_type = 3`
err = db.Select(context.Background(), &storageNodes, pgStr, mocks.BlockNumber.Uint64())
diff --git a/statediff/indexer/database/sql/postgres/config.go b/statediff/indexer/database/sql/postgres/config.go
index a7c7cc9b4..5794bd0af 100644
--- a/statediff/indexer/database/sql/postgres/config.go
+++ b/statediff/indexer/database/sql/postgres/config.go
@@ -49,9 +49,9 @@ func ResolveDriverType(str string) (DriverType, error) {
var DefaultConfig = Config{
Hostname: "localhost",
Port: 5432,
- DatabaseName: "vulcanize_test",
+ DatabaseName: "vulcanize_testing",
Username: "postgres",
- Password: "",
+ Password: "password",
}
// Config holds params for a Postgres db
diff --git a/statediff/indexer/database/sql/postgres/database.go b/statediff/indexer/database/sql/postgres/database.go
index 3fe7f652e..213638017 100644
--- a/statediff/indexer/database/sql/postgres/database.go
+++ b/statediff/indexer/database/sql/postgres/database.go
@@ -22,14 +22,7 @@ var _ sql.Database = &DB{}
const (
createNodeStm = `INSERT INTO nodes (genesis_block, network_id, node_id, client_name, chain_id) VALUES ($1, $2, $3, $4, $5)
- ON CONFLICT (genesis_block, network_id, node_id, chain_id)
- DO UPDATE
- SET genesis_block = $1,
- network_id = $2,
- node_id = $3,
- client_name = $4,
- chain_id = $5
- RETURNING id`
+ ON CONFLICT (node_id) DO NOTHING`
)
// NewPostgresDB returns a postgres.DB using the provided driver
@@ -37,7 +30,7 @@ func NewPostgresDB(driver sql.Driver) *DB {
return &DB{driver}
}
-// DB implements sql.Databse using a configured driver and Postgres statement syntax
+// DB implements sql.Database using a configured driver and Postgres statement syntax
type DB struct {
sql.Driver
}
@@ -46,59 +39,55 @@ type DB struct {
func (db *DB) InsertHeaderStm() string {
return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
- ON CONFLICT (block_number, block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)
- RETURNING id`
+ ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)`
}
// InsertUncleStm satisfies the sql.Statements interface
func (db *DB) InsertUncleStm() string {
return `INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6)
- ON CONFLICT (header_id, block_hash) DO UPDATE SET (parent_hash, cid, reward, mh_key) = ($3, $4, $5, $6)`
+ ON CONFLICT (block_hash) DO NOTHING`
}
// InsertTxStm satisfies the sql.Statements interface
func (db *DB) InsertTxStm() string {
return `INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
- ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index, mh_key, tx_data, tx_type) = ($3, $4, $5, $6, $7, $8, $9)
- RETURNING id`
+ ON CONFLICT (tx_hash) DO NOTHING`
}
// InsertAccessListElementStm satisfies the sql.Statements interface
func (db *DB) InsertAccessListElementStm() string {
return `INSERT INTO eth.access_list_element (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4)
- ON CONFLICT (tx_id, index) DO UPDATE SET (address, storage_keys) = ($3, $4)`
+ ON CONFLICT (tx_id, index) DO NOTHING`
}
// InsertRctStm satisfies the sql.Statements interface
func (db *DB) InsertRctStm() string {
return `INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
- ON CONFLICT (tx_id) DO UPDATE SET (leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) = ($2, $3, $4, $5, $6, $7, $8)
- RETURNING id`
+ ON CONFLICT (tx_id) DO NOTHING`
}
// InsertLogStm satisfies the sql.Statements interface
func (db *DB) InsertLogStm() string {
- return `INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, receipt_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
- ON CONFLICT (receipt_id, index) DO UPDATE SET (leaf_cid, leaf_mh_key, address, topic0, topic1, topic2, topic3, log_data) = ($1, $2, $4, $6, $7, $8, $9, $10)`
+ return `INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
+ ON CONFLICT (rct_id, index) DO NOTHING`
}
// InsertStateStm satisfies the sql.Statements interface
func (db *DB) InsertStateStm() string {
return `INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
- ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)
- RETURNING id`
+ ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)`
}
// InsertAccountStm satisfies the sql.Statements interface
func (db *DB) InsertAccountStm() string {
- return `INSERT INTO eth.state_accounts (state_id, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5)
- ON CONFLICT (state_id) DO UPDATE SET (balance, nonce, code_hash, storage_root) = ($2, $3, $4, $5)`
+ return `INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5, $6)
+ ON CONFLICT (header_id, state_path) DO NOTHING`
}
// InsertStorageStm satisfies the sql.Statements interface
func (db *DB) InsertStorageStm() string {
- return `INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
- ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)`
+ return `INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
+ ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($3, $4, $6, $7, $8)`
}
// InsertIPLDStm satisfies the sql.Statements interface
diff --git a/statediff/indexer/database/sql/postgres/pgx.go b/statediff/indexer/database/sql/postgres/pgx.go
index 9f6701400..936a3765d 100644
--- a/statediff/indexer/database/sql/postgres/pgx.go
+++ b/statediff/indexer/database/sql/postgres/pgx.go
@@ -34,7 +34,7 @@ type PGXDriver struct {
ctx context.Context
pool *pgxpool.Pool
nodeInfo node.Info
- nodeID int64
+ nodeID string
}
// NewPGXDriver returns a new pgx driver
@@ -89,17 +89,16 @@ func MakeConfig(config Config) (*pgxpool.Config, error) {
}
func (pgx *PGXDriver) createNode() error {
- var nodeID int64
- err := pgx.pool.QueryRow(
+ _, err := pgx.pool.Exec(
pgx.ctx,
createNodeStm,
pgx.nodeInfo.GenesisBlock, pgx.nodeInfo.NetworkID,
pgx.nodeInfo.ID, pgx.nodeInfo.ClientName,
- pgx.nodeInfo.ChainID).Scan(&nodeID)
+ pgx.nodeInfo.ChainID)
if err != nil {
return ErrUnableToSetNode(err)
}
- pgx.nodeID = nodeID
+ pgx.nodeID = pgx.nodeInfo.ID
return nil
}
@@ -138,13 +137,8 @@ func (pgx *PGXDriver) Stats() sql.Stats {
return pgxStatsWrapper{stats: stats}
}
-// NodeInfo satisfies sql.Database
-func (pgx *PGXDriver) NodeInfo() node.Info {
- return pgx.nodeInfo
-}
-
// NodeID satisfies sql.Database
-func (pgx *PGXDriver) NodeID() int64 {
+func (pgx *PGXDriver) NodeID() string {
return pgx.nodeID
}
diff --git a/statediff/indexer/database/sql/postgres/sqlx.go b/statediff/indexer/database/sql/postgres/sqlx.go
index 684fc7bf0..406b44a19 100644
--- a/statediff/indexer/database/sql/postgres/sqlx.go
+++ b/statediff/indexer/database/sql/postgres/sqlx.go
@@ -32,7 +32,7 @@ type SQLXDriver struct {
ctx context.Context
db *sqlx.DB
nodeInfo node.Info
- nodeID int64
+ nodeID string
}
// NewSQLXDriver returns a new sqlx driver for Postgres
@@ -60,16 +60,15 @@ func NewSQLXDriver(ctx context.Context, config Config, node node.Info) (*SQLXDri
}
func (driver *SQLXDriver) createNode() error {
- var nodeID int64
- err := driver.db.QueryRowx(
+ _, err := driver.db.Exec(
createNodeStm,
driver.nodeInfo.GenesisBlock, driver.nodeInfo.NetworkID,
driver.nodeInfo.ID, driver.nodeInfo.ClientName,
- driver.nodeInfo.ChainID).Scan(&nodeID)
+ driver.nodeInfo.ChainID)
if err != nil {
return ErrUnableToSetNode(err)
}
- driver.nodeID = nodeID
+ driver.nodeID = driver.nodeInfo.ID
return nil
}
@@ -107,13 +106,8 @@ func (driver *SQLXDriver) Stats() sql.Stats {
return sqlxStatsWrapper{stats: stats}
}
-// NodeInfo satisfies sql.Database
-func (driver *SQLXDriver) NodeInfo() node.Info {
- return driver.nodeInfo
-}
-
// NodeID satisfies sql.Database
-func (driver *SQLXDriver) NodeID() int64 {
+func (driver *SQLXDriver) NodeID() string {
return driver.nodeID
}
diff --git a/statediff/indexer/database/sql/sqlx_indexer_legacy_test.go b/statediff/indexer/database/sql/sqlx_indexer_legacy_test.go
index 2ce5f494f..4349850ed 100644
--- a/statediff/indexer/database/sql/sqlx_indexer_legacy_test.go
+++ b/statediff/indexer/database/sql/sqlx_indexer_legacy_test.go
@@ -62,7 +62,7 @@ func setupLegacySQLX(t *testing.T) {
}
}()
for _, node := range legacyData.StateDiffs {
- err = ind.PushStateNode(tx, node)
+ err = ind.PushStateNode(tx, node, mockLegacyBlock.Hash().String())
require.NoError(t, err)
}
@@ -73,16 +73,16 @@ func TestSQLXIndexerLegacy(t *testing.T) {
t.Run("Publish and index header IPLDs in a legacy tx", func(t *testing.T) {
setupLegacySQLX(t)
defer tearDown(t)
- pgStr := `SELECT cid, td, reward, id, base_fee
+ pgStr := `SELECT cid, td, reward, block_hash, base_fee
FROM eth.header_cids
WHERE block_number = $1`
// check header was properly indexed
type res struct {
- CID string
- TD string
- Reward string
- ID int
- BaseFee *int64 `db:"base_fee"`
+ CID string
+ TD string
+ Reward string
+ BlockHash string `db:"block_hash"`
+ BaseFee *int64 `db:"base_fee"`
}
header := new(res)
err = db.QueryRow(context.Background(), pgStr, legacyData.BlockNumber.Uint64()).(*sqlx.Row).StructScan(header)
diff --git a/statediff/indexer/database/sql/sqlx_indexer_test.go b/statediff/indexer/database/sql/sqlx_indexer_test.go
index 0fa4e8c1a..09ee62fa3 100644
--- a/statediff/indexer/database/sql/sqlx_indexer_test.go
+++ b/statediff/indexer/database/sql/sqlx_indexer_test.go
@@ -159,7 +159,7 @@ func setupSQLX(t *testing.T) {
}
}()
for _, node := range mocks.StateDiffs {
- err = ind.PushStateNode(tx, node)
+ err = ind.PushStateNode(tx, node, mockBlock.Hash().String())
if err != nil {
t.Fatal(err)
}
@@ -179,16 +179,16 @@ func TestSQLXIndexer(t *testing.T) {
t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) {
setupSQLX(t)
defer tearDown(t)
- pgStr := `SELECT cid, td, reward, id, base_fee
+ pgStr := `SELECT cid, td, reward, block_hash, base_fee
FROM eth.header_cids
WHERE block_number = $1`
// check header was properly indexed
type res struct {
- CID string
- TD string
- Reward string
- ID int
- BaseFee *int64 `db:"base_fee"`
+ CID string
+ TD string
+ Reward string
+ BlockHash string `db:"block_hash"`
+ BaseFee *int64 `db:"base_fee"`
}
header := new(res)
err = db.QueryRow(context.Background(), pgStr, mocks.BlockNumber.Uint64()).(*sqlx.Row).StructScan(header)
@@ -218,7 +218,7 @@ func TestSQLXIndexer(t *testing.T) {
defer tearDown(t)
// check that txs were properly indexed
trxs := make([]string, 0)
- pgStr := `SELECT transaction_cids.cid FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.id)
+ pgStr := `SELECT transaction_cids.cid FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash)
WHERE header_cids.block_number = $1`
err = db.Select(context.Background(), &trxs, pgStr, mocks.BlockNumber.Uint64())
if err != nil {
@@ -286,7 +286,7 @@ func TestSQLXIndexer(t *testing.T) {
t.Fatalf("expected AccessListTxType (1), got %d", txType)
}
accessListElementModels := make([]models.AccessListElementModel, 0)
- pgStr = `SELECT access_list_element.* FROM eth.access_list_element INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.id) WHERE cid = $1 ORDER BY access_list_element.index ASC`
+ pgStr = `SELECT access_list_element.* FROM eth.access_list_element INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.tx_hash) WHERE cid = $1 ORDER BY access_list_element.index ASC`
err = db.Select(context.Background(), &accessListElementModels, pgStr, c)
if err != nil {
t.Fatal(err)
@@ -325,8 +325,8 @@ func TestSQLXIndexer(t *testing.T) {
rcts := make([]string, 0)
pgStr := `SELECT receipt_cids.leaf_cid FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids
- WHERE receipt_cids.tx_id = transaction_cids.id
- AND transaction_cids.header_id = header_cids.id
+ WHERE receipt_cids.tx_id = transaction_cids.tx_hash
+ AND transaction_cids.header_id = header_cids.block_hash
AND header_cids.block_number = $1
ORDER BY transaction_cids.index`
err = db.Select(context.Background(), &rcts, pgStr, mocks.BlockNumber.Uint64())
@@ -343,8 +343,8 @@ func TestSQLXIndexer(t *testing.T) {
}
for i := range rcts {
results := make([]logIPLD, 0)
- pgStr = `SELECT log_cids.index, log_cids.address, log_cids.Topic0, log_cids.Topic1, data FROM eth.log_cids
- INNER JOIN eth.receipt_cids ON (log_cids.receipt_id = receipt_cids.id)
+ pgStr = `SELECT log_cids.index, log_cids.address, log_cids.topic0, log_cids.topic1, data FROM eth.log_cids
+ INNER JOIN eth.receipt_cids ON (log_cids.rct_id = receipt_cids.tx_id)
INNER JOIN public.blocks ON (log_cids.leaf_mh_key = blocks.key)
WHERE receipt_cids.leaf_cid = $1 ORDER BY eth.log_cids.index ASC`
err = db.Select(context.Background(), &results, pgStr, rcts[i])
@@ -376,9 +376,9 @@ func TestSQLXIndexer(t *testing.T) {
// check receipts were properly indexed
rcts := make([]string, 0)
pgStr := `SELECT receipt_cids.leaf_cid FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids
- WHERE receipt_cids.tx_id = transaction_cids.id
- AND transaction_cids.header_id = header_cids.id
- AND header_cids.block_number = $1 order by transaction_cids.id`
+ WHERE receipt_cids.tx_id = transaction_cids.tx_hash
+ AND transaction_cids.header_id = header_cids.block_hash
+ AND header_cids.block_number = $1 order by transaction_cids.index`
err = db.Select(context.Background(), &rcts, pgStr, mocks.BlockNumber.Uint64())
if err != nil {
t.Fatal(err)
@@ -472,8 +472,8 @@ func TestSQLXIndexer(t *testing.T) {
defer tearDown(t)
// check that state nodes were properly indexed and published
stateNodes := make([]models.StateNodeModel, 0)
- pgStr := `SELECT state_cids.id, state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id
- FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id)
+ pgStr := `SELECT state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id
+ FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash)
WHERE header_cids.block_number = $1 AND node_type != 3`
err = db.Select(context.Background(), &stateNodes, pgStr, mocks.BlockNumber.Uint64())
if err != nil {
@@ -492,9 +492,9 @@ func TestSQLXIndexer(t *testing.T) {
if err != nil {
t.Fatal(err)
}
- pgStr = `SELECT * from eth.state_accounts WHERE state_id = $1`
+ pgStr = `SELECT * from eth.state_accounts WHERE header_id = $1 AND state_path = $2`
var account models.StateAccountModel
- err = db.Get(context.Background(), &account, pgStr, stateNode.ID)
+ err = db.Get(context.Background(), &account, pgStr, stateNode.HeaderID, stateNode.Path)
if err != nil {
t.Fatal(err)
}
@@ -504,8 +504,8 @@ func TestSQLXIndexer(t *testing.T) {
test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x06'})
test_helpers.ExpectEqual(t, data, mocks.ContractLeafNode)
test_helpers.ExpectEqual(t, account, models.StateAccountModel{
- ID: account.ID,
- StateID: stateNode.ID,
+ HeaderID: account.HeaderID,
+ StatePath: stateNode.Path,
Balance: "0",
CodeHash: mocks.ContractCodeHash.Bytes(),
StorageRoot: mocks.ContractRoot,
@@ -518,8 +518,8 @@ func TestSQLXIndexer(t *testing.T) {
test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x0c'})
test_helpers.ExpectEqual(t, data, mocks.AccountLeafNode)
test_helpers.ExpectEqual(t, account, models.StateAccountModel{
- ID: account.ID,
- StateID: stateNode.ID,
+ HeaderID: account.HeaderID,
+ StatePath: stateNode.Path,
Balance: "1000",
CodeHash: mocks.AccountCodeHash.Bytes(),
StorageRoot: mocks.AccountRoot,
@@ -530,8 +530,8 @@ func TestSQLXIndexer(t *testing.T) {
// check that Removed state nodes were properly indexed and published
stateNodes = make([]models.StateNodeModel, 0)
- pgStr = `SELECT state_cids.id, state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id
- FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id)
+ pgStr = `SELECT state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id
+ FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash)
WHERE header_cids.block_number = $1 AND node_type = 3`
err = db.Select(context.Background(), &stateNodes, pgStr, mocks.BlockNumber.Uint64())
if err != nil {
@@ -563,8 +563,8 @@ func TestSQLXIndexer(t *testing.T) {
storageNodes := make([]models.StorageNodeWithStateKeyModel, 0)
pgStr := `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path
FROM eth.storage_cids, eth.state_cids, eth.header_cids
- WHERE storage_cids.state_id = state_cids.id
- AND state_cids.header_id = header_cids.id
+ WHERE (storage_cids.state_path, storage_cids.header_id) = (state_cids.state_path, state_cids.header_id)
+ AND state_cids.header_id = header_cids.block_hash
AND header_cids.block_number = $1
AND storage_cids.node_type != 3`
err = db.Select(context.Background(), &storageNodes, pgStr, mocks.BlockNumber.Uint64())
@@ -596,8 +596,8 @@ func TestSQLXIndexer(t *testing.T) {
storageNodes = make([]models.StorageNodeWithStateKeyModel, 0)
pgStr = `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path
FROM eth.storage_cids, eth.state_cids, eth.header_cids
- WHERE storage_cids.state_id = state_cids.id
- AND state_cids.header_id = header_cids.id
+ WHERE (storage_cids.state_path, storage_cids.header_id) = (state_cids.state_path, state_cids.header_id)
+ AND state_cids.header_id = header_cids.block_hash
AND header_cids.block_number = $1
AND storage_cids.node_type = 3`
err = db.Select(context.Background(), &storageNodes, pgStr, mocks.BlockNumber.Uint64())
diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go
index ea276dfbf..94b38c7e1 100644
--- a/statediff/indexer/database/sql/writer.go
+++ b/statediff/indexer/database/sql/writer.go
@@ -39,41 +39,56 @@ func NewWriter(db Database) *Writer {
}
}
-func (in *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) (int64, error) {
- var headerID int64
- err := tx.QueryRow(in.db.Context(), in.db.InsertHeaderStm(),
+/*
+INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee)
+VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
+ON CONFLICT (block_hash) DO UPDATE SET (block_number, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = ($1, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)
+*/
+func (in *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error {
+ _, err := tx.Exec(in.db.Context(), in.db.InsertHeaderStm(),
header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, in.db.NodeID(), header.Reward, header.StateRoot, header.TxRoot,
- header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.BaseFee).Scan(&headerID)
+ header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.BaseFee)
if err != nil {
- return 0, fmt.Errorf("error upserting header_cids entry: %v", err)
+ return fmt.Errorf("error upserting header_cids entry: %v", err)
}
indexerMetrics.blocks.Inc(1)
- return headerID, nil
+ return nil
}
-func (in *Writer) upsertUncleCID(tx Tx, uncle models.UncleModel, headerID int64) error {
+/*
+INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6)
+ON CONFLICT (block_hash) DO NOTHING
+*/
+func (in *Writer) upsertUncleCID(tx Tx, uncle models.UncleModel) error {
_, err := tx.Exec(in.db.Context(), in.db.InsertUncleStm(),
- uncle.BlockHash, headerID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey)
+ uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey)
if err != nil {
return fmt.Errorf("error upserting uncle_cids entry: %v", err)
}
return nil
}
-func (in *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel, headerID int64) (int64, error) {
- var txID int64
- err := tx.QueryRow(in.db.Context(), in.db.InsertTxStm(),
- headerID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type).Scan(&txID)
+/*
+INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
+ON CONFLICT (tx_hash) DO NOTHING
+*/
+func (in *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error {
+ _, err := tx.Exec(in.db.Context(), in.db.InsertTxStm(),
+ transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type)
if err != nil {
- return 0, fmt.Errorf("error upserting transaction_cids entry: %v", err)
+ return fmt.Errorf("error upserting transaction_cids entry: %v", err)
}
indexerMetrics.transactions.Inc(1)
- return txID, nil
+ return nil
}
-func (in *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessListElementModel, txID int64) error {
+/*
+INSERT INTO eth.access_list_element (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4)
+ON CONFLICT (tx_id, index) DO NOTHING
+*/
+func (in *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessListElementModel) error {
_, err := tx.Exec(in.db.Context(), in.db.InsertAccessListElementStm(),
- txID, accessListElement.Index, accessListElement.Address, accessListElement.StorageKeys)
+ accessListElement.TxID, accessListElement.Index, accessListElement.Address, accessListElement.StorageKeys)
if err != nil {
return fmt.Errorf("error upserting access_list_element entry: %v", err)
}
@@ -81,21 +96,28 @@ func (in *Writer) upsertAccessListElement(tx Tx, accessListElement models.Access
return nil
}
-func (in *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel, txID int64) (int64, error) {
- var receiptID int64
- err := tx.QueryRow(in.db.Context(), in.db.InsertRctStm(),
- txID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot).Scan(&receiptID)
+/*
+INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
+ON CONFLICT (tx_id) DO NOTHING
+*/
+func (in *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error {
+ _, err := tx.Exec(in.db.Context(), in.db.InsertRctStm(),
+ rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot)
if err != nil {
- return 0, fmt.Errorf("error upserting receipt_cids entry: %w", err)
+ return fmt.Errorf("error upserting receipt_cids entry: %w", err)
}
indexerMetrics.receipts.Inc(1)
- return receiptID, nil
+ return nil
}
-func (in *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel, receiptID int64) error {
+/*
+INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
+ON CONFLICT (rct_id, index) DO NOTHING
+*/
+func (in *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error {
for _, log := range logs {
_, err := tx.Exec(in.db.Context(), in.db.InsertLogStm(),
- log.LeafCID, log.LeafMhKey, receiptID, log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3, log.Data)
+ log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3, log.Data)
if err != nil {
return fmt.Errorf("error upserting logs entry: %w", err)
}
@@ -104,36 +126,47 @@ func (in *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel, receiptID int64)
return nil
}
-func (in *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel, headerID int64) (int64, error) {
- var stateID int64
+/*
+INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
+ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)
+*/
+func (in *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error {
var stateKey string
if stateNode.StateKey != nullHash.String() {
stateKey = stateNode.StateKey
}
- err := tx.QueryRow(in.db.Context(), in.db.InsertStateStm(),
- headerID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey).Scan(&stateID)
+ _, err := tx.Exec(in.db.Context(), in.db.InsertStateStm(),
+ stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey)
if err != nil {
- return 0, fmt.Errorf("error upserting state_cids entry: %v", err)
+ return fmt.Errorf("error upserting state_cids entry: %v", err)
}
- return stateID, nil
+ return nil
}
-func (in *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel, stateID int64) error {
+/*
+INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5, $6)
+ON CONFLICT (header_id, state_path) DO NOTHING
+*/
+func (in *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel) error {
_, err := tx.Exec(in.db.Context(), in.db.InsertAccountStm(),
- stateID, stateAccount.Balance, stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot)
+ stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance, stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot)
if err != nil {
return fmt.Errorf("error upserting state_accounts entry: %v", err)
}
return nil
}
-func (in *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel, stateID int64) error {
+/*
+INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
+ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($3, $4, $6, $7, $8)
+*/
+func (in *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) error {
var storageKey string
if storageCID.StorageKey != nullHash.String() {
storageKey = storageCID.StorageKey
}
_, err := tx.Exec(in.db.Context(), in.db.InsertStorageStm(),
- stateID, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey)
+ storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey)
if err != nil {
return fmt.Errorf("error upserting storage_cids entry: %v", err)
}
diff --git a/statediff/indexer/interfaces/interfaces.go b/statediff/indexer/interfaces/interfaces.go
index d32c117eb..8f951230d 100644
--- a/statediff/indexer/interfaces/interfaces.go
+++ b/statediff/indexer/interfaces/interfaces.go
@@ -29,7 +29,7 @@ import (
// StateDiffIndexer interface required to index statediff data
type StateDiffIndexer interface {
PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error)
- PushStateNode(tx Batch, stateNode sdtypes.StateNode) error
+ PushStateNode(tx Batch, stateNode sdtypes.StateNode, headerID string) error
PushCodeAndCodeHash(tx Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error
ReportDBMetrics(delay time.Duration, quit <-chan bool)
io.Closer
diff --git a/statediff/indexer/models/batch.go b/statediff/indexer/models/batch.go
index 48b2944e0..16096f292 100644
--- a/statediff/indexer/models/batch.go
+++ b/statediff/indexer/models/batch.go
@@ -26,7 +26,7 @@ type IPLDBatch struct {
// UncleBatch holds the arguments for a batch insert of uncle data
type UncleBatch struct {
- HeaderID []int64
+ HeaderID []string
BlockHashes []string
ParentHashes []string
CIDs []string
@@ -36,7 +36,7 @@ type UncleBatch struct {
// TxBatch holds the arguments for a batch insert of tx data
type TxBatch struct {
- HeaderID int64
+ HeaderID string
Indexes []int64
TxHashes []string
CIDs []string
@@ -44,20 +44,20 @@ type TxBatch struct {
Dsts []string
Srcs []string
Datas [][]byte
- Types []*uint8
+ Types []uint8
}
// AccessListBatch holds the arguments for a batch insert of access list data
type AccessListBatch struct {
Indexes []int64
- TxIDs []int64
+ TxIDs []string
Addresses []string
StorageKeysSets []pq.StringArray
}
// ReceiptBatch holds the arguments for a batch insert of receipt data
type ReceiptBatch struct {
- TxIDs []int64
+ TxIDs []string
LeafCIDs []string
LeafMhKeys []string
PostStatuses []uint64
@@ -71,7 +71,7 @@ type ReceiptBatch struct {
type LogBatch struct {
LeafCIDs []string
LeafMhKeys []string
- ReceiptIDs []int64
+ ReceiptIDs []string
Addresses []string
Indexes []int64
Datas [][]byte
@@ -83,34 +83,33 @@ type LogBatch struct {
// StateBatch holds the arguments for a batch insert of state data
type StateBatch struct {
- ID int64
- HeaderID int64
- Path []byte
- StateKey string
- NodeType int
- CID string
- MhKey string
- Diff bool
+ HeaderID string
+ Paths [][]byte
+ StateKeys []string
+ NodeTypes []int
+ CIDs []string
+ MhKeys []string
+ Diff bool
}
// AccountBatch holds the arguments for a batch insert of account data
type AccountBatch struct {
- ID int64
- StateID int64
- Balance string
- Nonce uint64
- CodeHash []byte
- StorageRoot string
+ HeaderID string
+ StatePaths [][]byte
+ Balances []string
+ Nonces []uint64
+ CodeHashes [][]byte
+ StorageRoots []string
}
// StorageBatch holds the arguments for a batch insert of storage data
type StorageBatch struct {
- ID int64
- StateID int64
- Path []byte
- StorageKey string
- NodeType int
- CID string
- MhKey string
- Diff bool
+ HeaderID string
+ StatePaths [][]string
+ Paths [][]byte
+ StorageKeys []string
+ NodeTypes []int
+ CIDs []string
+ MhKeys []string
+ Diff bool
}
diff --git a/statediff/indexer/models/models.go b/statediff/indexer/models/models.go
index 5e849193e..d37aa5449 100644
--- a/statediff/indexer/models/models.go
+++ b/statediff/indexer/models/models.go
@@ -26,14 +26,13 @@ type IPLDModel struct {
// HeaderModel is the db model for eth.header_cids
type HeaderModel struct {
- ID int64 `db:"id"`
BlockNumber string `db:"block_number"`
BlockHash string `db:"block_hash"`
ParentHash string `db:"parent_hash"`
CID string `db:"cid"`
MhKey string `db:"mh_key"`
TotalDifficulty string `db:"td"`
- NodeID int64 `db:"node_id"`
+ NodeID string `db:"node_id"`
Reward string `db:"reward"`
StateRoot string `db:"state_root"`
UncleRoot string `db:"uncle_root"`
@@ -47,8 +46,7 @@ type HeaderModel struct {
// UncleModel is the db model for eth.uncle_cids
type UncleModel struct {
- ID int64 `db:"id"`
- HeaderID int64 `db:"header_id"`
+ HeaderID string `db:"header_id"`
BlockHash string `db:"block_hash"`
ParentHash string `db:"parent_hash"`
CID string `db:"cid"`
@@ -58,8 +56,7 @@ type UncleModel struct {
// TxModel is the db model for eth.transaction_cids
type TxModel struct {
- ID int64 `db:"id"`
- HeaderID int64 `db:"header_id"`
+ HeaderID string `db:"header_id"`
Index int64 `db:"index"`
TxHash string `db:"tx_hash"`
CID string `db:"cid"`
@@ -72,17 +69,15 @@ type TxModel struct {
// AccessListElementModel is the db model for eth.access_list_entry
type AccessListElementModel struct {
- ID int64 `db:"id"`
Index int64 `db:"index"`
- TxID int64 `db:"tx_id"`
+ TxID string `db:"tx_id"`
Address string `db:"address"`
StorageKeys pq.StringArray `db:"storage_keys"`
}
// ReceiptModel is the db model for eth.receipt_cids
type ReceiptModel struct {
- ID int64 `db:"id"`
- TxID int64 `db:"tx_id"`
+ TxID string `db:"tx_id"`
LeafCID string `db:"leaf_cid"`
LeafMhKey string `db:"leaf_mh_key"`
PostStatus uint64 `db:"post_status"`
@@ -94,8 +89,7 @@ type ReceiptModel struct {
// StateNodeModel is the db model for eth.state_cids
type StateNodeModel struct {
- ID int64 `db:"id"`
- HeaderID int64 `db:"header_id"`
+ HeaderID string `db:"header_id"`
Path []byte `db:"state_path"`
StateKey string `db:"state_leaf_key"`
NodeType int `db:"node_type"`
@@ -106,8 +100,8 @@ type StateNodeModel struct {
// StorageNodeModel is the db model for eth.storage_cids
type StorageNodeModel struct {
- ID int64 `db:"id"`
- StateID int64 `db:"state_id"`
+ HeaderID string `db:"header_id"`
+ StatePath []byte `db:"state_path"`
Path []byte `db:"storage_path"`
StorageKey string `db:"storage_leaf_key"`
NodeType int `db:"node_type"`
@@ -118,8 +112,8 @@ type StorageNodeModel struct {
// StorageNodeWithStateKeyModel is a db model for eth.storage_cids + eth.state_cids.state_key
type StorageNodeWithStateKeyModel struct {
- ID int64 `db:"id"`
- StateID int64 `db:"state_id"`
+ HeaderID string `db:"header_id"`
+ StatePath []byte `db:"state_path"`
Path []byte `db:"storage_path"`
StateKey string `db:"state_leaf_key"`
StorageKey string `db:"storage_leaf_key"`
@@ -131,8 +125,8 @@ type StorageNodeWithStateKeyModel struct {
// StateAccountModel is a db model for an eth state account (decoded value of state leaf node)
type StateAccountModel struct {
- ID int64 `db:"id"`
- StateID int64 `db:"state_id"`
+ HeaderID string `db:"header_id"`
+ StatePath []byte `db:"state_path"`
Balance string `db:"balance"`
Nonce uint64 `db:"nonce"`
CodeHash []byte `db:"code_hash"`
@@ -141,10 +135,9 @@ type StateAccountModel struct {
// LogsModel is the db model for eth.logs
type LogsModel struct {
- ID int64 `db:"id"`
+ ReceiptID string `db:"rct_id"`
LeafCID string `db:"leaf_cid"`
LeafMhKey string `db:"leaf_mh_key"`
- ReceiptID int64 `db:"receipt_id"`
Address string `db:"address"`
Index int64 `db:"index"`
Data []byte `db:"log_data"`
diff --git a/statediff/indexer/shared/db_kind.go b/statediff/indexer/shared/db_kind.go
index 6b88164e1..7e7997f95 100644
--- a/statediff/indexer/shared/db_kind.go
+++ b/statediff/indexer/shared/db_kind.go
@@ -27,6 +27,7 @@ type DBType string
const (
POSTGRES DBType = "Postgres"
DUMP DBType = "Dump"
+ FILE DBType = "File"
UNKNOWN DBType = "Unknown"
)
@@ -37,6 +38,8 @@ func ResolveDBType(str string) (DBType, error) {
return POSTGRES, nil
case "dump", "d":
return DUMP, nil
+ case "file", "f", "fs":
+ return FILE, nil
default:
return UNKNOWN, fmt.Errorf("unrecognized db type string: %s", str)
}
diff --git a/statediff/service.go b/statediff/service.go
index 31a56b809..04aaac458 100644
--- a/statediff/service.go
+++ b/statediff/service.go
@@ -239,8 +239,14 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
statediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh)))
chainEventFwd <- chainEvent
case err := <-errCh:
+ println("here")
log.Error("Error from chain event subscription", "error", err)
close(sds.QuitChan)
+ log.Info("Quitting the statediffing writing loop")
+ if err := sds.indexer.Close(); err != nil {
+ log.Error("Error closing indexer", "err", err)
+ }
+ return
case <-sds.QuitChan:
log.Info("Quitting the statediffing writing loop")
if err := sds.indexer.Close(); err != nil {
@@ -339,6 +345,9 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) {
case err := <-errCh:
log.Error("Error from chain event subscription", "error", err)
close(sds.QuitChan)
+ log.Info("Quitting the statediffing listening loop")
+ sds.close()
+ return
case <-sds.QuitChan:
log.Info("Quitting the statediffing listening loop")
sds.close()
@@ -664,7 +673,7 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
}
}()
output := func(node types2.StateNode) error {
- return sds.indexer.PushStateNode(tx, node)
+ return sds.indexer.PushStateNode(tx, node, block.Hash().String())
}
codeOutput := func(c types2.CodeAndCodeHash) error {
return sds.indexer.PushCodeAndCodeHash(tx, c)
diff --git a/statediff/test_helpers/mocks/service_test.go b/statediff/test_helpers/mocks/service_test.go
index b3b77d4bf..dde784316 100644
--- a/statediff/test_helpers/mocks/service_test.go
+++ b/statediff/test_helpers/mocks/service_test.go
@@ -24,6 +24,7 @@ import (
"sort"
"sync"
"testing"
+ "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
@@ -150,29 +151,35 @@ func testSubscriptionAPI(t *testing.T) {
id := rpc.NewID()
payloadChan := make(chan statediff.Payload)
quitChan := make(chan bool)
+ wg := new(sync.WaitGroup)
+ go func() {
+ wg.Add(1)
+ defer wg.Done()
+ sort.Slice(expectedStateDiffBytes, func(i, j int) bool { return expectedStateDiffBytes[i] < expectedStateDiffBytes[j] })
+ select {
+ case payload := <-payloadChan:
+ if !bytes.Equal(payload.BlockRlp, expectedBlockRlp) {
+ t.Errorf("payload does not have expected block\r\nactual block rlp: %v\r\nexpected block rlp: %v", payload.BlockRlp, expectedBlockRlp)
+ }
+ sort.Slice(payload.StateObjectRlp, func(i, j int) bool { return payload.StateObjectRlp[i] < payload.StateObjectRlp[j] })
+ if !bytes.Equal(payload.StateObjectRlp, expectedStateDiffBytes) {
+ t.Errorf("payload does not have expected state diff\r\nactual state diff rlp: %v\r\nexpected state diff rlp: %v", payload.StateObjectRlp, expectedStateDiffBytes)
+ }
+ if !bytes.Equal(expectedReceiptBytes, payload.ReceiptsRlp) {
+ t.Errorf("payload does not have expected receipts\r\nactual receipt rlp: %v\r\nexpected receipt rlp: %v", payload.ReceiptsRlp, expectedReceiptBytes)
+ }
+ if !bytes.Equal(payload.TotalDifficulty.Bytes(), mockTotalDifficulty.Bytes()) {
+ t.Errorf("payload does not have expected total difficulty\r\nactual td: %d\r\nexpected td: %d", payload.TotalDifficulty.Int64(), mockTotalDifficulty.Int64())
+ }
+ case <-quitChan:
+ t.Errorf("channel quit before delivering payload")
+ }
+ }()
+ time.Sleep(1)
mockService.Subscribe(id, payloadChan, quitChan, params)
blockChan <- block1
parentBlockChain <- block0
-
- sort.Slice(expectedStateDiffBytes, func(i, j int) bool { return expectedStateDiffBytes[i] < expectedStateDiffBytes[j] })
- select {
- case payload := <-payloadChan:
- if !bytes.Equal(payload.BlockRlp, expectedBlockRlp) {
- t.Errorf("payload does not have expected block\r\nactual block rlp: %v\r\nexpected block rlp: %v", payload.BlockRlp, expectedBlockRlp)
- }
- sort.Slice(payload.StateObjectRlp, func(i, j int) bool { return payload.StateObjectRlp[i] < payload.StateObjectRlp[j] })
- if !bytes.Equal(payload.StateObjectRlp, expectedStateDiffBytes) {
- t.Errorf("payload does not have expected state diff\r\nactual state diff rlp: %v\r\nexpected state diff rlp: %v", payload.StateObjectRlp, expectedStateDiffBytes)
- }
- if !bytes.Equal(expectedReceiptBytes, payload.ReceiptsRlp) {
- t.Errorf("payload does not have expected receipts\r\nactual receipt rlp: %v\r\nexpected receipt rlp: %v", payload.ReceiptsRlp, expectedReceiptBytes)
- }
- if !bytes.Equal(payload.TotalDifficulty.Bytes(), mockTotalDifficulty.Bytes()) {
- t.Errorf("payload does not have expected total difficulty\r\nactual td: %d\r\nexpected td: %d", payload.TotalDifficulty.Int64(), mockTotalDifficulty.Int64())
- }
- case <-quitChan:
- t.Errorf("channel quit before delivering payload")
- }
+ wg.Wait()
}
func testHTTPAPI(t *testing.T) {