clean up db transaction rollback/commit handling

This commit is contained in:
Ian Norden 2020-05-01 16:25:58 -05:00
parent 44e7f0570f
commit 9b0ded692b
14 changed files with 294 additions and 192 deletions

View File

@ -65,17 +65,26 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe
return nil, true, fmt.Errorf("btc retriever expected filter type %T got %T", &SubscriptionSettings{}, filter)
}
log.Debug("retrieving cids")
// Begin new db tx
tx, err := ecr.db.Beginx()
if err != nil {
return nil, true, err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
// Retrieve cached header CIDs
headers, err := ecr.RetrieveHeaderCIDs(tx, blockNumber)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("header cid retrieval error")
return nil, true, err
}
@ -92,9 +101,6 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe
if !streamFilter.TxFilter.Off {
cw.Transactions, err = ecr.RetrieveTxCIDs(tx, streamFilter.TxFilter, header.ID)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("transaction cid retrieval error")
return nil, true, err
}
@ -105,7 +111,7 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe
cws[i] = cw
}
return cws, empty, tx.Commit()
return cws, empty, err
}
// RetrieveHeaderCIDs retrieves and returns all of the header cids at the provided blockheight
@ -202,41 +208,57 @@ func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap,
// RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash
func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (HeaderModel, []TxModel, error) {
log.Debug("retrieving block cids for block hash ", blockHash.String())
// Begin new db tx
tx, err := ecr.db.Beginx()
if err != nil {
return HeaderModel{}, nil, err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
headerCID, err := ecr.RetrieveHeaderCIDByHash(tx, blockHash)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("header cid retrieval error")
return HeaderModel{}, nil, err
}
txCIDs, err := ecr.RetrieveTxCIDsByHeaderID(tx, headerCID.ID)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("tx cid retrieval error")
return HeaderModel{}, nil, err
}
return headerCID, txCIDs, tx.Commit()
return headerCID, txCIDs, err
}
// RetrieveBlockByNumber returns all of the CIDs needed to compose an entire block, for a given block number
func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel, []TxModel, error) {
log.Debug("retrieving block cids for block number ", blockNumber)
// Begin new db tx
tx, err := ecr.db.Beginx()
if err != nil {
return HeaderModel{}, nil, err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
headerCID, err := ecr.RetrieveHeaderCIDs(tx, blockNumber)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("header cid retrieval error")
return HeaderModel{}, nil, err
}
@ -245,13 +267,9 @@ func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel,
}
txCIDs, err := ecr.RetrieveTxCIDsByHeaderID(tx, headerCID[0].ID)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("tx cid retrieval error")
return HeaderModel{}, nil, err
}
return headerCID[0], txCIDs, tx.Commit()
return headerCID[0], txCIDs, err
}
// RetrieveHeaderCIDByHash returns the header for the given block hash

View File

@ -50,9 +50,7 @@ func (c *Cleaner) ResetValidation(rngs [][2]uint64) error {
SET times_validated = 0
WHERE block_number BETWEEN $1 AND $2`
if _, err := tx.Exec(pgStr, rng[0], rng[1]); err != nil {
if err := tx.Rollback(); err != nil {
logrus.Error(err)
}
shared.Rollback(tx)
return err
}
}
@ -68,9 +66,7 @@ func (c *Cleaner) Clean(rngs [][2]uint64, t shared.DataType) error {
for _, rng := range rngs {
logrus.Infof("btc db cleaner cleaning up block range %d to %d", rng[0], rng[1])
if err := c.clean(tx, rng, t); err != nil {
if err := tx.Rollback(); err != nil {
logrus.Error(err)
}
shared.Rollback(tx)
return err
}
}

View File

@ -43,20 +43,33 @@ func (in *CIDIndexer) Index(cids shared.CIDsForIndexing) error {
if !ok {
return fmt.Errorf("btc indexer expected cids type %T got %T", &CIDPayload{}, cids)
}
// Begin new db tx
tx, err := in.db.Beginx()
if err != nil {
return err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
headerID, err := in.indexHeaderCID(tx, cidWrapper.HeaderCID)
if err != nil {
logrus.Error("btc indexer error when indexing header")
return err
}
if err := in.indexTransactionCIDs(tx, cidWrapper.TransactionCIDs, headerID); err != nil {
err = in.indexTransactionCIDs(tx, cidWrapper.TransactionCIDs, headerID)
if err != nil {
logrus.Error("btc indexer error when indexing transactions")
return err
}
return tx.Commit()
return err
}
func (in *CIDIndexer) indexHeaderCID(tx *sqlx.Tx, header HeaderModel) (int64, error) {

View File

@ -49,22 +49,31 @@ func (f *IPLDPGFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error)
log.Debug("fetching iplds")
iplds := IPLDs{}
iplds.BlockNumber = cidWrapper.BlockNumber
var err error
tx, err := f.db.Beginx()
if err != nil {
return nil, err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
iplds.Header, err = f.FetchHeader(tx, cidWrapper.Header)
if err != nil {
shared.Rollback(tx)
return nil, fmt.Errorf("btc pg fetcher: header fetching error: %s", err.Error())
}
iplds.Transactions, err = f.FetchTrxs(tx, cidWrapper.Transactions)
if err != nil {
shared.Rollback(tx)
return nil, fmt.Errorf("btc pg fetcher: transaction fetching error: %s", err.Error())
}
return iplds, tx.Commit()
return iplds, err
}
// FetchHeaders fetches headers

View File

@ -56,18 +56,26 @@ func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (share
if err != nil {
return nil, err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
// Publish trie nodes
for _, node := range txTrieNodes {
if err := shared.PublishIPLD(tx, node); err != nil {
shared.Rollback(tx)
return nil, err
}
}
// Publish and index header
if err := shared.PublishIPLD(tx, headerNode); err != nil {
shared.Rollback(tx)
return nil, err
}
header := HeaderModel{
@ -80,39 +88,34 @@ func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (share
}
headerID, err := pub.indexer.indexHeaderCID(tx, header)
if err != nil {
shared.Rollback(tx)
return nil, err
}
// Publish and index txs
for i, txNode := range txNodes {
if err := shared.PublishIPLD(tx, txNode); err != nil {
shared.Rollback(tx)
return nil, err
}
txModel := ipldPayload.TxMetaData[i]
txModel.CID = txNode.Cid().String()
txID, err := pub.indexer.indexTransactionCID(tx, txModel, headerID)
if err != nil {
shared.Rollback(tx)
return nil, err
}
for _, input := range txModel.TxInputs {
if err := pub.indexer.indexTxInput(tx, input, txID); err != nil {
shared.Rollback(tx)
return nil, err
}
}
for _, output := range txModel.TxOutputs {
if err := pub.indexer.indexTxOutput(tx, output, txID); err != nil {
shared.Rollback(tx)
return nil, err
}
}
}
// This IPLDPublisher does both publishing and indexing, we do not need to pass anything forward to the indexer
return nil, tx.Commit()
return nil, err
}
// Index satisfies the shared.CIDIndexer interface

View File

@ -18,7 +18,6 @@ package btc_test
import (
"bytes"
"fmt"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-blockstore"
@ -112,7 +111,6 @@ var _ = Describe("PublishAndIndexer", func() {
Expect(err).ToNot(HaveOccurred())
mhKey := dshelp.CidToDsKey(dc)
prefixedKey := blockstore.BlockPrefix.String() + mhKey.String()
fmt.Printf("mhKey: %s\r\n", prefixedKey)
var data []byte
err = db.Get(&data, ipfsPgGet, prefixedKey)
Expect(err).ToNot(HaveOccurred())

View File

@ -20,6 +20,8 @@ import (
"context"
"math/big"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
@ -34,19 +36,19 @@ const APIName = "eth"
const APIVersion = "0.0.1"
type PublicEthAPI struct {
b *Backend
B *Backend
}
// NewPublicEthAPI creates a new PublicEthAPI with the provided underlying Backend
func NewPublicEthAPI(b *Backend) *PublicEthAPI {
return &PublicEthAPI{
b: b,
B: b,
}
}
// BlockNumber returns the block number of the chain head.
func (pea *PublicEthAPI) BlockNumber() hexutil.Uint64 {
number, _ := pea.b.Retriever.RetrieveLastBlockNumber()
number, _ := pea.B.Retriever.RetrieveLastBlockNumber()
return hexutil.Uint64(number)
}
@ -73,20 +75,36 @@ func (pea *PublicEthAPI) GetLogs(ctx context.Context, crit ethereum.FilterQuery)
LogAddresses: addrStrs,
Topics: topicStrSets,
}
tx, err := pea.b.DB.Beginx()
// Begin tx
tx, err := pea.B.DB.Beginx()
if err != nil {
return nil, err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
// If we have a blockhash to filter on, fire off single retrieval query
if crit.BlockHash != nil {
rctCIDs, err := pea.b.Retriever.RetrieveRctCIDs(tx, filter, 0, crit.BlockHash, nil)
rctCIDs, err := pea.B.Retriever.RetrieveRctCIDs(tx, filter, 0, crit.BlockHash, nil)
if err != nil {
return nil, err
}
rctIPLDs, err := pea.b.Fetcher.FetchRcts(tx, rctCIDs)
rctIPLDs, err := pea.B.Fetcher.FetchRcts(tx, rctCIDs)
if err != nil {
return nil, err
}
if err := tx.Commit(); err != nil {
return nil, err
}
return extractLogsOfInterest(rctIPLDs, filter.Topics)
}
// Otherwise, create block range from criteria
@ -94,14 +112,14 @@ func (pea *PublicEthAPI) GetLogs(ctx context.Context, crit ethereum.FilterQuery)
startingBlock := crit.FromBlock
endingBlock := crit.ToBlock
if startingBlock == nil {
startingBlockInt, err := pea.b.Retriever.RetrieveFirstBlockNumber()
startingBlockInt, err := pea.B.Retriever.RetrieveFirstBlockNumber()
if err != nil {
return nil, err
}
startingBlock = big.NewInt(startingBlockInt)
}
if endingBlock == nil {
endingBlockInt, err := pea.b.Retriever.RetrieveLastBlockNumber()
endingBlockInt, err := pea.B.Retriever.RetrieveLastBlockNumber()
if err != nil {
return nil, err
}
@ -111,27 +129,28 @@ func (pea *PublicEthAPI) GetLogs(ctx context.Context, crit ethereum.FilterQuery)
end := endingBlock.Int64()
allRctCIDs := make([]ReceiptModel, 0)
for i := start; i <= end; i++ {
rctCIDs, err := pea.b.Retriever.RetrieveRctCIDs(tx, filter, i, nil, nil)
rctCIDs, err := pea.B.Retriever.RetrieveRctCIDs(tx, filter, i, nil, nil)
if err != nil {
return nil, err
}
allRctCIDs = append(allRctCIDs, rctCIDs...)
}
rctIPLDs, err := pea.b.Fetcher.FetchRcts(tx, allRctCIDs)
rctIPLDs, err := pea.B.Fetcher.FetchRcts(tx, allRctCIDs)
if err != nil {
return nil, err
}
if err := tx.Commit(); err != nil {
return nil, err
}
return extractLogsOfInterest(rctIPLDs, filter.Topics)
logs, err := extractLogsOfInterest(rctIPLDs, filter.Topics)
return logs, err // need to return err variable so that we return the err = tx.Commit() assignment in the defer
}
// GetHeaderByNumber returns the requested canonical block header.
// * When blockNr is -1 the chain head is returned.
// * We cannot support pending block calls since we do not have an active miner
func (pea *PublicEthAPI) GetHeaderByNumber(ctx context.Context, number rpc.BlockNumber) (map[string]interface{}, error) {
header, err := pea.b.HeaderByNumber(ctx, number)
header, err := pea.B.HeaderByNumber(ctx, number)
if header != nil && err == nil {
return pea.rpcMarshalHeader(header)
}
@ -144,7 +163,7 @@ func (pea *PublicEthAPI) GetHeaderByNumber(ctx context.Context, number rpc.Block
// * When fullTx is true all transactions in the block are returned, otherwise
// only the transaction hash is returned.
func (pea *PublicEthAPI) GetBlockByNumber(ctx context.Context, number rpc.BlockNumber, fullTx bool) (map[string]interface{}, error) {
block, err := pea.b.BlockByNumber(ctx, number)
block, err := pea.B.BlockByNumber(ctx, number)
if block != nil && err == nil {
return pea.rpcMarshalBlock(block, true, fullTx)
}
@ -154,7 +173,7 @@ func (pea *PublicEthAPI) GetBlockByNumber(ctx context.Context, number rpc.BlockN
// GetBlockByHash returns the requested block. When fullTx is true all transactions in the block are returned in full
// detail, otherwise only the transaction hash is returned.
func (pea *PublicEthAPI) GetBlockByHash(ctx context.Context, hash common.Hash, fullTx bool) (map[string]interface{}, error) {
block, err := pea.b.BlockByHash(ctx, hash)
block, err := pea.B.BlockByHash(ctx, hash)
if block != nil {
return pea.rpcMarshalBlock(block, true, fullTx)
}
@ -165,7 +184,7 @@ func (pea *PublicEthAPI) GetBlockByHash(ctx context.Context, hash common.Hash, f
// SuperNode cannot currently handle pending/tx_pool txs
func (pea *PublicEthAPI) GetTransactionByHash(ctx context.Context, hash common.Hash) (*RPCTransaction, error) {
// Try to return an already finalized transaction
tx, blockHash, blockNumber, index, err := pea.b.GetTransaction(ctx, hash)
tx, blockHash, blockNumber, index, err := pea.B.GetTransaction(ctx, hash)
if err != nil {
return nil, err
}

View File

@ -166,8 +166,20 @@ var _ = Describe("API", func() {
number, err := strconv.ParseInt(mocks.MockCIDPayload.HeaderCID.BlockNumber, 10, 64)
Expect(err).ToNot(HaveOccurred())
header, err := api.GetHeaderByNumber(context.Background(), rpc.BlockNumber(number))
Expect(err).ToNot(HaveOccurred())
Expect(header).To(Equal(expectedHeader))
})
It("Throws an error if a header cannot be found", func() {
number, err := strconv.ParseInt(mocks.MockCIDPayload.HeaderCID.BlockNumber, 10, 64)
Expect(err).ToNot(HaveOccurred())
header, err := api.GetHeaderByNumber(context.Background(), rpc.BlockNumber(number+1))
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("header at block %d is not available", number+1))
Expect(header).To(BeNil())
_, err = api.B.DB.Beginx()
Expect(err).ToNot(HaveOccurred())
})
})
Describe("GetBlockByHash", func() {

View File

@ -22,13 +22,13 @@ import (
"fmt"
"math/big"
"github.com/vulcanize/vulcanizedb/pkg/super_node/shared"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/sirupsen/logrus"
"github.com/vulcanize/vulcanizedb/pkg/ipfs"
"github.com/vulcanize/vulcanizedb/pkg/postgres"
)
@ -52,8 +52,8 @@ func NewEthBackend(db *postgres.DB) (*Backend, error) {
}
func (b *Backend) HeaderByNumber(ctx context.Context, blockNumber rpc.BlockNumber) (*types.Header, error) {
number := blockNumber.Int64()
var err error
number := blockNumber.Int64()
if blockNumber == rpc.LatestBlockNumber {
number, err = b.Retriever.RetrieveLastBlockNumber()
if err != nil {
@ -63,16 +63,26 @@ func (b *Backend) HeaderByNumber(ctx context.Context, blockNumber rpc.BlockNumbe
if blockNumber == rpc.PendingBlockNumber {
return nil, errPendingBlockNumber
}
// Retrieve the CIDs for headers at this height
// Begin tx
tx, err := b.DB.Beginx()
if err != nil {
return nil, err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
// Retrieve the CIDs for headers at this height
headerCids, err := b.Retriever.RetrieveHeaderCIDs(tx, number)
if err != nil {
if err := tx.Rollback(); err != nil {
logrus.Error(err)
}
return nil, err
}
// If there are none, throw an error
@ -84,17 +94,12 @@ func (b *Backend) HeaderByNumber(ctx context.Context, blockNumber rpc.BlockNumbe
if err != nil {
return nil, err
}
if err := tx.Commit(); err != nil {
return nil, err
}
// Decode the first header at this block height and return it
// We throw an error in FetchHeaders() if the number of headers does not match the number of CIDs and we already
// confirmed the number of CIDs is greater than 0 so there is no need to bound check the slice before accessing
var header types.Header
if err := rlp.DecodeBytes(headerIPLD.Data, &header); err != nil {
return nil, err
}
return &header, nil
err = rlp.DecodeBytes(headerIPLD.Data, &header)
return &header, err
}
// GetTd retrieves and returns the total difficulty at the given block hash
@ -115,15 +120,23 @@ func (b *Backend) GetTd(blockHash common.Hash) (*big.Int, error) {
// GetLogs returns all the logs for the given block hash
func (b *Backend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) {
// Begin tx
tx, err := b.DB.Beginx()
if err != nil {
return nil, err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
receiptCIDs, err := b.Retriever.RetrieveRctCIDs(tx, ReceiptFilter{}, 0, &hash, nil)
if err != nil {
if err := tx.Rollback(); err != nil {
logrus.Error(err)
}
return nil, err
}
if len(receiptCIDs) == 0 {
@ -133,9 +146,6 @@ func (b *Backend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log
if err != nil {
return nil, err
}
if err := tx.Commit(); err != nil {
return nil, err
}
logs := make([][]*types.Log, len(receiptIPLDs))
for i, rctIPLD := range receiptIPLDs {
var rct types.Receipt
@ -144,15 +154,15 @@ func (b *Backend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log
}
logs[i] = rct.Logs
}
return logs, nil
return logs, err
}
// BlockByNumber returns the requested canonical block.
// Since the SuperNode can contain forked blocks, it is recommended to fetch BlockByHash as
// fetching by number can return non-deterministic results (returns the first block found at that height)
func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber) (*types.Block, error) {
number := blockNumber.Int64()
var err error
number := blockNumber.Int64()
if blockNumber == rpc.LatestBlockNumber {
number, err = b.Retriever.RetrieveLastBlockNumber()
if err != nil {
@ -167,10 +177,22 @@ func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber
if err != nil {
return nil, err
}
// Begin tx
tx, err := b.DB.Beginx()
if err != nil {
return nil, err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
// Fetch and decode the header IPLD
headerIPLD, err := b.Fetcher.FetchHeader(tx, headerCID)
@ -201,20 +223,17 @@ func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber
}
var transactions []*types.Transaction
for _, txIPLD := range txIPLDs {
var tx types.Transaction
if err := rlp.DecodeBytes(txIPLD.Data, &tx); err != nil {
var transaction types.Transaction
if err := rlp.DecodeBytes(txIPLD.Data, &transaction); err != nil {
return nil, err
}
transactions = append(transactions, &tx)
transactions = append(transactions, &transaction)
}
// Fetch and decode the receipt IPLDs
rctIPLDs, err := b.Fetcher.FetchRcts(tx, rctCIDs)
if err != nil {
return nil, err
}
if err := tx.Commit(); err != nil {
return nil, err
}
var receipts []*types.Receipt
for _, rctIPLD := range rctIPLDs {
var receipt types.Receipt
@ -224,7 +243,7 @@ func (b *Backend) BlockByNumber(ctx context.Context, blockNumber rpc.BlockNumber
receipts = append(receipts, &receipt)
}
// Compose everything together into a complete block
return types.NewBlock(&header, transactions, uncles, receipts), nil
return types.NewBlock(&header, transactions, uncles, receipts), err
}
// BlockByHash returns the requested block. When fullTx is true all transactions in the block are returned in full
@ -235,10 +254,23 @@ func (b *Backend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Blo
if err != nil {
return nil, err
}
// Begin tx
tx, err := b.DB.Beginx()
if err != nil {
return nil, err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
// Fetch and decode the header IPLD
headerIPLD, err := b.Fetcher.FetchHeader(tx, headerCID)
if err != nil {
@ -268,20 +300,17 @@ func (b *Backend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Blo
}
var transactions []*types.Transaction
for _, txIPLD := range txIPLDs {
var tx types.Transaction
if err := rlp.DecodeBytes(txIPLD.Data, &tx); err != nil {
var transaction types.Transaction
if err := rlp.DecodeBytes(txIPLD.Data, &transaction); err != nil {
return nil, err
}
transactions = append(transactions, &tx)
transactions = append(transactions, &transaction)
}
// Fetch and decode the receipt IPLDs
rctIPLDs, err := b.Fetcher.FetchRcts(tx, rctCIDs)
if err != nil {
return nil, err
}
if err := tx.Commit(); err != nil {
return nil, err
}
var receipts []*types.Receipt
for _, rctIPLD := range rctIPLDs {
var receipt types.Receipt
@ -291,7 +320,7 @@ func (b *Backend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Blo
receipts = append(receipts, &receipt)
}
// Compose everything together into a complete block
return types.NewBlock(&header, transactions, uncles, receipts), nil
return types.NewBlock(&header, transactions, uncles, receipts), err
}
// GetTransaction retrieves a tx by hash
@ -310,10 +339,23 @@ func (b *Backend) GetTransaction(ctx context.Context, txHash common.Hash) (*type
if err := b.DB.Get(&txCIDWithHeaderInfo, pgStr, txHash.String()); err != nil {
return nil, common.Hash{}, 0, 0, err
}
// Begin tx
tx, err := b.DB.Beginx()
if err != nil {
return nil, common.Hash{}, 0, 0, err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
txIPLD, err := b.Fetcher.FetchTrxs(tx, []TxModel{{CID: txCIDWithHeaderInfo.CID}})
if err != nil {
return nil, common.Hash{}, 0, 0, err
@ -325,7 +367,7 @@ func (b *Backend) GetTransaction(ctx context.Context, txHash common.Hash) (*type
if err := rlp.DecodeBytes(txIPLD[0].Data, &transaction); err != nil {
return nil, common.Hash{}, 0, 0, err
}
return &transaction, common.HexToHash(txCIDWithHeaderInfo.BlockHash), uint64(txCIDWithHeaderInfo.BlockNumber), uint64(txCIDWithHeaderInfo.Index), nil
return &transaction, common.HexToHash(txCIDWithHeaderInfo.BlockHash), uint64(txCIDWithHeaderInfo.BlockNumber), uint64(txCIDWithHeaderInfo.Index), err
}
// extractLogsOfInterest returns logs from the receipt IPLD
@ -380,7 +422,7 @@ func sliceContainsHash(slice []string, hash common.Hash) int {
// a `PublicEthAPI`.
func (pea *PublicEthAPI) rpcMarshalHeader(header *types.Header) (map[string]interface{}, error) {
fields := RPCMarshalHeader(header)
td, err := pea.b.GetTd(header.Hash())
td, err := pea.B.GetTd(header.Hash())
if err != nil {
return nil, err
}
@ -419,7 +461,7 @@ func (pea *PublicEthAPI) rpcMarshalBlock(b *types.Block, inclTx bool, fullTx boo
if err != nil {
return nil, err
}
td, err := pea.b.GetTd(b.Hash())
td, err := pea.B.GetTd(b.Hash())
if err != nil {
return nil, err
}

View File

@ -65,17 +65,26 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe
return nil, true, fmt.Errorf("eth retriever expected filter type %T got %T", &SubscriptionSettings{}, filter)
}
log.Debug("retrieving cids")
// Begin new db tx
tx, err := ecr.db.Beginx()
if err != nil {
return nil, true, err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
// Retrieve cached header CIDs at this block height
headers, err := ecr.RetrieveHeaderCIDs(tx, blockNumber)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("header cid retrieval error")
return nil, true, err
}
@ -91,9 +100,6 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe
// Retrieve uncle cids for this header id
uncleCIDs, err := ecr.RetrieveUncleCIDsByHeaderID(tx, header.ID)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("uncle cid retrieval error")
return nil, true, err
}
@ -104,9 +110,6 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe
if !streamFilter.TxFilter.Off {
cw.Transactions, err = ecr.RetrieveTxCIDs(tx, streamFilter.TxFilter, header.ID)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("transaction cid retrieval error")
return nil, true, err
}
@ -122,9 +125,6 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe
if !streamFilter.ReceiptFilter.Off {
cw.Receipts, err = ecr.RetrieveRctCIDsByHeaderID(tx, streamFilter.ReceiptFilter, header.ID, trxIds)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("receipt cid retrieval error")
return nil, true, err
}
@ -136,9 +136,6 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe
if !streamFilter.StateFilter.Off {
cw.StateNodes, err = ecr.RetrieveStateCIDs(tx, streamFilter.StateFilter, header.ID)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("state cid retrieval error")
return nil, true, err
}
@ -150,9 +147,6 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe
if !streamFilter.StorageFilter.Off {
cw.StorageNodes, err = ecr.RetrieveStorageCIDs(tx, streamFilter.StorageFilter, header.ID)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("storage cid retrieval error")
return nil, true, err
}
@ -163,7 +157,7 @@ func (ecr *CIDRetriever) Retrieve(filter shared.SubscriptionSettings, blockNumbe
cws[i] = cw
}
return cws, empty, tx.Commit()
return cws, empty, err
}
// RetrieveHeaderCIDs retrieves and returns all of the header cids at the provided blockheight
@ -487,31 +481,35 @@ func (ecr *CIDRetriever) RetrieveGapsInData(validationLevel int) ([]shared.Gap,
// RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash
func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (HeaderModel, []UncleModel, []TxModel, []ReceiptModel, error) {
log.Debug("retrieving block cids for block hash ", blockHash.String())
// Begin new db tx
tx, err := ecr.db.Beginx()
if err != nil {
return HeaderModel{}, nil, nil, nil, err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
headerCID, err := ecr.RetrieveHeaderCIDByHash(tx, blockHash)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("header cid retrieval error")
return HeaderModel{}, nil, nil, nil, err
}
uncleCIDs, err := ecr.RetrieveUncleCIDsByHeaderID(tx, headerCID.ID)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("uncle cid retrieval error")
return HeaderModel{}, nil, nil, nil, err
}
txCIDs, err := ecr.RetrieveTxCIDsByHeaderID(tx, headerCID.ID)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("tx cid retrieval error")
return HeaderModel{}, nil, nil, nil, err
}
@ -521,27 +519,33 @@ func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (HeaderModel
}
rctCIDs, err := ecr.RetrieveReceiptCIDsByTxIDs(tx, txIDs)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("rct cid retrieval error")
return HeaderModel{}, nil, nil, nil, err
}
return headerCID, uncleCIDs, txCIDs, rctCIDs, tx.Commit()
return headerCID, uncleCIDs, txCIDs, rctCIDs, err
}
// RetrieveBlockByNumber returns all of the CIDs needed to compose an entire block, for a given block number
func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel, []UncleModel, []TxModel, []ReceiptModel, error) {
log.Debug("retrieving block cids for block number ", blockNumber)
// Begin new db tx
tx, err := ecr.db.Beginx()
if err != nil {
return HeaderModel{}, nil, nil, nil, err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
headerCID, err := ecr.RetrieveHeaderCIDs(tx, blockNumber)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("header cid retrieval error")
return HeaderModel{}, nil, nil, nil, err
}
@ -550,17 +554,11 @@ func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel,
}
uncleCIDs, err := ecr.RetrieveUncleCIDsByHeaderID(tx, headerCID[0].ID)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("uncle cid retrieval error")
return HeaderModel{}, nil, nil, nil, err
}
txCIDs, err := ecr.RetrieveTxCIDsByHeaderID(tx, headerCID[0].ID)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("tx cid retrieval error")
return HeaderModel{}, nil, nil, nil, err
}
@ -570,13 +568,9 @@ func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (HeaderModel,
}
rctCIDs, err := ecr.RetrieveReceiptCIDsByTxIDs(tx, txIDs)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("rct cid retrieval error")
return HeaderModel{}, nil, nil, nil, err
}
return headerCID[0], uncleCIDs, txCIDs, rctCIDs, tx.Commit()
return headerCID[0], uncleCIDs, txCIDs, rctCIDs, err
}
// RetrieveHeaderCIDByHash returns the header for the given block hash

View File

@ -50,9 +50,7 @@ func (c *Cleaner) ResetValidation(rngs [][2]uint64) error {
SET times_validated = 0
WHERE block_number BETWEEN $1 AND $2`
if _, err := tx.Exec(pgStr, rng[0], rng[1]); err != nil {
if err := tx.Rollback(); err != nil {
logrus.Error(err)
}
shared.Rollback(tx)
return err
}
}
@ -68,9 +66,7 @@ func (c *Cleaner) Clean(rngs [][2]uint64, t shared.DataType) error {
for _, rng := range rngs {
logrus.Infof("eth db cleaner cleaning up block range %d to %d", rng[0], rng[1])
if err := c.clean(tx, rng, t); err != nil {
if err := tx.Rollback(); err != nil {
logrus.Error(err)
}
shared.Rollback(tx)
return err
}
}

View File

@ -50,42 +50,43 @@ func (in *CIDIndexer) Index(cids shared.CIDsForIndexing) error {
if !ok {
return fmt.Errorf("eth indexer expected cids type %T got %T", &CIDPayload{}, cids)
}
// Begin new db tx
tx, err := in.db.Beginx()
if err != nil {
return err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
headerID, err := in.indexHeaderCID(tx, cidPayload.HeaderCID)
if err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("eth indexer error when indexing header")
return err
}
for _, uncle := range cidPayload.UncleCIDs {
if err := in.indexUncleCID(tx, uncle, headerID); err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("eth indexer error when indexing uncle")
return err
}
}
if err := in.indexTransactionAndReceiptCIDs(tx, cidPayload, headerID); err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
log.Error("eth indexer error when indexing transactions and receipts")
return err
}
if err := in.indexStateAndStorageCIDs(tx, cidPayload, headerID); err != nil {
if err := tx.Rollback(); err != nil {
log.Error(err)
}
err = in.indexStateAndStorageCIDs(tx, cidPayload, headerID)
if err != nil {
log.Error("eth indexer error when indexing state and storage nodes")
return err
}
return tx.Commit()
return err
}
func (in *CIDIndexer) indexHeaderCID(tx *sqlx.Tx, header HeaderModel) (int64, error) {

View File

@ -50,48 +50,53 @@ func (f *IPLDPGFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error)
return nil, fmt.Errorf("eth fetcher: expected cids type %T got %T", &CIDWrapper{}, cids)
}
log.Debug("fetching iplds")
var err error
iplds := IPLDs{}
iplds.TotalDifficulty, ok = new(big.Int).SetString(cidWrapper.Header.TotalDifficulty, 10)
if !ok {
return nil, errors.New("eth fetcher: unable to set total difficulty")
}
iplds.BlockNumber = cidWrapper.BlockNumber
tx, err := f.db.Beginx()
if err != nil {
return nil, err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
iplds.Header, err = f.FetchHeader(tx, cidWrapper.Header)
if err != nil {
shared.Rollback(tx)
return nil, fmt.Errorf("eth pg fetcher: header fetching error: %s", err.Error())
}
iplds.Uncles, err = f.FetchUncles(tx, cidWrapper.Uncles)
if err != nil {
shared.Rollback(tx)
return nil, fmt.Errorf("eth pg fetcher: uncle fetching error: %s", err.Error())
}
iplds.Transactions, err = f.FetchTrxs(tx, cidWrapper.Transactions)
if err != nil {
shared.Rollback(tx)
return nil, fmt.Errorf("eth pg fetcher: transaction fetching error: %s", err.Error())
}
iplds.Receipts, err = f.FetchRcts(tx, cidWrapper.Receipts)
if err != nil {
shared.Rollback(tx)
return nil, fmt.Errorf("eth pg fetcher: receipt fetching error: %s", err.Error())
}
iplds.StateNodes, err = f.FetchState(tx, cidWrapper.StateNodes)
if err != nil {
shared.Rollback(tx)
return nil, fmt.Errorf("eth pg fetcher: state fetching error: %s", err.Error())
}
iplds.StorageNodes, err = f.FetchStorage(tx, cidWrapper.StorageNodes)
if err != nil {
shared.Rollback(tx)
return nil, fmt.Errorf("eth pg fetcher: storage fetching error: %s", err.Error())
}
return iplds, tx.Commit()
return iplds, err
}
// FetchHeaders fetches headers

View File

@ -62,24 +62,31 @@ func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (share
if err != nil {
return nil, err
}
defer func() {
if p := recover(); p != nil {
shared.Rollback(tx)
panic(p)
} else if err != nil {
shared.Rollback(tx)
} else {
err = tx.Commit()
}
}()
// Publish trie nodes
for _, node := range txTrieNodes {
if err := shared.PublishIPLD(tx, node); err != nil {
shared.Rollback(tx)
return nil, err
}
}
for _, node := range rctTrieNodes {
if err := shared.PublishIPLD(tx, node); err != nil {
shared.Rollback(tx)
return nil, err
}
}
// Publish and index header
if err := shared.PublishIPLD(tx, headerNode); err != nil {
shared.Rollback(tx)
return nil, err
}
reward := common2.CalcEthBlockReward(ipldPayload.Block.Header(), ipldPayload.Block.Uncles(), ipldPayload.Block.Transactions(), ipldPayload.Receipts)
@ -99,14 +106,12 @@ func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (share
}
headerID, err := pub.indexer.indexHeaderCID(tx, header)
if err != nil {
shared.Rollback(tx)
return nil, err
}
// Publish and index uncles
for _, uncleNode := range uncleNodes {
if err := shared.PublishIPLD(tx, uncleNode); err != nil {
shared.Rollback(tx)
return nil, err
}
uncleReward := common2.CalcUncleMinerReward(ipldPayload.Block.Number().Int64(), uncleNode.Number.Int64())
@ -117,7 +122,6 @@ func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (share
Reward: uncleReward.String(),
}
if err := pub.indexer.indexUncleCID(tx, uncle, headerID); err != nil {
shared.Rollback(tx)
return nil, err
}
}
@ -125,37 +129,30 @@ func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (share
// Publish and index txs and receipts
for i, txNode := range txNodes {
if err := shared.PublishIPLD(tx, txNode); err != nil {
shared.Rollback(tx)
return nil, err
}
rctNode := rctNodes[i]
if err := shared.PublishIPLD(tx, rctNode); err != nil {
shared.Rollback(tx)
return nil, err
}
txModel := ipldPayload.TxMetaData[i]
txModel.CID = txNode.Cid().String()
txID, err := pub.indexer.indexTransactionCID(tx, txModel, headerID)
if err != nil {
shared.Rollback(tx)
return nil, err
}
rctModel := ipldPayload.ReceiptMetaData[i]
rctModel.CID = rctNode.Cid().String()
if err := pub.indexer.indexReceiptCID(tx, rctModel, txID); err != nil {
shared.Rollback(tx)
return nil, err
}
}
// Publish and index state and storage
if err := pub.publishAndIndexStateAndStorage(tx, ipldPayload, headerID); err != nil {
shared.Rollback(tx)
return nil, err
}
err = pub.publishAndIndexStateAndStorage(tx, ipldPayload, headerID)
// This IPLDPublisher does both publishing and indexing, we do not need to pass anything forward to the indexer
return nil, tx.Commit()
return nil, err // return err variable explicity so that we return the err = tx.Commit() assignment in the defer
}
func (pub *IPLDPublisherAndIndexer) publishAndIndexStateAndStorage(tx *sqlx.Tx, ipldPayload ConvertedPayload, headerID int64) error {
@ -166,7 +163,6 @@ func (pub *IPLDPublisherAndIndexer) publishAndIndexStateAndStorage(tx *sqlx.Tx,
return err
}
if err := shared.PublishIPLD(tx, stateIPLD); err != nil {
shared.Rollback(tx)
return err
}
stateModel := StateNodeModel{