Remove ipld-eth-indexer dependency.

This commit is contained in:
Arijit Das 2021-08-12 11:53:41 +05:30
parent e00e602098
commit d8a5358a70
28 changed files with 468 additions and 279 deletions

View File

@ -28,9 +28,9 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/gap-filler/pkg/mux" "github.com/vulcanize/gap-filler/pkg/mux"
"github.com/vulcanize/ipld-eth-indexer/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/graphql" "github.com/vulcanize/ipld-eth-server/pkg/graphql"
srpc "github.com/vulcanize/ipld-eth-server/pkg/rpc" srpc "github.com/vulcanize/ipld-eth-server/pkg/rpc"
s "github.com/vulcanize/ipld-eth-server/pkg/serve" s "github.com/vulcanize/ipld-eth-server/pkg/serve"

View File

@ -16,6 +16,7 @@ CREATE TABLE eth.header_cids (
bloom BYTEA NOT NULL, bloom BYTEA NOT NULL,
timestamp NUMERIC NOT NULL, timestamp NUMERIC NOT NULL,
times_validated INTEGER NOT NULL DEFAULT 1, times_validated INTEGER NOT NULL DEFAULT 1,
base_fee BIGINT,
UNIQUE (block_number, block_hash) UNIQUE (block_number, block_hash)
); );

View File

@ -9,6 +9,7 @@ CREATE TABLE eth.transaction_cids (
dst VARCHAR(66) NOT NULL, dst VARCHAR(66) NOT NULL,
src VARCHAR(66) NOT NULL, src VARCHAR(66) NOT NULL,
tx_data BYTEA, tx_data BYTEA,
tx_type BYTEA,
UNIQUE (header_id, tx_hash) UNIQUE (header_id, tx_hash)
); );

View File

@ -0,0 +1,64 @@
-- +goose Up
CREATE TABLE eth.log_cids (
id SERIAL PRIMARY KEY,
receipt_id INTEGER NOT NULL REFERENCES eth.receipt_cids (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
address TEXT NOT NULL,
cid TEXT NOT NULL,
mh_key TEXT NOT NULL REFERENCES public.blocks (key) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
block_number BIGINT NOT NULL,
block_hash VARCHAR(66) NOT NULL,
tx_hash VARCHAR(66) NOT NULL,
tx_index INTEGER NOT NULL,
index INTEGER NOT NULL,
topic0s VARCHAR(66)[],
topic1s VARCHAR(66)[],
topic2s VARCHAR(66)[],
topic3s VARCHAR(66)[],
UNIQUE (block_hash, tx_hash, index)
);
-- TODO: Remove topics from receipts to avoid redundancy.
-- ALTER TABLE eth.receipt_cids
-- DROP COLUMN topic0s,
-- DROP COLUMN topic1s,
-- DROP COLUMN topic2s,
-- DROP COLUMN topic3s,
ALTER TABLE eth.receipt_cids
ADD COLUMN log_root VARCHAR(66);
CREATE INDEX log_rct_id_index ON eth.log_cids USING btree (receipt_id);
CREATE INDEX log_mh_index ON eth.log_cids USING btree (mh_key);
CREATE INDEX log_cid_index ON eth.log_cids USING btree (cid);
--
-- Name: log_topic0_index; Type: INDEX; Schema: eth; Owner: -
--
CREATE INDEX log_topic0_index ON eth.log_cids USING gin (topic0s);
--
-- Name: log_topic1_index; Type: INDEX; Schema: eth; Owner: -
--
CREATE INDEX log_topic1_index ON eth.log_cids USING gin (topic1s);
--
-- Name: log_topic2_index; Type: INDEX; Schema: eth; Owner: -
--
CREATE INDEX log_topic2_index ON eth.log_cids USING gin (topic2s);
--
-- Name: log_topic3_index; Type: INDEX; Schema: eth; Owner: -
--
CREATE INDEX log_topic3_index ON eth.log_cids USING gin (topic3s);
-- +goose Down
DROP TABLE eth.logs;

3
go.mod
View File

@ -24,9 +24,8 @@ require (
github.com/tklauser/go-sysconf v0.3.6 // indirect github.com/tklauser/go-sysconf v0.3.6 // indirect
github.com/vulcanize/gap-filler v0.3.1 github.com/vulcanize/gap-filler v0.3.1
github.com/vulcanize/ipfs-ethdb v0.0.2-alpha github.com/vulcanize/ipfs-ethdb v0.0.2-alpha
github.com/vulcanize/ipld-eth-indexer v0.7.1-alpha.0.20210805022537-b4692fa49849
) )
replace github.com/ethereum/go-ethereum v1.9.25 => github.com/vulcanize/go-ethereum v1.10.4-statediff-0.0.25 replace github.com/ethereum/go-ethereum v1.9.25 => /Users/arijitdas/go/src/github.com/ethereum/go-ethereum
replace github.com/vulcanize/ipfs-ethdb v0.0.2-alpha => github.com/vulcanize/pg-ipfs-ethdb v0.0.2-alpha replace github.com/vulcanize/ipfs-ethdb v0.0.2-alpha => github.com/vulcanize/pg-ipfs-ethdb v0.0.2-alpha

2
go.sum
View File

@ -497,6 +497,8 @@ github.com/huin/goupnp v1.0.0 h1:wg75sLpL6DZqwHQN6E1Cfk6mtfzS45z8OV+ic+DtHRo=
github.com/huin/goupnp v1.0.0/go.mod h1:n9v9KO1tAxYH82qOn+UTIFQDmx5n1Zxd/ClZDMX7Bnc= github.com/huin/goupnp v1.0.0/go.mod h1:n9v9KO1tAxYH82qOn+UTIFQDmx5n1Zxd/ClZDMX7Bnc=
github.com/huin/goupnp v1.0.1-0.20210310174557-0ca763054c88 h1:bcAj8KroPf552TScjFPIakjH2/tdIrIH8F+cc4v4SRo= github.com/huin/goupnp v1.0.1-0.20210310174557-0ca763054c88 h1:bcAj8KroPf552TScjFPIakjH2/tdIrIH8F+cc4v4SRo=
github.com/huin/goupnp v1.0.1-0.20210310174557-0ca763054c88/go.mod h1:nNs7wvRfN1eKaMknBydLNQU6146XQim8t4h+q90biWo= github.com/huin/goupnp v1.0.1-0.20210310174557-0ca763054c88/go.mod h1:nNs7wvRfN1eKaMknBydLNQU6146XQim8t4h+q90biWo=
github.com/huin/goupnp v1.0.1-0.20210626160114-33cdcbb30dda h1:Vofqyy/Ysqit++X33unU0Gr08b6P35hKm3juytDrBVI=
github.com/huin/goupnp v1.0.1-0.20210626160114-33cdcbb30dda/go.mod h1:0dxJBVBHqTMjIUMkESDTNgOOx/Mw5wYIfyFmdzSamkM=
github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150 h1:vlNjIqmUZ9CMAWsbURYl3a6wZbw7q5RHVvlXTNS/Bs8= github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150 h1:vlNjIqmUZ9CMAWsbURYl3a6wZbw7q5RHVvlXTNS/Bs8=
github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150/go.mod h1:PpLOETDnJ0o3iZrZfqZzyLl6l7F3c6L1oWn7OICBi6o= github.com/huin/goutil v0.0.0-20170803182201-1ca381bf3150/go.mod h1:PpLOETDnJ0o3iZrZfqZzyLl6l7F3c6L1oWn7OICBi6o=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6 h1:UDMh68UUwekSh5iP2OMhRRZJiiBccgV7axzUG8vi56c= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6 h1:UDMh68UUwekSh5iP2OMhRRZJiiBccgV7axzUG8vi56c=

View File

@ -30,13 +30,12 @@ import (
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff/indexer"
"github.com/ethereum/go-ethereum/statediff/indexer/node"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
eth2 "github.com/vulcanize/ipld-eth-indexer/pkg/eth"
"github.com/vulcanize/ipld-eth-indexer/pkg/postgres"
"github.com/vulcanize/ipld-eth-indexer/pkg/shared"
"github.com/vulcanize/ipld-eth-server/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers" "github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers"
) )
@ -182,6 +181,18 @@ var (
} }
) )
// SetupDB is use to setup a db for watcher tests
func SetupDB() (*postgres.DB, error) {
uri := postgres.DbConnectionString(postgres.ConnectionParams{
User: "vdbm",
Password: "password",
Hostname: "localhost",
Name: "vulcanize_testing",
Port: 8077,
})
return postgres.NewDB(uri, postgres.ConnectionConfig{}, node.Info{})
}
var _ = Describe("API", func() { var _ = Describe("API", func() {
var ( var (
db *postgres.DB db *postgres.DB
@ -191,10 +202,14 @@ var _ = Describe("API", func() {
// Test db setup, rather than using BeforeEach we only need to setup once since the tests do not mutate the database // Test db setup, rather than using BeforeEach we only need to setup once since the tests do not mutate the database
// Note: if you focus one of the tests be sure to focus this and the defered It() // Note: if you focus one of the tests be sure to focus this and the defered It()
It("test init", func() { It("test init", func() {
var err error var (
db, err = shared.SetupDB() err error
tx *indexer.BlockTx
)
db, err = SetupDB()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
indexAndPublisher := eth2.NewIPLDPublisher(db) indexAndPublisher := indexer.NewStateDiffIndexer(chainConfig, db)
backend, err := eth.NewEthBackend(db, &eth.Config{ backend, err := eth.NewEthBackend(db, &eth.Config{
ChainConfig: chainConfig, ChainConfig: chainConfig,
VmConfig: vm.Config{}, VmConfig: vm.Config{},
@ -202,18 +217,29 @@ var _ = Describe("API", func() {
}) })
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
api = eth.NewPublicEthAPI(backend, nil, false) api = eth.NewPublicEthAPI(backend, nil, false)
err = indexAndPublisher.Publish(test_helpers.MockConvertedPayload) tx, err = indexAndPublisher.PushBlock(test_helpers.MockBlock, test_helpers.MockReceipts, test_helpers.MockBlock.Difficulty())
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
for _, node := range test_helpers.MockStateNodes {
err = indexAndPublisher.PushStateNode(tx, node)
Expect(err).ToNot(HaveOccurred())
}
err = tx.Close(err)
Expect(err).ToNot(HaveOccurred())
err = publishCode(db, test_helpers.ContractCodeHash, test_helpers.ContractCode) err = publishCode(db, test_helpers.ContractCodeHash, test_helpers.ContractCode)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
uncles := test_helpers.MockBlock.Uncles() uncles := test_helpers.MockBlock.Uncles()
uncleHashes := make([]common.Hash, len(uncles)) uncleHashes := make([]common.Hash, len(uncles))
for i, uncle := range uncles { for i, uncle := range uncles {
uncleHashes[i] = uncle.Hash() uncleHashes[i] = uncle.Hash()
} }
expectedBlock["uncles"] = uncleHashes expectedBlock["uncles"] = uncleHashes
tx, err = indexAndPublisher.PushBlock(test_helpers.MockLondonBlock, test_helpers.MockLondonReceipts, test_helpers.MockLondonBlock.Difficulty())
Expect(err).ToNot(HaveOccurred())
err = indexAndPublisher.Publish(test_helpers.MockConvertedLondonPayload) err = tx.Close(err)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}) })

View File

@ -39,14 +39,12 @@ import (
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie"
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
ipfsethdb "github.com/vulcanize/ipfs-ethdb" ipfsethdb "github.com/vulcanize/ipfs-ethdb"
"github.com/vulcanize/ipld-eth-indexer/pkg/ipfs"
"github.com/vulcanize/ipld-eth-indexer/pkg/postgres"
shared2 "github.com/vulcanize/ipld-eth-indexer/pkg/shared"
"github.com/vulcanize/ipld-eth-server/pkg/shared"
) )
var ( var (
@ -714,7 +712,7 @@ func (b *Backend) GetCodeByHash(ctx context.Context, address common.Address, has
return nil, err return nil, err
} }
var mhKey string var mhKey string
mhKey, err = shared2.MultihashKeyFromKeccak256(common.BytesToHash(codeHash)) mhKey, err = shared.MultihashKeyFromKeccak256(common.BytesToHash(codeHash))
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -30,8 +30,7 @@ import (
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs"
"github.com/vulcanize/ipld-eth-indexer/pkg/ipfs"
) )
// RPCMarshalHeader converts the given header to the RPC output. // RPCMarshalHeader converts the given header to the RPC output.

View File

@ -22,13 +22,12 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/lib/pq" "github.com/lib/pq"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-indexer/pkg/eth"
"github.com/vulcanize/ipld-eth-indexer/pkg/postgres"
"github.com/vulcanize/ipld-eth-server/pkg/shared" "github.com/vulcanize/ipld-eth-server/pkg/shared"
) )
@ -86,7 +85,7 @@ func (ecr *CIDRetriever) Retrieve(filter SubscriptionSettings, blockNumber int64
}() }()
// Retrieve cached header CIDs at this block height // Retrieve cached header CIDs at this block height
var headers []eth.HeaderModel var headers []models.HeaderModel
headers, err = ecr.RetrieveHeaderCIDs(tx, blockNumber) headers, err = ecr.RetrieveHeaderCIDs(tx, blockNumber)
if err != nil { if err != nil {
log.Error("header cid retrieval error") log.Error("header cid retrieval error")
@ -102,7 +101,7 @@ func (ecr *CIDRetriever) Retrieve(filter SubscriptionSettings, blockNumber int64
empty = false empty = false
if filter.HeaderFilter.Uncles { if filter.HeaderFilter.Uncles {
// Retrieve uncle cids for this header id // Retrieve uncle cids for this header id
var uncleCIDs []eth.UncleModel var uncleCIDs []models.UncleModel
uncleCIDs, err = ecr.RetrieveUncleCIDsByHeaderID(tx, header.ID) uncleCIDs, err = ecr.RetrieveUncleCIDsByHeaderID(tx, header.ID)
if err != nil { if err != nil {
log.Error("uncle cid retrieval error") log.Error("uncle cid retrieval error")
@ -166,18 +165,18 @@ func (ecr *CIDRetriever) Retrieve(filter SubscriptionSettings, blockNumber int64
} }
// RetrieveHeaderCIDs retrieves and returns all of the header cids at the provided blockheight // RetrieveHeaderCIDs retrieves and returns all of the header cids at the provided blockheight
func (ecr *CIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]eth.HeaderModel, error) { func (ecr *CIDRetriever) RetrieveHeaderCIDs(tx *sqlx.Tx, blockNumber int64) ([]models.HeaderModel, error) {
log.Debug("retrieving header cids for block ", blockNumber) log.Debug("retrieving header cids for block ", blockNumber)
headers := make([]eth.HeaderModel, 0) headers := make([]models.HeaderModel, 0)
pgStr := `SELECT * FROM eth.header_cids pgStr := `SELECT * FROM eth.header_cids
WHERE block_number = $1` WHERE block_number = $1`
return headers, tx.Select(&headers, pgStr, blockNumber) return headers, tx.Select(&headers, pgStr, blockNumber)
} }
// RetrieveUncleCIDsByHeaderID retrieves and returns all of the uncle cids for the provided header // RetrieveUncleCIDsByHeaderID retrieves and returns all of the uncle cids for the provided header
func (ecr *CIDRetriever) RetrieveUncleCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]eth.UncleModel, error) { func (ecr *CIDRetriever) RetrieveUncleCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]models.UncleModel, error) {
log.Debug("retrieving uncle cids for block id ", headerID) log.Debug("retrieving uncle cids for block id ", headerID)
headers := make([]eth.UncleModel, 0) headers := make([]models.UncleModel, 0)
pgStr := `SELECT * FROM eth.uncle_cids pgStr := `SELECT * FROM eth.uncle_cids
WHERE header_id = $1` WHERE header_id = $1`
return headers, tx.Select(&headers, pgStr, headerID) return headers, tx.Select(&headers, pgStr, headerID)
@ -185,10 +184,10 @@ func (ecr *CIDRetriever) RetrieveUncleCIDsByHeaderID(tx *sqlx.Tx, headerID int64
// RetrieveTxCIDs retrieves and returns all of the trx cids at the provided blockheight that conform to the provided filter parameters // RetrieveTxCIDs retrieves and returns all of the trx cids at the provided blockheight that conform to the provided filter parameters
// also returns the ids for the returned transaction cids // also returns the ids for the returned transaction cids
func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID int64) ([]eth.TxModel, error) { func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID int64) ([]models.TxModel, error) {
log.Debug("retrieving transaction cids for header id ", headerID) log.Debug("retrieving transaction cids for header id ", headerID)
args := make([]interface{}, 0, 3) args := make([]interface{}, 0, 3)
results := make([]eth.TxModel, 0) results := make([]models.TxModel, 0)
id := 1 id := 1
pgStr := fmt.Sprintf(`SELECT transaction_cids.id, transaction_cids.header_id, pgStr := fmt.Sprintf(`SELECT transaction_cids.id, transaction_cids.header_id,
transaction_cids.tx_hash, transaction_cids.cid, transaction_cids.mh_key, transaction_cids.tx_hash, transaction_cids.cid, transaction_cids.mh_key,
@ -212,7 +211,7 @@ func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID
// RetrieveRctCIDsByHeaderID retrieves and returns all of the rct cids at the provided header ID that conform to the provided // RetrieveRctCIDsByHeaderID retrieves and returns all of the rct cids at the provided header ID that conform to the provided
// filter parameters and correspond to the provided tx ids // filter parameters and correspond to the provided tx ids
func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter ReceiptFilter, headerID int64, trxIds []int64) ([]eth.ReceiptModel, error) { func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter ReceiptFilter, headerID int64, trxIds []int64) ([]models.ReceiptModel, error) {
log.Debug("retrieving receipt cids for header id ", headerID) log.Debug("retrieving receipt cids for header id ", headerID)
args := make([]interface{}, 0, 4) args := make([]interface{}, 0, 4)
pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key, pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key,
@ -286,13 +285,13 @@ func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter Receip
} }
} }
pgStr += ` ORDER BY transaction_cids.index` pgStr += ` ORDER BY transaction_cids.index`
receiptCids := make([]eth.ReceiptModel, 0) receiptCids := make([]models.ReceiptModel, 0)
return receiptCids, tx.Select(&receiptCids, pgStr, args...) return receiptCids, tx.Select(&receiptCids, pgStr, args...)
} }
// RetrieveRctCIDs retrieves and returns all of the rct cids at the provided blockheight or block hash that conform to the provided // RetrieveRctCIDs retrieves and returns all of the rct cids at the provided blockheight or block hash that conform to the provided
// filter parameters and correspond to the provided tx ids // filter parameters and correspond to the provided tx ids
func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, blockNumber int64, blockHash *common.Hash, trxIds []int64) ([]eth.ReceiptModel, error) { func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, blockNumber int64, blockHash *common.Hash, trxIds []int64) ([]models.ReceiptModel, error) {
log.Debug("retrieving receipt cids for block ", blockNumber) log.Debug("retrieving receipt cids for block ", blockNumber)
args := make([]interface{}, 0, 5) args := make([]interface{}, 0, 5)
pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key, pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key,
@ -316,7 +315,7 @@ func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, b
// TODO: Add the below filters when we have log index in DB. // TODO: Add the below filters when we have log index in DB.
if true { if true {
pgStr += ` ORDER BY transaction_cids.index` pgStr += ` ORDER BY transaction_cids.index`
receiptCids := make([]eth.ReceiptModel, 0) receiptCids := make([]models.ReceiptModel, 0)
return receiptCids, tx.Select(&receiptCids, pgStr, args...) return receiptCids, tx.Select(&receiptCids, pgStr, args...)
} }
@ -383,7 +382,7 @@ func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, b
} }
pgStr += ` ORDER BY transaction_cids.index` pgStr += ` ORDER BY transaction_cids.index`
receiptCids := make([]eth.ReceiptModel, 0) receiptCids := make([]models.ReceiptModel, 0)
return receiptCids, tx.Select(&receiptCids, pgStr, args...) return receiptCids, tx.Select(&receiptCids, pgStr, args...)
} }
@ -397,7 +396,7 @@ func hasTopics(topics [][]string) bool {
} }
// RetrieveStateCIDs retrieves and returns all of the state node cids at the provided header ID that conform to the provided filter parameters // RetrieveStateCIDs retrieves and returns all of the state node cids at the provided header ID that conform to the provided filter parameters
func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter StateFilter, headerID int64) ([]eth.StateNodeModel, error) { func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter StateFilter, headerID int64) ([]models.StateNodeModel, error) {
log.Debug("retrieving state cids for header id ", headerID) log.Debug("retrieving state cids for header id ", headerID)
args := make([]interface{}, 0, 2) args := make([]interface{}, 0, 2)
pgStr := `SELECT state_cids.id, state_cids.header_id, pgStr := `SELECT state_cids.id, state_cids.header_id,
@ -417,12 +416,12 @@ func (ecr *CIDRetriever) RetrieveStateCIDs(tx *sqlx.Tx, stateFilter StateFilter,
if !stateFilter.IntermediateNodes { if !stateFilter.IntermediateNodes {
pgStr += ` AND state_cids.node_type = 2` pgStr += ` AND state_cids.node_type = 2`
} }
stateNodeCIDs := make([]eth.StateNodeModel, 0) stateNodeCIDs := make([]models.StateNodeModel, 0)
return stateNodeCIDs, tx.Select(&stateNodeCIDs, pgStr, args...) return stateNodeCIDs, tx.Select(&stateNodeCIDs, pgStr, args...)
} }
// RetrieveStorageCIDs retrieves and returns all of the storage node cids at the provided header id that conform to the provided filter parameters // RetrieveStorageCIDs retrieves and returns all of the storage node cids at the provided header id that conform to the provided filter parameters
func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageFilter, headerID int64) ([]eth.StorageNodeWithStateKeyModel, error) { func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageFilter, headerID int64) ([]models.StorageNodeWithStateKeyModel, error) {
log.Debug("retrieving storage cids for header id ", headerID) log.Debug("retrieving storage cids for header id ", headerID)
args := make([]interface{}, 0, 3) args := make([]interface{}, 0, 3)
pgStr := `SELECT storage_cids.id, storage_cids.state_id, storage_cids.storage_leaf_key, storage_cids.node_type, pgStr := `SELECT storage_cids.id, storage_cids.state_id, storage_cids.storage_leaf_key, storage_cids.node_type,
@ -450,18 +449,18 @@ func (ecr *CIDRetriever) RetrieveStorageCIDs(tx *sqlx.Tx, storageFilter StorageF
if !storageFilter.IntermediateNodes { if !storageFilter.IntermediateNodes {
pgStr += ` AND storage_cids.node_type = 2` pgStr += ` AND storage_cids.node_type = 2`
} }
storageNodeCIDs := make([]eth.StorageNodeWithStateKeyModel, 0) storageNodeCIDs := make([]models.StorageNodeWithStateKeyModel, 0)
return storageNodeCIDs, tx.Select(&storageNodeCIDs, pgStr, args...) return storageNodeCIDs, tx.Select(&storageNodeCIDs, pgStr, args...)
} }
// RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash // RetrieveBlockByHash returns all of the CIDs needed to compose an entire block, for a given block hash
func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (eth.HeaderModel, []eth.UncleModel, []eth.TxModel, []eth.ReceiptModel, error) { func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (models.HeaderModel, []models.UncleModel, []models.TxModel, []models.ReceiptModel, error) {
log.Debug("retrieving block cids for block hash ", blockHash.String()) log.Debug("retrieving block cids for block hash ", blockHash.String())
// Begin new db tx // Begin new db tx
tx, err := ecr.db.Beginx() tx, err := ecr.db.Beginx()
if err != nil { if err != nil {
return eth.HeaderModel{}, nil, nil, nil, err return models.HeaderModel{}, nil, nil, nil, err
} }
defer func() { defer func() {
if p := recover(); p != nil { if p := recover(); p != nil {
@ -474,29 +473,29 @@ func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (eth.HeaderM
} }
}() }()
var headerCID eth.HeaderModel var headerCID models.HeaderModel
headerCID, err = ecr.RetrieveHeaderCIDByHash(tx, blockHash) headerCID, err = ecr.RetrieveHeaderCIDByHash(tx, blockHash)
if err != nil { if err != nil {
log.Error("header cid retrieval error") log.Error("header cid retrieval error")
return eth.HeaderModel{}, nil, nil, nil, err return models.HeaderModel{}, nil, nil, nil, err
} }
var uncleCIDs []eth.UncleModel var uncleCIDs []models.UncleModel
uncleCIDs, err = ecr.RetrieveUncleCIDsByHeaderID(tx, headerCID.ID) uncleCIDs, err = ecr.RetrieveUncleCIDsByHeaderID(tx, headerCID.ID)
if err != nil { if err != nil {
log.Error("uncle cid retrieval error") log.Error("uncle cid retrieval error")
return eth.HeaderModel{}, nil, nil, nil, err return models.HeaderModel{}, nil, nil, nil, err
} }
var txCIDs []eth.TxModel var txCIDs []models.TxModel
txCIDs, err = ecr.RetrieveTxCIDsByHeaderID(tx, headerCID.ID) txCIDs, err = ecr.RetrieveTxCIDsByHeaderID(tx, headerCID.ID)
if err != nil { if err != nil {
log.Error("tx cid retrieval error") log.Error("tx cid retrieval error")
return eth.HeaderModel{}, nil, nil, nil, err return models.HeaderModel{}, nil, nil, nil, err
} }
txIDs := make([]int64, len(txCIDs)) txIDs := make([]int64, len(txCIDs))
for i, txCID := range txCIDs { for i, txCID := range txCIDs {
txIDs[i] = txCID.ID txIDs[i] = txCID.ID
} }
var rctCIDs []eth.ReceiptModel var rctCIDs []models.ReceiptModel
rctCIDs, err = ecr.RetrieveReceiptCIDsByTxIDs(tx, txIDs) rctCIDs, err = ecr.RetrieveReceiptCIDsByTxIDs(tx, txIDs)
if err != nil { if err != nil {
log.Error("rct cid retrieval error") log.Error("rct cid retrieval error")
@ -505,13 +504,13 @@ func (ecr *CIDRetriever) RetrieveBlockByHash(blockHash common.Hash) (eth.HeaderM
} }
// RetrieveBlockByNumber returns all of the CIDs needed to compose an entire block, for a given block number // RetrieveBlockByNumber returns all of the CIDs needed to compose an entire block, for a given block number
func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (eth.HeaderModel, []eth.UncleModel, []eth.TxModel, []eth.ReceiptModel, error) { func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (models.HeaderModel, []models.UncleModel, []models.TxModel, []models.ReceiptModel, error) {
log.Debug("retrieving block cids for block number ", blockNumber) log.Debug("retrieving block cids for block number ", blockNumber)
// Begin new db tx // Begin new db tx
tx, err := ecr.db.Beginx() tx, err := ecr.db.Beginx()
if err != nil { if err != nil {
return eth.HeaderModel{}, nil, nil, nil, err return models.HeaderModel{}, nil, nil, nil, err
} }
defer func() { defer func() {
if p := recover(); p != nil { if p := recover(); p != nil {
@ -524,32 +523,32 @@ func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (eth.HeaderMod
} }
}() }()
var headerCID []eth.HeaderModel var headerCID []models.HeaderModel
headerCID, err = ecr.RetrieveHeaderCIDs(tx, blockNumber) headerCID, err = ecr.RetrieveHeaderCIDs(tx, blockNumber)
if err != nil { if err != nil {
log.Error("header cid retrieval error") log.Error("header cid retrieval error")
return eth.HeaderModel{}, nil, nil, nil, err return models.HeaderModel{}, nil, nil, nil, err
} }
if len(headerCID) < 1 { if len(headerCID) < 1 {
return eth.HeaderModel{}, nil, nil, nil, fmt.Errorf("header cid retrieval error, no header CIDs found at block %d", blockNumber) return models.HeaderModel{}, nil, nil, nil, fmt.Errorf("header cid retrieval error, no header CIDs found at block %d", blockNumber)
} }
var uncleCIDs []eth.UncleModel var uncleCIDs []models.UncleModel
uncleCIDs, err = ecr.RetrieveUncleCIDsByHeaderID(tx, headerCID[0].ID) uncleCIDs, err = ecr.RetrieveUncleCIDsByHeaderID(tx, headerCID[0].ID)
if err != nil { if err != nil {
log.Error("uncle cid retrieval error") log.Error("uncle cid retrieval error")
return eth.HeaderModel{}, nil, nil, nil, err return models.HeaderModel{}, nil, nil, nil, err
} }
var txCIDs []eth.TxModel var txCIDs []models.TxModel
txCIDs, err = ecr.RetrieveTxCIDsByHeaderID(tx, headerCID[0].ID) txCIDs, err = ecr.RetrieveTxCIDsByHeaderID(tx, headerCID[0].ID)
if err != nil { if err != nil {
log.Error("tx cid retrieval error") log.Error("tx cid retrieval error")
return eth.HeaderModel{}, nil, nil, nil, err return models.HeaderModel{}, nil, nil, nil, err
} }
txIDs := make([]int64, len(txCIDs)) txIDs := make([]int64, len(txCIDs))
for i, txCID := range txCIDs { for i, txCID := range txCIDs {
txIDs[i] = txCID.ID txIDs[i] = txCID.ID
} }
var rctCIDs []eth.ReceiptModel var rctCIDs []models.ReceiptModel
rctCIDs, err = ecr.RetrieveReceiptCIDsByTxIDs(tx, txIDs) rctCIDs, err = ecr.RetrieveReceiptCIDsByTxIDs(tx, txIDs)
if err != nil { if err != nil {
log.Error("rct cid retrieval error") log.Error("rct cid retrieval error")
@ -558,26 +557,26 @@ func (ecr *CIDRetriever) RetrieveBlockByNumber(blockNumber int64) (eth.HeaderMod
} }
// RetrieveHeaderCIDByHash returns the header for the given block hash // RetrieveHeaderCIDByHash returns the header for the given block hash
func (ecr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.Hash) (eth.HeaderModel, error) { func (ecr *CIDRetriever) RetrieveHeaderCIDByHash(tx *sqlx.Tx, blockHash common.Hash) (models.HeaderModel, error) {
log.Debug("retrieving header cids for block hash ", blockHash.String()) log.Debug("retrieving header cids for block hash ", blockHash.String())
pgStr := `SELECT * FROM eth.header_cids pgStr := `SELECT * FROM eth.header_cids
WHERE block_hash = $1` WHERE block_hash = $1`
var headerCID eth.HeaderModel var headerCID models.HeaderModel
return headerCID, tx.Get(&headerCID, pgStr, blockHash.String()) return headerCID, tx.Get(&headerCID, pgStr, blockHash.String())
} }
// RetrieveTxCIDsByHeaderID retrieves all tx CIDs for the given header id // RetrieveTxCIDsByHeaderID retrieves all tx CIDs for the given header id
func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]eth.TxModel, error) { func (ecr *CIDRetriever) RetrieveTxCIDsByHeaderID(tx *sqlx.Tx, headerID int64) ([]models.TxModel, error) {
log.Debug("retrieving tx cids for block id ", headerID) log.Debug("retrieving tx cids for block id ", headerID)
pgStr := `SELECT * FROM eth.transaction_cids pgStr := `SELECT * FROM eth.transaction_cids
WHERE header_id = $1 WHERE header_id = $1
ORDER BY index` ORDER BY index`
var txCIDs []eth.TxModel var txCIDs []models.TxModel
return txCIDs, tx.Select(&txCIDs, pgStr, headerID) return txCIDs, tx.Select(&txCIDs, pgStr, headerID)
} }
// RetrieveReceiptCIDsByTxIDs retrieves receipt CIDs by their associated tx IDs // RetrieveReceiptCIDsByTxIDs retrieves receipt CIDs by their associated tx IDs
func (ecr *CIDRetriever) RetrieveReceiptCIDsByTxIDs(tx *sqlx.Tx, txIDs []int64) ([]eth.ReceiptModel, error) { func (ecr *CIDRetriever) RetrieveReceiptCIDsByTxIDs(tx *sqlx.Tx, txIDs []int64) ([]models.ReceiptModel, error) {
log.Debugf("retrieving receipt cids for tx ids %v", txIDs) log.Debugf("retrieving receipt cids for tx ids %v", txIDs)
pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key, pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key,
receipt_cids.contract, receipt_cids.contract_hash, receipt_cids.topic0s, receipt_cids.topic1s, receipt_cids.contract, receipt_cids.contract_hash, receipt_cids.topic0s, receipt_cids.topic1s,
@ -586,6 +585,6 @@ func (ecr *CIDRetriever) RetrieveReceiptCIDsByTxIDs(tx *sqlx.Tx, txIDs []int64)
WHERE tx_id = ANY($1::INTEGER[]) WHERE tx_id = ANY($1::INTEGER[])
AND receipt_cids.tx_id = transaction_cids.id AND receipt_cids.tx_id = transaction_cids.id
ORDER BY transaction_cids.index` ORDER BY transaction_cids.index`
var rctCIDs []eth.ReceiptModel var rctCIDs []models.ReceiptModel
return rctCIDs, tx.Select(&rctCIDs, pgStr, pq.Array(txIDs)) return rctCIDs, tx.Select(&rctCIDs, pgStr, pq.Array(txIDs))
} }

View File

@ -19,17 +19,17 @@ package eth_test
import ( import (
"math/big" "math/big"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/statediff/indexer"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie"
"github.com/vulcanize/ipld-eth-indexer/pkg/shared"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
eth2 "github.com/vulcanize/ipld-eth-indexer/pkg/eth"
"github.com/vulcanize/ipld-eth-indexer/pkg/postgres"
"github.com/vulcanize/ipld-eth-server/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers" "github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers"
) )
@ -211,15 +211,15 @@ var (
var _ = Describe("Retriever", func() { var _ = Describe("Retriever", func() {
var ( var (
db *postgres.DB db *postgres.DB
repo *eth2.IPLDPublisher diffIndexer *indexer.StateDiffIndexer
retriever *eth.CIDRetriever retriever *eth.CIDRetriever
) )
BeforeEach(func() { BeforeEach(func() {
var err error var err error
db, err = shared.SetupDB() db, err = SetupDB()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
repo = eth2.NewIPLDPublisher(db) diffIndexer = indexer.NewStateDiffIndexer(params.TestChainConfig, db)
retriever = eth.NewCIDRetriever(db) retriever = eth.NewCIDRetriever(db)
}) })
AfterEach(func() { AfterEach(func() {
@ -228,7 +228,14 @@ var _ = Describe("Retriever", func() {
Describe("Retrieve", func() { Describe("Retrieve", func() {
BeforeEach(func() { BeforeEach(func() {
err := repo.Publish(test_helpers.MockConvertedPayload) tx, err := diffIndexer.PushBlock(test_helpers.MockBlock, test_helpers.MockReceipts, test_helpers.MockBlock.Difficulty())
Expect(err).ToNot(HaveOccurred())
for _, node := range test_helpers.MockStateNodes {
err = diffIndexer.PushStateNode(tx, node)
Expect(err).ToNot(HaveOccurred())
}
err = tx.Close(err)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}) })
It("Retrieves all CIDs for the given blocknumber when provided an open filter", func() { It("Retrieves all CIDs for the given blocknumber when provided an open filter", func() {
@ -277,7 +284,7 @@ var _ = Describe("Retriever", func() {
Expect(empty).ToNot(BeTrue()) Expect(empty).ToNot(BeTrue())
Expect(len(cids1)).To(Equal(1)) Expect(len(cids1)).To(Equal(1))
Expect(cids1[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber)) Expect(cids1[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber))
Expect(cids1[0].Header).To(Equal(eth2.HeaderModel{})) Expect(cids1[0].Header).To(Equal(models.HeaderModel{}))
Expect(len(cids1[0].Transactions)).To(Equal(0)) Expect(len(cids1[0].Transactions)).To(Equal(0))
Expect(len(cids1[0].StateNodes)).To(Equal(0)) Expect(len(cids1[0].StateNodes)).To(Equal(0))
Expect(len(cids1[0].StorageNodes)).To(Equal(0)) Expect(len(cids1[0].StorageNodes)).To(Equal(0))
@ -292,7 +299,7 @@ var _ = Describe("Retriever", func() {
Expect(empty).ToNot(BeTrue()) Expect(empty).ToNot(BeTrue())
Expect(len(cids2)).To(Equal(1)) Expect(len(cids2)).To(Equal(1))
Expect(cids2[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber)) Expect(cids2[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber))
Expect(cids2[0].Header).To(Equal(eth2.HeaderModel{})) Expect(cids2[0].Header).To(Equal(models.HeaderModel{}))
Expect(len(cids2[0].Transactions)).To(Equal(0)) Expect(len(cids2[0].Transactions)).To(Equal(0))
Expect(len(cids2[0].StateNodes)).To(Equal(0)) Expect(len(cids2[0].StateNodes)).To(Equal(0))
Expect(len(cids2[0].StorageNodes)).To(Equal(0)) Expect(len(cids2[0].StorageNodes)).To(Equal(0))
@ -307,7 +314,7 @@ var _ = Describe("Retriever", func() {
Expect(empty).ToNot(BeTrue()) Expect(empty).ToNot(BeTrue())
Expect(len(cids3)).To(Equal(1)) Expect(len(cids3)).To(Equal(1))
Expect(cids3[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber)) Expect(cids3[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber))
Expect(cids3[0].Header).To(Equal(eth2.HeaderModel{})) Expect(cids3[0].Header).To(Equal(models.HeaderModel{}))
Expect(len(cids3[0].Transactions)).To(Equal(0)) Expect(len(cids3[0].Transactions)).To(Equal(0))
Expect(len(cids3[0].StateNodes)).To(Equal(0)) Expect(len(cids3[0].StateNodes)).To(Equal(0))
Expect(len(cids3[0].StorageNodes)).To(Equal(0)) Expect(len(cids3[0].StorageNodes)).To(Equal(0))
@ -322,7 +329,7 @@ var _ = Describe("Retriever", func() {
Expect(empty).ToNot(BeTrue()) Expect(empty).ToNot(BeTrue())
Expect(len(cids4)).To(Equal(1)) Expect(len(cids4)).To(Equal(1))
Expect(cids4[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber)) Expect(cids4[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber))
Expect(cids4[0].Header).To(Equal(eth2.HeaderModel{})) Expect(cids4[0].Header).To(Equal(models.HeaderModel{}))
Expect(len(cids4[0].Transactions)).To(Equal(0)) Expect(len(cids4[0].Transactions)).To(Equal(0))
Expect(len(cids4[0].StateNodes)).To(Equal(0)) Expect(len(cids4[0].StateNodes)).To(Equal(0))
Expect(len(cids4[0].StorageNodes)).To(Equal(0)) Expect(len(cids4[0].StorageNodes)).To(Equal(0))
@ -337,7 +344,7 @@ var _ = Describe("Retriever", func() {
Expect(empty).ToNot(BeTrue()) Expect(empty).ToNot(BeTrue())
Expect(len(cids5)).To(Equal(1)) Expect(len(cids5)).To(Equal(1))
Expect(cids5[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber)) Expect(cids5[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber))
Expect(cids5[0].Header).To(Equal(eth2.HeaderModel{})) Expect(cids5[0].Header).To(Equal(models.HeaderModel{}))
Expect(len(cids5[0].Transactions)).To(Equal(3)) Expect(len(cids5[0].Transactions)).To(Equal(3))
Expect(eth.TxModelsContainsCID(cids5[0].Transactions, test_helpers.Trx1CID.String())).To(BeTrue()) Expect(eth.TxModelsContainsCID(cids5[0].Transactions, test_helpers.Trx1CID.String())).To(BeTrue())
Expect(eth.TxModelsContainsCID(cids5[0].Transactions, test_helpers.Trx2CID.String())).To(BeTrue()) Expect(eth.TxModelsContainsCID(cids5[0].Transactions, test_helpers.Trx2CID.String())).To(BeTrue())
@ -354,7 +361,7 @@ var _ = Describe("Retriever", func() {
Expect(empty).ToNot(BeTrue()) Expect(empty).ToNot(BeTrue())
Expect(len(cids6)).To(Equal(1)) Expect(len(cids6)).To(Equal(1))
Expect(cids6[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber)) Expect(cids6[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber))
Expect(cids6[0].Header).To(Equal(eth2.HeaderModel{})) Expect(cids6[0].Header).To(Equal(models.HeaderModel{}))
Expect(len(cids6[0].Transactions)).To(Equal(1)) Expect(len(cids6[0].Transactions)).To(Equal(1))
expectedTxCID := test_helpers.MockCIDWrapper.Transactions[1] expectedTxCID := test_helpers.MockCIDWrapper.Transactions[1]
expectedTxCID.ID = cids6[0].Transactions[0].ID expectedTxCID.ID = cids6[0].Transactions[0].ID
@ -373,12 +380,12 @@ var _ = Describe("Retriever", func() {
Expect(empty).ToNot(BeTrue()) Expect(empty).ToNot(BeTrue())
Expect(len(cids7)).To(Equal(1)) Expect(len(cids7)).To(Equal(1))
Expect(cids7[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber)) Expect(cids7[0].BlockNumber).To(Equal(test_helpers.MockCIDWrapper.BlockNumber))
Expect(cids7[0].Header).To(Equal(eth2.HeaderModel{})) Expect(cids7[0].Header).To(Equal(models.HeaderModel{}))
Expect(len(cids7[0].Transactions)).To(Equal(0)) Expect(len(cids7[0].Transactions)).To(Equal(0))
Expect(len(cids7[0].Receipts)).To(Equal(0)) Expect(len(cids7[0].Receipts)).To(Equal(0))
Expect(len(cids7[0].StorageNodes)).To(Equal(0)) Expect(len(cids7[0].StorageNodes)).To(Equal(0))
Expect(len(cids7[0].StateNodes)).To(Equal(1)) Expect(len(cids7[0].StateNodes)).To(Equal(1))
Expect(cids7[0].StateNodes[0]).To(Equal(eth2.StateNodeModel{ Expect(cids7[0].StateNodes[0]).To(Equal(models.StateNodeModel{
ID: cids7[0].StateNodes[0].ID, ID: cids7[0].StateNodes[0].ID,
HeaderID: cids7[0].StateNodes[0].HeaderID, HeaderID: cids7[0].StateNodes[0].HeaderID,
NodeType: 2, NodeType: 2,
@ -400,8 +407,12 @@ var _ = Describe("Retriever", func() {
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
}) })
It("Gets the number of the first block that has data in the database", func() { It("Gets the number of the first block that has data in the database", func() {
err := repo.Publish(test_helpers.MockConvertedPayload) tx, err := diffIndexer.PushBlock(test_helpers.MockBlock, test_helpers.MockReceipts, test_helpers.MockBlock.Difficulty())
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
err = tx.Close(err)
Expect(err).ToNot(HaveOccurred())
num, err := retriever.RetrieveFirstBlockNumber() num, err := retriever.RetrieveFirstBlockNumber()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(num).To(Equal(int64(1))) Expect(num).To(Equal(int64(1)))
@ -410,8 +421,12 @@ var _ = Describe("Retriever", func() {
It("Gets the number of the first block that has data in the database", func() { It("Gets the number of the first block that has data in the database", func() {
payload := test_helpers.MockConvertedPayload payload := test_helpers.MockConvertedPayload
payload.Block = newMockBlock(1010101) payload.Block = newMockBlock(1010101)
err := repo.Publish(payload) tx, err := diffIndexer.PushBlock(payload.Block, payload.Receipts, payload.Block.Difficulty())
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
err = tx.Close(err)
Expect(err).ToNot(HaveOccurred())
num, err := retriever.RetrieveFirstBlockNumber() num, err := retriever.RetrieveFirstBlockNumber()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(num).To(Equal(int64(1010101))) Expect(num).To(Equal(int64(1010101)))
@ -422,10 +437,16 @@ var _ = Describe("Retriever", func() {
payload1.Block = newMockBlock(1010101) payload1.Block = newMockBlock(1010101)
payload2 := payload1 payload2 := payload1
payload2.Block = newMockBlock(5) payload2.Block = newMockBlock(5)
err := repo.Publish(payload1) tx, err := diffIndexer.PushBlock(payload1.Block, payload1.Receipts, payload1.Block.Difficulty())
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
err = repo.Publish(payload2) err = tx.Close(err)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
tx, err = diffIndexer.PushBlock(payload2.Block, payload2.Receipts, payload2.Block.Difficulty())
Expect(err).ToNot(HaveOccurred())
err = tx.Close(err)
Expect(err).ToNot(HaveOccurred())
num, err := retriever.RetrieveFirstBlockNumber() num, err := retriever.RetrieveFirstBlockNumber()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(num).To(Equal(int64(5))) Expect(num).To(Equal(int64(5)))
@ -438,8 +459,11 @@ var _ = Describe("Retriever", func() {
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
}) })
It("Gets the number of the latest block that has data in the database", func() { It("Gets the number of the latest block that has data in the database", func() {
err := repo.Publish(test_helpers.MockConvertedPayload) tx, err := diffIndexer.PushBlock(test_helpers.MockBlock, test_helpers.MockReceipts, test_helpers.MockBlock.Difficulty())
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
err = tx.Close(err)
Expect(err).ToNot(HaveOccurred())
num, err := retriever.RetrieveLastBlockNumber() num, err := retriever.RetrieveLastBlockNumber()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(num).To(Equal(int64(1))) Expect(num).To(Equal(int64(1)))
@ -448,8 +472,12 @@ var _ = Describe("Retriever", func() {
It("Gets the number of the latest block that has data in the database", func() { It("Gets the number of the latest block that has data in the database", func() {
payload := test_helpers.MockConvertedPayload payload := test_helpers.MockConvertedPayload
payload.Block = newMockBlock(1010101) payload.Block = newMockBlock(1010101)
err := repo.Publish(payload) tx, err := diffIndexer.PushBlock(payload.Block, payload.Receipts, payload.Block.Difficulty())
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
err = tx.Close(err)
Expect(err).ToNot(HaveOccurred())
num, err := retriever.RetrieveLastBlockNumber() num, err := retriever.RetrieveLastBlockNumber()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(num).To(Equal(int64(1010101))) Expect(num).To(Equal(int64(1010101)))
@ -460,10 +488,16 @@ var _ = Describe("Retriever", func() {
payload1.Block = newMockBlock(1010101) payload1.Block = newMockBlock(1010101)
payload2 := payload1 payload2 := payload1
payload2.Block = newMockBlock(5) payload2.Block = newMockBlock(5)
err := repo.Publish(payload1) tx, err := diffIndexer.PushBlock(payload1.Block, payload1.Receipts, payload1.Block.Difficulty())
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
err = repo.Publish(payload2) err = tx.Close(err)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
tx, err = diffIndexer.PushBlock(payload2.Block, payload2.Receipts, payload2.Block.Difficulty())
Expect(err).ToNot(HaveOccurred())
err = tx.Close(err)
Expect(err).ToNot(HaveOccurred())
num, err := retriever.RetrieveLastBlockNumber() num, err := retriever.RetrieveLastBlockNumber()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(num).To(Equal(int64(1010101))) Expect(num).To(Equal(int64(1010101)))

View File

@ -29,15 +29,14 @@ import (
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
"github.com/ethereum/go-ethereum/statediff/indexer"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
eth2 "github.com/vulcanize/ipld-eth-indexer/pkg/eth"
"github.com/vulcanize/ipld-eth-indexer/pkg/postgres"
"github.com/vulcanize/ipld-eth-indexer/pkg/shared"
"github.com/vulcanize/ipld-eth-server/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers" "github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers"
) )
@ -74,9 +73,9 @@ var _ = Describe("eth state reading tests", func() {
It("test init", func() { It("test init", func() {
// db and type initializations // db and type initializations
var err error var err error
db, err = shared.SetupDB() db, err = SetupDB()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
transformer := eth2.NewStateDiffTransformer(chainConfig, db) transformer := indexer.NewStateDiffIndexer(chainConfig, db)
backend, err = eth.NewEthBackend(db, &eth.Config{ backend, err = eth.NewEthBackend(db, &eth.Config{
ChainConfig: chainConfig, ChainConfig: chainConfig,
VmConfig: vm.Config{}, VmConfig: vm.Config{},
@ -135,31 +134,39 @@ var _ = Describe("eth state reading tests", func() {
} }
diff, err := builder.BuildStateDiffObject(args, params) diff, err := builder.BuildStateDiffObject(args, params)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
diffRlp, err := rlp.EncodeToBytes(diff) tx, err := transformer.PushBlock(block, rcts, mockTD)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
blockRlp, err := rlp.EncodeToBytes(block)
Expect(err).ToNot(HaveOccurred()) for _, node := range diff.Nodes {
receiptsRlp, err := rlp.EncodeToBytes(rcts) err = transformer.PushStateNode(tx, node)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
payload := statediff.Payload{
StateObjectRlp: diffRlp,
BlockRlp: blockRlp,
ReceiptsRlp: receiptsRlp,
TotalDifficulty: mockTD,
} }
_, err = transformer.Transform(0, payload) err = tx.Close(err)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
} }
// Insert some non-canonical data into the database so that we test our ability to discern canonicity // Insert some non-canonical data into the database so that we test our ability to discern canonicity
indexAndPublisher := eth2.NewIPLDPublisher(db) indexAndPublisher := indexer.NewStateDiffIndexer(chainConfig, db)
api = eth.NewPublicEthAPI(backend, nil, false)
err = indexAndPublisher.Publish(test_helpers.MockConvertedPayload) tx, err := indexAndPublisher.PushBlock(test_helpers.MockBlock, test_helpers.MockReceipts, test_helpers.MockBlock.Difficulty())
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
err = tx.Close(err)
Expect(err).ToNot(HaveOccurred())
// The non-canonical header has a child // The non-canonical header has a child
err = indexAndPublisher.Publish(test_helpers.MockConvertedPayloadForChild) tx, err = indexAndPublisher.PushBlock(test_helpers.MockChild, test_helpers.MockReceipts, test_helpers.MockChild.Difficulty())
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
err = publishCode(db, test_helpers.ContractCodeHash, test_helpers.ContractCode)
hash := sdtypes.CodeAndCodeHash{
Hash: test_helpers.CodeHash,
Code: test_helpers.ContractCode,
}
err = indexAndPublisher.PushCodeAndCodeHash(tx, hash)
Expect(err).ToNot(HaveOccurred())
err = tx.Close(err)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}) })
defer It("test teardown", func() { defer It("test teardown", func() {

View File

@ -19,6 +19,7 @@ package eth
import ( import (
"bytes" "bytes"
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld"
sdtypes "github.com/ethereum/go-ethereum/statediff/types" sdtypes "github.com/ethereum/go-ethereum/statediff/types"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -27,14 +28,12 @@ import (
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/multiformats/go-multihash" "github.com/multiformats/go-multihash"
"github.com/vulcanize/ipld-eth-indexer/pkg/eth" "github.com/ethereum/go-ethereum/statediff/indexer/ipfs"
"github.com/vulcanize/ipld-eth-indexer/pkg/ipfs"
"github.com/vulcanize/ipld-eth-indexer/pkg/ipfs/ipld"
) )
// Filterer interface for substituing mocks in tests // Filterer interface for substituing mocks in tests
type Filterer interface { type Filterer interface {
Filter(filter SubscriptionSettings, payload eth.ConvertedPayload) (*IPLDs, error) Filter(filter SubscriptionSettings, payload ConvertedPayload) (*IPLDs, error)
} }
// ResponseFilterer satisfies the ResponseFilterer interface for ethereum // ResponseFilterer satisfies the ResponseFilterer interface for ethereum
@ -46,7 +45,7 @@ func NewResponseFilterer() *ResponseFilterer {
} }
// Filter is used to filter through eth data to extract and package requested data into a Payload // Filter is used to filter through eth data to extract and package requested data into a Payload
func (s *ResponseFilterer) Filter(filter SubscriptionSettings, payload eth.ConvertedPayload) (*IPLDs, error) { func (s *ResponseFilterer) Filter(filter SubscriptionSettings, payload ConvertedPayload) (*IPLDs, error) {
if checkRange(filter.Start.Int64(), filter.End.Int64(), payload.Block.Number().Int64()) { if checkRange(filter.Start.Int64(), filter.End.Int64(), payload.Block.Number().Int64()) {
response := new(IPLDs) response := new(IPLDs)
response.TotalDifficulty = payload.TotalDifficulty response.TotalDifficulty = payload.TotalDifficulty
@ -73,7 +72,7 @@ func (s *ResponseFilterer) Filter(filter SubscriptionSettings, payload eth.Conve
return nil, nil return nil, nil
} }
func (s *ResponseFilterer) filterHeaders(headerFilter HeaderFilter, response *IPLDs, payload eth.ConvertedPayload) error { func (s *ResponseFilterer) filterHeaders(headerFilter HeaderFilter, response *IPLDs, payload ConvertedPayload) error {
if !headerFilter.Off { if !headerFilter.Off {
headerRLP, err := rlp.EncodeToBytes(payload.Block.Header()) headerRLP, err := rlp.EncodeToBytes(payload.Block.Header())
if err != nil { if err != nil {
@ -115,7 +114,7 @@ func checkRange(start, end, actual int64) bool {
return false return false
} }
func (s *ResponseFilterer) filterTransactions(trxFilter TxFilter, response *IPLDs, payload eth.ConvertedPayload) ([]common.Hash, error) { func (s *ResponseFilterer) filterTransactions(trxFilter TxFilter, response *IPLDs, payload ConvertedPayload) ([]common.Hash, error) {
var trxHashes []common.Hash var trxHashes []common.Hash
if !trxFilter.Off { if !trxFilter.Off {
trxLen := len(payload.Block.Body().Transactions) trxLen := len(payload.Block.Body().Transactions)
@ -163,7 +162,7 @@ func checkTransactionAddrs(wantedSrc, wantedDst []string, actualSrc, actualDst s
return false return false
} }
func (s *ResponseFilterer) filerReceipts(receiptFilter ReceiptFilter, response *IPLDs, payload eth.ConvertedPayload, trxHashes []common.Hash) error { func (s *ResponseFilterer) filerReceipts(receiptFilter ReceiptFilter, response *IPLDs, payload ConvertedPayload, trxHashes []common.Hash) error {
if !receiptFilter.Off { if !receiptFilter.Off {
response.Receipts = make([]ipfs.BlockModel, 0, len(payload.Receipts)) response.Receipts = make([]ipfs.BlockModel, 0, len(payload.Receipts))
for i, receipt := range payload.Receipts { for i, receipt := range payload.Receipts {
@ -253,7 +252,7 @@ func slicesShareString(slice1, slice2 []string) int {
} }
// filterStateAndStorage filters state and storage nodes into the response according to the provided filters // filterStateAndStorage filters state and storage nodes into the response according to the provided filters
func (s *ResponseFilterer) filterStateAndStorage(stateFilter StateFilter, storageFilter StorageFilter, response *IPLDs, payload eth.ConvertedPayload) error { func (s *ResponseFilterer) filterStateAndStorage(stateFilter StateFilter, storageFilter StorageFilter, response *IPLDs, payload ConvertedPayload) error {
response.StateNodes = make([]StateNode, 0, len(payload.StateNodes)) response.StateNodes = make([]StateNode, 0, len(payload.StateNodes))
response.StorageNodes = make([]StorageNode, 0) response.StorageNodes = make([]StorageNode, 0)
stateAddressFilters := make([]common.Hash, len(stateFilter.Addresses)) stateAddressFilters := make([]common.Hash, len(stateFilter.Addresses))
@ -270,37 +269,37 @@ func (s *ResponseFilterer) filterStateAndStorage(stateFilter StateFilter, storag
} }
for _, stateNode := range payload.StateNodes { for _, stateNode := range payload.StateNodes {
if !stateFilter.Off && checkNodeKeys(stateAddressFilters, stateNode.LeafKey) { if !stateFilter.Off && checkNodeKeys(stateAddressFilters, stateNode.LeafKey) {
if stateNode.Type == sdtypes.Leaf || stateFilter.IntermediateNodes { if stateNode.NodeType == sdtypes.Leaf || stateFilter.IntermediateNodes {
cid, err := ipld.RawdataToCid(ipld.MEthStateTrie, stateNode.Value, multihash.KECCAK_256) cid, err := ipld.RawdataToCid(ipld.MEthStateTrie, stateNode.NodeValue, multihash.KECCAK_256)
if err != nil { if err != nil {
return err return err
} }
response.StateNodes = append(response.StateNodes, StateNode{ response.StateNodes = append(response.StateNodes, StateNode{
StateLeafKey: stateNode.LeafKey, StateLeafKey: common.BytesToHash(stateNode.LeafKey),
Path: stateNode.Path, Path: stateNode.Path,
IPLD: ipfs.BlockModel{ IPLD: ipfs.BlockModel{
Data: stateNode.Value, Data: stateNode.NodeValue,
CID: cid.String(), CID: cid.String(),
}, },
Type: stateNode.Type, Type: stateNode.NodeType,
}) })
} }
} }
if !storageFilter.Off && checkNodeKeys(storageAddressFilters, stateNode.LeafKey) { if !storageFilter.Off && checkNodeKeys(storageAddressFilters, stateNode.LeafKey) {
for _, storageNode := range payload.StorageNodes[common.Bytes2Hex(stateNode.Path)] { for _, storageNode := range payload.StorageNodes[common.Bytes2Hex(stateNode.Path)] {
if checkNodeKeys(storageKeyFilters, storageNode.LeafKey) { if checkNodeKeys(storageKeyFilters, storageNode.LeafKey) {
cid, err := ipld.RawdataToCid(ipld.MEthStorageTrie, storageNode.Value, multihash.KECCAK_256) cid, err := ipld.RawdataToCid(ipld.MEthStorageTrie, storageNode.NodeValue, multihash.KECCAK_256)
if err != nil { if err != nil {
return err return err
} }
response.StorageNodes = append(response.StorageNodes, StorageNode{ response.StorageNodes = append(response.StorageNodes, StorageNode{
StateLeafKey: stateNode.LeafKey, StateLeafKey: common.BytesToHash(stateNode.LeafKey),
StorageLeafKey: storageNode.LeafKey, StorageLeafKey: common.BytesToHash(storageNode.LeafKey),
IPLD: ipfs.BlockModel{ IPLD: ipfs.BlockModel{
Data: storageNode.Value, Data: storageNode.NodeValue,
CID: cid.String(), CID: cid.String(),
}, },
Type: storageNode.Type, Type: storageNode.NodeType,
Path: storageNode.Path, Path: storageNode.Path,
}) })
} }
@ -310,13 +309,13 @@ func (s *ResponseFilterer) filterStateAndStorage(stateFilter StateFilter, storag
return nil return nil
} }
func checkNodeKeys(wantedKeys []common.Hash, actualKey common.Hash) bool { func checkNodeKeys(wantedKeys []common.Hash, actualKey []byte) bool {
// If we aren't filtering for any specific keys, all nodes are a go // If we aren't filtering for any specific keys, all nodes are a go
if len(wantedKeys) == 0 { if len(wantedKeys) == 0 {
return true return true
} }
for _, key := range wantedKeys { for _, key := range wantedKeys {
if bytes.Equal(key.Bytes(), actualKey.Bytes()) { if bytes.Equal(key.Bytes(), actualKey) {
return true return true
} }
} }

View File

@ -19,13 +19,12 @@ package eth_test
import ( import (
"bytes" "bytes"
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs"
sdtypes "github.com/ethereum/go-ethereum/statediff/types" sdtypes "github.com/ethereum/go-ethereum/statediff/types"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/vulcanize/ipld-eth-indexer/pkg/ipfs"
"github.com/vulcanize/ipld-eth-server/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers" "github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers"
"github.com/vulcanize/ipld-eth-server/pkg/shared" "github.com/vulcanize/ipld-eth-server/pkg/shared"

View File

@ -22,13 +22,12 @@ import (
"math/big" "math/big"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-indexer/pkg/eth" "github.com/ethereum/go-ethereum/statediff/indexer/ipfs"
"github.com/vulcanize/ipld-eth-indexer/pkg/ipfs"
"github.com/vulcanize/ipld-eth-indexer/pkg/postgres"
"github.com/vulcanize/ipld-eth-server/pkg/shared" "github.com/vulcanize/ipld-eth-server/pkg/shared"
) )
@ -104,7 +103,7 @@ func (f *IPLDFetcher) Fetch(cids CIDWrapper) (*IPLDs, error) {
} }
// FetchHeaders fetches headers // FetchHeaders fetches headers
func (f *IPLDFetcher) FetchHeader(tx *sqlx.Tx, c eth.HeaderModel) (ipfs.BlockModel, error) { func (f *IPLDFetcher) FetchHeader(tx *sqlx.Tx, c models.HeaderModel) (ipfs.BlockModel, error) {
log.Debug("fetching header ipld") log.Debug("fetching header ipld")
headerBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey) headerBytes, err := shared.FetchIPLDByMhKey(tx, c.MhKey)
if err != nil { if err != nil {
@ -117,7 +116,7 @@ func (f *IPLDFetcher) FetchHeader(tx *sqlx.Tx, c eth.HeaderModel) (ipfs.BlockMod
} }
// FetchUncles fetches uncles // FetchUncles fetches uncles
func (f *IPLDFetcher) FetchUncles(tx *sqlx.Tx, cids []eth.UncleModel) ([]ipfs.BlockModel, error) { func (f *IPLDFetcher) FetchUncles(tx *sqlx.Tx, cids []models.UncleModel) ([]ipfs.BlockModel, error) {
log.Debug("fetching uncle iplds") log.Debug("fetching uncle iplds")
uncleIPLDs := make([]ipfs.BlockModel, len(cids)) uncleIPLDs := make([]ipfs.BlockModel, len(cids))
for i, c := range cids { for i, c := range cids {
@ -134,7 +133,7 @@ func (f *IPLDFetcher) FetchUncles(tx *sqlx.Tx, cids []eth.UncleModel) ([]ipfs.Bl
} }
// FetchTrxs fetches transactions // FetchTrxs fetches transactions
func (f *IPLDFetcher) FetchTrxs(tx *sqlx.Tx, cids []eth.TxModel) ([]ipfs.BlockModel, error) { func (f *IPLDFetcher) FetchTrxs(tx *sqlx.Tx, cids []models.TxModel) ([]ipfs.BlockModel, error) {
log.Debug("fetching transaction iplds") log.Debug("fetching transaction iplds")
trxIPLDs := make([]ipfs.BlockModel, len(cids)) trxIPLDs := make([]ipfs.BlockModel, len(cids))
for i, c := range cids { for i, c := range cids {
@ -151,7 +150,7 @@ func (f *IPLDFetcher) FetchTrxs(tx *sqlx.Tx, cids []eth.TxModel) ([]ipfs.BlockMo
} }
// FetchRcts fetches receipts // FetchRcts fetches receipts
func (f *IPLDFetcher) FetchRcts(tx *sqlx.Tx, cids []eth.ReceiptModel) ([]ipfs.BlockModel, error) { func (f *IPLDFetcher) FetchRcts(tx *sqlx.Tx, cids []models.ReceiptModel) ([]ipfs.BlockModel, error) {
log.Debug("fetching receipt iplds") log.Debug("fetching receipt iplds")
rctIPLDs := make([]ipfs.BlockModel, len(cids)) rctIPLDs := make([]ipfs.BlockModel, len(cids))
for i, c := range cids { for i, c := range cids {
@ -168,7 +167,7 @@ func (f *IPLDFetcher) FetchRcts(tx *sqlx.Tx, cids []eth.ReceiptModel) ([]ipfs.Bl
} }
// FetchState fetches state nodes // FetchState fetches state nodes
func (f *IPLDFetcher) FetchState(tx *sqlx.Tx, cids []eth.StateNodeModel) ([]StateNode, error) { func (f *IPLDFetcher) FetchState(tx *sqlx.Tx, cids []models.StateNodeModel) ([]StateNode, error) {
log.Debug("fetching state iplds") log.Debug("fetching state iplds")
stateNodes := make([]StateNode, 0, len(cids)) stateNodes := make([]StateNode, 0, len(cids))
for _, stateNode := range cids { for _, stateNode := range cids {
@ -193,7 +192,7 @@ func (f *IPLDFetcher) FetchState(tx *sqlx.Tx, cids []eth.StateNodeModel) ([]Stat
} }
// FetchStorage fetches storage nodes // FetchStorage fetches storage nodes
func (f *IPLDFetcher) FetchStorage(tx *sqlx.Tx, cids []eth.StorageNodeWithStateKeyModel) ([]StorageNode, error) { func (f *IPLDFetcher) FetchStorage(tx *sqlx.Tx, cids []models.StorageNodeWithStateKeyModel) ([]StorageNode, error) {
log.Debug("fetching storage iplds") log.Debug("fetching storage iplds")
storageNodes := make([]StorageNode, 0, len(cids)) storageNodes := make([]StorageNode, 0, len(cids))
for _, storageNode := range cids { for _, storageNode := range cids {

View File

@ -17,12 +17,11 @@
package eth_test package eth_test
import ( import (
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/statediff/indexer"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/vulcanize/ipld-eth-indexer/pkg/shared"
eth2 "github.com/vulcanize/ipld-eth-indexer/pkg/eth"
"github.com/vulcanize/ipld-eth-indexer/pkg/postgres"
"github.com/vulcanize/ipld-eth-server/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers" "github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers"
@ -31,18 +30,28 @@ import (
var _ = Describe("IPLDFetcher", func() { var _ = Describe("IPLDFetcher", func() {
var ( var (
db *postgres.DB db *postgres.DB
pubAndIndexer *eth2.IPLDPublisher pubAndIndexer *indexer.StateDiffIndexer
fetcher *eth.IPLDFetcher fetcher *eth.IPLDFetcher
) )
Describe("Fetch", func() { Describe("Fetch", func() {
BeforeEach(func() { BeforeEach(func() {
var err error var (
db, err = shared.SetupDB() err error
tx *indexer.BlockTx
)
db, err = SetupDB()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
pubAndIndexer = eth2.NewIPLDPublisher(db) pubAndIndexer = indexer.NewStateDiffIndexer(params.TestChainConfig, db)
err = pubAndIndexer.Publish(test_helpers.MockConvertedPayload) tx, err = pubAndIndexer.PushBlock(test_helpers.MockBlock, test_helpers.MockReceipts, test_helpers.MockBlock.Difficulty())
for _, node := range test_helpers.MockStateNodes {
err = pubAndIndexer.PushStateNode(tx, node)
Expect(err).ToNot(HaveOccurred())
}
err = tx.Close(err)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
fetcher = eth.NewIPLDFetcher(db) fetcher = eth.NewIPLDFetcher(db)
}) })
AfterEach(func() { AfterEach(func() {
eth.TearDownDB(db) eth.TearDownDB(db)

View File

@ -22,9 +22,8 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/lib/pq" "github.com/lib/pq"
"github.com/vulcanize/ipld-eth-indexer/pkg/postgres"
) )
const ( const (

View File

@ -17,10 +17,9 @@
package eth package eth
import ( import (
"github.com/ethereum/go-ethereum/statediff/indexer/models"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/vulcanize/ipld-eth-indexer/pkg/eth"
"github.com/vulcanize/ipld-eth-indexer/pkg/postgres"
) )
// TearDownDB is used to tear down the watcher dbs after tests // TearDownDB is used to tear down the watcher dbs after tests
@ -46,7 +45,7 @@ func TearDownDB(db *postgres.DB) {
} }
// TxModelsContainsCID used to check if a list of TxModels contains a specific cid string // TxModelsContainsCID used to check if a list of TxModels contains a specific cid string
func TxModelsContainsCID(txs []eth.TxModel, cid string) bool { func TxModelsContainsCID(txs []models.TxModel, cid string) bool {
for _, tx := range txs { for _, tx := range txs {
if tx.CID == cid { if tx.CID == cid {
return true return true
@ -56,7 +55,7 @@ func TxModelsContainsCID(txs []eth.TxModel, cid string) bool {
} }
// ListContainsBytes used to check if a list of byte arrays contains a particular byte array // ListContainsBytes used to check if a list of byte arrays contains a particular byte array
func ReceiptModelsContainsCID(rcts []eth.ReceiptModel, cid string) bool { func ReceiptModelsContainsCID(rcts []models.ReceiptModel, cid string) bool {
for _, rct := range rcts { for _, rct := range rcts {
if rct.CID == cid { if rct.CID == cid {
return true return true

View File

@ -23,11 +23,14 @@ import (
"crypto/rand" "crypto/rand"
"math/big" "math/big"
"github.com/ethereum/go-ethereum/statediff/indexer"
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs"
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
sdtypes "github.com/ethereum/go-ethereum/statediff/types" sdtypes "github.com/ethereum/go-ethereum/statediff/types"
"github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie"
"github.com/vulcanize/ipld-eth-indexer/pkg/shared"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
@ -39,11 +42,7 @@ import (
"github.com/multiformats/go-multihash" "github.com/multiformats/go-multihash"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-indexer/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipld-eth-indexer/pkg/ipfs"
"github.com/vulcanize/ipld-eth-indexer/pkg/ipfs/ipld"
eth2 "github.com/vulcanize/ipld-eth-server/pkg/eth"
) )
// Test variables // Test variables
@ -149,7 +148,7 @@ var (
State2MhKey = shared.MultihashKeyFromCID(State2CID) State2MhKey = shared.MultihashKeyFromCID(State2CID)
StorageCID, _ = ipld.RawdataToCid(ipld.MEthStorageTrie, StorageLeafNode, multihash.KECCAK_256) StorageCID, _ = ipld.RawdataToCid(ipld.MEthStorageTrie, StorageLeafNode, multihash.KECCAK_256)
StorageMhKey = shared.MultihashKeyFromCID(StorageCID) StorageMhKey = shared.MultihashKeyFromCID(StorageCID)
MockTrxMeta = []eth.TxModel{ MockTrxMeta = []models.TxModel{
{ {
CID: "", // This is empty until we go to publish to ipfs CID: "", // This is empty until we go to publish to ipfs
MhKey: "", MhKey: "",
@ -178,7 +177,7 @@ var (
Data: MockContractByteCode, Data: MockContractByteCode,
}, },
} }
MockTrxMetaPostPublsh = []eth.TxModel{ MockTrxMetaPostPublsh = []models.TxModel{
{ {
CID: Trx1CID.String(), // This is empty until we go to publish to ipfs CID: Trx1CID.String(), // This is empty until we go to publish to ipfs
MhKey: Trx1MhKey, MhKey: Trx1MhKey,
@ -207,7 +206,7 @@ var (
Data: MockContractByteCode, Data: MockContractByteCode,
}, },
} }
MockRctMeta = []eth.ReceiptModel{ MockRctMeta = []models.ReceiptModel{
{ {
CID: "", CID: "",
MhKey: "", MhKey: "",
@ -246,7 +245,7 @@ var (
LogContracts: []string{}, LogContracts: []string{},
}, },
} }
MockRctMetaPostPublish = []eth.ReceiptModel{ MockRctMetaPostPublish = []models.ReceiptModel{
{ {
CID: Rct1CID.String(), CID: Rct1CID.String(),
MhKey: Rct1MhKey, MhKey: Rct1MhKey,
@ -331,21 +330,30 @@ var (
Account, Account,
}) })
MockStateNodes = []eth.TrieNode{ MockStateNodes = []sdtypes.StateNode{
{ {
LeafKey: common.BytesToHash(ContractLeafKey), LeafKey: ContractLeafKey,
Path: []byte{'\x06'}, Path: []byte{'\x06'},
Value: ContractLeafNode, NodeValue: ContractLeafNode,
Type: sdtypes.Leaf, NodeType: sdtypes.Leaf,
StorageNodes: []sdtypes.StorageNode{
{
Path: []byte{},
NodeType: sdtypes.Leaf,
LeafKey: StorageLeafKey,
NodeValue: StorageLeafNode,
},
},
}, },
{ {
LeafKey: common.BytesToHash(AccountLeafKey), LeafKey: AccountLeafKey,
Path: []byte{'\x0c'}, Path: []byte{'\x0c'},
Value: AccountLeafNode, NodeValue: AccountLeafNode,
Type: sdtypes.Leaf, NodeType: sdtypes.Leaf,
StorageNodes: []sdtypes.StorageNode{},
}, },
} }
MockStateMetaPostPublish = []eth.StateNodeModel{ MockStateMetaPostPublish = []models.StateNodeModel{
{ {
CID: State1CID.String(), CID: State1CID.String(),
MhKey: State1MhKey, MhKey: State1MhKey,
@ -361,13 +369,13 @@ var (
StateKey: common.BytesToHash(AccountLeafKey).Hex(), StateKey: common.BytesToHash(AccountLeafKey).Hex(),
}, },
} }
MockStorageNodes = map[string][]eth.TrieNode{ MockStorageNodes = map[string][]sdtypes.StorageNode{
contractPath: { contractPath: {
{ {
LeafKey: common.BytesToHash(StorageLeafKey), LeafKey: StorageLeafKey,
Value: StorageLeafNode, NodeValue: StorageLeafNode,
Type: sdtypes.Leaf, NodeType: sdtypes.Leaf,
Path: []byte{}, Path: []byte{},
}, },
}, },
} }
@ -391,11 +399,11 @@ var (
StateNodes: MockStateNodes, StateNodes: MockStateNodes,
} }
Reward = eth.CalcEthBlockReward(MockBlock.Header(), MockBlock.Uncles(), MockBlock.Transactions(), MockReceipts) Reward = indexer.CalcEthBlockReward(MockBlock.Header(), MockBlock.Uncles(), MockBlock.Transactions(), MockReceipts)
MockCIDWrapper = &eth2.CIDWrapper{ MockCIDWrapper = &eth.CIDWrapper{
BlockNumber: new(big.Int).Set(BlockNumber), BlockNumber: new(big.Int).Set(BlockNumber),
Header: eth.HeaderModel{ Header: models.HeaderModel{
BlockNumber: "1", BlockNumber: "1",
BlockHash: MockBlock.Hash().String(), BlockHash: MockBlock.Hash().String(),
ParentHash: "0x0000000000000000000000000000000000000000000000000000000000000000", ParentHash: "0x0000000000000000000000000000000000000000000000000000000000000000",
@ -413,9 +421,9 @@ var (
}, },
Transactions: MockTrxMetaPostPublsh, Transactions: MockTrxMetaPostPublsh,
Receipts: MockRctMetaPostPublish, Receipts: MockRctMetaPostPublish,
Uncles: []eth.UncleModel{}, Uncles: []models.UncleModel{},
StateNodes: MockStateMetaPostPublish, StateNodes: MockStateMetaPostPublish,
StorageNodes: []eth.StorageNodeWithStateKeyModel{ StorageNodes: []models.StorageNodeWithStateKeyModel{
{ {
Path: []byte{}, Path: []byte{},
CID: StorageCID.String(), CID: StorageCID.String(),
@ -438,7 +446,7 @@ var (
State2IPLD, _ = blocks.NewBlockWithCid(AccountLeafNode, State2CID) State2IPLD, _ = blocks.NewBlockWithCid(AccountLeafNode, State2CID)
StorageIPLD, _ = blocks.NewBlockWithCid(StorageLeafNode, StorageCID) StorageIPLD, _ = blocks.NewBlockWithCid(StorageLeafNode, StorageCID)
MockIPLDs = eth2.IPLDs{ MockIPLDs = eth.IPLDs{
BlockNumber: new(big.Int).Set(BlockNumber), BlockNumber: new(big.Int).Set(BlockNumber),
Header: ipfs.BlockModel{ Header: ipfs.BlockModel{
Data: HeaderIPLD.RawData(), Data: HeaderIPLD.RawData(),
@ -472,7 +480,7 @@ var (
CID: Rct3IPLD.Cid().String(), CID: Rct3IPLD.Cid().String(),
}, },
}, },
StateNodes: []eth2.StateNode{ StateNodes: []eth.StateNode{
{ {
StateLeafKey: common.BytesToHash(ContractLeafKey), StateLeafKey: common.BytesToHash(ContractLeafKey),
Type: sdtypes.Leaf, Type: sdtypes.Leaf,
@ -492,7 +500,7 @@ var (
Path: []byte{'\x0c'}, Path: []byte{'\x0c'},
}, },
}, },
StorageNodes: []eth2.StorageNode{ StorageNodes: []eth.StorageNode{
{ {
StateLeafKey: common.BytesToHash(ContractLeafKey), StateLeafKey: common.BytesToHash(ContractLeafKey),
StorageLeafKey: common.BytesToHash(StorageLeafKey), StorageLeafKey: common.BytesToHash(StorageLeafKey),
@ -518,7 +526,7 @@ var (
MockLondonTransactions, MockLondonReceipts, SenderAdd = createDynamicTransactionsAndReceipts(LondonBlockNum) MockLondonTransactions, MockLondonReceipts, SenderAdd = createDynamicTransactionsAndReceipts(LondonBlockNum)
MockLondonBlock = createNewBlock(&MockLondonHeader, MockLondonTransactions, nil, MockLondonReceipts, new(trie.Trie)) MockLondonBlock = createNewBlock(&MockLondonHeader, MockLondonTransactions, nil, MockLondonReceipts, new(trie.Trie))
MockLondonTrxMeta = []eth.TxModel{ MockLondonTrxMeta = []models.TxModel{
{ {
CID: "", // This is empty until we go to publish to ipfs CID: "", // This is empty until we go to publish to ipfs
MhKey: "", MhKey: "",
@ -529,7 +537,7 @@ var (
Data: []byte{}, Data: []byte{},
}, },
} }
MockLondonRctMeta = []eth.ReceiptModel{ MockLondonRctMeta = []models.ReceiptModel{
{ {
CID: "", CID: "",
MhKey: "", MhKey: "",
@ -605,7 +613,7 @@ func createDynamicTransactionsAndReceipts(blockNumber *big.Int) (types.Transacti
// TODO: Change the receipt type to DynamicFeeTxType once this PR is merged. // TODO: Change the receipt type to DynamicFeeTxType once this PR is merged.
// https://github.com/ethereum/go-ethereum/pull/22806 // https://github.com/ethereum/go-ethereum/pull/22806
mockReceipt1 := &types.Receipt{ mockReceipt1 := &types.Receipt{
Type: types.AccessListTxType, Type: types.DynamicFeeTxType,
PostState: common.HexToHash("0x1").Bytes(), PostState: common.HexToHash("0x1").Bytes(),
Status: types.ReceiptStatusSuccessful, Status: types.ReceiptStatusSuccessful,
CumulativeGasUsed: 50, CumulativeGasUsed: 50,

View File

@ -22,9 +22,10 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
sdtypes "github.com/ethereum/go-ethereum/statediff/types" sdtypes "github.com/ethereum/go-ethereum/statediff/types"
"github.com/vulcanize/ipld-eth-indexer/pkg/eth"
"github.com/vulcanize/ipld-eth-indexer/pkg/ipfs"
) )
// RPCTransaction represents a transaction that will serialize to the RPC representation of a transaction // RPCTransaction represents a transaction that will serialize to the RPC representation of a transaction
@ -146,10 +147,23 @@ type StorageNode struct {
// Passed to IPLDFetcher // Passed to IPLDFetcher
type CIDWrapper struct { type CIDWrapper struct {
BlockNumber *big.Int BlockNumber *big.Int
Header eth.HeaderModel Header models.HeaderModel
Uncles []eth.UncleModel Uncles []models.UncleModel
Transactions []eth.TxModel Transactions []models.TxModel
Receipts []eth.ReceiptModel Receipts []models.ReceiptModel
StateNodes []eth.StateNodeModel StateNodes []models.StateNodeModel
StorageNodes []eth.StorageNodeWithStateKeyModel StorageNodes []models.StorageNodeWithStateKeyModel
}
// ConvertedPayload is a custom type which packages raw ETH data for publishing to IPFS and filtering to subscribers
// Returned by PayloadConverter
// Passed to IPLDPublisher and ResponseFilterer
type ConvertedPayload struct {
TotalDifficulty *big.Int
Block *types.Block
TxMetaData []models.TxModel
Receipts types.Receipts
ReceiptMetaData []models.ReceiptModel
StateNodes []sdtypes.StateNode
StorageNodes map[string][]sdtypes.StorageNode
} }

View File

@ -28,20 +28,32 @@ import (
"github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
"github.com/ethereum/go-ethereum/statediff/indexer"
"github.com/ethereum/go-ethereum/statediff/indexer/node"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
eth2 "github.com/vulcanize/ipld-eth-indexer/pkg/eth"
"github.com/vulcanize/ipld-eth-indexer/pkg/postgres"
"github.com/vulcanize/ipld-eth-indexer/pkg/shared"
"github.com/vulcanize/ipld-eth-server/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers" "github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers"
"github.com/vulcanize/ipld-eth-server/pkg/graphql" "github.com/vulcanize/ipld-eth-server/pkg/graphql"
) )
// SetupDB is use to setup a db for watcher tests
func SetupDB() (*postgres.DB, error) {
uri := postgres.DbConnectionString(postgres.ConnectionParams{
User: "vdbm",
Password: "password",
Hostname: "localhost",
Name: "vulcanize_testing",
Port: 8077,
})
return postgres.NewDB(uri, postgres.ConnectionConfig{}, node.Info{})
}
var _ = Describe("GraphQL", func() { var _ = Describe("GraphQL", func() {
const ( const (
gqlEndPoint = "127.0.0.1:8083" gqlEndPoint = "127.0.0.1:8083"
@ -66,9 +78,9 @@ var _ = Describe("GraphQL", func() {
It("test init", func() { It("test init", func() {
var err error var err error
db, err = shared.SetupDB() db, err = SetupDB()
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
transformer := eth2.NewStateDiffTransformer(chainConfig, db) transformer := indexer.NewStateDiffIndexer(chainConfig, db)
backend, err = eth.NewEthBackend(db, &eth.Config{ backend, err = eth.NewEthBackend(db, &eth.Config{
ChainConfig: chainConfig, ChainConfig: chainConfig,
VmConfig: vm.Config{}, VmConfig: vm.Config{},
@ -109,35 +121,43 @@ var _ = Describe("GraphQL", func() {
var diff statediff.StateObject var diff statediff.StateObject
diff, err = builder.BuildStateDiffObject(args, params) diff, err = builder.BuildStateDiffObject(args, params)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
diffRlp, err := rlp.EncodeToBytes(diff)
tx, err := transformer.PushBlock(block, rcts, mockTD)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
blockRlp, err := rlp.EncodeToBytes(block)
Expect(err).ToNot(HaveOccurred()) for _, node := range diff.Nodes {
receiptsRlp, err := rlp.EncodeToBytes(rcts) err = transformer.PushStateNode(tx, node)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
payload := statediff.Payload{
StateObjectRlp: diffRlp,
BlockRlp: blockRlp,
ReceiptsRlp: receiptsRlp,
TotalDifficulty: mockTD,
} }
_, err = transformer.Transform(0, payload) err = tx.Close(err)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
} }
// Insert some non-canonical data into the database so that we test our ability to discern canonicity // Insert some non-canonical data into the database so that we test our ability to discern canonicity
indexAndPublisher := eth2.NewIPLDPublisher(db) indexAndPublisher := indexer.NewStateDiffIndexer(chainConfig, db)
blockHash = test_helpers.MockBlock.Hash() blockHash = test_helpers.MockBlock.Hash()
contractAddress = test_helpers.ContractAddr contractAddress = test_helpers.ContractAddr
err = indexAndPublisher.Publish(test_helpers.MockConvertedPayload) tx, err := indexAndPublisher.PushBlock(test_helpers.MockBlock, test_helpers.MockReceipts, test_helpers.MockBlock.Difficulty())
Expect(err).ToNot(HaveOccurred())
err = tx.Close(err)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
// The non-canonical header has a child // The non-canonical header has a child
err = indexAndPublisher.Publish(test_helpers.MockConvertedPayloadForChild) tx, err = indexAndPublisher.PushBlock(test_helpers.MockChild, test_helpers.MockReceipts, test_helpers.MockChild.Difficulty())
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
err = publishCode(db, test_helpers.ContractCodeHash, test_helpers.ContractCode)
ccHash := sdtypes.CodeAndCodeHash{
Hash: test_helpers.CodeHash,
Code: test_helpers.ContractCode,
}
err = indexAndPublisher.PushCodeAndCodeHash(tx, ccHash)
Expect(err).ToNot(HaveOccurred())
err = tx.Close(err)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
graphQLServer, err = graphql.New(backend, gqlEndPoint, nil, []string{"*"}, rpc.HTTPTimeouts{}) graphQLServer, err = graphql.New(backend, gqlEndPoint, nil, []string{"*"}, rpc.HTTPTimeouts{})
@ -208,23 +228,3 @@ var _ = Describe("GraphQL", func() {
}) })
}) })
}) })
func publishCode(db *postgres.DB, codeHash common.Hash, code []byte) error {
tx, err := db.Beginx()
if err != nil {
return err
}
mhKey, err := shared.MultihashKeyFromKeccak256(codeHash)
if err != nil {
_ = tx.Rollback()
return err
}
if err := shared.PublishDirect(tx, mhKey, code); err != nil {
_ = tx.Rollback()
return err
}
return tx.Commit()
}

View File

@ -20,10 +20,9 @@ import (
"context" "context"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-indexer/pkg/shared"
"github.com/vulcanize/ipld-eth-server/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth"
) )

View File

@ -23,15 +23,11 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"github.com/ethereum/go-ethereum/rpc"
"github.com/vulcanize/ipld-eth-indexer/pkg/shared"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/vulcanize/ipld-eth-indexer/pkg/postgres"
"github.com/vulcanize/ipld-eth-indexer/utils"
"github.com/vulcanize/ipld-eth-server/pkg/prom" "github.com/vulcanize/ipld-eth-server/pkg/prom"
"github.com/vulcanize/ipld-eth-server/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth"
@ -56,7 +52,8 @@ const (
// Config struct // Config struct
type Config struct { type Config struct {
DB *postgres.DB DB *postgres.DB
DBConfig postgres.Config DBConfig postgres.ConnectionConfig
DBParams postgres.ConnectionParams
WSEnabled bool WSEnabled bool
WSEndpoint string WSEndpoint string
@ -89,17 +86,17 @@ type Config struct {
func NewConfig() (*Config, error) { func NewConfig() (*Config, error) {
c := new(Config) c := new(Config)
viper.BindEnv("ethereum.httpPath", shared.ETH_HTTP_PATH) viper.BindEnv("ethereum.httpPath", ETH_HTTP_PATH)
viper.BindEnv("ethereum.defaultSender", ETH_DEFAULT_SENDER_ADDR) viper.BindEnv("ethereum.defaultSender", ETH_DEFAULT_SENDER_ADDR)
viper.BindEnv("ethereum.rpcGasCap", ETH_RPC_GAS_CAP) viper.BindEnv("ethereum.rpcGasCap", ETH_RPC_GAS_CAP)
viper.BindEnv("ethereum.chainConfig", ETH_CHAIN_CONFIG) viper.BindEnv("ethereum.chainConfig", ETH_CHAIN_CONFIG)
viper.BindEnv("ethereum.supportsStateDiff", ETH_SUPPORTS_STATEDIFF) viper.BindEnv("ethereum.supportsStateDiff", ETH_SUPPORTS_STATEDIFF)
c.DBConfig.Init() //c.DBConfig.Init()
ethHTTP := viper.GetString("ethereum.httpPath") ethHTTP := viper.GetString("ethereum.httpPath")
ethHTTPEndpoint := fmt.Sprintf("http://%s", ethHTTP) ethHTTPEndpoint := fmt.Sprintf("http://%s", ethHTTP)
nodeInfo, cli, err := shared.GetEthNodeAndClient(ethHTTPEndpoint) nodeInfo, cli, err := getEthNodeAndClient(ethHTTPEndpoint)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -184,9 +181,9 @@ func NewConfig() (*Config, error) {
c.IpldGraphqlEnabled = ipldGraphqlEnabled c.IpldGraphqlEnabled = ipldGraphqlEnabled
overrideDBConnConfig(&c.DBConfig) overrideDBConnConfig(&c.DBConfig)
serveDB := utils.LoadPostgres(c.DBConfig, nodeInfo, false) serveDB, err := postgres.NewDB(postgres.DbConnectionString(c.DBParams), c.DBConfig, nodeInfo)
prom.RegisterDBCollector(c.DBConfig.Name, serveDB.DB) prom.RegisterDBCollector(c.DBParams.Name, serveDB.DB)
c.DB = &serveDB c.DB = serveDB
defaultSenderStr := viper.GetString("ethereum.defaultSender") defaultSenderStr := viper.GetString("ethereum.defaultSender")
if defaultSenderStr != "" { if defaultSenderStr != "" {
@ -208,7 +205,7 @@ func NewConfig() (*Config, error) {
return c, err return c, err
} }
func overrideDBConnConfig(con *postgres.Config) { func overrideDBConnConfig(con *postgres.ConnectionConfig) {
viper.BindEnv("database.server.maxIdle", SERVER_MAX_IDLE_CONNECTIONS) viper.BindEnv("database.server.maxIdle", SERVER_MAX_IDLE_CONNECTIONS)
viper.BindEnv("database.server.maxOpen", SERVER_MAX_OPEN_CONNECTIONS) viper.BindEnv("database.server.maxOpen", SERVER_MAX_OPEN_CONNECTIONS)
viper.BindEnv("database.server.maxLifetime", SERVER_MAX_CONN_LIFETIME) viper.BindEnv("database.server.maxLifetime", SERVER_MAX_CONN_LIFETIME)

41
pkg/serve/env.go Normal file
View File

@ -0,0 +1,41 @@
package serve
import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff/indexer/node"
"github.com/spf13/viper"
)
// Env variables
const (
HTTP_TIMEOUT = "HTTP_TIMEOUT"
ETH_WS_PATH = "ETH_WS_PATH"
ETH_HTTP_PATH = "ETH_HTTP_PATH"
ETH_NODE_ID = "ETH_NODE_ID"
ETH_CLIENT_NAME = "ETH_CLIENT_NAME"
ETH_GENESIS_BLOCK = "ETH_GENESIS_BLOCK"
ETH_NETWORK_ID = "ETH_NETWORK_ID"
ETH_CHAIN_ID = "ETH_CHAIN_ID"
)
// GetEthNodeAndClient returns eth node info and client from path url
func getEthNodeAndClient(path string) (node.Info, *rpc.Client, error) {
viper.BindEnv("ethereum.nodeID", ETH_NODE_ID)
viper.BindEnv("ethereum.clientName", ETH_CLIENT_NAME)
viper.BindEnv("ethereum.genesisBlock", ETH_GENESIS_BLOCK)
viper.BindEnv("ethereum.networkID", ETH_NETWORK_ID)
viper.BindEnv("ethereum.chainID", ETH_CHAIN_ID)
rpcClient, err := rpc.Dial(path)
if err != nil {
return node.Info{}, nil, err
}
return node.Info{
ID: viper.GetString("ethereum.nodeID"),
ClientName: viper.GetString("ethereum.clientName"),
GenesisBlock: viper.GetString("ethereum.genesisBlock"),
NetworkID: viper.GetString("ethereum.networkID"),
ChainID: viper.GetUint64("ethereum.chainID"),
}, rpcClient, nil
}

View File

@ -21,6 +21,7 @@ import (
"strconv" "strconv"
"sync" "sync"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/vulcanize/ipld-eth-server/pkg/net" "github.com/vulcanize/ipld-eth-server/pkg/net"
"github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/core/vm"
@ -32,10 +33,6 @@ import (
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
eth2 "github.com/vulcanize/ipld-eth-indexer/pkg/eth"
"github.com/vulcanize/ipld-eth-indexer/pkg/postgres"
"github.com/vulcanize/ipld-eth-server/pkg/eth" "github.com/vulcanize/ipld-eth-server/pkg/eth"
) )
@ -52,7 +49,7 @@ type Server interface {
APIs() []rpc.API APIs() []rpc.API
Protocols() []p2p.Protocol Protocols() []p2p.Protocol
// Pub-Sub handling event loop // Pub-Sub handling event loop
Serve(wg *sync.WaitGroup, screenAndServePayload <-chan eth2.ConvertedPayload) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan eth.ConvertedPayload)
// Method to subscribe to the service // Method to subscribe to the service
Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params eth.SubscriptionSettings) Subscribe(id rpc.ID, sub chan<- SubscriptionPayload, quitChan chan<- bool, params eth.SubscriptionSettings)
// Method to unsubscribe from the service // Method to unsubscribe from the service
@ -145,7 +142,7 @@ func (sap *Service) APIs() []rpc.API {
// It filters and sends this data to any subscribers to the service // It filters and sends this data to any subscribers to the service
// This process can also be stood up alone, without an screenAndServePayload attached to a Sync process // This process can also be stood up alone, without an screenAndServePayload attached to a Sync process
// and it will hang on the WaitGroup indefinitely, allowing the Service to serve historical data requests only // and it will hang on the WaitGroup indefinitely, allowing the Service to serve historical data requests only
func (sap *Service) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan eth2.ConvertedPayload) { func (sap *Service) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan eth.ConvertedPayload) {
sap.serveWg = wg sap.serveWg = wg
go func() { go func() {
wg.Add(1) wg.Add(1)
@ -164,7 +161,7 @@ func (sap *Service) Serve(wg *sync.WaitGroup, screenAndServePayload <-chan eth2.
} }
// filterAndServe filters the payload according to each subscription type and sends to the subscriptions // filterAndServe filters the payload according to each subscription type and sends to the subscriptions
func (sap *Service) filterAndServe(payload eth2.ConvertedPayload) { func (sap *Service) filterAndServe(payload eth.ConvertedPayload) {
log.Debug("sending eth ipld payload to subscriptions") log.Debug("sending eth ipld payload to subscriptions")
sap.Lock() sap.Lock()
sap.serveWg.Add(1) sap.serveWg.Add(1)
@ -337,7 +334,7 @@ func (sap *Service) Unsubscribe(id rpc.ID) {
func (sap *Service) Start() error { func (sap *Service) Start() error {
log.Info("starting eth ipld server") log.Info("starting eth ipld server")
wg := new(sync.WaitGroup) wg := new(sync.WaitGroup)
payloadChan := make(chan eth2.ConvertedPayload, PayloadChanBufferSize) payloadChan := make(chan eth.ConvertedPayload, PayloadChanBufferSize)
sap.Serve(wg, payloadChan) sap.Serve(wg, payloadChan)
return nil return nil
} }

View File

@ -18,13 +18,13 @@ package shared
import ( import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-blockstore" "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-ipfs-ds-help" "github.com/ipfs/go-ipfs-ds-help"
node "github.com/ipfs/go-ipld-format" node "github.com/ipfs/go-ipld-format"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-indexer/pkg/ipfs/ipld"
) )
// HandleZeroAddrPointer will return an emtpy string for a nil address pointer // HandleZeroAddrPointer will return an emtpy string for a nil address pointer

View File

@ -19,7 +19,7 @@ package shared
import ( import (
"bytes" "bytes"
"github.com/vulcanize/ipld-eth-indexer/pkg/ipfs" "github.com/ethereum/go-ethereum/statediff/indexer/ipfs"
) )
// IPLDsContainBytes used to check if a list of strings contains a particular string // IPLDsContainBytes used to check if a list of strings contains a particular string

View File

@ -19,13 +19,12 @@ package test_config
import ( import (
"errors" "errors"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/vulcanize/ipld-eth-indexer/pkg/postgres"
) )
var DBConfig postgres.Config var DBConfig postgres.ConnectionParams
func init() { func init() {
setTestConfig() setTestConfig()
@ -53,7 +52,7 @@ func setTestConfig() {
port := vip.GetInt("database.port") port := vip.GetInt("database.port")
name := vip.GetString("database.name") name := vip.GetString("database.name")
DBConfig = postgres.Config{ DBConfig = postgres.ConnectionParams{
Hostname: hn, Hostname: hn,
Name: name, Name: name,
Port: port, Port: port,