adjust everything to work with mh fks

This commit is contained in:
Ian Norden 2020-08-04 22:34:49 -05:00
parent 77490e8b4b
commit 77b7bcc94c
27 changed files with 501 additions and 362 deletions

View File

@ -130,7 +130,7 @@ func (bcr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID
results := make([]TxModel, 0)
id := 1
pgStr := fmt.Sprintf(`SELECT transaction_cids.id, transaction_cids.header_id,
transaction_cids.tx_hash, transaction_cids.cid,
transaction_cids.tx_hash, transaction_cids.cid, transaction_cids.mh_key,
transaction_cids.segwit, transaction_cids.witness_hash, transaction_cids.index
FROM btc.transaction_cids, btc.header_cids, btc.tx_inputs, btc.tx_outputs
WHERE transaction_cids.header_id = header_cids.id

View File

@ -160,7 +160,7 @@ func (c *Cleaner) cleanFull(tx *sqlx.Tx, rng [2]uint64) error {
func (c *Cleaner) cleanTransactionIPLDs(tx *sqlx.Tx, rng [2]uint64) error {
pgStr := `DELETE FROM public.blocks A
USING btc.transaction_cids B, btc.header_cids C
WHERE A.key = B.cid
WHERE A.key = B.mh_key
AND B.header_id = C.id
AND C.block_number BETWEEN $1 AND $2`
_, err := tx.Exec(pgStr, rng[0], rng[1])
@ -179,7 +179,7 @@ func (c *Cleaner) cleanTransactionMetaData(tx *sqlx.Tx, rng [2]uint64) error {
func (c *Cleaner) cleanHeaderIPLDs(tx *sqlx.Tx, rng [2]uint64) error {
pgStr := `DELETE FROM public.blocks A
USING btc.header_cids B
WHERE A.key = B.cid
WHERE A.key = B.mh_key
AND B.block_number BETWEEN $1 AND $2`
_, err := tx.Exec(pgStr, rng[0], rng[1])
return err

View File

@ -33,25 +33,30 @@ var (
// header variables
blockHash1 = crypto.Keccak256Hash([]byte{00, 02})
blocKNumber1 = big.NewInt(0)
headerCid1 = "mockHeader1CID"
headerCid1 = shared.TestCID([]byte("mockHeader1CID"))
headerMhKey1 = shared.MultihashKeyFromCID(headerCid1)
parentHash = crypto.Keccak256Hash([]byte{00, 01})
headerModel1 = btc.HeaderModel{
BlockHash: blockHash1.String(),
BlockNumber: blocKNumber1.String(),
ParentHash: parentHash.String(),
CID: headerCid1,
CID: headerCid1.String(),
MhKey: headerMhKey1,
}
// tx variables
tx1CID = "mockTx1CID"
tx2CID = "mockTx2CID"
tx1CID = shared.TestCID([]byte("mockTx1CID"))
tx1MhKey = shared.MultihashKeyFromCID(tx1CID)
tx2CID = shared.TestCID([]byte("mockTx2CID"))
tx2MhKey = shared.MultihashKeyFromCID(tx2CID)
tx1Hash = crypto.Keccak256Hash([]byte{01, 01})
tx2Hash = crypto.Keccak256Hash([]byte{01, 02})
opHash = crypto.Keccak256Hash([]byte{02, 01})
txModels1 = []btc.TxModelWithInsAndOuts{
{
Index: 0,
CID: tx1CID,
CID: tx1CID.String(),
MhKey: tx1MhKey,
TxHash: tx1Hash.String(),
SegWit: true,
TxInputs: []btc.TxInput{
@ -75,7 +80,8 @@ var (
},
{
Index: 1,
CID: tx2CID,
CID: tx2CID.String(),
MhKey: tx2MhKey,
TxHash: tx2Hash.String(),
SegWit: true,
},
@ -89,21 +95,25 @@ var (
// header variables
blockHash2 = crypto.Keccak256Hash([]byte{00, 03})
blocKNumber2 = big.NewInt(1)
headerCid2 = "mockHeaderCID2"
headerCid2 = shared.TestCID([]byte("mockHeaderCID2"))
headerMhKey2 = shared.MultihashKeyFromCID(headerCid2)
headerModel2 = btc.HeaderModel{
BlockNumber: blocKNumber2.String(),
BlockHash: blockHash2.String(),
ParentHash: blockHash1.String(),
CID: headerCid2,
CID: headerCid2.String(),
MhKey: headerMhKey2,
}
// tx variables
tx3CID = "mockTx3CID"
tx3CID = shared.TestCID([]byte("mockTx3CID"))
tx3MhKey = shared.MultihashKeyFromCID(tx3CID)
tx3Hash = crypto.Keccak256Hash([]byte{01, 03})
txModels2 = []btc.TxModelWithInsAndOuts{
{
Index: 0,
CID: tx3CID,
CID: tx3CID.String(),
MhKey: tx3MhKey,
TxHash: tx3Hash.String(),
SegWit: true,
},
@ -113,12 +123,12 @@ var (
TransactionCIDs: txModels2,
}
rngs = [][2]uint64{{0, 1}}
cids = []string{
headerCid1,
headerCid2,
tx1CID,
tx2CID,
tx3CID,
mhKeys = []string{
headerMhKey1,
headerMhKey2,
tx1MhKey,
tx2MhKey,
tx3MhKey,
}
mockData = []byte{'\x01'}
)
@ -139,16 +149,15 @@ var _ = Describe("Cleaner", func() {
Describe("Clean", func() {
BeforeEach(func() {
for _, key := range mhKeys {
_, err := db.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2)`, key, mockData)
Expect(err).ToNot(HaveOccurred())
}
err := repo.Index(mockCIDPayload1)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(mockCIDPayload2)
Expect(err).ToNot(HaveOccurred())
for _, cid := range cids {
_, err = db.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2)`, cid, mockData)
Expect(err).ToNot(HaveOccurred())
}
tx, err := db.Beginx()
Expect(err).ToNot(HaveOccurred())
var startingIPFSBlocksCount int
@ -286,6 +295,11 @@ var _ = Describe("Cleaner", func() {
Describe("ResetValidation", func() {
BeforeEach(func() {
for _, key := range mhKeys {
_, err := db.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2)`, key, mockData)
Expect(err).ToNot(HaveOccurred())
}
err := repo.Index(mockCIDPayload1)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(mockCIDPayload2)

View File

@ -74,11 +74,11 @@ func (in *CIDIndexer) Index(cids shared.CIDsForIndexing) error {
func (in *CIDIndexer) indexHeaderCID(tx *sqlx.Tx, header HeaderModel) (int64, error) {
var headerID int64
err := tx.QueryRowx(`INSERT INTO btc.header_cids (block_number, block_hash, parent_hash, cid, timestamp, bits, node_id, times_validated)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (block_number, block_hash) DO UPDATE SET (parent_hash, cid, timestamp, bits, node_id, times_validated) = ($3, $4, $5, $6, $7, btc.header_cids.times_validated + 1)
err := tx.QueryRowx(`INSERT INTO btc.header_cids (block_number, block_hash, parent_hash, cid, timestamp, bits, node_id, mh_key, times_validated)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (block_number, block_hash) DO UPDATE SET (parent_hash, cid, timestamp, bits, node_id, mh_key, times_validated) = ($3, $4, $5, $6, $7, $8, btc.header_cids.times_validated + 1)
RETURNING id`,
header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.Timestamp, header.Bits, in.db.NodeID, 1).Scan(&headerID)
header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.Timestamp, header.Bits, in.db.NodeID, header.MhKey, 1).Scan(&headerID)
return headerID, err
}
@ -107,11 +107,11 @@ func (in *CIDIndexer) indexTransactionCIDs(tx *sqlx.Tx, transactions []TxModelWi
func (in *CIDIndexer) indexTransactionCID(tx *sqlx.Tx, transaction TxModelWithInsAndOuts, headerID int64) (int64, error) {
var txID int64
err := tx.QueryRowx(`INSERT INTO btc.transaction_cids (header_id, tx_hash, index, cid, segwit, witness_hash)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (tx_hash) DO UPDATE SET (header_id, index, cid, segwit, witness_hash) = ($1, $3, $4, $5, $6)
err := tx.QueryRowx(`INSERT INTO btc.transaction_cids (header_id, tx_hash, index, cid, segwit, witness_hash, mh_key)
VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (tx_hash) DO UPDATE SET (header_id, index, cid, segwit, witness_hash, mh_key) = ($1, $3, $4, $5, $6, $7)
RETURNING id`,
headerID, transaction.TxHash, transaction.Index, transaction.CID, transaction.SegWit, transaction.WitnessHash).Scan(&txID)
headerID, transaction.TxHash, transaction.Index, transaction.CID, transaction.SegWit, transaction.WitnessHash, transaction.MhKey).Scan(&txID)
return txID, err
}

View File

@ -31,11 +31,17 @@ var _ = Describe("Indexer", func() {
db *postgres.DB
err error
repo *btc.CIDIndexer
mockData = []byte{1, 2, 3}
)
BeforeEach(func() {
db, err = shared.SetupDB()
Expect(err).ToNot(HaveOccurred())
repo = btc.NewCIDIndexer(db)
// need entries in the public.blocks with the mhkeys or the FK constraint will fail
shared.PublishMockIPLD(db, mocks.MockHeaderMhKey, mockData)
shared.PublishMockIPLD(db, mocks.MockTrxMhKey1, mockData)
shared.PublishMockIPLD(db, mocks.MockTrxMhKey2, mockData)
shared.PublishMockIPLD(db, mocks.MockTrxMhKey3, mockData)
})
AfterEach(func() {
btc.TearDownDB(db)
@ -43,6 +49,7 @@ var _ = Describe("Indexer", func() {
Describe("Index", func() {
It("Indexes CIDs and related metadata into vulcanizedb", func() {
err = repo.Index(&mocks.MockCIDPayload)
Expect(err).ToNot(HaveOccurred())
pgStr := `SELECT * FROM btc.header_cids
@ -72,13 +79,13 @@ var _ = Describe("Indexer", func() {
Expect(tx.WitnessHash).To(Equal(""))
switch tx.Index {
case 0:
Expect(tx.CID).To(Equal("mockTrxCID1"))
Expect(tx.CID).To(Equal(mocks.MockTrxCID1.String()))
Expect(tx.TxHash).To(Equal(mocks.MockBlock.Transactions[0].TxHash().String()))
case 1:
Expect(tx.CID).To(Equal("mockTrxCID2"))
Expect(tx.CID).To(Equal(mocks.MockTrxCID2.String()))
Expect(tx.TxHash).To(Equal(mocks.MockBlock.Transactions[1].TxHash().String()))
case 2:
Expect(tx.CID).To(Equal("mockTrxCID3"))
Expect(tx.CID).To(Equal(mocks.MockTrxCID3.String()))
Expect(tx.TxHash).To(Equal(mocks.MockBlock.Transactions[2].TxHash().String()))
}
}

View File

@ -79,7 +79,7 @@ func (f *IPLDPGFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error)
// FetchHeaders fetches headers
func (f *IPLDPGFetcher) FetchHeader(tx *sqlx.Tx, c HeaderModel) (ipfs.BlockModel, error) {
log.Debug("fetching header ipld")
headerBytes, err := shared.FetchIPLD(tx, c.CID)
headerBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey)
if err != nil {
return ipfs.BlockModel{}, err
}
@ -94,7 +94,7 @@ func (f *IPLDPGFetcher) FetchTrxs(tx *sqlx.Tx, cids []TxModel) ([]ipfs.BlockMode
log.Debug("fetching transaction iplds")
trxIPLDs := make([]ipfs.BlockModel, len(cids))
for i, c := range cids {
trxBytes, err := shared.FetchIPLD(tx, c.CID)
trxBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey)
if err != nil {
return nil, err
}

View File

@ -25,11 +25,19 @@ import (
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/btc"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
)
var (
MockHeaderCID = shared.TestCID([]byte("MockHeaderCID"))
MockTrxCID1 = shared.TestCID([]byte("MockTrxCID1"))
MockTrxCID2 = shared.TestCID([]byte("MockTrxCID2"))
MockTrxCID3 = shared.TestCID([]byte("MockTrxCID3"))
MockHeaderMhKey = shared.MultihashKeyFromCID(MockHeaderCID)
MockTrxMhKey1 = shared.MultihashKeyFromCID(MockTrxCID1)
MockTrxMhKey2 = shared.MultihashKeyFromCID(MockTrxCID2)
MockTrxMhKey3 = shared.MultihashKeyFromCID(MockTrxCID3)
MockBlockHeight int64 = 1337
MockBlock = wire.MsgBlock{
Header: wire.BlockHeader{
@ -479,7 +487,8 @@ var (
}
MockTxsMetaDataPostPublish = []btc.TxModelWithInsAndOuts{
{
CID: "mockTrxCID1",
CID: MockTrxCID1.String(),
MhKey: MockTrxMhKey1,
TxHash: MockBlock.Transactions[0].TxHash().String(),
Index: 0,
SegWit: MockBlock.Transactions[0].HasWitness(),
@ -517,7 +526,8 @@ var (
},
},
{
CID: "mockTrxCID2",
CID: MockTrxCID2.String(),
MhKey: MockTrxMhKey2,
TxHash: MockBlock.Transactions[1].TxHash().String(),
Index: 1,
SegWit: MockBlock.Transactions[1].HasWitness(),
@ -594,7 +604,8 @@ var (
},
},
{
CID: "mockTrxCID3",
CID: MockTrxCID3.String(),
MhKey: MockTrxMhKey3,
TxHash: MockBlock.Transactions[2].TxHash().String(),
Index: 2,
SegWit: MockBlock.Transactions[2].HasWitness(),
@ -671,7 +682,8 @@ var (
},
}
MockHeaderMetaData = btc.HeaderModel{
CID: "mockHeaderCID",
CID: MockHeaderCID.String(),
MhKey: MockHeaderMhKey,
ParentHash: MockBlock.Header.PrevBlock.String(),
BlockNumber: strconv.Itoa(int(MockBlockHeight)),
BlockHash: MockBlock.Header.BlockHash().String(),
@ -686,53 +698,6 @@ var (
HeaderCID: MockHeaderMetaData,
TransactionCIDs: MockTxsMetaDataPostPublish,
}
DummyCIDPayloadForFKReference = btc.CIDPayload{
HeaderCID: btc.HeaderModel{
CID: "dummyHeader",
ParentHash: "",
BlockHash: "",
BlockNumber: "1336",
Bits: 1,
Timestamp: 1000000000,
},
TransactionCIDs: []btc.TxModelWithInsAndOuts{
{
TxHash: "87a157f3fd88ac7907c05fc55e271dc4acdc5605d187d646604ca8c0e9382e03",
CID: "dummyTx1",
Index: 0,
TxOutputs: []btc.TxOutput{
{
Index: 0,
RequiredSigs: 0,
Value: 0,
PkScript: []byte{},
ScriptClass: 0,
},
},
},
{
TxHash: "cf4e2978d0611ce46592e02d7e7daf8627a316ab69759a9f3df109a7f2bf3ec3",
CID: "dummyTx2",
Index: 1,
TxOutputs: []btc.TxOutput{
{
Index: 0,
RequiredSigs: 0,
Value: 0,
PkScript: []byte{},
ScriptClass: 0,
},
{
Index: 1,
RequiredSigs: 0,
Value: 0,
PkScript: []byte{},
ScriptClass: 0,
},
},
},
},
}
)
func stringSliceFromAddresses(addrs []btcutil.Address) []string {

View File

@ -25,6 +25,7 @@ type HeaderModel struct {
BlockHash string `db:"block_hash"`
ParentHash string `db:"parent_hash"`
CID string `db:"cid"`
MhKey string `db:"mh_key"`
Timestamp int64 `db:"timestamp"`
Bits uint32 `db:"bits"`
NodeID int64 `db:"node_id"`
@ -38,6 +39,7 @@ type TxModel struct {
Index int64 `db:"index"`
TxHash string `db:"tx_hash"`
CID string `db:"cid"`
MhKey string `db:"mh_key"`
SegWit bool `db:"segwit"`
WitnessHash string `db:"witness_hash"`
}
@ -49,6 +51,7 @@ type TxModelWithInsAndOuts struct {
Index int64 `db:"index"`
TxHash string `db:"tx_hash"`
CID string `db:"cid"`
MhKey string `db:"mh_key"`
SegWit bool `db:"segwit"`
WitnessHash string `db:"witness_hash"`
TxInputs []TxInput

View File

@ -80,6 +80,7 @@ func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (share
}
header := HeaderModel{
CID: headerNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(headerNode.Cid()),
ParentHash: ipldPayload.Header.PrevBlock.String(),
BlockNumber: strconv.Itoa(int(ipldPayload.BlockPayload.BlockHeight)),
BlockHash: ipldPayload.Header.BlockHash().String(),
@ -98,6 +99,7 @@ func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (share
}
txModel := ipldPayload.TxMetaData[i]
txModel.CID = txNode.Cid().String()
txModel.MhKey = shared.MultihashKeyFromCID(txNode.Cid())
txID, err := pub.indexer.indexTransactionCID(tx, txModel, headerID)
if err != nil {
return nil, err

View File

@ -62,8 +62,10 @@ func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForI
if err != nil {
return nil, err
}
mhKey, _ := shared.MultihashKeyFromCIDString(headerCid)
header := HeaderModel{
CID: headerCid,
MhKey: mhKey,
ParentHash: ipldPayload.Header.PrevBlock.String(),
BlockNumber: strconv.Itoa(int(ipldPayload.BlockPayload.BlockHeight)),
BlockHash: ipldPayload.Header.BlockHash().String(),
@ -97,8 +99,10 @@ func (pub *IPLDPublisher) publishTransactions(transactions []*ipld.BtcTx, txTrie
if err != nil {
return nil, err
}
mhKey, _ := shared.MultihashKeyFromCIDString(cid)
txCids[i] = TxModelWithInsAndOuts{
CID: cid,
MhKey: mhKey,
Index: trxMeta[i].Index,
TxHash: trxMeta[i].TxHash,
SegWit: trxMeta[i].SegWit,

View File

@ -57,12 +57,12 @@ var _ = Describe("Publisher", func() {
Expect(err).ToNot(HaveOccurred())
tx3Bytes := by.Bytes()
mockHeaderDagPutter.CIDsToReturn = map[common.Hash]string{
common.BytesToHash(headerBytes): "mockHeaderCID",
common.BytesToHash(headerBytes): mocks.MockHeaderCID.String(),
}
mockTrxDagPutter.CIDsToReturn = map[common.Hash]string{
common.BytesToHash(tx1Bytes): "mockTrxCID1",
common.BytesToHash(tx2Bytes): "mockTrxCID2",
common.BytesToHash(tx3Bytes): "mockTrxCID3",
common.BytesToHash(tx1Bytes): mocks.MockTrxCID1.String(),
common.BytesToHash(tx2Bytes): mocks.MockTrxCID2.String(),
common.BytesToHash(tx3Bytes): mocks.MockTrxCID3.String(),
}
publisher := btc.IPLDPublisher{
HeaderPutter: mockHeaderDagPutter,

View File

@ -124,7 +124,7 @@ var _ = Describe("API", func() {
})
Describe("GetTransactionByHash", func() {
It("Retrieves the head block number", func() {
It("Retrieves a transaction by hash", func() {
hash := mocks.MockTransactions[0].Hash()
tx, err := api.GetTransactionByHash(context.Background(), hash)
Expect(err).ToNot(HaveOccurred())

View File

@ -326,12 +326,12 @@ func (b *Backend) BlockByHash(ctx context.Context, hash common.Hash) (*types.Blo
// GetTransaction retrieves a tx by hash
// It also returns the blockhash, blocknumber, and tx index associated with the transaction
func (b *Backend) GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) {
pgStr := `SELECT transaction_cids.cid, transaction_cids.index, header_cids.block_hash, header_cids.block_number
pgStr := `SELECT transaction_cids.mh_key, transaction_cids.index, header_cids.block_hash, header_cids.block_number
FROM eth.transaction_cids, eth.header_cids
WHERE transaction_cids.header_id = header_cids.id
AND transaction_cids.tx_hash = $1`
var txCIDWithHeaderInfo struct {
CID string `db:"cid"`
MhKey string `db:"mh_key"`
Index int64 `db:"index"`
BlockHash string `db:"block_hash"`
BlockNumber int64 `db:"block_number"`
@ -356,7 +356,7 @@ func (b *Backend) GetTransaction(ctx context.Context, txHash common.Hash) (*type
}
}()
txIPLD, err := b.Fetcher.FetchTrxs(tx, []TxModel{{CID: txCIDWithHeaderInfo.CID}})
txIPLD, err := b.Fetcher.FetchTrxs(tx, []TxModel{{MhKey: txCIDWithHeaderInfo.MhKey}})
if err != nil {
return nil, common.Hash{}, 0, 0, err
}

View File

@ -186,7 +186,7 @@ func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID
results := make([]TxModel, 0)
id := 1
pgStr := fmt.Sprintf(`SELECT transaction_cids.id, transaction_cids.header_id,
transaction_cids.tx_hash, transaction_cids.cid,
transaction_cids.tx_hash, transaction_cids.cid, transaction_cids.mh_key,
transaction_cids.dst, transaction_cids.src, transaction_cids.index
FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.id)
WHERE header_cids.id = $%d`, id)
@ -210,8 +210,8 @@ func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID
func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter ReceiptFilter, headerID int64, trxIds []int64) ([]ReceiptModel, error) {
log.Debug("retrieving receipt cids for header id ", headerID)
args := make([]interface{}, 0, 4)
pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.contract,
receipt_cids.contract_hash, receipt_cids.topic0s, receipt_cids.topic1s,
pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key,
receipt_cids.contract, receipt_cids.contract_hash, receipt_cids.topic0s, receipt_cids.topic1s,
receipt_cids.topic2s, receipt_cids.topic3s, receipt_cids.log_contracts
FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids
WHERE receipt_cids.tx_id = transaction_cids.id
@ -290,8 +290,8 @@ func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter Receip
func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, blockNumber int64, blockHash *common.Hash, trxIds []int64) ([]ReceiptModel, error) {
log.Debug("retrieving receipt cids for block ", blockNumber)
args := make([]interface{}, 0, 5)
pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.contract,
receipt_cids.contract_hash, receipt_cids.topic0s, receipt_cids.topic1s,
pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key,
receipt_cids.contract, receipt_cids.contract_hash, receipt_cids.topic0s, receipt_cids.topic1s,
receipt_cids.topic2s, receipt_cids.topic3s, receipt_cids.log_contracts
FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids
WHERE receipt_cids.tx_id = transaction_cids.id
@ -387,7 +387,7 @@ func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter StateFilter,
log.Debug("retrieving state cids for header id ", headerID)
args := make([]interface{}, 0, 2)
pgStr := `SELECT state_cids.id, state_cids.header_id,
state_cids.state_leaf_key, state_cids.node_type, state_cids.cid, state_cids.state_path
state_cids.state_leaf_key, state_cids.node_type, state_cids.cid, state_cids.mh_key, state_cids.state_path
FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id)
WHERE header_cids.id = $1`
args = append(args, headerID)
@ -411,8 +411,8 @@ func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter StateFilter,
func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageFilter, headerID int64) ([]StorageNodeWithStateKeyModel, error) {
log.Debug("retrieving storage cids for header id ", headerID)
args := make([]interface{}, 0, 3)
pgStr := `SELECT storage_cids.id, storage_cids.state_id, storage_cids.storage_leaf_key,
storage_cids.node_type, storage_cids.cid, storage_cids.storage_path, state_cids.state_leaf_key
pgStr := `SELECT storage_cids.id, storage_cids.state_id, storage_cids.storage_leaf_key, storage_cids.node_type,
storage_cids.cid, storage_cids.mh_key, storage_cids.storage_path, state_cids.state_leaf_key
FROM eth.storage_cids, eth.state_cids, eth.header_cids
WHERE storage_cids.state_id = state_cids.id
AND state_cids.header_id = header_cids.id
@ -607,8 +607,8 @@ func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) (
// RetrieveReceiptCIDsByTxIDs retrieves receipt CIDs by their associated tx IDs
func (ecr *CIDRetriever) RetrieveReceiptCIDsByTxIDs(tx *sqlx.Tx, txIDs []int64) ([]ReceiptModel, error) {
log.Debugf("retrieving receipt cids for tx ids %v", txIDs)
pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.contract,
receipt_cids.contract_hash, receipt_cids.topic0s, receipt_cids.topic1s,
pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key,
receipt_cids.contract, receipt_cids.contract_hash, receipt_cids.topic0s, receipt_cids.topic1s,
receipt_cids.topic2s, receipt_cids.topic3s, receipt_cids.log_contracts
FROM eth.receipt_cids, eth.transaction_cids
WHERE tx_id = ANY($1::INTEGER[])

View File

@ -19,6 +19,8 @@ package eth_test
import (
"math/big"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/common"
. "github.com/onsi/ginkgo"
@ -209,14 +211,14 @@ var (
var _ = Describe("Retriever", func() {
var (
db *postgres.DB
repo *eth2.CIDIndexer
repo *eth2.IPLDPublisherAndIndexer
retriever *eth2.CIDRetriever
)
BeforeEach(func() {
var err error
db, err = shared.SetupDB()
Expect(err).ToNot(HaveOccurred())
repo = eth2.NewCIDIndexer(db)
repo = eth2.NewIPLDPublisherAndIndexer(db)
retriever = eth2.NewCIDRetriever(db)
})
AfterEach(func() {
@ -225,7 +227,7 @@ var _ = Describe("Retriever", func() {
Describe("Retrieve", func() {
BeforeEach(func() {
err := repo.Index(mocks.MockCIDPayload)
_, err := repo.Publish(mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred())
})
It("Retrieves all CIDs for the given blocknumber when provided an open filter", func() {
@ -395,6 +397,7 @@ var _ = Describe("Retriever", func() {
NodeType: 2,
StateKey: common.BytesToHash(mocks.AccountLeafKey).Hex(),
CID: mocks.State2CID.String(),
MhKey: mocks.State2MhKey,
Path: []byte{'\x0c'},
}))
@ -405,8 +408,12 @@ var _ = Describe("Retriever", func() {
})
Describe("RetrieveFirstBlockNumber", func() {
It("Throws an error if there are no blocks in the database", func() {
_, err := retriever.RetrieveFirstBlockNumber()
Expect(err).To(HaveOccurred())
})
It("Gets the number of the first block that has data in the database", func() {
err := repo.Index(mocks.MockCIDPayload)
_, err := repo.Publish(mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred())
num, err := retriever.RetrieveFirstBlockNumber()
Expect(err).ToNot(HaveOccurred())
@ -414,9 +421,9 @@ var _ = Describe("Retriever", func() {
})
It("Gets the number of the first block that has data in the database", func() {
payload := *mocks.MockCIDPayload
payload.HeaderCID.BlockNumber = "1010101"
err := repo.Index(&payload)
payload := mocks.MockConvertedPayload
payload.Block = newMockBlock(1010101)
_, err := repo.Publish(payload)
Expect(err).ToNot(HaveOccurred())
num, err := retriever.RetrieveFirstBlockNumber()
Expect(err).ToNot(HaveOccurred())
@ -424,13 +431,13 @@ var _ = Describe("Retriever", func() {
})
It("Gets the number of the first block that has data in the database", func() {
payload1 := *mocks.MockCIDPayload
payload1.HeaderCID.BlockNumber = "1010101"
payload1 := mocks.MockConvertedPayload
payload1.Block = newMockBlock(1010101)
payload2 := payload1
payload2.HeaderCID.BlockNumber = "5"
err := repo.Index(&payload1)
payload2.Block = newMockBlock(5)
_, err := repo.Publish(payload1)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload2)
_, err = repo.Publish(payload2)
Expect(err).ToNot(HaveOccurred())
num, err := retriever.RetrieveFirstBlockNumber()
Expect(err).ToNot(HaveOccurred())
@ -439,8 +446,12 @@ var _ = Describe("Retriever", func() {
})
Describe("RetrieveLastBlockNumber", func() {
It("Throws an error if there are no blocks in the database", func() {
_, err := retriever.RetrieveLastBlockNumber()
Expect(err).To(HaveOccurred())
})
It("Gets the number of the latest block that has data in the database", func() {
err := repo.Index(mocks.MockCIDPayload)
_, err := repo.Publish(mocks.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred())
num, err := retriever.RetrieveLastBlockNumber()
Expect(err).ToNot(HaveOccurred())
@ -448,9 +459,9 @@ var _ = Describe("Retriever", func() {
})
It("Gets the number of the latest block that has data in the database", func() {
payload := *mocks.MockCIDPayload
payload.HeaderCID.BlockNumber = "1010101"
err := repo.Index(&payload)
payload := mocks.MockConvertedPayload
payload.Block = newMockBlock(1010101)
_, err := repo.Publish(payload)
Expect(err).ToNot(HaveOccurred())
num, err := retriever.RetrieveLastBlockNumber()
Expect(err).ToNot(HaveOccurred())
@ -458,13 +469,13 @@ var _ = Describe("Retriever", func() {
})
It("Gets the number of the latest block that has data in the database", func() {
payload1 := *mocks.MockCIDPayload
payload1.HeaderCID.BlockNumber = "1010101"
payload1 := mocks.MockConvertedPayload
payload1.Block = newMockBlock(1010101)
payload2 := payload1
payload2.HeaderCID.BlockNumber = "5"
err := repo.Index(&payload1)
payload2.Block = newMockBlock(5)
_, err := repo.Publish(payload1)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload2)
_, err = repo.Publish(payload2)
Expect(err).ToNot(HaveOccurred())
num, err := retriever.RetrieveLastBlockNumber()
Expect(err).ToNot(HaveOccurred())
@ -474,21 +485,20 @@ var _ = Describe("Retriever", func() {
Describe("RetrieveGapsInData", func() {
It("Doesn't return gaps if there are none", func() {
payload0 := *mocks.MockCIDPayload
payload0.HeaderCID.BlockNumber = "0"
payload1 := *mocks.MockCIDPayload
payload1.HeaderCID.BlockNumber = "1"
payload0 := mocks.MockConvertedPayload
payload0.Block = newMockBlock(0)
payload1 := mocks.MockConvertedPayload
payload2 := payload1
payload2.HeaderCID.BlockNumber = "2"
payload2.Block = newMockBlock(2)
payload3 := payload2
payload3.HeaderCID.BlockNumber = "3"
err := repo.Index(&payload0)
payload3.Block = newMockBlock(3)
_, err := repo.Publish(payload0)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload1)
_, err = repo.Publish(payload1)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload2)
_, err = repo.Publish(payload2)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload3)
_, err = repo.Publish(payload3)
Expect(err).ToNot(HaveOccurred())
gaps, err := retriever.RetrieveGapsInData(1)
Expect(err).ToNot(HaveOccurred())
@ -496,9 +506,9 @@ var _ = Describe("Retriever", func() {
})
It("Returns the gap from 0 to the earliest block", func() {
payload := *mocks.MockCIDPayload
payload.HeaderCID.BlockNumber = "5"
err := repo.Index(&payload)
payload := mocks.MockConvertedPayload
payload.Block = newMockBlock(5)
_, err := repo.Publish(payload)
Expect(err).ToNot(HaveOccurred())
gaps, err := retriever.RetrieveGapsInData(1)
Expect(err).ToNot(HaveOccurred())
@ -508,17 +518,16 @@ var _ = Describe("Retriever", func() {
})
It("Can handle single block gaps", func() {
payload0 := *mocks.MockCIDPayload
payload0.HeaderCID.BlockNumber = "0"
payload1 := *mocks.MockCIDPayload
payload1.HeaderCID.BlockNumber = "1"
payload0 := mocks.MockConvertedPayload
payload0.Block = newMockBlock(0)
payload1 := mocks.MockConvertedPayload
payload3 := payload1
payload3.HeaderCID.BlockNumber = "3"
err := repo.Index(&payload0)
payload3.Block = newMockBlock(3)
_, err := repo.Publish(payload0)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload1)
_, err = repo.Publish(payload1)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload3)
_, err = repo.Publish(payload3)
Expect(err).ToNot(HaveOccurred())
gaps, err := retriever.RetrieveGapsInData(1)
Expect(err).ToNot(HaveOccurred())
@ -528,13 +537,13 @@ var _ = Describe("Retriever", func() {
})
It("Finds gap between two entries", func() {
payload1 := *mocks.MockCIDPayload
payload1.HeaderCID.BlockNumber = "1010101"
payload1 := mocks.MockConvertedPayload
payload1.Block = newMockBlock(1010101)
payload2 := payload1
payload2.HeaderCID.BlockNumber = "0"
err := repo.Index(&payload1)
payload2.Block = newMockBlock(0)
_, err := repo.Publish(payload1)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload2)
_, err = repo.Publish(payload2)
Expect(err).ToNot(HaveOccurred())
gaps, err := retriever.RetrieveGapsInData(1)
Expect(err).ToNot(HaveOccurred())
@ -544,49 +553,50 @@ var _ = Describe("Retriever", func() {
})
It("Finds gaps between multiple entries", func() {
payload := *mocks.MockCIDPayload
payload.HeaderCID.BlockNumber = "1010101"
payload1 := payload
payload1.HeaderCID.BlockNumber = "1"
payload2 := payload1
payload2.HeaderCID.BlockNumber = "5"
payload3 := payload2
payload3.HeaderCID.BlockNumber = "100"
payload4 := payload3
payload4.HeaderCID.BlockNumber = "101"
payload5 := payload4
payload5.HeaderCID.BlockNumber = "102"
payload6 := payload4
payload6.HeaderCID.BlockNumber = "103"
payload7 := payload4
payload7.HeaderCID.BlockNumber = "104"
payload8 := payload4
payload8.HeaderCID.BlockNumber = "105"
payload9 := payload4
payload9.HeaderCID.BlockNumber = "106"
payload10 := payload5
payload10.HeaderCID.BlockNumber = "1000"
err := repo.Index(&payload)
payload1 := mocks.MockConvertedPayload
payload1.Block = newMockBlock(1010101)
payload2 := mocks.MockConvertedPayload
payload2.Block = newMockBlock(1)
payload3 := mocks.MockConvertedPayload
payload3.Block = newMockBlock(5)
payload4 := mocks.MockConvertedPayload
payload4.Block = newMockBlock(100)
payload5 := mocks.MockConvertedPayload
payload5.Block = newMockBlock(101)
payload6 := mocks.MockConvertedPayload
payload6.Block = newMockBlock(102)
payload7 := mocks.MockConvertedPayload
payload7.Block = newMockBlock(103)
payload8 := mocks.MockConvertedPayload
payload8.Block = newMockBlock(104)
payload9 := mocks.MockConvertedPayload
payload9.Block = newMockBlock(105)
payload10 := mocks.MockConvertedPayload
payload10.Block = newMockBlock(106)
payload11 := mocks.MockConvertedPayload
payload11.Block = newMockBlock(1000)
_, err := repo.Publish(payload1)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload1)
_, err = repo.Publish(payload2)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload2)
_, err = repo.Publish(payload3)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload3)
_, err = repo.Publish(payload4)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload4)
_, err = repo.Publish(payload5)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload5)
_, err = repo.Publish(payload6)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload6)
_, err = repo.Publish(payload7)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload7)
_, err = repo.Publish(payload8)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload8)
_, err = repo.Publish(payload9)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload9)
_, err = repo.Publish(payload10)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload10)
_, err = repo.Publish(payload11)
Expect(err).ToNot(HaveOccurred())
gaps, err := retriever.RetrieveGapsInData(1)
@ -600,61 +610,63 @@ var _ = Describe("Retriever", func() {
})
It("Finds validation level gaps", func() {
payload := *mocks.MockCIDPayload
payload.HeaderCID.BlockNumber = "1010101"
payload1 := payload
payload1.HeaderCID.BlockNumber = "1"
payload2 := payload1
payload2.HeaderCID.BlockNumber = "5"
payload3 := payload2
payload3.HeaderCID.BlockNumber = "100"
payload4 := payload3
payload4.HeaderCID.BlockNumber = "101"
payload5 := payload4
payload5.HeaderCID.BlockNumber = "102"
payload6 := payload4
payload6.HeaderCID.BlockNumber = "103"
payload7 := payload4
payload7.HeaderCID.BlockNumber = "104"
payload8 := payload4
payload8.HeaderCID.BlockNumber = "105"
payload9 := payload4
payload9.HeaderCID.BlockNumber = "106"
payload10 := payload4
payload10.HeaderCID.BlockNumber = "107"
payload11 := payload4
payload11.HeaderCID.BlockNumber = "108"
payload12 := payload4
payload12.HeaderCID.BlockNumber = "109"
payload13 := payload5
payload13.HeaderCID.BlockNumber = "1000"
err := repo.Index(&payload)
payload1 := mocks.MockConvertedPayload
payload1.Block = newMockBlock(1010101)
payload2 := mocks.MockConvertedPayload
payload2.Block = newMockBlock(1)
payload3 := mocks.MockConvertedPayload
payload3.Block = newMockBlock(5)
payload4 := mocks.MockConvertedPayload
payload4.Block = newMockBlock(100)
payload5 := mocks.MockConvertedPayload
payload5.Block = newMockBlock(101)
payload6 := mocks.MockConvertedPayload
payload6.Block = newMockBlock(102)
payload7 := mocks.MockConvertedPayload
payload7.Block = newMockBlock(103)
payload8 := mocks.MockConvertedPayload
payload8.Block = newMockBlock(104)
payload9 := mocks.MockConvertedPayload
payload9.Block = newMockBlock(105)
payload10 := mocks.MockConvertedPayload
payload10.Block = newMockBlock(106)
payload11 := mocks.MockConvertedPayload
payload11.Block = newMockBlock(107)
payload12 := mocks.MockConvertedPayload
payload12.Block = newMockBlock(108)
payload13 := mocks.MockConvertedPayload
payload13.Block = newMockBlock(109)
payload14 := mocks.MockConvertedPayload
payload14.Block = newMockBlock(1000)
_, err := repo.Publish(payload1)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload1)
_, err = repo.Publish(payload2)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload2)
_, err = repo.Publish(payload3)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload3)
_, err = repo.Publish(payload4)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload4)
_, err = repo.Publish(payload5)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload5)
_, err = repo.Publish(payload6)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload6)
_, err = repo.Publish(payload7)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload7)
_, err = repo.Publish(payload8)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload8)
_, err = repo.Publish(payload9)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload9)
_, err = repo.Publish(payload10)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload10)
_, err = repo.Publish(payload11)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload11)
_, err = repo.Publish(payload12)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload12)
_, err = repo.Publish(payload13)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(&payload13)
_, err = repo.Publish(payload14)
Expect(err).ToNot(HaveOccurred())
cleaner := eth.NewCleaner(db)
@ -675,3 +687,9 @@ var _ = Describe("Retriever", func() {
})
})
})
func newMockBlock(blockNumber uint64) *types.Block {
header := mocks.MockHeader
header.Number.SetUint64(blockNumber)
return types.NewBlock(&mocks.MockHeader, mocks.MockTransactions, nil, mocks.MockReceipts)
}

View File

@ -243,7 +243,7 @@ func (c *Cleaner) cleanFull(tx *sqlx.Tx, rng [2]uint64) error {
func (c *Cleaner) cleanStorageIPLDs(tx *sqlx.Tx, rng [2]uint64) error {
pgStr := `DELETE FROM public.blocks A
USING eth.storage_cids B, eth.state_cids C, eth.header_cids D
WHERE A.key = B.cid
WHERE A.key = B.mh_key
AND B.state_id = C.id
AND C.header_id = D.id
AND D.block_number BETWEEN $1 AND $2`
@ -264,7 +264,7 @@ func (c *Cleaner) cleanStorageMetaData(tx *sqlx.Tx, rng [2]uint64) error {
func (c *Cleaner) cleanStateIPLDs(tx *sqlx.Tx, rng [2]uint64) error {
pgStr := `DELETE FROM public.blocks A
USING eth.state_cids B, eth.header_cids C
WHERE A.key = B.cid
WHERE A.key = B.mh_key
AND B.header_id = C.id
AND C.block_number BETWEEN $1 AND $2`
_, err := tx.Exec(pgStr, rng[0], rng[1])
@ -283,7 +283,7 @@ func (c *Cleaner) cleanStateMetaData(tx *sqlx.Tx, rng [2]uint64) error {
func (c *Cleaner) cleanReceiptIPLDs(tx *sqlx.Tx, rng [2]uint64) error {
pgStr := `DELETE FROM public.blocks A
USING eth.receipt_cids B, eth.transaction_cids C, eth.header_cids D
WHERE A.key = B.cid
WHERE A.key = B.mh_key
AND B.tx_id = C.id
AND C.header_id = D.id
AND D.block_number BETWEEN $1 AND $2`
@ -304,7 +304,7 @@ func (c *Cleaner) cleanReceiptMetaData(tx *sqlx.Tx, rng [2]uint64) error {
func (c *Cleaner) cleanTransactionIPLDs(tx *sqlx.Tx, rng [2]uint64) error {
pgStr := `DELETE FROM public.blocks A
USING eth.transaction_cids B, eth.header_cids C
WHERE A.key = B.cid
WHERE A.key = B.mh_key
AND B.header_id = C.id
AND C.block_number BETWEEN $1 AND $2`
_, err := tx.Exec(pgStr, rng[0], rng[1])
@ -323,7 +323,7 @@ func (c *Cleaner) cleanTransactionMetaData(tx *sqlx.Tx, rng [2]uint64) error {
func (c *Cleaner) cleanUncleIPLDs(tx *sqlx.Tx, rng [2]uint64) error {
pgStr := `DELETE FROM public.blocks A
USING eth.uncle_cids B, eth.header_cids C
WHERE A.key = B.cid
WHERE A.key = B.mh_key
AND B.header_id = C.id
AND C.block_number BETWEEN $1 AND $2`
_, err := tx.Exec(pgStr, rng[0], rng[1])
@ -342,7 +342,7 @@ func (c *Cleaner) cleanUncleMetaData(tx *sqlx.Tx, rng [2]uint64) error {
func (c *Cleaner) cleanHeaderIPLDs(tx *sqlx.Tx, rng [2]uint64) error {
pgStr := `DELETE FROM public.blocks A
USING eth.header_cids B
WHERE A.key = B.cid
WHERE A.key = B.mh_key
AND B.block_number BETWEEN $1 AND $2`
_, err := tx.Exec(pgStr, rng[0], rng[1])
return err

View File

@ -34,47 +34,55 @@ var (
// header variables
blockHash1 = crypto.Keccak256Hash([]byte{00, 02})
blocKNumber1 = big.NewInt(0)
headerCID1 = "mockHeader1CID"
headerCID1 = shared.TestCID([]byte("mockHeader1CID"))
headerMhKey1 = shared.MultihashKeyFromCID(headerCID1)
parentHash = crypto.Keccak256Hash([]byte{00, 01})
totalDifficulty = "50000000000000000000"
reward = "5000000000000000000"
headerModel = eth.HeaderModel{
BlockHash: blockHash1.String(),
BlockNumber: blocKNumber1.String(),
CID: headerCID1,
CID: headerCID1.String(),
MhKey: headerMhKey1,
ParentHash: parentHash.String(),
TotalDifficulty: totalDifficulty,
Reward: reward,
}
// tx variables
tx1CID = "mockTx1CID"
tx2CID = "mockTx2CID"
tx1CID = shared.TestCID([]byte("mockTx1CID"))
tx1MhKey = shared.MultihashKeyFromCID(tx1CID)
tx2CID = shared.TestCID([]byte("mockTx2CID"))
tx2MhKey = shared.MultihashKeyFromCID(tx2CID)
tx1Hash = crypto.Keccak256Hash([]byte{01, 01})
tx2Hash = crypto.Keccak256Hash([]byte{01, 02})
txSrc = common.HexToAddress("0x010a")
txDst = common.HexToAddress("0x020a")
txModels1 = []eth.TxModel{
{
CID: tx1CID,
CID: tx1CID.String(),
MhKey: tx1MhKey,
TxHash: tx1Hash.String(),
Index: 0,
},
{
CID: tx2CID,
CID: tx2CID.String(),
MhKey: tx2MhKey,
TxHash: tx2Hash.String(),
Index: 1,
},
}
// uncle variables
uncleCID = "mockUncle1CID"
uncleCID = shared.TestCID([]byte("mockUncle1CID"))
uncleMhKey = shared.MultihashKeyFromCID(uncleCID)
uncleHash = crypto.Keccak256Hash([]byte{02, 02})
uncleParentHash = crypto.Keccak256Hash([]byte{02, 01})
uncleReward = "1000000000000000000"
uncleModels1 = []eth.UncleModel{
{
CID: uncleCID,
CID: uncleCID.String(),
MhKey: uncleMhKey,
Reward: uncleReward,
BlockHash: uncleHash.String(),
ParentHash: uncleParentHash.String(),
@ -82,37 +90,45 @@ var (
}
// receipt variables
rct1CID = "mockRct1CID"
rct2CID = "mockRct2CID"
rct1CID = shared.TestCID([]byte("mockRct1CID"))
rct1MhKey = shared.MultihashKeyFromCID(rct1CID)
rct2CID = shared.TestCID([]byte("mockRct2CID"))
rct2MhKey = shared.MultihashKeyFromCID(rct2CID)
rct1Contract = common.Address{}
rct2Contract = common.HexToAddress("0x010c")
receiptModels1 = map[common.Hash]eth.ReceiptModel{
tx1Hash: {
CID: rct1CID,
CID: rct1CID.String(),
MhKey: rct1MhKey,
ContractHash: crypto.Keccak256Hash(rct1Contract.Bytes()).String(),
},
tx2Hash: {
CID: rct2CID,
CID: rct2CID.String(),
MhKey: rct2MhKey,
ContractHash: crypto.Keccak256Hash(rct2Contract.Bytes()).String(),
},
}
// state variables
state1CID1 = "mockState1CID1"
state1CID1 = shared.TestCID([]byte("mockState1CID1"))
state1MhKey1 = shared.MultihashKeyFromCID(state1CID1)
state1Path = []byte{'\x01'}
state1Key = crypto.Keccak256Hash(txSrc.Bytes())
state2CID1 = "mockState2CID1"
state2CID1 = shared.TestCID([]byte("mockState2CID1"))
state2MhKey1 = shared.MultihashKeyFromCID(state2CID1)
state2Path = []byte{'\x02'}
state2Key = crypto.Keccak256Hash(txDst.Bytes())
stateModels1 = []eth.StateNodeModel{
{
CID: state1CID1,
CID: state1CID1.String(),
MhKey: state1MhKey1,
Path: state1Path,
NodeType: 2,
StateKey: state1Key.String(),
},
{
CID: state2CID1,
CID: state2CID1.String(),
MhKey: state2MhKey1,
Path: state2Path,
NodeType: 2,
StateKey: state2Key.String(),
@ -120,13 +136,15 @@ var (
}
// storage variables
storageCID = "mockStorageCID1"
storageCID = shared.TestCID([]byte("mockStorageCID1"))
storageMhKey = shared.MultihashKeyFromCID(storageCID)
storagePath = []byte{'\x01'}
storageKey = crypto.Keccak256Hash(common.Hex2Bytes("0x0000000000000000000000000000000000000000000000000000000000000000"))
storageModels1 = map[string][]eth.StorageNodeModel{
common.Bytes2Hex(state1Path): {
{
CID: storageCID,
CID: storageCID.String(),
MhKey: storageMhKey,
StorageKey: storageKey.String(),
Path: storagePath,
NodeType: 2,
@ -146,39 +164,47 @@ var (
// header variables
blockHash2 = crypto.Keccak256Hash([]byte{00, 03})
blocKNumber2 = big.NewInt(1)
headerCID2 = "mockHeaderCID2"
headerCID2 = shared.TestCID([]byte("mockHeaderCID2"))
headerMhKey2 = shared.MultihashKeyFromCID(headerCID2)
headerModel2 = eth.HeaderModel{
BlockHash: blockHash2.String(),
BlockNumber: blocKNumber2.String(),
CID: headerCID2,
CID: headerCID2.String(),
MhKey: headerMhKey2,
ParentHash: blockHash1.String(),
TotalDifficulty: totalDifficulty,
Reward: reward,
}
// tx variables
tx3CID = "mockTx3CID"
tx3CID = shared.TestCID([]byte("mockTx3CID"))
tx3MhKey = shared.MultihashKeyFromCID(tx3CID)
tx3Hash = crypto.Keccak256Hash([]byte{01, 03})
txModels2 = []eth.TxModel{
{
CID: tx3CID,
CID: tx3CID.String(),
MhKey: tx3MhKey,
TxHash: tx3Hash.String(),
Index: 0,
},
}
// receipt variables
rct3CID = "mockRct3CID"
rct3CID = shared.TestCID([]byte("mockRct3CID"))
rct3MhKey = shared.MultihashKeyFromCID(rct3CID)
receiptModels2 = map[common.Hash]eth.ReceiptModel{
tx3Hash: {
CID: rct3CID,
CID: rct3CID.String(),
MhKey: rct3MhKey,
ContractHash: crypto.Keccak256Hash(rct1Contract.Bytes()).String(),
},
}
// state variables
state1CID2 = "mockState1CID2"
state1CID2 = shared.TestCID([]byte("mockState1CID2"))
state1MhKey2 = shared.MultihashKeyFromCID(state1CID2)
stateModels2 = []eth.StateNodeModel{
{
CID: state1CID2,
CID: state1CID2.String(),
MhKey: state1MhKey2,
Path: state1Path,
NodeType: 2,
StateKey: state1Key.String(),
@ -191,20 +217,20 @@ var (
StateNodeCIDs: stateModels2,
}
rngs = [][2]uint64{{0, 1}}
cids = []string{
headerCID1,
headerCID2,
uncleCID,
tx1CID,
tx2CID,
tx3CID,
rct1CID,
rct2CID,
rct3CID,
state1CID1,
state2CID1,
state1CID2,
storageCID,
mhKeys = []string{
headerMhKey1,
headerMhKey2,
uncleMhKey,
tx1MhKey,
tx2MhKey,
tx3MhKey,
rct1MhKey,
rct2MhKey,
rct3MhKey,
state1MhKey1,
state2MhKey1,
state1MhKey2,
storageMhKey,
}
mockData = []byte{'\x01'}
)
@ -224,16 +250,16 @@ var _ = Describe("Cleaner", func() {
})
Describe("Clean", func() {
BeforeEach(func() {
for _, key := range mhKeys {
_, err := db.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2)`, key, mockData)
Expect(err).ToNot(HaveOccurred())
}
err := repo.Index(mockCIDPayload1)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(mockCIDPayload2)
Expect(err).ToNot(HaveOccurred())
for _, cid := range cids {
_, err = db.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2)`, cid, mockData)
Expect(err).ToNot(HaveOccurred())
}
tx, err := db.Beginx()
Expect(err).ToNot(HaveOccurred())
@ -613,6 +639,11 @@ var _ = Describe("Cleaner", func() {
Describe("ResetValidation", func() {
BeforeEach(func() {
for _, key := range mhKeys {
_, err := db.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2)`, key, mockData)
Expect(err).ToNot(HaveOccurred())
}
err := repo.Index(mockCIDPayload1)
Expect(err).ToNot(HaveOccurred())
err = repo.Index(mockCIDPayload2)

View File

@ -72,8 +72,8 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Convert
return nil, err
}
txMeta := TxModel{
Dst: shared.HandleNullAddrPointer(trx.To()),
Src: shared.HandleNullAddr(from),
Dst: shared.HandleZeroAddrPointer(trx.To()),
Src: shared.HandleZeroAddr(from),
TxHash: trx.Hash().String(),
Index: int64(i),
}
@ -106,7 +106,7 @@ func (pc *PayloadConverter) Convert(payload shared.RawChainData) (shared.Convert
logContracts = append(logContracts, addr)
}
// This is the contract address if this receipt is for a contract creation tx
contract := shared.HandleNullAddr(receipt.ContractAddress)
contract := shared.HandleZeroAddr(receipt.ContractAddress)
var contractHash string
if contract != "" {
contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String()

View File

@ -90,29 +90,29 @@ func (in *CIDIndexer) Index(cids shared.CIDsForIndexing) error {
func (in *CIDIndexer) indexHeaderCID(tx *sqlx.Tx, header HeaderModel) (int64, error) {
var headerID int64
err := tx.QueryRowx(`INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, times_validated)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
ON CONFLICT (block_number, block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, times_validated) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, eth.header_cids.times_validated + 1)
err := tx.QueryRowx(`INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
ON CONFLICT (block_number, block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1)
RETURNING id`,
header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, in.db.NodeID, header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, 1).Scan(&headerID)
header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1).Scan(&headerID)
return headerID, err
}
func (in *CIDIndexer) indexUncleCID(tx *sqlx.Tx, uncle UncleModel, headerID int64) error {
_, err := tx.Exec(`INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward) VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (header_id, block_hash) DO UPDATE SET (parent_hash, cid, reward) = ($3, $4, $5)`,
uncle.BlockHash, headerID, uncle.ParentHash, uncle.CID, uncle.Reward)
_, err := tx.Exec(`INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (header_id, block_hash) DO UPDATE SET (parent_hash, cid, reward, mh_key) = ($3, $4, $5, $6)`,
uncle.BlockHash, headerID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey)
return err
}
func (in *CIDIndexer) indexTransactionAndReceiptCIDs(tx *sqlx.Tx, payload *CIDPayload, headerID int64) error {
for _, trxCidMeta := range payload.TransactionCIDs {
var txID int64
err := tx.QueryRowx(`INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index) VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index) = ($3, $4, $5, $6)
err := tx.QueryRowx(`INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index, mh_key) = ($3, $4, $5, $6, $7)
RETURNING id`,
headerID, trxCidMeta.TxHash, trxCidMeta.CID, trxCidMeta.Dst, trxCidMeta.Src, trxCidMeta.Index).Scan(&txID)
headerID, trxCidMeta.TxHash, trxCidMeta.CID, trxCidMeta.Dst, trxCidMeta.Src, trxCidMeta.Index, trxCidMeta.MhKey).Scan(&txID)
if err != nil {
return err
}
@ -128,17 +128,17 @@ func (in *CIDIndexer) indexTransactionAndReceiptCIDs(tx *sqlx.Tx, payload *CIDPa
func (in *CIDIndexer) indexTransactionCID(tx *sqlx.Tx, transaction TxModel, headerID int64) (int64, error) {
var txID int64
err := tx.QueryRowx(`INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index) VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index) = ($3, $4, $5, $6)
err := tx.QueryRowx(`INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index, mh_key) = ($3, $4, $5, $6, $7)
RETURNING id`,
headerID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index).Scan(&txID)
headerID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey).Scan(&txID)
return txID, err
}
func (in *CIDIndexer) indexReceiptCID(tx *sqlx.Tx, cidMeta ReceiptModel, txID int64) error {
_, err := tx.Exec(`INSERT INTO eth.receipt_cids (tx_id, cid, contract, contract_hash, topic0s, topic1s, topic2s, topic3s, log_contracts) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (tx_id) DO UPDATE SET (cid, contract, contract_hash, topic0s, topic1s, topic2s, topic3s, log_contracts) = ($2, $3, $4, $5, $6, $7, $8, $9)`,
txID, cidMeta.CID, cidMeta.Contract, cidMeta.ContractHash, cidMeta.Topic0s, cidMeta.Topic1s, cidMeta.Topic2s, cidMeta.Topic3s, cidMeta.LogContracts)
_, err := tx.Exec(`INSERT INTO eth.receipt_cids (tx_id, cid, contract, contract_hash, topic0s, topic1s, topic2s, topic3s, log_contracts, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (tx_id) DO UPDATE SET (cid, contract, contract_hash, topic0s, topic1s, topic2s, topic3s, log_contracts, mh_key) = ($2, $3, $4, $5, $6, $7, $8, $9, $10)`,
txID, cidMeta.CID, cidMeta.Contract, cidMeta.ContractHash, cidMeta.Topic0s, cidMeta.Topic1s, cidMeta.Topic2s, cidMeta.Topic3s, cidMeta.LogContracts, cidMeta.MhKey)
return err
}
@ -149,10 +149,10 @@ func (in *CIDIndexer) indexStateAndStorageCIDs(tx *sqlx.Tx, payload *CIDPayload,
if stateCID.StateKey != nullHash.String() {
stateKey = stateCID.StateKey
}
err := tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff) VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff) = ($2, $3, $5, $6)
err := tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (header_id, state_path, diff) DO UPDATE SET (state_leaf_key, cid, node_type, mh_key) = ($2, $3, $5, $7)
RETURNING id`,
headerID, stateKey, stateCID.CID, stateCID.Path, stateCID.NodeType, true).Scan(&stateID)
headerID, stateKey, stateCID.CID, stateCID.Path, stateCID.NodeType, true, stateCID.MhKey).Scan(&stateID)
if err != nil {
return err
}
@ -180,10 +180,10 @@ func (in *CIDIndexer) indexStateCID(tx *sqlx.Tx, stateNode StateNodeModel, heade
if stateNode.StateKey != nullHash.String() {
stateKey = stateNode.StateKey
}
err := tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff) VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff) = ($2, $3, $5, $6)
err := tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (header_id, state_path, diff) DO UPDATE SET (state_leaf_key, cid, node_type, mh_key) = ($2, $3, $5, $7)
RETURNING id`,
headerID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true).Scan(&stateID)
headerID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey).Scan(&stateID)
return stateID, err
}
@ -199,8 +199,8 @@ func (in *CIDIndexer) indexStorageCID(tx *sqlx.Tx, storageCID StorageNodeModel,
if storageCID.StorageKey != nullHash.String() {
storageKey = storageCID.StorageKey
}
_, err := tx.Exec(`INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff) VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff) = ($2, $3, $5, $6)`,
stateID, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true)
_, err := tx.Exec(`INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (state_id, storage_path, diff) DO UPDATE SET (storage_leaf_key, cid, node_type, mh_key) = ($2, $3, $5, $7)`,
stateID, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey)
return err
}

View File

@ -37,6 +37,17 @@ var _ = Describe("Indexer", func() {
db, err = shared.SetupDB()
Expect(err).ToNot(HaveOccurred())
repo = eth.NewCIDIndexer(db)
// need entries in the public.blocks with the mhkeys or the FK constraint will fail
shared.PublishMockIPLD(db, mocks.HeaderMhKey, mockData)
shared.PublishMockIPLD(db, mocks.Trx1MhKey, mockData)
shared.PublishMockIPLD(db, mocks.Trx2MhKey, mockData)
shared.PublishMockIPLD(db, mocks.Trx3MhKey, mockData)
shared.PublishMockIPLD(db, mocks.Rct1MhKey, mockData)
shared.PublishMockIPLD(db, mocks.Rct2MhKey, mockData)
shared.PublishMockIPLD(db, mocks.Rct3MhKey, mockData)
shared.PublishMockIPLD(db, mocks.State1MhKey, mockData)
shared.PublishMockIPLD(db, mocks.State2MhKey, mockData)
shared.PublishMockIPLD(db, mocks.StorageMhKey, mockData)
})
AfterEach(func() {
eth.TearDownDB(db)

View File

@ -102,7 +102,7 @@ func (f *IPLDPGFetcher) Fetch(cids shared.CIDsForFetching) (shared.IPLDs, error)
// FetchHeaders fetches headers
func (f *IPLDPGFetcher) FetchHeader(tx *sqlx.Tx, c HeaderModel) (ipfs.BlockModel, error) {
log.Debug("fetching header ipld")
headerBytes, err := shared.FetchIPLD(tx, c.CID)
headerBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey)
if err != nil {
return ipfs.BlockModel{}, err
}
@ -117,7 +117,7 @@ func (f *IPLDPGFetcher) FetchUncles(tx *sqlx.Tx, cids []UncleModel) ([]ipfs.Bloc
log.Debug("fetching uncle iplds")
uncleIPLDs := make([]ipfs.BlockModel, len(cids))
for i, c := range cids {
uncleBytes, err := shared.FetchIPLD(tx, c.CID)
uncleBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey)
if err != nil {
return nil, err
}
@ -134,7 +134,7 @@ func (f *IPLDPGFetcher) FetchTrxs(tx *sqlx.Tx, cids []TxModel) ([]ipfs.BlockMode
log.Debug("fetching transaction iplds")
trxIPLDs := make([]ipfs.BlockModel, len(cids))
for i, c := range cids {
txBytes, err := shared.FetchIPLD(tx, c.CID)
txBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey)
if err != nil {
return nil, err
}
@ -151,7 +151,7 @@ func (f *IPLDPGFetcher) FetchRcts(tx *sqlx.Tx, cids []ReceiptModel) ([]ipfs.Bloc
log.Debug("fetching receipt iplds")
rctIPLDs := make([]ipfs.BlockModel, len(cids))
for i, c := range cids {
rctBytes, err := shared.FetchIPLD(tx, c.CID)
rctBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey)
if err != nil {
return nil, err
}
@ -171,7 +171,7 @@ func (f *IPLDPGFetcher) FetchState(tx *sqlx.Tx, cids []StateNodeModel) ([]StateN
if stateNode.CID == "" {
continue
}
stateBytes, err := shared.FetchIPLD(tx, stateNode.CID)
stateBytes, err := shared.FetchIPLDByMhKey(tx, stateNode.MhKey)
if err != nil {
return nil, err
}
@ -196,7 +196,7 @@ func (f *IPLDPGFetcher) FetchStorage(tx *sqlx.Tx, cids []StorageNodeWithStateKey
if storageNode.CID == "" || storageNode.StateKey == "" {
continue
}
storageBytes, err := shared.FetchIPLD(tx, storageNode.CID)
storageBytes, err := shared.FetchIPLDByMhKey(tx, storageNode.MhKey)
if err != nil {
return nil, err
}

View File

@ -22,6 +22,8 @@ import (
"crypto/rand"
"math/big"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/shared"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
@ -76,18 +78,29 @@ var (
Data: []byte{},
}
HeaderCID, _ = ipld.RawdataToCid(ipld.MEthHeader, MockHeaderRlp, multihash.KECCAK_256)
HeaderMhKey = shared.MultihashKeyFromCID(HeaderCID)
Trx1CID, _ = ipld.RawdataToCid(ipld.MEthTx, MockTransactions.GetRlp(0), multihash.KECCAK_256)
Trx1MhKey = shared.MultihashKeyFromCID(Trx1CID)
Trx2CID, _ = ipld.RawdataToCid(ipld.MEthTx, MockTransactions.GetRlp(1), multihash.KECCAK_256)
Trx2MhKey = shared.MultihashKeyFromCID(Trx2CID)
Trx3CID, _ = ipld.RawdataToCid(ipld.MEthTx, MockTransactions.GetRlp(2), multihash.KECCAK_256)
Trx3MhKey = shared.MultihashKeyFromCID(Trx3CID)
Rct1CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, MockReceipts.GetRlp(0), multihash.KECCAK_256)
Rct1MhKey = shared.MultihashKeyFromCID(Rct1CID)
Rct2CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, MockReceipts.GetRlp(1), multihash.KECCAK_256)
Rct2MhKey = shared.MultihashKeyFromCID(Rct2CID)
Rct3CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, MockReceipts.GetRlp(2), multihash.KECCAK_256)
Rct3MhKey = shared.MultihashKeyFromCID(Rct3CID)
State1CID, _ = ipld.RawdataToCid(ipld.MEthStateTrie, ContractLeafNode, multihash.KECCAK_256)
State1MhKey = shared.MultihashKeyFromCID(State1CID)
State2CID, _ = ipld.RawdataToCid(ipld.MEthStateTrie, AccountLeafNode, multihash.KECCAK_256)
State2MhKey = shared.MultihashKeyFromCID(State2CID)
StorageCID, _ = ipld.RawdataToCid(ipld.MEthStorageTrie, StorageLeafNode, multihash.KECCAK_256)
StorageMhKey = shared.MultihashKeyFromCID(StorageCID)
MockTrxMeta = []eth.TxModel{
{
CID: "", // This is empty until we go to publish to ipfs
MhKey: "",
Src: SenderAddr.Hex(),
Dst: Address.String(),
Index: 0,
@ -95,6 +108,7 @@ var (
},
{
CID: "",
MhKey: "",
Src: SenderAddr.Hex(),
Dst: AnotherAddress.String(),
Index: 1,
@ -102,6 +116,7 @@ var (
},
{
CID: "",
MhKey: "",
Src: SenderAddr.Hex(),
Dst: "",
Index: 2,
@ -111,6 +126,7 @@ var (
MockTrxMetaPostPublsh = []eth.TxModel{
{
CID: Trx1CID.String(), // This is empty until we go to publish to ipfs
MhKey: Trx1MhKey,
Src: SenderAddr.Hex(),
Dst: Address.String(),
Index: 0,
@ -118,6 +134,7 @@ var (
},
{
CID: Trx2CID.String(),
MhKey: Trx2MhKey,
Src: SenderAddr.Hex(),
Dst: AnotherAddress.String(),
Index: 1,
@ -125,6 +142,7 @@ var (
},
{
CID: Trx3CID.String(),
MhKey: Trx3MhKey,
Src: SenderAddr.Hex(),
Dst: "",
Index: 2,
@ -134,6 +152,7 @@ var (
MockRctMeta = []eth.ReceiptModel{
{
CID: "",
MhKey: "",
Topic0s: []string{
mockTopic11.String(),
},
@ -148,6 +167,7 @@ var (
},
{
CID: "",
MhKey: "",
Topic0s: []string{
mockTopic21.String(),
},
@ -162,6 +182,7 @@ var (
},
{
CID: "",
MhKey: "",
Contract: ContractAddress.String(),
ContractHash: ContractHash,
LogContracts: []string{},
@ -170,6 +191,7 @@ var (
MockRctMetaPostPublish = []eth.ReceiptModel{
{
CID: Rct1CID.String(),
MhKey: Rct1MhKey,
Topic0s: []string{
mockTopic11.String(),
},
@ -184,6 +206,7 @@ var (
},
{
CID: Rct2CID.String(),
MhKey: Rct2MhKey,
Topic0s: []string{
mockTopic21.String(),
},
@ -198,6 +221,7 @@ var (
},
{
CID: Rct3CID.String(),
MhKey: Rct3MhKey,
Contract: ContractAddress.String(),
ContractHash: ContractHash,
LogContracts: []string{},
@ -296,12 +320,14 @@ var (
MockStateMetaPostPublish = []eth.StateNodeModel{
{
CID: State1CID.String(),
MhKey: State1MhKey,
Path: []byte{'\x06'},
NodeType: 2,
StateKey: common.BytesToHash(ContractLeafKey).Hex(),
},
{
CID: State2CID.String(),
MhKey: State2MhKey,
Path: []byte{'\x0c'},
NodeType: 2,
StateKey: common.BytesToHash(AccountLeafKey).Hex(),
@ -341,6 +367,7 @@ var (
BlockHash: MockBlock.Hash().String(),
BlockNumber: MockBlock.Number().String(),
CID: HeaderCID.String(),
MhKey: HeaderMhKey,
ParentHash: MockBlock.ParentHash().String(),
TotalDifficulty: MockBlock.Difficulty().String(),
Reward: "5000000000000000000",
@ -363,6 +390,7 @@ var (
contractPath: {
{
CID: StorageCID.String(),
MhKey: StorageMhKey,
Path: []byte{},
StorageKey: common.BytesToHash(StorageLeafKey).Hex(),
NodeType: 2,
@ -392,6 +420,7 @@ var (
BlockHash: MockBlock.Hash().String(),
ParentHash: "0x0000000000000000000000000000000000000000000000000000000000000000",
CID: HeaderCID.String(),
MhKey: HeaderMhKey,
TotalDifficulty: MockBlock.Difficulty().String(),
Reward: "5000000000000000000",
StateRoot: MockBlock.Root().String(),
@ -410,6 +439,7 @@ var (
{
Path: []byte{},
CID: StorageCID.String(),
MhKey: StorageMhKey,
NodeType: 2,
StateKey: common.BytesToHash(ContractLeafKey).Hex(),
StorageKey: common.BytesToHash(StorageLeafKey).Hex(),

View File

@ -25,6 +25,7 @@ type HeaderModel struct {
BlockHash string `db:"block_hash"`
ParentHash string `db:"parent_hash"`
CID string `db:"cid"`
MhKey string `db:"mh_key"`
TotalDifficulty string `db:"td"`
NodeID int64 `db:"node_id"`
Reward string `db:"reward"`
@ -44,6 +45,7 @@ type UncleModel struct {
BlockHash string `db:"block_hash"`
ParentHash string `db:"parent_hash"`
CID string `db:"cid"`
MhKey string `db:"mh_key"`
Reward string `db:"reward"`
}
@ -54,6 +56,7 @@ type TxModel struct {
Index int64 `db:"index"`
TxHash string `db:"tx_hash"`
CID string `db:"cid"`
MhKey string `db:"mh_key"`
Dst string `db:"dst"`
Src string `db:"src"`
}
@ -63,6 +66,7 @@ type ReceiptModel struct {
ID int64 `db:"id"`
TxID int64 `db:"tx_id"`
CID string `db:"cid"`
MhKey string `db:"mh_key"`
Contract string `db:"contract"`
ContractHash string `db:"contract_hash"`
LogContracts pq.StringArray `db:"log_contracts"`
@ -80,6 +84,7 @@ type StateNodeModel struct {
StateKey string `db:"state_leaf_key"`
NodeType int `db:"node_type"`
CID string `db:"cid"`
MhKey string `db:"mh_key"`
Diff bool `db:"diff"`
}
@ -91,6 +96,7 @@ type StorageNodeModel struct {
StorageKey string `db:"storage_leaf_key"`
NodeType int `db:"node_type"`
CID string `db:"cid"`
MhKey string `db:"mh_key"`
Diff bool `db:"diff"`
}
@ -103,6 +109,7 @@ type StorageNodeWithStateKeyModel struct {
StorageKey string `db:"storage_leaf_key"`
NodeType int `db:"node_type"`
CID string `db:"cid"`
MhKey string `db:"mh_key"`
Diff bool `db:"diff"`
}

View File

@ -92,6 +92,7 @@ func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (share
reward := CalcEthBlockReward(ipldPayload.Block.Header(), ipldPayload.Block.Uncles(), ipldPayload.Block.Transactions(), ipldPayload.Receipts)
header := HeaderModel{
CID: headerNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(headerNode.Cid()),
ParentHash: ipldPayload.Block.ParentHash().String(),
BlockNumber: ipldPayload.Block.Number().String(),
BlockHash: ipldPayload.Block.Hash().String(),
@ -117,6 +118,7 @@ func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (share
uncleReward := CalcUncleMinerReward(ipldPayload.Block.Number().Int64(), uncleNode.Number.Int64())
uncle := UncleModel{
CID: uncleNode.Cid().String(),
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
ParentHash: uncleNode.ParentHash.String(),
BlockHash: uncleNode.Hash().String(),
Reward: uncleReward.String(),
@ -137,12 +139,14 @@ func (pub *IPLDPublisherAndIndexer) Publish(payload shared.ConvertedData) (share
}
txModel := ipldPayload.TxMetaData[i]
txModel.CID = txNode.Cid().String()
txModel.MhKey = shared.MultihashKeyFromCID(txNode.Cid())
txID, err := pub.indexer.indexTransactionCID(tx, txModel, headerID)
if err != nil {
return nil, err
}
rctModel := ipldPayload.ReceiptMetaData[i]
rctModel.CID = rctNode.Cid().String()
rctModel.MhKey = shared.MultihashKeyFromCID(rctNode.Cid())
if err := pub.indexer.indexReceiptCID(tx, rctModel, txID); err != nil {
return nil, err
}
@ -162,10 +166,12 @@ func (pub *IPLDPublisherAndIndexer) publishAndIndexStateAndStorage(tx *sqlx.Tx,
if err != nil {
return err
}
mhKey, _ := shared.MultihashKeyFromCIDString(stateCIDStr)
stateModel := StateNodeModel{
Path: stateNode.Path,
StateKey: stateNode.LeafKey.String(),
CID: stateCIDStr,
MhKey: mhKey,
NodeType: ResolveFromNodeType(stateNode.Type),
}
stateID, err := pub.indexer.indexStateCID(tx, stateModel, headerID)
@ -199,10 +205,12 @@ func (pub *IPLDPublisherAndIndexer) publishAndIndexStateAndStorage(tx *sqlx.Tx,
if err != nil {
return err
}
mhKey, _ := shared.MultihashKeyFromCIDString(storageCIDStr)
storageModel := StorageNodeModel{
Path: storageNode.Path,
StorageKey: storageNode.LeafKey.Hex(),
CID: storageCIDStr,
MhKey: mhKey,
NodeType: ResolveFromNodeType(storageNode.Type),
}
if err := pub.indexer.indexStorageCID(tx, storageModel, stateID); err != nil {

View File

@ -79,6 +79,7 @@ func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForI
reward := CalcEthBlockReward(ipldPayload.Block.Header(), ipldPayload.Block.Uncles(), ipldPayload.Block.Transactions(), ipldPayload.Receipts)
header := HeaderModel{
CID: headerCid,
MhKey: shared.MultihashKeyFromCID(headerNode.Cid()),
ParentHash: ipldPayload.Block.ParentHash().String(),
BlockNumber: ipldPayload.Block.Number().String(),
BlockHash: ipldPayload.Block.Hash().String(),
@ -102,6 +103,7 @@ func (pub *IPLDPublisher) Publish(payload shared.ConvertedData) (shared.CIDsForI
uncleReward := CalcUncleMinerReward(ipldPayload.Block.Number().Int64(), uncle.Number.Int64())
uncleCids[i] = UncleModel{
CID: uncleCid,
MhKey: shared.MultihashKeyFromCID(uncle.Cid()),
ParentHash: uncle.ParentHash.String(),
BlockHash: uncle.Hash().String(),
Reward: uncleReward.String(),
@ -162,6 +164,7 @@ func (pub *IPLDPublisher) publishTransactions(transactions []*ipld.EthTx, txTrie
}
trxCids[i] = TxModel{
CID: cid,
MhKey: shared.MultihashKeyFromCID(tx.Cid()),
Index: trxMeta[i].Index,
TxHash: trxMeta[i].TxHash,
Src: trxMeta[i].Src,
@ -186,6 +189,7 @@ func (pub *IPLDPublisher) publishReceipts(receipts []*ipld.EthReceipt, receiptTr
}
rctCids[rct.TxHash] = ReceiptModel{
CID: cid,
MhKey: shared.MultihashKeyFromCID(rct.Cid()),
Contract: receiptMeta[i].Contract,
ContractHash: receiptMeta[i].ContractHash,
Topic0s: receiptMeta[i].Topic0s,
@ -220,6 +224,7 @@ func (pub *IPLDPublisher) publishStateNodes(stateNodes []TrieNode) ([]StateNodeM
Path: stateNode.Path,
StateKey: stateNode.LeafKey.String(),
CID: cid,
MhKey: shared.MultihashKeyFromCID(node.Cid()),
NodeType: ResolveFromNodeType(stateNode.Type),
})
// If we have a leaf, decode the account to extract additional metadata for indexing
@ -266,6 +271,7 @@ func (pub *IPLDPublisher) publishStorageNodes(storageNodes map[string][]TrieNode
Path: storageNode.Path,
StorageKey: storageNode.LeafKey.Hex(),
CID: cid,
MhKey: shared.MultihashKeyFromCID(node.Cid()),
NodeType: ResolveFromNodeType(storageNode.Type),
})
}

View File

@ -17,8 +17,6 @@
package shared
import (
"bytes"
"github.com/ethereum/go-ethereum/common"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-blockstore"
@ -26,51 +24,19 @@ import (
node "github.com/ipfs/go-ipld-format"
"github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs/ipld"
)
// ListContainsString used to check if a list of strings contains a particular string
func ListContainsString(sss []string, s string) bool {
for _, str := range sss {
if s == str {
return true
}
}
return false
}
// IPLDsContainBytes used to check if a list of strings contains a particular string
func IPLDsContainBytes(iplds []ipfs.BlockModel, b []byte) bool {
for _, ipld := range iplds {
if bytes.Equal(ipld.Data, b) {
return true
}
}
return false
}
// ListContainsGap used to check if a list of Gaps contains a particular Gap
func ListContainsGap(gapList []Gap, gap Gap) bool {
for _, listGap := range gapList {
if listGap == gap {
return true
}
}
return false
}
// HandleNullAddrPointer will return an emtpy string for a nil address pointer
func HandleNullAddrPointer(to *common.Address) string {
// HandleZeroAddrPointer will return an emtpy string for a nil address pointer
func HandleZeroAddrPointer(to *common.Address) string {
if to == nil {
return ""
}
return to.Hex()
}
// HandleNullAddr will return an empty string for a a null address
func HandleNullAddr(to common.Address) string {
// HandleZeroAddr will return an empty string for a 0 value address
func HandleZeroAddr(to common.Address) string {
if to.Hex() == "0x0000000000000000000000000000000000000000" {
return ""
}
@ -93,7 +59,7 @@ func PublishIPLD(tx *sqlx.Tx, i node.Node) error {
return err
}
// FetchIPLD is used to retrieve an ipld from Postgres blockstore with the provided tx
// FetchIPLD is used to retrieve an ipld from Postgres blockstore with the provided tx and cid string
func FetchIPLD(tx *sqlx.Tx, cid string) ([]byte, error) {
mhKey, err := MultihashKeyFromCIDString(cid)
if err != nil {
@ -104,6 +70,19 @@ func FetchIPLD(tx *sqlx.Tx, cid string) ([]byte, error) {
return block, tx.Get(&block, pgStr, mhKey)
}
// FetchIPLDByMhKey is used to retrieve an ipld from Postgres blockstore with the provided tx and mhkey string
func FetchIPLDByMhKey(tx *sqlx.Tx, mhKey string) ([]byte, error) {
pgStr := `SELECT data FROM public.blocks WHERE key = $1`
var block []byte
return block, tx.Get(&block, pgStr, mhKey)
}
// MultihashKeyFromCID converts a cid into a blockstore-prefixed multihash db key string
func MultihashKeyFromCID(c cid.Cid) string {
dbKey := dshelp.MultihashToDsKey(c.Hash())
return blockstore.BlockPrefix.String() + dbKey.String()
}
// MultihashKeyFromCIDString converts a cid string into a blockstore-prefixed multihash db key string
func MultihashKeyFromCIDString(c string) (string, error) {
dc, err := cid.Decode(c)

View File

@ -17,7 +17,13 @@
package shared
import (
"bytes"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/config"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/ipfs"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/node"
"github.com/vulcanize/ipfs-blockchain-watcher/pkg/postgres"
)
@ -30,3 +36,51 @@ func SetupDB() (*postgres.DB, error) {
Port: 5432,
}, node.Node{})
}
// ListContainsString used to check if a list of strings contains a particular string
func ListContainsString(sss []string, s string) bool {
for _, str := range sss {
if s == str {
return true
}
}
return false
}
// IPLDsContainBytes used to check if a list of strings contains a particular string
func IPLDsContainBytes(iplds []ipfs.BlockModel, b []byte) bool {
for _, ipld := range iplds {
if bytes.Equal(ipld.Data, b) {
return true
}
}
return false
}
// ListContainsGap used to check if a list of Gaps contains a particular Gap
func ListContainsGap(gapList []Gap, gap Gap) bool {
for _, listGap := range gapList {
if listGap == gap {
return true
}
}
return false
}
// TestCID creates a basic CID for testing purposes
func TestCID(b []byte) cid.Cid {
pref := cid.Prefix{
Version: 1,
Codec: cid.Raw,
MhType: multihash.KECCAK_256,
MhLength: -1,
}
c, _ := pref.Sum(b)
return c
}
// PublishMockIPLD writes a mhkey-data pair to the public.blocks table so that test data can FK reference the mhkey
func PublishMockIPLD(db *postgres.DB, mhKey string, mockData []byte) error {
_, err := db.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`, mhKey, mockData)
return err
}