batch inserts to public.blocks

This commit is contained in:
i-norden 2021-11-08 18:30:35 -06:00
parent f9ca3d09c2
commit dcaaa40067
9 changed files with 257 additions and 129 deletions

View File

@ -0,0 +1,91 @@
// VulcanizeDB
// Copyright © 2021 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package indexer
import (
blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help"
node "github.com/ipfs/go-ipld-format"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
)
const ipldBatchInsertPgStr string = `INSERT INTO public.blocks (key, data) VALUES (unnest($1::TEXT[]), unnest($2::BYTEA[])) ON CONFLICT (key) DO NOTHING`
// BlockTx wraps a Postgres tx with the state necessary for building the Postgres tx concurrently during trie difference iteration
type BlockTx struct {
dbtx *sqlx.Tx
BlockNumber uint64
headerID int64
Close func(blockTx *BlockTx, err error) error
quit chan struct{}
iplds chan models.IPLDModel
ipldCache models.IPLDBatch
}
func (tx *BlockTx) flush() error {
_, err := tx.dbtx.Exec(ipldBatchInsertPgStr, pq.Array(tx.ipldCache.Keys), pq.Array(tx.ipldCache.Values))
if err != nil {
return err
}
tx.ipldCache = models.IPLDBatch{}
return nil
}
// run in background goroutine to synchronize concurrent appends to the ipldCache
func (tx *BlockTx) cache() {
for {
select {
case i := <-tx.iplds:
tx.ipldCache.Keys = append(tx.ipldCache.Keys, i.Key)
tx.ipldCache.Values = append(tx.ipldCache.Values, i.Data)
case <-tx.quit:
return
}
}
}
func (tx *BlockTx) cacheDirect(key string, value []byte) {
tx.iplds <- models.IPLDModel{
Key: key,
Data: value,
}
}
func (tx *BlockTx) cacheIPLD(i node.Node) {
tx.iplds <- models.IPLDModel{
Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(),
Data: i.RawData(),
}
}
func (tx *BlockTx) cacheRaw(codec, mh uint64, raw []byte) (string, string, error) {
c, err := ipld.RawdataToCid(codec, raw, mh)
if err != nil {
return "", "", err
}
prefixedKey := blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(c.Hash()).String()
tx.iplds <- models.IPLDModel{
Key: prefixedKey,
Data: raw,
}
return c.String(), prefixedKey, err
}

View File

@ -25,7 +25,6 @@ import (
"github.com/ipfs/go-cid"
node "github.com/ipfs/go-ipld-format"
"github.com/jmoiron/sqlx"
"github.com/multiformats/go-multihash"
"github.com/ethereum/go-ethereum/common"
@ -65,7 +64,6 @@ type Indexer interface {
type StateDiffIndexer struct {
chainConfig *params.ChainConfig
dbWriter *PostgresCIDWriter
init bool
}
// NewStateDiffIndexer creates a pointer to a new PayloadConverter which satisfies the PayloadConverter interface
@ -80,13 +78,6 @@ func NewStateDiffIndexer(chainConfig *params.ChainConfig, db *postgres.DB) (*Sta
}, nil
}
type BlockTx struct {
dbtx *sqlx.Tx
BlockNumber uint64
headerID int64
Close func(err error) error
}
// ReportDBMetrics is a reporting function to run as goroutine
func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bool) {
if !metrics.Enabled {
@ -127,7 +118,10 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
}
if len(txNodes) != len(rctNodes) || len(rctNodes) != len(rctLeafNodeCIDs) {
return nil, fmt.Errorf("expected number of transactions (%d), receipts (%d), and receipt trie leaf nodes (%d)to be equal", len(txNodes), len(rctNodes), len(rctLeafNodeCIDs))
return nil, fmt.Errorf("expected number of transactions (%d), receipts (%d), and receipt trie leaf nodes (%d) to be equal", len(txNodes), len(rctNodes), len(rctLeafNodeCIDs))
}
if len(txTrieNodes) != len(rctTrieNodes) {
return nil, fmt.Errorf("expected number of tx trie (%d) and rct trie (%d) nodes to be equal", len(txTrieNodes), len(rctTrieNodes))
}
// Calculate reward
@ -139,6 +133,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
reward = CalcEthBlockReward(block.Header(), block.Uncles(), block.Transactions(), receipts)
}
t = time.Now()
// Begin new db tx for everything
tx, err := sdi.dbWriter.db.Beginx()
if err != nil {
@ -153,9 +148,14 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
}
}()
blockTx := &BlockTx{
dbtx: tx,
iplds: make(chan models.IPLDModel),
quit: make(chan struct{}),
ipldCache: models.IPLDBatch{},
dbtx: tx,
// handle transaction commit or rollback for any return case
Close: func(err error) error {
Close: func(self *BlockTx, err error) error {
close(self.quit)
close(self.iplds)
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
@ -166,6 +166,12 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
indexerMetrics.tStateStoreCodeProcessing.Update(tDiff)
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
t = time.Now()
if err := self.flush(); err != nil {
shared.Rollback(tx)
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
log.Debug(traceMsg)
return err
}
err = tx.Commit()
tDiff = time.Since(t)
indexerMetrics.tPostgresCommit.Update(tDiff)
@ -176,6 +182,8 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return err
},
}
go blockTx.cache()
tDiff := time.Since(t)
indexerMetrics.tFreePostgres.Update(tDiff)
@ -184,7 +192,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
// Publish and index header, collect headerID
var headerID int64
headerID, err = sdi.processHeader(tx, block.Header(), headerNode, reward, totalDifficulty)
headerID, err = sdi.processHeader(blockTx, block.Header(), headerNode, reward, totalDifficulty)
if err != nil {
return nil, err
}
@ -193,7 +201,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
t = time.Now()
// Publish and index uncles
err = sdi.processUncles(tx, headerID, height, uncleNodes)
err = sdi.processUncles(blockTx, headerID, height, uncleNodes)
if err != nil {
return nil, err
}
@ -202,7 +210,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
t = time.Now()
// Publish and index receipts and txs
err = sdi.processReceiptsAndTxs(tx, processArgs{
err = sdi.processReceiptsAndTxs(blockTx, processArgs{
headerID: headerID,
blockNumber: block.Number(),
receipts: receipts,
@ -230,11 +238,8 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
// processHeader publishes and indexes a header IPLD in Postgres
// it returns the headerID
func (sdi *StateDiffIndexer) processHeader(tx *sqlx.Tx, header *types.Header, headerNode node.Node, reward, td *big.Int) (int64, error) {
// publish header
if err := shared.PublishIPLD(tx, headerNode); err != nil {
return 0, fmt.Errorf("error publishing header IPLD: %v", err)
}
func (sdi *StateDiffIndexer) processHeader(tx *BlockTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (int64, error) {
tx.cacheIPLD(headerNode)
var baseFee *int64
if header.BaseFee != nil {
@ -243,7 +248,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *sqlx.Tx, header *types.Header, he
}
// index header
return sdi.dbWriter.upsertHeaderCID(tx, models.HeaderModel{
return sdi.dbWriter.upsertHeaderCID(tx.dbtx, models.HeaderModel{
CID: headerNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(headerNode.Cid()),
ParentHash: header.ParentHash.String(),
@ -262,12 +267,10 @@ func (sdi *StateDiffIndexer) processHeader(tx *sqlx.Tx, header *types.Header, he
}
// processUncles publishes and indexes uncle IPLDs in Postgres
func (sdi *StateDiffIndexer) processUncles(tx *sqlx.Tx, headerID int64, blockNumber uint64, uncleNodes []*ipld.EthHeader) error {
func (sdi *StateDiffIndexer) processUncles(tx *BlockTx, headerID int64, blockNumber uint64, uncleNodes []*ipld.EthHeader) error {
// publish and index uncles
for _, uncleNode := range uncleNodes {
if err := shared.PublishIPLD(tx, uncleNode); err != nil {
return fmt.Errorf("error publishing uncle IPLD: %v", err)
}
tx.cacheIPLD(uncleNode)
var uncleReward *big.Int
// in PoA networks uncle reward is 0
if sdi.chainConfig.Clique != nil {
@ -282,7 +285,7 @@ func (sdi *StateDiffIndexer) processUncles(tx *sqlx.Tx, headerID int64, blockNum
BlockHash: uncleNode.Hash().String(),
Reward: uncleReward.String(),
}
if err := sdi.dbWriter.upsertUncleCID(tx, uncle, headerID); err != nil {
if err := sdi.dbWriter.upsertUncleCID(tx.dbtx, uncle, headerID); err != nil {
return err
}
}
@ -305,28 +308,15 @@ type processArgs struct {
}
// processReceiptsAndTxs publishes and indexes receipt and transaction IPLDs in Postgres
func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *sqlx.Tx, args processArgs) error {
func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BlockTx, args processArgs) error {
// Process receipts and txs
signer := types.MakeSigner(sdi.chainConfig, args.blockNumber)
for i, receipt := range args.receipts {
// tx that corresponds with this receipt
trx := args.txs[i]
from, err := types.Sender(signer, trx)
if err != nil {
return fmt.Errorf("error deriving tx sender: %v", err)
for _, logTrieNode := range args.logTrieNodes[i] {
tx.cacheIPLD(logTrieNode)
}
for _, trie := range args.logTrieNodes[i] {
if err = shared.PublishIPLD(tx, trie); err != nil {
return fmt.Errorf("error publishing log trie node IPLD: %w", err)
}
}
// publish the txs and receipts
txNode := args.txNodes[i]
if err := shared.PublishIPLD(tx, txNode); err != nil {
return fmt.Errorf("error publishing tx IPLD: %v", err)
}
tx.cacheIPLD(txNode)
// Indexing
// extract topic and contract data from the receipt for indexing
@ -344,7 +334,6 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *sqlx.Tx, args processArgs
mappedContracts[l.Address.String()] = true
logDataSet[idx] = &models.LogsModel{
ID: 0,
Address: l.Address.String(),
Index: int64(l.Index),
Data: l.Data,
@ -368,6 +357,12 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *sqlx.Tx, args processArgs
contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String()
}
// index tx first so that the receipt can reference it by FK
trx := args.txs[i]
// derive sender for the tx that corresponds with this receipt
from, err := types.Sender(signer, trx)
if err != nil {
return fmt.Errorf("error deriving tx sender: %v", err)
}
txModel := models.TxModel{
Dst: shared.HandleZeroAddrPointer(trx.To()),
Src: shared.HandleZeroAddr(from),
@ -381,7 +376,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *sqlx.Tx, args processArgs
if txType != types.LegacyTxType {
txModel.Type = &txType
}
txID, err := sdi.dbWriter.upsertTransactionCID(tx, txModel, args.headerID)
txID, err := sdi.dbWriter.upsertTransactionCID(tx.dbtx, txModel, args.headerID)
if err != nil {
return err
}
@ -397,7 +392,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *sqlx.Tx, args processArgs
Address: accessListElement.Address.Hex(),
StorageKeys: storageKeys,
}
if err := sdi.dbWriter.upsertAccessListElement(tx, accessListElementModel, txID); err != nil {
if err := sdi.dbWriter.upsertAccessListElement(tx.dbtx, accessListElementModel, txID); err != nil {
return err
}
}
@ -420,27 +415,20 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *sqlx.Tx, args processArgs
rctModel.PostState = common.Bytes2Hex(receipt.PostState)
}
receiptID, err := sdi.dbWriter.upsertReceiptCID(tx, rctModel, txID)
receiptID, err := sdi.dbWriter.upsertReceiptCID(tx.dbtx, rctModel, txID)
if err != nil {
return err
}
if err = sdi.dbWriter.upsertLogCID(tx, logDataSet, receiptID); err != nil {
if err = sdi.dbWriter.upsertLogCID(tx.dbtx, logDataSet, receiptID); err != nil {
return err
}
}
// publish trie nodes, these aren't indexed directly
for _, n := range args.txTrieNodes {
if err := shared.PublishIPLD(tx, n); err != nil {
return fmt.Errorf("error publishing tx trie node IPLD: %w", err)
}
}
for _, n := range args.rctTrieNodes {
if err := shared.PublishIPLD(tx, n); err != nil {
return fmt.Errorf("error publishing rct trie node IPLD: %w", err)
}
for i, n := range args.txTrieNodes {
tx.cacheIPLD(n)
tx.cacheIPLD(args.rctTrieNodes[i])
}
return nil
@ -462,9 +450,9 @@ func (sdi *StateDiffIndexer) PushStateNode(tx *BlockTx, stateNode sdtypes.StateN
_, err := sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel, tx.headerID)
return err
}
stateCIDStr, stateMhKey, err := shared.PublishRaw(tx.dbtx, ipld.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
stateCIDStr, stateMhKey, err := tx.cacheRaw(ipld.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
if err != nil {
return fmt.Errorf("error publishing state node IPLD: %v", err)
return fmt.Errorf("error generating and cacheing state node IPLD: %v", err)
}
stateModel := models.StateNodeModel{
Path: stateNode.Path,
@ -518,9 +506,9 @@ func (sdi *StateDiffIndexer) PushStateNode(tx *BlockTx, stateNode sdtypes.StateN
}
continue
}
storageCIDStr, storageMhKey, err := shared.PublishRaw(tx.dbtx, ipld.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue)
storageCIDStr, storageMhKey, err := tx.cacheRaw(ipld.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue)
if err != nil {
return fmt.Errorf("error publishing storage node IPLD: %v", err)
return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err)
}
storageModel := models.StorageNodeModel{
Path: storageNode.Path,
@ -544,8 +532,6 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(tx *BlockTx, codeAndCodeHash sd
if err != nil {
return fmt.Errorf("error deriving multihash key from codehash: %v", err)
}
if err := shared.PublishDirect(tx.dbtx, mhKey, codeAndCodeHash.Code); err != nil {
return fmt.Errorf("error publishing code IPLD: %v", err)
}
tx.cacheDirect(mhKey, codeAndCodeHash.Code)
return nil
}

View File

@ -51,7 +51,7 @@ func setupLegacy(t *testing.T) {
legacyData.MockBlock.Difficulty())
require.NoError(t, err)
defer tx.Close(err)
defer tx.Close(tx, err)
for _, node := range legacyData.StateDiffs {
err = ind.PushStateNode(tx, node)
require.NoError(t, err)

View File

@ -149,7 +149,7 @@ func setup(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer tx.Close(err)
defer tx.Close(tx, err)
for _, node := range mocks.StateDiffs {
err = ind.PushStateNode(tx, node)
if err != nil {

View File

@ -0,0 +1,100 @@
package models
import "github.com/lib/pq"
// IPLDBatch holds the arguments for a batch insert of IPLD data
type IPLDBatch struct {
Keys []string
Values [][]byte
}
// UncleBatch holds the arguments for a batch insert of uncle data
type UncleBatch struct {
HeaderID []int64
BlockHashes []string
ParentHashes []string
CIDs []string
MhKeys []string
Rewards []string
}
// TxBatch holds the arguments for a batch insert of tx data
type TxBatch struct {
HeaderID int64
Indexes []int64
TxHashes []string
CIDs []string
MhKeys []string
Dsts []string
Srcs []string
Datas [][]byte
Types []*uint8
}
// AccessListBatch holds the arguments for a batch insert of access list data
type AccessListBatch struct {
Indexes []int64
TxIDs []int64
Addresses []string
StorageKeysSets []pq.StringArray
}
// ReceiptBatch holds the arguments for a batch insert of receipt data
type ReceiptBatch struct {
TxIDs []int64
LeafCIDs []string
LeafMhKeys []string
PostStatuses []uint64
PostStates []string
Contracts []string
ContractHashes []string
LogRoots []string
}
// LogBatch holds the arguments for a batch insert of log data
type LogBatch struct {
LeafCIDs []string
LeafMhKeys []string
ReceiptIDs []int64
Addresses []string
Indexes []int64
Datas [][]byte
Topic0s []string
Topic1s []string
Topic2s []string
Topic3s []string
}
// StateBatch holds the arguments for a batch insert of state data
type StateBatch struct {
ID int64
HeaderID int64
Path []byte
StateKey string
NodeType int
CID string
MhKey string
Diff bool
}
// AccountBatch holds the arguments for a batch insert of account data
type AccountBatch struct {
ID int64
StateID int64
Balance string
Nonce uint64
CodeHash []byte
StorageRoot string
}
// StorageBatch holds the arguments for a batch insert of storage data
type StorageBatch struct {
ID int64
StateID int64
Path []byte
StorageKey string
NodeType int
CID string
MhKey string
Diff bool
}

View File

@ -18,6 +18,12 @@ package models
import "github.com/lib/pq"
// IPLDModel is the db model for public.blocks
type IPLDModel struct {
Key string
Data []byte
}
// HeaderModel is the db model for eth.header_cids
type HeaderModel struct {
ID int64 `db:"id"`

View File

@ -19,17 +19,18 @@ package shared
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help"
format "github.com/ipfs/go-ipld-format"
"github.com/jmoiron/sqlx"
"github.com/multiformats/go-multihash"
)
// IPLDInsertPgStr is the postgres statement string for IPLDs inserting into public.blocks
const IPLDInsertPgStr = `INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`
// HandleZeroAddrPointer will return an empty string for a nil address pointer
func HandleZeroAddrPointer(to *common.Address) string {
if to == nil {
@ -53,62 +54,12 @@ func Rollback(tx *sqlx.Tx) {
}
}
// PublishIPLD is used to insert an IPLD into Postgres blockstore with the provided tx
func PublishIPLD(tx *sqlx.Tx, i format.Node) error {
dbKey := dshelp.MultihashToDsKey(i.Cid().Hash())
prefixedKey := blockstore.BlockPrefix.String() + dbKey.String()
raw := i.RawData()
_, err := tx.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`, prefixedKey, raw)
return err
}
// FetchIPLD is used to retrieve an ipld from Postgres blockstore with the provided tx and cid string
func FetchIPLD(tx *sqlx.Tx, cid string) ([]byte, error) {
mhKey, err := MultihashKeyFromCIDString(cid)
if err != nil {
return nil, err
}
pgStr := `SELECT data FROM public.blocks WHERE key = $1`
var block []byte
return block, tx.Get(&block, pgStr, mhKey)
}
// FetchIPLDByMhKey is used to retrieve an ipld from Postgres blockstore with the provided tx and mhkey string
func FetchIPLDByMhKey(tx *sqlx.Tx, mhKey string) ([]byte, error) {
pgStr := `SELECT data FROM public.blocks WHERE key = $1`
var block []byte
return block, tx.Get(&block, pgStr, mhKey)
}
// MultihashKeyFromCID converts a cid into a blockstore-prefixed multihash db key string
func MultihashKeyFromCID(c cid.Cid) string {
dbKey := dshelp.MultihashToDsKey(c.Hash())
return blockstore.BlockPrefix.String() + dbKey.String()
}
// MultihashKeyFromCIDString converts a cid string into a blockstore-prefixed multihash db key string
func MultihashKeyFromCIDString(c string) (string, error) {
dc, err := cid.Decode(c)
if err != nil {
return "", err
}
dbKey := dshelp.MultihashToDsKey(dc.Hash())
return blockstore.BlockPrefix.String() + dbKey.String(), nil
}
// PublishRaw derives a cid from raw bytes and provided codec and multihash type, and writes it to the db tx
// returns the CID and blockstore prefixed multihash key
func PublishRaw(tx *sqlx.Tx, codec, mh uint64, raw []byte) (string, string, error) {
c, err := ipld.RawdataToCid(codec, raw, mh)
if err != nil {
return "", "", err
}
dbKey := dshelp.MultihashToDsKey(c.Hash())
prefixedKey := blockstore.BlockPrefix.String() + dbKey.String()
_, err = tx.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`, prefixedKey, raw)
return c.String(), prefixedKey, err
}
// MultihashKeyFromKeccak256 converts keccak256 hash bytes into a blockstore-prefixed multihash db key string
func MultihashKeyFromKeccak256(hash common.Hash) (string, error) {
mh, err := multihash.Encode(hash.Bytes(), multihash.KECCAK_256)
@ -119,14 +70,8 @@ func MultihashKeyFromKeccak256(hash common.Hash) (string, error) {
return blockstore.BlockPrefix.String() + dbKey.String(), nil
}
// PublishDirect diretly writes a previously derived mhkey => value pair to the ipld database in the provided tx
func PublishDirect(tx *sqlx.Tx, key string, value []byte) error {
_, err := tx.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`, key, value)
return err
}
// PublishDirectWithDB diretly writes a previously derived mhkey => value pair to the ipld database
func PublishDirectWithDB(db *postgres.DB, key string, value []byte) error {
_, err := db.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`, key, value)
_, err := db.Exec(IPLDInsertPgStr, key, value)
return err
}

View File

@ -35,7 +35,7 @@ type PostgresCIDWriter struct {
db *postgres.DB
}
// NewPostgresCIDWriter creates a new pointer to a Indexer which satisfies the PostgresCIDWriter interface
// NewPostgresCIDWriter creates a new pointer to a PostgresCIDWriter
func NewPostgresCIDWriter(db *postgres.DB) *PostgresCIDWriter {
return &PostgresCIDWriter{
db: db,
@ -107,7 +107,7 @@ func (in *PostgresCIDWriter) upsertReceiptCID(tx *sqlx.Tx, rct *models.ReceiptMo
func (in *PostgresCIDWriter) upsertLogCID(tx *sqlx.Tx, logs []*models.LogsModel, receiptID int64) error {
for _, log := range logs {
_, err := tx.Exec(`INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, receipt_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (receipt_id, index) DO UPDATE SET (leaf_cid, leaf_mh_key ,address, topic0, topic1, topic2, topic3,log_data ) = ($1, $2, $4, $6, $7, $8, $9, $10)`,
ON CONFLICT (receipt_id, index) DO UPDATE SET (leaf_cid, leaf_mh_key, address, topic0, topic1, topic2, topic3, log_data) = ($1, $2, $4, $6, $7, $8, $9, $10)`,
log.LeafCID, log.LeafMhKey, receiptID, log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3, log.Data)
if err != nil {
return fmt.Errorf("error upserting logs entry: %w", err)

View File

@ -672,7 +672,7 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
return err
}
// defer handling of commit/rollback for any return case
defer tx.Close(err)
defer tx.Close(tx, err)
output := func(node StateNode) error {
return sds.indexer.PushStateNode(tx, node)
}