Implement single query for transactions and blockByMhKey
This commit is contained in:
parent
fe7329c284
commit
da1c8b2332
1
go.mod
1
go.mod
@ -23,6 +23,7 @@ require (
|
|||||||
github.com/spf13/cobra v1.4.0
|
github.com/spf13/cobra v1.4.0
|
||||||
github.com/spf13/viper v1.11.0
|
github.com/spf13/viper v1.11.0
|
||||||
github.com/vulcanize/eth-ipfs-state-validator/v3 v3.0.2
|
github.com/vulcanize/eth-ipfs-state-validator/v3 v3.0.2
|
||||||
|
github.com/thoas/go-funk v0.9.2
|
||||||
github.com/vulcanize/gap-filler v0.3.1
|
github.com/vulcanize/gap-filler v0.3.1
|
||||||
github.com/vulcanize/ipfs-ethdb/v3 v3.0.3
|
github.com/vulcanize/ipfs-ethdb/v3 v3.0.3
|
||||||
)
|
)
|
||||||
|
@ -26,6 +26,7 @@ import (
|
|||||||
"github.com/jmoiron/sqlx"
|
"github.com/jmoiron/sqlx"
|
||||||
"github.com/lib/pq"
|
"github.com/lib/pq"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
"github.com/thoas/go-funk"
|
||||||
|
|
||||||
"github.com/vulcanize/ipld-eth-server/v3/pkg/shared"
|
"github.com/vulcanize/ipld-eth-server/v3/pkg/shared"
|
||||||
)
|
)
|
||||||
@ -592,6 +593,17 @@ func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID string)
|
|||||||
return txCIDs, tx.Select(&txCIDs, pgStr, headerID)
|
return txCIDs, tx.Select(&txCIDs, pgStr, headerID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ecr *CIDRetriever) RetrieveTxCIDsByBlockNumber(tx *sqlx.Tx, blockNumber int64) ([]models.TxModel, error) {
|
||||||
|
log.Debug("retrieving tx cids for block number ", blockNumber)
|
||||||
|
pgStr := `SELECT CAST(block_number as Text), header_id, index, tx_hash, cid, mh_key,
|
||||||
|
dst, src, tx_data, tx_type, value
|
||||||
|
FROM eth.transaction_cids
|
||||||
|
WHERE block_number = $1
|
||||||
|
ORDER BY index`
|
||||||
|
var txCIDs []models.TxModel
|
||||||
|
return txCIDs, tx.Select(&txCIDs, pgStr, blockNumber)
|
||||||
|
}
|
||||||
|
|
||||||
// RetrieveReceiptCIDsByTxIDs retrieves receipt CIDs by their associated tx IDs
|
// RetrieveReceiptCIDsByTxIDs retrieves receipt CIDs by their associated tx IDs
|
||||||
func (ecr *CIDRetriever) RetrieveReceiptCIDsByTxIDs(tx *sqlx.Tx, txHashes []string) ([]models.ReceiptModel, error) {
|
func (ecr *CIDRetriever) RetrieveReceiptCIDsByTxIDs(tx *sqlx.Tx, txHashes []string) ([]models.ReceiptModel, error) {
|
||||||
log.Debugf("retrieving receipt cids for tx hashes %v", txHashes)
|
log.Debugf("retrieving receipt cids for tx hashes %v", txHashes)
|
||||||
@ -635,13 +647,30 @@ func (ecr *CIDRetriever) RetrieveHeaderAndTxCIDsByBlockNumber(blockNumber int64)
|
|||||||
}
|
}
|
||||||
|
|
||||||
var allTxCIDs [][]models.TxModel
|
var allTxCIDs [][]models.TxModel
|
||||||
for _, headerCID := range headerCIDs {
|
txCIDs, err := ecr.RetrieveTxCIDsByBlockNumber(tx, blockNumber)
|
||||||
var txCIDs []models.TxModel
|
|
||||||
txCIDs, err = ecr.RetrieveTxCIDsByHeaderID(tx, headerCID.BlockHash)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("tx cid retrieval error")
|
log.Error("tx cid retrieval error")
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
txCIDsByHeaderID := funk.Reduce(
|
||||||
|
txCIDs,
|
||||||
|
func(acc map[string][]models.TxModel, txCID models.TxModel) map[string][]models.TxModel {
|
||||||
|
if _, ok := acc[txCID.HeaderID]; !ok {
|
||||||
|
acc[txCID.HeaderID] = []models.TxModel{}
|
||||||
|
}
|
||||||
|
|
||||||
|
txCIDs = append(acc[txCID.HeaderID], txCID)
|
||||||
|
acc[txCID.HeaderID] = txCIDs
|
||||||
|
return acc
|
||||||
|
},
|
||||||
|
make(map[string][]models.TxModel),
|
||||||
|
)
|
||||||
|
|
||||||
|
txCIDsByHeaderIDMap := txCIDsByHeaderID.(map[string][]models.TxModel)
|
||||||
|
|
||||||
|
for _, headerCID := range headerCIDs {
|
||||||
|
txCIDs := txCIDsByHeaderIDMap[headerCID.BlockHash]
|
||||||
allTxCIDs = append(allTxCIDs, txCIDs)
|
allTxCIDs = append(allTxCIDs, txCIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -676,7 +705,6 @@ func (ecr *CIDRetriever) RetrieveHeaderAndTxCIDsByBlockHash(blockHash common.Has
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return models.HeaderModel{}, nil, err
|
return models.HeaderModel{}, nil, err
|
||||||
}
|
}
|
||||||
fmt.Println("RetrieveHeaderAndTxCIDsByBlockHash", headerCID.ParentHash, headerCID.Timestamp)
|
|
||||||
|
|
||||||
var txCIDs []models.TxModel
|
var txCIDs []models.TxModel
|
||||||
txCIDs, err = ecr.RetrieveTxCIDsByHeaderID(tx, headerCID.BlockHash)
|
txCIDs, err = ecr.RetrieveTxCIDsByHeaderID(tx, headerCID.BlockHash)
|
||||||
|
@ -20,11 +20,13 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/big"
|
"math/big"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||||
"github.com/jmoiron/sqlx"
|
"github.com/jmoiron/sqlx"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
"github.com/thoas/go-funk"
|
||||||
"github.com/vulcanize/ipld-eth-server/v3/pkg/shared"
|
"github.com/vulcanize/ipld-eth-server/v3/pkg/shared"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -99,7 +101,7 @@ func (f *IPLDFetcher) Fetch(cids CIDWrapper) (*IPLDs, error) {
|
|||||||
return iplds, err
|
return iplds, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// FetchHeaders fetches headers
|
// FetchHeader fetches header
|
||||||
func (f *IPLDFetcher) FetchHeader(tx *sqlx.Tx, c models.HeaderModel) (models.IPLDModel, error) {
|
func (f *IPLDFetcher) FetchHeader(tx *sqlx.Tx, c models.HeaderModel) (models.IPLDModel, error) {
|
||||||
log.Debug("fetching header ipld")
|
log.Debug("fetching header ipld")
|
||||||
headerBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey)
|
headerBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey)
|
||||||
@ -112,6 +114,38 @@ func (f *IPLDFetcher) FetchHeader(tx *sqlx.Tx, c models.HeaderModel) (models.IPL
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FetchHeaders fetches headers
|
||||||
|
func (f *IPLDFetcher) FetchHeaders(tx *sqlx.Tx, cids []models.HeaderModel) ([]models.IPLDModel, error) {
|
||||||
|
log.Debug("fetching header iplds")
|
||||||
|
headerIPLDs := make([]models.IPLDModel, len(cids))
|
||||||
|
|
||||||
|
blockNumbers := make([]uint64, len(cids))
|
||||||
|
mhKeys := make([]string, len(cids))
|
||||||
|
for i, c := range cids {
|
||||||
|
var err error
|
||||||
|
mhKeys[i] = c.MhKey
|
||||||
|
blockNumbers[i], err = strconv.ParseUint(c.BlockNumber, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fetchedIPLDs, err := shared.FetchIPLDsByMhKeysAndBlockNumbers(tx, mhKeys, blockNumbers)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, c := range cids {
|
||||||
|
headerIPLD := funk.Find(fetchedIPLDs, func(ipld models.IPLDModel) bool {
|
||||||
|
return ipld.Key == c.MhKey
|
||||||
|
}).(models.IPLDModel)
|
||||||
|
|
||||||
|
headerIPLDs[i] = headerIPLD
|
||||||
|
}
|
||||||
|
|
||||||
|
return headerIPLDs, nil
|
||||||
|
}
|
||||||
|
|
||||||
// FetchUncles fetches uncles
|
// FetchUncles fetches uncles
|
||||||
func (f *IPLDFetcher) FetchUncles(tx *sqlx.Tx, cids []models.UncleModel) ([]models.IPLDModel, error) {
|
func (f *IPLDFetcher) FetchUncles(tx *sqlx.Tx, cids []models.UncleModel) ([]models.IPLDModel, error) {
|
||||||
log.Debug("fetching uncle iplds")
|
log.Debug("fetching uncle iplds")
|
||||||
|
@ -1161,13 +1161,33 @@ func (transactionCIDResult EthTransactionCidsConnection) Nodes(ctx context.Conte
|
|||||||
return transactionCIDResult.nodes
|
return transactionCIDResult.nodes
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type IPFSBlock struct {
|
||||||
|
key string
|
||||||
|
data string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b IPFSBlock) Key(ctx context.Context) string {
|
||||||
|
return b.key
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b IPFSBlock) Data(ctx context.Context) string {
|
||||||
|
return b.data
|
||||||
|
}
|
||||||
|
|
||||||
type EthHeaderCid struct {
|
type EthHeaderCid struct {
|
||||||
cid string
|
cid string
|
||||||
blockNumber BigInt
|
blockNumber BigInt
|
||||||
blockHash string
|
blockHash string
|
||||||
parentHash string
|
parentHash string
|
||||||
timestamp BigInt
|
timestamp BigInt
|
||||||
|
stateRoot string
|
||||||
|
td BigInt
|
||||||
|
txRoot string
|
||||||
|
receiptRoot string
|
||||||
|
uncleRoot string
|
||||||
|
bloom string
|
||||||
transactions []*EthTransactionCid
|
transactions []*EthTransactionCid
|
||||||
|
ipfsBlock IPFSBlock
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h EthHeaderCid) Cid(ctx context.Context) string {
|
func (h EthHeaderCid) Cid(ctx context.Context) string {
|
||||||
@ -1190,10 +1210,38 @@ func (h EthHeaderCid) Timestamp(ctx context.Context) BigInt {
|
|||||||
return h.timestamp
|
return h.timestamp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h EthHeaderCid) StateRoot(ctx context.Context) string {
|
||||||
|
return h.stateRoot
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h EthHeaderCid) Td(ctx context.Context) BigInt {
|
||||||
|
return h.td
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h EthHeaderCid) TxRoot(ctx context.Context) string {
|
||||||
|
return h.txRoot
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h EthHeaderCid) ReceiptRoot(ctx context.Context) string {
|
||||||
|
return h.receiptRoot
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h EthHeaderCid) UncleRoot(ctx context.Context) string {
|
||||||
|
return h.uncleRoot
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h EthHeaderCid) Bloom(ctx context.Context) string {
|
||||||
|
return h.bloom
|
||||||
|
}
|
||||||
|
|
||||||
func (h EthHeaderCid) EthTransactionCidsByHeaderId(ctx context.Context) EthTransactionCidsConnection {
|
func (h EthHeaderCid) EthTransactionCidsByHeaderId(ctx context.Context) EthTransactionCidsConnection {
|
||||||
return EthTransactionCidsConnection{nodes: h.transactions}
|
return EthTransactionCidsConnection{nodes: h.transactions}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h EthHeaderCid) BlockByMhKey(ctx context.Context) IPFSBlock {
|
||||||
|
return h.ipfsBlock
|
||||||
|
}
|
||||||
|
|
||||||
type EthHeaderCidsConnection struct {
|
type EthHeaderCidsConnection struct {
|
||||||
nodes []*EthHeaderCid
|
nodes []*EthHeaderCid
|
||||||
}
|
}
|
||||||
@ -1230,6 +1278,17 @@ func (r *Resolver) AllEthHeaderCids(ctx context.Context, args struct {
|
|||||||
return nil, fmt.Errorf("provide block number or block hash")
|
return nil, fmt.Errorf("provide block number or block hash")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Begin tx
|
||||||
|
tx, err := r.backend.DB.Beginx()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
headerIPLDs, err := r.backend.Fetcher.FetchHeaders(tx, headerCIDs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
var resultNodes []*EthHeaderCid
|
var resultNodes []*EthHeaderCid
|
||||||
for idx, headerCID := range headerCIDs {
|
for idx, headerCID := range headerCIDs {
|
||||||
var blockNumber BigInt
|
var blockNumber BigInt
|
||||||
@ -1238,12 +1297,21 @@ func (r *Resolver) AllEthHeaderCids(ctx context.Context, args struct {
|
|||||||
var timestamp BigInt
|
var timestamp BigInt
|
||||||
timestamp.SetUint64(headerCID.Timestamp)
|
timestamp.SetUint64(headerCID.Timestamp)
|
||||||
|
|
||||||
|
var td BigInt
|
||||||
|
td.UnmarshalText([]byte(headerCID.TotalDifficulty))
|
||||||
|
|
||||||
ethHeaderCidNode := EthHeaderCid{
|
ethHeaderCidNode := EthHeaderCid{
|
||||||
cid: headerCID.CID,
|
cid: headerCID.CID,
|
||||||
blockNumber: blockNumber,
|
blockNumber: blockNumber,
|
||||||
blockHash: headerCID.BlockHash,
|
blockHash: headerCID.BlockHash,
|
||||||
parentHash: headerCID.ParentHash,
|
parentHash: headerCID.ParentHash,
|
||||||
timestamp: timestamp,
|
timestamp: timestamp,
|
||||||
|
stateRoot: headerCID.StateRoot,
|
||||||
|
td: td,
|
||||||
|
txRoot: headerCID.TxRoot,
|
||||||
|
receiptRoot: headerCID.RctRoot,
|
||||||
|
uncleRoot: headerCID.UncleRoot,
|
||||||
|
bloom: hexutil.Bytes(headerCID.Bloom).String(),
|
||||||
}
|
}
|
||||||
|
|
||||||
txCIDs := allTxCIDs[idx]
|
txCIDs := allTxCIDs[idx]
|
||||||
@ -1257,6 +1325,11 @@ func (r *Resolver) AllEthHeaderCids(ctx context.Context, args struct {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ethHeaderCidNode.ipfsBlock = IPFSBlock{
|
||||||
|
key: headerIPLDs[idx].Key,
|
||||||
|
data: hexutil.Bytes(headerIPLDs[idx].Data).String(),
|
||||||
|
}
|
||||||
|
|
||||||
resultNodes = append(resultNodes, ðHeaderCidNode)
|
resultNodes = append(resultNodes, ðHeaderCidNode)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,6 +29,8 @@ const schema string = `
|
|||||||
scalar BigInt
|
scalar BigInt
|
||||||
# Long is a 64 bit unsigned integer.
|
# Long is a 64 bit unsigned integer.
|
||||||
scalar Long
|
scalar Long
|
||||||
|
# BigFloat is a floating point number.
|
||||||
|
scalar BigFloat
|
||||||
|
|
||||||
schema {
|
schema {
|
||||||
query: Query
|
query: Query
|
||||||
@ -298,13 +300,28 @@ const schema string = `
|
|||||||
nodes: [EthTransactionCid]!
|
nodes: [EthTransactionCid]!
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type IPFSBlock {
|
||||||
|
key: String!
|
||||||
|
data: String!
|
||||||
|
}
|
||||||
|
|
||||||
type EthHeaderCid {
|
type EthHeaderCid {
|
||||||
cid: String!
|
cid: String!
|
||||||
blockNumber: BigInt!
|
blockNumber: BigInt!
|
||||||
blockHash: String!
|
blockHash: String!
|
||||||
parentHash: String!
|
parentHash: String!
|
||||||
timestamp: BigInt!
|
timestamp: BigInt!
|
||||||
|
stateRoot: String!
|
||||||
|
|
||||||
|
# TODO: Use BigFloat
|
||||||
|
td: BigInt!
|
||||||
|
|
||||||
|
txRoot: String!
|
||||||
|
receiptRoot: String!
|
||||||
|
uncleRoot: String!
|
||||||
|
bloom: String!
|
||||||
ethTransactionCidsByHeaderId: EthTransactionCidsConnection!
|
ethTransactionCidsByHeaderId: EthTransactionCidsConnection!
|
||||||
|
blockByMhKey: IPFSBlock!
|
||||||
}
|
}
|
||||||
|
|
||||||
type EthHeaderCidsConnection {
|
type EthHeaderCidsConnection {
|
||||||
|
@ -19,6 +19,7 @@ package shared
|
|||||||
import (
|
import (
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
||||||
|
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||||
dshelp "github.com/ipfs/go-ipfs-ds-help"
|
dshelp "github.com/ipfs/go-ipfs-ds-help"
|
||||||
@ -77,6 +78,19 @@ func FetchIPLDByMhKey(tx *sqlx.Tx, mhKey string) ([]byte, error) {
|
|||||||
return block, tx.Get(&block, pgStr, mhKey)
|
return block, tx.Get(&block, pgStr, mhKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FetchIPLDByMhKeysAndBlockNumbers is used to retrieve iplds from Postgres blockstore with the provided tx, mhkey strings and blockNumbers
|
||||||
|
func FetchIPLDsByMhKeysAndBlockNumbers(tx *sqlx.Tx, mhKeys []string, blockNumbers []uint64) ([]models.IPLDModel, error) {
|
||||||
|
var blocks []models.IPLDModel
|
||||||
|
pgStr := `SELECT key, data, block_number FROM public.blocks WHERE key IN (?) AND block_number IN (?)`
|
||||||
|
query, args, err := sqlx.In(pgStr, mhKeys, blockNumbers)
|
||||||
|
if err != nil {
|
||||||
|
return blocks, err
|
||||||
|
}
|
||||||
|
query = tx.Rebind(query)
|
||||||
|
|
||||||
|
return blocks, tx.Select(&blocks, query, args...)
|
||||||
|
}
|
||||||
|
|
||||||
// MultihashKeyFromCID converts a cid into a blockstore-prefixed multihash db key string
|
// MultihashKeyFromCID converts a cid into a blockstore-prefixed multihash db key string
|
||||||
func MultihashKeyFromCID(c cid.Cid) string {
|
func MultihashKeyFromCID(c cid.Cid) string {
|
||||||
dbKey := dshelp.MultihashToDsKey(c.Hash())
|
dbKey := dshelp.MultihashToDsKey(c.Hash())
|
||||||
|
Loading…
Reference in New Issue
Block a user