indexer that writes sql stmts out to a file
This commit is contained in:
parent
b36b3f83cb
commit
81a8ffd257
@ -20,6 +20,8 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/file"
|
||||
|
||||
"github.com/ethereum/go-ethereum/params"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/dump"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
|
||||
@ -32,10 +34,16 @@ import (
|
||||
// NewStateDiffIndexer creates and returns an implementation of the StateDiffIndexer interface
|
||||
func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, nodeInfo node.Info, config interfaces.Config) (interfaces.StateDiffIndexer, error) {
|
||||
switch config.Type() {
|
||||
case shared.FILE:
|
||||
fc, ok := config.(file.Config)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("file config is not the correct type: got %T, expected %T", config, file.Config{})
|
||||
}
|
||||
return file.NewStateDiffIndexer(ctx, chainConfig, fc)
|
||||
case shared.POSTGRES:
|
||||
pgc, ok := config.(postgres.Config)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("ostgres config is not the correct type: got %T, expected %T", config, postgres.Config{})
|
||||
return nil, fmt.Errorf("postgres config is not the correct type: got %T, expected %T", config, postgres.Config{})
|
||||
}
|
||||
var err error
|
||||
var driver sql.Driver
|
||||
|
29
statediff/indexer/database/file/batch_tx.go
Normal file
29
statediff/indexer/database/file/batch_tx.go
Normal file
@ -0,0 +1,29 @@
|
||||
// VulcanizeDB
|
||||
// Copyright © 2021 Vulcanize
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package file
|
||||
|
||||
// BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration
|
||||
type BatchTx struct {
|
||||
BlockNumber uint64
|
||||
|
||||
submit func(blockTx *BatchTx, err error) error
|
||||
}
|
||||
|
||||
// Submit satisfies indexer.AtomicTx
|
||||
func (tx *BatchTx) Submit(err error) error {
|
||||
return tx.submit(tx, err)
|
||||
}
|
32
statediff/indexer/database/file/config.go
Normal file
32
statediff/indexer/database/file/config.go
Normal file
@ -0,0 +1,32 @@
|
||||
// VulcanizeDB
|
||||
// Copyright © 2021 Vulcanize
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package file
|
||||
|
||||
import (
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
|
||||
)
|
||||
|
||||
// Config holds params for writing sql statements out to a file
|
||||
type Config struct {
|
||||
NodeID int64 // this is the nodeID used as FK in public.blocks
|
||||
FilePath string
|
||||
}
|
||||
|
||||
// Type satisfies interfaces.Config
|
||||
func (c Config) Type() shared.DBType {
|
||||
return shared.FILE
|
||||
}
|
60
statediff/indexer/database/file/helpers.go
Normal file
60
statediff/indexer/database/file/helpers.go
Normal file
@ -0,0 +1,60 @@
|
||||
// VulcanizeDB
|
||||
// Copyright © 2021 Vulcanize
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package file
|
||||
|
||||
import "bytes"
|
||||
|
||||
// formatPostgresStringArray parses an array of strings into the proper Postgres string representation of that array
|
||||
func formatPostgresStringArray(a []string) string {
|
||||
if a == nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
if n := len(a); n > 0 {
|
||||
// There will be at least two curly brackets, 2*N bytes of quotes,
|
||||
// and N-1 bytes of delimiters.
|
||||
b := make([]byte, 1, 1+3*n)
|
||||
b[0] = '{'
|
||||
|
||||
b = appendArrayQuotedBytes(b, []byte(a[0]))
|
||||
for i := 1; i < n; i++ {
|
||||
b = append(b, ',')
|
||||
b = appendArrayQuotedBytes(b, []byte(a[i]))
|
||||
}
|
||||
|
||||
return string(append(b, '}'))
|
||||
}
|
||||
|
||||
return "{}"
|
||||
}
|
||||
|
||||
func appendArrayQuotedBytes(b, v []byte) []byte {
|
||||
b = append(b, '"')
|
||||
for {
|
||||
i := bytes.IndexAny(v, `"\`)
|
||||
if i < 0 {
|
||||
b = append(b, v...)
|
||||
break
|
||||
}
|
||||
if i > 0 {
|
||||
b = append(b, v[:i]...)
|
||||
}
|
||||
b = append(b, '\\', v[i])
|
||||
v = v[i+1:]
|
||||
}
|
||||
return append(b, '"')
|
||||
}
|
474
statediff/indexer/database/file/indexer.go
Normal file
474
statediff/indexer/database/file/indexer.go
Normal file
@ -0,0 +1,474 @@
|
||||
// VulcanizeDB
|
||||
// Copyright © 2021 Vulcanize
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package file
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
node "github.com/ipfs/go-ipld-format"
|
||||
"github.com/multiformats/go-multihash"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
"github.com/ethereum/go-ethereum/params"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
||||
ipld2 "github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
|
||||
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
|
||||
)
|
||||
|
||||
const defaultFilePath = "./statediff.sql"
|
||||
|
||||
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
|
||||
|
||||
var (
|
||||
indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry)
|
||||
)
|
||||
|
||||
// StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void
|
||||
type StateDiffIndexer struct {
|
||||
writer *SQLWriter
|
||||
chainConfig *params.ChainConfig
|
||||
nodeID int64
|
||||
wg *sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer
|
||||
func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, config Config) (*StateDiffIndexer, error) {
|
||||
filePath := config.FilePath
|
||||
if filePath == "" {
|
||||
filePath = defaultFilePath
|
||||
}
|
||||
if _, err := os.Stat(filePath); !errors.Is(err, os.ErrNotExist) {
|
||||
return nil, fmt.Errorf("cannot create file, file (%s) already exists", filePath)
|
||||
}
|
||||
file, err := os.Create(filePath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to create file (%s), err: %v", filePath, err)
|
||||
}
|
||||
w := NewSQLWriter(file)
|
||||
wg := new(sync.WaitGroup)
|
||||
w.Loop()
|
||||
return &StateDiffIndexer{
|
||||
writer: w,
|
||||
chainConfig: chainConfig,
|
||||
nodeID: config.NodeID,
|
||||
wg: wg,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ReportDBMetrics has nothing to report for dump
|
||||
func (sdi *StateDiffIndexer) ReportDBMetrics(time.Duration, <-chan bool) {}
|
||||
|
||||
// PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts)
|
||||
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
|
||||
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
|
||||
start, t := time.Now(), time.Now()
|
||||
blockHash := block.Hash()
|
||||
blockHashStr := blockHash.String()
|
||||
height := block.NumberU64()
|
||||
traceMsg := fmt.Sprintf("indexer stats for statediff at %d with hash %s:\r\n", height, blockHashStr)
|
||||
transactions := block.Transactions()
|
||||
// Derive any missing fields
|
||||
if err := receipts.DeriveFields(sdi.chainConfig, blockHash, height, transactions); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Generate the block iplds
|
||||
headerNode, uncleNodes, txNodes, txTrieNodes, rctNodes, rctTrieNodes, logTrieNodes, logLeafNodeCIDs, rctLeafNodeCIDs, err := ipld2.FromBlockAndReceipts(block, receipts)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err)
|
||||
}
|
||||
|
||||
if len(txNodes) != len(rctNodes) || len(rctNodes) != len(rctLeafNodeCIDs) {
|
||||
return nil, fmt.Errorf("expected number of transactions (%d), receipts (%d), and receipt trie leaf nodes (%d) to be equal", len(txNodes), len(rctNodes), len(rctLeafNodeCIDs))
|
||||
}
|
||||
if len(txTrieNodes) != len(rctTrieNodes) {
|
||||
return nil, fmt.Errorf("expected number of tx trie (%d) and rct trie (%d) nodes to be equal", len(txTrieNodes), len(rctTrieNodes))
|
||||
}
|
||||
|
||||
// Calculate reward
|
||||
var reward *big.Int
|
||||
// in PoA networks block reward is 0
|
||||
if sdi.chainConfig.Clique != nil {
|
||||
reward = big.NewInt(0)
|
||||
} else {
|
||||
reward = shared.CalcEthBlockReward(block.Header(), block.Uncles(), block.Transactions(), receipts)
|
||||
}
|
||||
t = time.Now()
|
||||
|
||||
blockTx := &BatchTx{
|
||||
BlockNumber: height,
|
||||
submit: func(self *BatchTx, err error) error {
|
||||
tDiff := time.Since(t)
|
||||
indexerMetrics.tStateStoreCodeProcessing.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
if err := sdi.writer.flush(); err != nil {
|
||||
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
|
||||
log.Debug(traceMsg)
|
||||
return err
|
||||
}
|
||||
tDiff = time.Since(t)
|
||||
indexerMetrics.tPostgresCommit.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
|
||||
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
|
||||
log.Debug(traceMsg)
|
||||
return err
|
||||
},
|
||||
}
|
||||
tDiff := time.Since(t)
|
||||
indexerMetrics.tFreePostgres.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
|
||||
// write header, collect headerID
|
||||
headerID := sdi.processHeader(block.Header(), headerNode, reward, totalDifficulty)
|
||||
tDiff = time.Since(t)
|
||||
indexerMetrics.tHeaderProcessing.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
|
||||
// write uncles
|
||||
sdi.processUncles(headerID, height, uncleNodes)
|
||||
tDiff = time.Since(t)
|
||||
indexerMetrics.tUncleProcessing.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
|
||||
// write receipts and txs
|
||||
err = sdi.processReceiptsAndTxs(processArgs{
|
||||
headerID: headerID,
|
||||
blockNumber: block.Number(),
|
||||
receipts: receipts,
|
||||
txs: transactions,
|
||||
rctNodes: rctNodes,
|
||||
rctTrieNodes: rctTrieNodes,
|
||||
txNodes: txNodes,
|
||||
txTrieNodes: txTrieNodes,
|
||||
logTrieNodes: logTrieNodes,
|
||||
logLeafNodeCIDs: logLeafNodeCIDs,
|
||||
rctLeafNodeCIDs: rctLeafNodeCIDs,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tDiff = time.Since(t)
|
||||
indexerMetrics.tTxAndRecProcessing.Update(tDiff)
|
||||
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String())
|
||||
t = time.Now()
|
||||
|
||||
return blockTx, err
|
||||
}
|
||||
|
||||
// processHeader write a header IPLD insert SQL stmt to a file
|
||||
// it returns the headerID
|
||||
func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node.Node, reward, td *big.Int) string {
|
||||
sdi.writer.upsertIPLDNode(headerNode)
|
||||
|
||||
var baseFee *int64
|
||||
if header.BaseFee != nil {
|
||||
baseFee = new(int64)
|
||||
*baseFee = header.BaseFee.Int64()
|
||||
}
|
||||
headerID := header.Hash().String()
|
||||
sdi.writer.upsertHeaderCID(models.HeaderModel{
|
||||
NodeID: sdi.nodeID,
|
||||
CID: headerNode.Cid().String(),
|
||||
MhKey: shared.MultihashKeyFromCID(headerNode.Cid()),
|
||||
ParentHash: header.ParentHash.String(),
|
||||
BlockNumber: header.Number.String(),
|
||||
BlockHash: headerID,
|
||||
TotalDifficulty: td.String(),
|
||||
Reward: reward.String(),
|
||||
Bloom: header.Bloom.Bytes(),
|
||||
StateRoot: header.Root.String(),
|
||||
RctRoot: header.ReceiptHash.String(),
|
||||
TxRoot: header.TxHash.String(),
|
||||
UncleRoot: header.UncleHash.String(),
|
||||
Timestamp: header.Time,
|
||||
BaseFee: baseFee,
|
||||
})
|
||||
return headerID
|
||||
}
|
||||
|
||||
// processUncles writes uncle IPLD insert SQL stmts to a file
|
||||
func (sdi *StateDiffIndexer) processUncles(headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) {
|
||||
// publish and index uncles
|
||||
for _, uncleNode := range uncleNodes {
|
||||
sdi.writer.upsertIPLDNode(uncleNode)
|
||||
var uncleReward *big.Int
|
||||
// in PoA networks uncle reward is 0
|
||||
if sdi.chainConfig.Clique != nil {
|
||||
uncleReward = big.NewInt(0)
|
||||
} else {
|
||||
uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64())
|
||||
}
|
||||
sdi.writer.upsertUncleCID(models.UncleModel{
|
||||
HeaderID: headerID,
|
||||
CID: uncleNode.Cid().String(),
|
||||
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
|
||||
ParentHash: uncleNode.ParentHash.String(),
|
||||
BlockHash: uncleNode.Hash().String(),
|
||||
Reward: uncleReward.String(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// processArgs bundles arguments to processReceiptsAndTxs
|
||||
type processArgs struct {
|
||||
headerID string
|
||||
blockNumber *big.Int
|
||||
receipts types.Receipts
|
||||
txs types.Transactions
|
||||
rctNodes []*ipld2.EthReceipt
|
||||
rctTrieNodes []*ipld2.EthRctTrie
|
||||
txNodes []*ipld2.EthTx
|
||||
txTrieNodes []*ipld2.EthTxTrie
|
||||
logTrieNodes [][]*ipld2.EthLogTrie
|
||||
logLeafNodeCIDs [][]cid.Cid
|
||||
rctLeafNodeCIDs []cid.Cid
|
||||
}
|
||||
|
||||
// processReceiptsAndTxs writes receipt and tx IPLD insert SQL stmts to a file
|
||||
func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
|
||||
// Process receipts and txs
|
||||
signer := types.MakeSigner(sdi.chainConfig, args.blockNumber)
|
||||
for i, receipt := range args.receipts {
|
||||
for _, logTrieNode := range args.logTrieNodes[i] {
|
||||
sdi.writer.upsertIPLDNode(logTrieNode)
|
||||
}
|
||||
txNode := args.txNodes[i]
|
||||
sdi.writer.upsertIPLDNode(txNode)
|
||||
|
||||
// index tx
|
||||
trx := args.txs[i]
|
||||
txID := trx.Hash().String()
|
||||
// derive sender for the tx that corresponds with this receipt
|
||||
from, err := types.Sender(signer, trx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error deriving tx sender: %v", err)
|
||||
}
|
||||
txModel := models.TxModel{
|
||||
HeaderID: args.headerID,
|
||||
Dst: shared.HandleZeroAddrPointer(trx.To()),
|
||||
Src: shared.HandleZeroAddr(from),
|
||||
TxHash: txID,
|
||||
Index: int64(i),
|
||||
Data: trx.Data(),
|
||||
CID: txNode.Cid().String(),
|
||||
MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
|
||||
Type: trx.Type(),
|
||||
}
|
||||
sdi.writer.upsertTransactionCID(txModel)
|
||||
|
||||
// index access list if this is one
|
||||
for j, accessListElement := range trx.AccessList() {
|
||||
storageKeys := make([]string, len(accessListElement.StorageKeys))
|
||||
for k, storageKey := range accessListElement.StorageKeys {
|
||||
storageKeys[k] = storageKey.Hex()
|
||||
}
|
||||
accessListElementModel := models.AccessListElementModel{
|
||||
TxID: txID,
|
||||
Index: int64(j),
|
||||
Address: accessListElement.Address.Hex(),
|
||||
StorageKeys: storageKeys,
|
||||
}
|
||||
sdi.writer.upsertAccessListElement(accessListElementModel)
|
||||
}
|
||||
|
||||
// this is the contract address if this receipt is for a contract creation tx
|
||||
contract := shared.HandleZeroAddr(receipt.ContractAddress)
|
||||
var contractHash string
|
||||
if contract != "" {
|
||||
contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String()
|
||||
}
|
||||
|
||||
// index receipt
|
||||
if !args.rctLeafNodeCIDs[i].Defined() {
|
||||
return fmt.Errorf("invalid receipt leaf node cid")
|
||||
}
|
||||
|
||||
rctModel := &models.ReceiptModel{
|
||||
TxID: txID,
|
||||
Contract: contract,
|
||||
ContractHash: contractHash,
|
||||
LeafCID: args.rctLeafNodeCIDs[i].String(),
|
||||
LeafMhKey: shared.MultihashKeyFromCID(args.rctLeafNodeCIDs[i]),
|
||||
LogRoot: args.rctNodes[i].LogRoot.String(),
|
||||
}
|
||||
if len(receipt.PostState) == 0 {
|
||||
rctModel.PostStatus = receipt.Status
|
||||
} else {
|
||||
rctModel.PostState = common.Bytes2Hex(receipt.PostState)
|
||||
}
|
||||
sdi.writer.upsertReceiptCID(rctModel)
|
||||
|
||||
// index logs
|
||||
logDataSet := make([]*models.LogsModel, len(receipt.Logs))
|
||||
for idx, l := range receipt.Logs {
|
||||
topicSet := make([]string, 4)
|
||||
for ti, topic := range l.Topics {
|
||||
topicSet[ti] = topic.Hex()
|
||||
}
|
||||
|
||||
if !args.logLeafNodeCIDs[i][idx].Defined() {
|
||||
return fmt.Errorf("invalid log cid")
|
||||
}
|
||||
|
||||
logDataSet[idx] = &models.LogsModel{
|
||||
ReceiptID: txID,
|
||||
Address: l.Address.String(),
|
||||
Index: int64(l.Index),
|
||||
Data: l.Data,
|
||||
LeafCID: args.logLeafNodeCIDs[i][idx].String(),
|
||||
LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]),
|
||||
Topic0: topicSet[0],
|
||||
Topic1: topicSet[1],
|
||||
Topic2: topicSet[2],
|
||||
Topic3: topicSet[3],
|
||||
}
|
||||
}
|
||||
sdi.writer.upsertLogCID(logDataSet)
|
||||
}
|
||||
|
||||
// publish trie nodes, these aren't indexed directly
|
||||
for i, n := range args.txTrieNodes {
|
||||
sdi.writer.upsertIPLDNode(n)
|
||||
sdi.writer.upsertIPLDNode(args.rctTrieNodes[i])
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// PushStateNode writes a state diff node object (including any child storage nodes) IPLD insert SQL stmt to a file
|
||||
func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateNode, headerID string) error {
|
||||
// publish the state node
|
||||
if stateNode.NodeType == sdtypes.Removed {
|
||||
// short circuit if it is a Removed node
|
||||
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present
|
||||
stateModel := models.StateNodeModel{
|
||||
HeaderID: headerID,
|
||||
Path: stateNode.Path,
|
||||
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
|
||||
CID: shared.RemovedNodeStateCID,
|
||||
MhKey: shared.RemovedNodeMhKey,
|
||||
NodeType: stateNode.NodeType.Int(),
|
||||
}
|
||||
sdi.writer.upsertStateCID(stateModel)
|
||||
return nil
|
||||
}
|
||||
stateCIDStr, stateMhKey, err := sdi.writer.upsertIPLDRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error generating and cacheing state node IPLD: %v", err)
|
||||
}
|
||||
stateModel := models.StateNodeModel{
|
||||
HeaderID: headerID,
|
||||
Path: stateNode.Path,
|
||||
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
|
||||
CID: stateCIDStr,
|
||||
MhKey: stateMhKey,
|
||||
NodeType: stateNode.NodeType.Int(),
|
||||
}
|
||||
// index the state node
|
||||
sdi.writer.upsertStateCID(stateModel)
|
||||
// if we have a leaf, decode and index the account data
|
||||
if stateNode.NodeType == sdtypes.Leaf {
|
||||
var i []interface{}
|
||||
if err := rlp.DecodeBytes(stateNode.NodeValue, &i); err != nil {
|
||||
return fmt.Errorf("error decoding state leaf node rlp: %s", err.Error())
|
||||
}
|
||||
if len(i) != 2 {
|
||||
return fmt.Errorf("eth IPLDPublisher expected state leaf node rlp to decode into two elements")
|
||||
}
|
||||
var account types.StateAccount
|
||||
if err := rlp.DecodeBytes(i[1].([]byte), &account); err != nil {
|
||||
return fmt.Errorf("error decoding state account rlp: %s", err.Error())
|
||||
}
|
||||
accountModel := models.StateAccountModel{
|
||||
HeaderID: headerID,
|
||||
StatePath: stateNode.Path,
|
||||
Balance: account.Balance.String(),
|
||||
Nonce: account.Nonce,
|
||||
CodeHash: account.CodeHash,
|
||||
StorageRoot: account.Root.String(),
|
||||
}
|
||||
sdi.writer.upsertStateAccount(accountModel)
|
||||
}
|
||||
// if there are any storage nodes associated with this node, publish and index them
|
||||
for _, storageNode := range stateNode.StorageNodes {
|
||||
if storageNode.NodeType == sdtypes.Removed {
|
||||
// short circuit if it is a Removed node
|
||||
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present
|
||||
storageModel := models.StorageNodeModel{
|
||||
HeaderID: headerID,
|
||||
StatePath: stateNode.Path,
|
||||
Path: storageNode.Path,
|
||||
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
|
||||
CID: shared.RemovedNodeStorageCID,
|
||||
MhKey: shared.RemovedNodeMhKey,
|
||||
NodeType: storageNode.NodeType.Int(),
|
||||
}
|
||||
sdi.writer.upsertStorageCID(storageModel)
|
||||
continue
|
||||
}
|
||||
storageCIDStr, storageMhKey, err := sdi.writer.upsertIPLDRaw(ipld2.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err)
|
||||
}
|
||||
storageModel := models.StorageNodeModel{
|
||||
HeaderID: headerID,
|
||||
StatePath: stateNode.Path,
|
||||
Path: storageNode.Path,
|
||||
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
|
||||
CID: storageCIDStr,
|
||||
MhKey: storageMhKey,
|
||||
NodeType: storageNode.NodeType.Int(),
|
||||
}
|
||||
sdi.writer.upsertStorageCID(storageModel)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// PushCodeAndCodeHash writes code and codehash pairs insert SQL stmts to a file
|
||||
func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error {
|
||||
// codec doesn't matter since db key is multihash-based
|
||||
mhKey, err := shared.MultihashKeyFromKeccak256(codeAndCodeHash.Hash)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error deriving multihash key from codehash: %v", err)
|
||||
}
|
||||
sdi.writer.upsertIPLDDirect(mhKey, codeAndCodeHash.Code)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close satisfies io.Closer
|
||||
func (sdi *StateDiffIndexer) Close() error {
|
||||
return sdi.writer.Close()
|
||||
}
|
94
statediff/indexer/database/file/metrics.go
Normal file
94
statediff/indexer/database/file/metrics.go
Normal file
@ -0,0 +1,94 @@
|
||||
// VulcanizeDB
|
||||
// Copyright © 2021 Vulcanize
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package file
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
namespace = "statediff"
|
||||
)
|
||||
|
||||
// Build a fully qualified metric name
|
||||
func metricName(subsystem, name string) string {
|
||||
if name == "" {
|
||||
return ""
|
||||
}
|
||||
parts := []string{namespace, name}
|
||||
if subsystem != "" {
|
||||
parts = []string{namespace, subsystem, name}
|
||||
}
|
||||
// Prometheus uses _ but geth metrics uses / and replaces
|
||||
return strings.Join(parts, "/")
|
||||
}
|
||||
|
||||
type indexerMetricsHandles struct {
|
||||
// The total number of processed blocks
|
||||
blocks metrics.Counter
|
||||
// The total number of processed transactions
|
||||
transactions metrics.Counter
|
||||
// The total number of processed receipts
|
||||
receipts metrics.Counter
|
||||
// The total number of processed logs
|
||||
logs metrics.Counter
|
||||
// The total number of access list entries processed
|
||||
accessListEntries metrics.Counter
|
||||
// Time spent waiting for free postgres tx
|
||||
tFreePostgres metrics.Timer
|
||||
// Postgres transaction commit duration
|
||||
tPostgresCommit metrics.Timer
|
||||
// Header processing time
|
||||
tHeaderProcessing metrics.Timer
|
||||
// Uncle processing time
|
||||
tUncleProcessing metrics.Timer
|
||||
// Tx and receipt processing time
|
||||
tTxAndRecProcessing metrics.Timer
|
||||
// State, storage, and code combined processing time
|
||||
tStateStoreCodeProcessing metrics.Timer
|
||||
}
|
||||
|
||||
func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles {
|
||||
ctx := indexerMetricsHandles{
|
||||
blocks: metrics.NewCounter(),
|
||||
transactions: metrics.NewCounter(),
|
||||
receipts: metrics.NewCounter(),
|
||||
logs: metrics.NewCounter(),
|
||||
accessListEntries: metrics.NewCounter(),
|
||||
tFreePostgres: metrics.NewTimer(),
|
||||
tPostgresCommit: metrics.NewTimer(),
|
||||
tHeaderProcessing: metrics.NewTimer(),
|
||||
tUncleProcessing: metrics.NewTimer(),
|
||||
tTxAndRecProcessing: metrics.NewTimer(),
|
||||
tStateStoreCodeProcessing: metrics.NewTimer(),
|
||||
}
|
||||
subsys := "indexer"
|
||||
reg.Register(metricName(subsys, "blocks"), ctx.blocks)
|
||||
reg.Register(metricName(subsys, "transactions"), ctx.transactions)
|
||||
reg.Register(metricName(subsys, "receipts"), ctx.receipts)
|
||||
reg.Register(metricName(subsys, "logs"), ctx.logs)
|
||||
reg.Register(metricName(subsys, "access_list_entries"), ctx.accessListEntries)
|
||||
reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres)
|
||||
reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit)
|
||||
reg.Register(metricName(subsys, "t_header_processing"), ctx.tHeaderProcessing)
|
||||
reg.Register(metricName(subsys, "t_uncle_processing"), ctx.tUncleProcessing)
|
||||
reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.tTxAndRecProcessing)
|
||||
reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.tStateStoreCodeProcessing)
|
||||
return ctx
|
||||
}
|
240
statediff/indexer/database/file/writer.go
Normal file
240
statediff/indexer/database/file/writer.go
Normal file
@ -0,0 +1,240 @@
|
||||
// VulcanizeDB
|
||||
// Copyright © 2019 Vulcanize
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package file
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
dshelp "github.com/ipfs/go-ipfs-ds-help"
|
||||
node "github.com/ipfs/go-ipld-format"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||
)
|
||||
|
||||
var (
|
||||
nullHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000")
|
||||
collatedStmtSize = 65336 // min(linuxPipeSize, macOSPipeSize)
|
||||
)
|
||||
|
||||
// SQLWriter writes sql statements to a file
|
||||
type SQLWriter struct {
|
||||
file *os.File
|
||||
stmts chan []byte
|
||||
collatedStmt []byte
|
||||
collationIndex int
|
||||
|
||||
quitChan chan struct{}
|
||||
doneChan chan struct{}
|
||||
}
|
||||
|
||||
// NewSQLWriter creates a new pointer to a Writer
|
||||
func NewSQLWriter(file *os.File) *SQLWriter {
|
||||
return &SQLWriter{
|
||||
file: file,
|
||||
stmts: make(chan []byte),
|
||||
collatedStmt: make([]byte, collatedStmtSize),
|
||||
quitChan: make(chan struct{}),
|
||||
doneChan: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Loop enables concurrent writes to the underlying os.File
|
||||
// since os.File does not buffer, it utilizes an internal buffer that is the size of a unix pipe
|
||||
// by using copy() and tracking the index/size of the buffer, we require only the initial memory allocation
|
||||
func (sqw *SQLWriter) Loop() {
|
||||
sqw.collationIndex = 0
|
||||
go func() {
|
||||
defer func() {
|
||||
close(sqw.doneChan)
|
||||
}()
|
||||
var l int
|
||||
for {
|
||||
select {
|
||||
case stmt := <-sqw.stmts:
|
||||
l = len(stmt)
|
||||
if l+sqw.collationIndex+1 > collatedStmtSize {
|
||||
if err := sqw.flush(); err != nil {
|
||||
log.Error("error writing cached sql stmts to file", "err", err)
|
||||
}
|
||||
}
|
||||
copy(sqw.collatedStmt[sqw.collationIndex:sqw.collationIndex+l-1], stmt)
|
||||
sqw.collationIndex += l
|
||||
case <-sqw.quitChan:
|
||||
if err := sqw.flush(); err != nil {
|
||||
log.Error("error writing cached sql stmts to file", "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Close satisfies io.Closer
|
||||
func (sqw *SQLWriter) Close() error {
|
||||
close(sqw.quitChan)
|
||||
<-sqw.doneChan
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sqw *SQLWriter) flush() error {
|
||||
if _, err := sqw.file.Write(sqw.collatedStmt[0 : sqw.collationIndex-1]); err != nil {
|
||||
return err
|
||||
}
|
||||
sqw.collationIndex = 0
|
||||
return nil
|
||||
}
|
||||
|
||||
const (
|
||||
ipldInsert = `INSERT INTO public.blocks (key, data) VALUES (%s, %x) ON CONFLICT (key) DO NOTHING;\n`
|
||||
|
||||
headerInsert = `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee)
|
||||
VALUES (%s, %s, %s, %s, %s, %d, %s, %s, %s, %s, %s, %s, %d, %s, %d, %d)
|
||||
ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = (%s, %s, %s, %d, %s, %s, %s, %s, %s, %s, %d, %s, eth.header_cids.times_validated + 1, %d);\n`
|
||||
|
||||
headerInsertWithoutBaseFee = `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee)
|
||||
VALUES (%s, %s, %s, %s, %s, %d, %s, %s, %s, %s, %s, %s, %d, %s, %d, NULL)
|
||||
ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = (%s, %s, %s, %d, %s, %s, %s, %s, %s, %s, %d, %s, eth.header_cids.times_validated + 1, NULL);\n`
|
||||
|
||||
uncleInsert = `INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES (%s, %s, %s, %s, %s, %s)
|
||||
ON CONFLICT (block_hash) DO NOTHING;\n`
|
||||
|
||||
txInsert = `INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) VALUES (%s, %s, %s, %s, %s, %d, %s, %s, %d)
|
||||
ON CONFLICT (tx_hash) DO NOTHING;\n`
|
||||
|
||||
alInsert = `INSERT INTO eth.access_list_element (tx_id, index, address, storage_keys) VALUES (%s, %d, %s, %s)
|
||||
ON CONFLICT (tx_id, index) DO NOTHING;\n`
|
||||
|
||||
rctInsert = `INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES (%s, %s, %s, %s, %s, %s, %d, %s)
|
||||
ON CONFLICT (tx_id) DO NOTHING;\n`
|
||||
|
||||
logInsert = `INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES (%s, %s, %s, %s, %d, %s, %s, %s, %s, %s)
|
||||
ON CONFLICT (rct_id, index) DO NOTHING;\n`
|
||||
|
||||
stateInsert = `INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES (%s, %s, %s, %s, %d, %t, %s)
|
||||
ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = (%s, %s, %d, %t, %s);\n`
|
||||
|
||||
accountInsert = `INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) VALUES (%s, %s, %s, %d, %s, %s)
|
||||
ON CONFLICT (header_id, state_path) DO NOTHING;\n`
|
||||
|
||||
storageInsert = `INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES (%s, %s, %s, %s, %s, %d, %t, %s)
|
||||
ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = (%s, %s, %d, %t, %s);\n`
|
||||
)
|
||||
|
||||
func (sqw *SQLWriter) upsertIPLD(ipld models.IPLDModel) {
|
||||
sqw.stmts <- []byte(fmt.Sprintf(ipldInsert, ipld.Key, ipld.Data))
|
||||
}
|
||||
|
||||
func (sqw *SQLWriter) upsertIPLDDirect(key string, value []byte) {
|
||||
sqw.upsertIPLD(models.IPLDModel{
|
||||
Key: key,
|
||||
Data: value,
|
||||
})
|
||||
}
|
||||
|
||||
func (sqw *SQLWriter) upsertIPLDNode(i node.Node) {
|
||||
sqw.upsertIPLD(models.IPLDModel{
|
||||
Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(),
|
||||
Data: i.RawData(),
|
||||
})
|
||||
}
|
||||
|
||||
func (sqw *SQLWriter) upsertIPLDRaw(codec, mh uint64, raw []byte) (string, string, error) {
|
||||
c, err := ipld.RawdataToCid(codec, raw, mh)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
prefixedKey := blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(c.Hash()).String()
|
||||
sqw.upsertIPLD(models.IPLDModel{
|
||||
Key: prefixedKey,
|
||||
Data: raw,
|
||||
})
|
||||
return c.String(), prefixedKey, err
|
||||
}
|
||||
|
||||
func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) {
|
||||
var stmt string
|
||||
if header.BaseFee == nil {
|
||||
stmt = fmt.Sprintf(headerInsertWithoutBaseFee, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
|
||||
header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot,
|
||||
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1,
|
||||
header.ParentHash, header.CID, header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot,
|
||||
header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey)
|
||||
} else {
|
||||
stmt = fmt.Sprintf(headerInsert, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
|
||||
header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot,
|
||||
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.BaseFee,
|
||||
header.ParentHash, header.CID, header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot,
|
||||
header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, header.BaseFee)
|
||||
}
|
||||
sqw.stmts <- []byte(stmt)
|
||||
indexerMetrics.blocks.Inc(1)
|
||||
}
|
||||
|
||||
func (sqw *SQLWriter) upsertUncleCID(uncle models.UncleModel) {
|
||||
sqw.stmts <- []byte(fmt.Sprintf(uncleInsert, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey))
|
||||
}
|
||||
|
||||
func (sqw *SQLWriter) upsertTransactionCID(transaction models.TxModel) {
|
||||
sqw.stmts <- []byte(fmt.Sprintf(txInsert, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type))
|
||||
indexerMetrics.transactions.Inc(1)
|
||||
}
|
||||
|
||||
func (sqw *SQLWriter) upsertAccessListElement(accessListElement models.AccessListElementModel) {
|
||||
sqw.stmts <- []byte(fmt.Sprintf(alInsert, accessListElement.TxID, accessListElement.Index, accessListElement.Address, formatPostgresStringArray(accessListElement.StorageKeys)))
|
||||
indexerMetrics.accessListEntries.Inc(1)
|
||||
}
|
||||
|
||||
func (sqw *SQLWriter) upsertReceiptCID(rct *models.ReceiptModel) {
|
||||
sqw.stmts <- []byte(fmt.Sprintf(rctInsert, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot))
|
||||
indexerMetrics.receipts.Inc(1)
|
||||
}
|
||||
|
||||
func (sqw *SQLWriter) upsertLogCID(logs []*models.LogsModel) {
|
||||
for _, l := range logs {
|
||||
sqw.stmts <- []byte(fmt.Sprintf(logInsert, l.LeafCID, l.LeafMhKey, l.ReceiptID, l.Address, l.Index, l.Topic0, l.Topic1, l.Topic2, l.Topic3, l.Data))
|
||||
indexerMetrics.logs.Inc(1)
|
||||
}
|
||||
}
|
||||
|
||||
func (sqw *SQLWriter) upsertStateCID(stateNode models.StateNodeModel) {
|
||||
var stateKey string
|
||||
if stateNode.StateKey != nullHash.String() {
|
||||
stateKey = stateNode.StateKey
|
||||
}
|
||||
sqw.stmts <- []byte(fmt.Sprintf(stateInsert, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType,
|
||||
true, stateNode.MhKey, stateKey, stateNode.CID, stateNode.NodeType, true, stateNode.MhKey))
|
||||
}
|
||||
|
||||
func (sqw *SQLWriter) upsertStateAccount(stateAccount models.StateAccountModel) {
|
||||
sqw.stmts <- []byte(fmt.Sprintf(accountInsert, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance,
|
||||
stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot))
|
||||
}
|
||||
|
||||
func (sqw *SQLWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
|
||||
var storageKey string
|
||||
if storageCID.StorageKey != nullHash.String() {
|
||||
storageKey = storageCID.StorageKey
|
||||
}
|
||||
sqw.stmts <- []byte(fmt.Sprintf(storageInsert, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID,
|
||||
storageCID.Path, storageCID.NodeType, true, storageCID.MhKey, storageKey, storageCID.CID, storageCID.NodeType,
|
||||
true, storageCID.MhKey))
|
||||
}
|
@ -539,7 +539,7 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAnd
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close satisfied io.Closer
|
||||
// Close satisfies io.Closer
|
||||
func (sdi *StateDiffIndexer) Close() error {
|
||||
return sdi.dbWriter.db.Close()
|
||||
}
|
||||
|
@ -27,6 +27,7 @@ type DBType string
|
||||
const (
|
||||
POSTGRES DBType = "Postgres"
|
||||
DUMP DBType = "Dump"
|
||||
FILE DBType = "File"
|
||||
UNKNOWN DBType = "Unknown"
|
||||
)
|
||||
|
||||
@ -37,6 +38,8 @@ func ResolveDBType(str string) (DBType, error) {
|
||||
return POSTGRES, nil
|
||||
case "dump", "d":
|
||||
return DUMP, nil
|
||||
case "file", "f", "fs":
|
||||
return FILE, nil
|
||||
default:
|
||||
return UNKNOWN, fmt.Errorf("unrecognized db type string: %s", str)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user