Fix get log API to use log_cids table.

This commit is contained in:
Arijit Das 2021-08-20 13:07:11 +05:30
parent 3d1b308326
commit a28892f1d3
23 changed files with 553 additions and 1300 deletions

View File

@ -6,13 +6,9 @@ CREATE TABLE eth.receipt_cids (
mh_key TEXT NOT NULL REFERENCES public.blocks (key) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
contract VARCHAR(66),
contract_hash VARCHAR(66),
topic0s VARCHAR(66)[],
topic1s VARCHAR(66)[],
topic2s VARCHAR(66)[],
topic3s VARCHAR(66)[],
log_contracts VARCHAR(66)[],
post_state VARCHAR(66),
post_status INTEGER,
log_root VARCHAR(66),
UNIQUE (tx_id)
);

View File

@ -36,16 +36,6 @@ CREATE INDEX rct_contract_index ON eth.receipt_cids USING btree (contract);
CREATE INDEX rct_contract_hash_index ON eth.receipt_cids USING btree (contract_hash);
CREATE INDEX rct_topic0_index ON eth.receipt_cids USING gin (topic0s);
CREATE INDEX rct_topic1_index ON eth.receipt_cids USING gin (topic1s);
CREATE INDEX rct_topic2_index ON eth.receipt_cids USING gin (topic2s);
CREATE INDEX rct_topic3_index ON eth.receipt_cids USING gin (topic3s);
CREATE INDEX rct_log_contract_index ON eth.receipt_cids USING gin (log_contracts);
-- state node indexes
CREATE INDEX state_header_id_index ON eth.state_cids USING btree (header_id);
@ -93,11 +83,6 @@ DROP INDEX eth.state_leaf_key_index;
DROP INDEX eth.state_header_id_index;
-- receipt indexes
DROP INDEX eth.rct_log_contract_index;
DROP INDEX eth.rct_topic3_index;
DROP INDEX eth.rct_topic2_index;
DROP INDEX eth.rct_topic1_index;
DROP INDEX eth.rct_topic0_index;
DROP INDEX eth.rct_contract_hash_index;
DROP INDEX eth.rct_contract_index;
DROP INDEX eth.rct_mh_index;

View File

@ -1,11 +1,11 @@
-- +goose Up
CREATE TABLE eth.log_cids (
id SERIAL PRIMARY KEY,
leaf_cid TEXT NOT NULL,
leaf_mh_key TEXT NOT NULL REFERENCES public.blocks (key) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
receipt_id INTEGER NOT NULL REFERENCES eth.receipt_cids (id) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
address VARCHAR(66),
cid TEXT NOT NULL,
address VARCHAR(66) NOT NULL,
log_data BYTEA,
mh_key TEXT NOT NULL REFERENCES public.blocks (key) ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
index INTEGER NOT NULL,
topic0 VARCHAR(66),
topic1 VARCHAR(66),
@ -14,20 +14,12 @@ CREATE TABLE eth.log_cids (
UNIQUE (receipt_id, index)
);
ALTER TABLE eth.receipt_cids
DROP COLUMN topic0s,
DROP COLUMN topic1s,
DROP COLUMN topic2s,
DROP COLUMN topic3s,
DROP COLUMN log_contracts,
ADD COLUMN log_root VARCHAR(66);
CREATE INDEX log_mh_index ON eth.log_cids USING btree (leaf_mh_key);
CREATE INDEX log_cid_index ON eth.log_cids USING btree (leaf_cid);
CREATE INDEX log_rct_id_index ON eth.log_cids USING btree (receipt_id);
CREATE INDEX log_mh_index ON eth.log_cids USING btree (mh_key);
CREATE INDEX log_cid_index ON eth.log_cids USING btree (cid);
--
-- Name: log_topic0_index; Type: INDEX; Schema: eth; Owner: -
--
@ -57,4 +49,13 @@ CREATE INDEX log_topic3_index ON eth.log_cids USING btree (topic3);
-- +goose Down
DROP TABLE eth.logs;
-- log indexes
DROP INDEX eth.log_mh_index;
DROP INDEX eth.log_cid_index;
DROP INDEX eth.log_rct_id_index;
DROP INDEX eth.log_topic0_index;
DROP INDEX eth.log_topic1_index;
DROP INDEX eth.log_topic2_index;
DROP INDEX eth.log_topic3_index;
DROP TABLE eth.log_cids;

View File

@ -5,7 +5,7 @@ services:
restart: unless-stopped
depends_on:
- statediff-migrations
image: vulcanize/dapptools:v0.29.0-v1.10.2-statediff-0.0.19
image: vulcanize/dapptools:v0.29.0-v1.10.8-statediff-0.0.26
environment:
DB_USER: vdbm
DB_NAME: vulcanize_public
@ -20,7 +20,7 @@ services:
restart: on-failure
depends_on:
- db
image: vulcanize/statediff-migrations:v0.4.0
image: vulcanize/statediff-migrations:v0.6.0
environment:
DATABASE_USER: vdbm
DATABASE_NAME: vulcanize_public

9
go.mod
View File

@ -3,7 +3,7 @@ module github.com/vulcanize/ipld-eth-server
go 1.13
require (
github.com/ethereum/go-ethereum v1.10.6
github.com/ethereum/go-ethereum v1.10.8
github.com/graph-gophers/graphql-go v0.0.0-20201113091052-beb923fada29
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-cid v0.0.7
@ -13,6 +13,7 @@ require (
github.com/jmoiron/sqlx v1.2.0
github.com/lib/pq v1.10.2
github.com/machinebox/graphql v0.2.2
github.com/matryer/is v1.4.0 // indirect
github.com/multiformats/go-multihash v0.0.14
github.com/onsi/ginkgo v1.16.2
github.com/onsi/gomega v1.10.1
@ -23,9 +24,9 @@ require (
github.com/spf13/viper v1.7.0
github.com/tklauser/go-sysconf v0.3.6 // indirect
github.com/vulcanize/gap-filler v0.3.1
github.com/vulcanize/ipfs-ethdb v0.0.2-alpha
github.com/vulcanize/ipfs-ethdb v0.0.4-0.20210824131459-7bb49801fc12
)
replace github.com/ethereum/go-ethereum v1.10.6 => github.com/vulcanize/go-ethereum v1.10.6-statediff-0.0.25.0.20210816125050-7b20e5918be3
replace github.com/ethereum/go-ethereum v1.10.8 => github.com/vulcanize/go-ethereum v1.10.8-statediff-0.0.26
replace github.com/vulcanize/ipfs-ethdb v0.0.2-alpha => github.com/vulcanize/pg-ipfs-ethdb v0.0.2-alpha
replace github.com/vulcanize/ipfs-ethdb v0.0.2-alpha => github.com/vulcanize/pg-ipfs-ethdb v0.0.2-alpha

978
go.sum

File diff suppressed because it is too large Load Diff

View File

@ -24,7 +24,6 @@ import (
"fmt"
"io"
"math/big"
"strconv"
"time"
"github.com/ethereum/go-ethereum/common"
@ -94,6 +93,7 @@ func (pea *PublicEthAPI) GetHeaderByNumber(ctx context.Context, number rpc.Block
return pea.rpcMarshalHeader(header)
}
}
return nil, err
}
@ -105,6 +105,7 @@ func (pea *PublicEthAPI) GetHeaderByHash(ctx context.Context, hash common.Hash)
return res
}
}
if pea.ethClient != nil {
if header, err := pea.ethClient.HeaderByHash(ctx, hash); header != nil && err == nil {
go pea.writeStateDiffFor(hash)
@ -113,17 +114,25 @@ func (pea *PublicEthAPI) GetHeaderByHash(ctx context.Context, hash common.Hash)
}
}
}
return nil
}
// rpcMarshalHeader uses the generalized output filler, then adds the total difficulty field
func (pea *PublicEthAPI) rpcMarshalHeader(header *types.Header) (map[string]interface{}, error) {
fields := RPCMarshalHeader(header)
var extractMiner bool
if pea.B.Config.ChainConfig.Clique != nil {
extractMiner = true
}
fields := RPCMarshalHeader(header, extractMiner)
td, err := pea.B.GetTd(header.Hash())
if err != nil {
return nil, err
}
fields["totalDifficulty"] = (*hexutil.Big)(td)
return fields, nil
}
@ -143,12 +152,14 @@ func (pea *PublicEthAPI) GetBlockByNumber(ctx context.Context, number rpc.BlockN
if block != nil && err == nil {
return pea.rpcMarshalBlock(block, true, fullTx)
}
if pea.ethClient != nil {
if block, err := pea.ethClient.BlockByNumber(ctx, big.NewInt(number.Int64())); block != nil && err == nil {
go pea.writeStateDiffAt(number.Int64())
return pea.rpcMarshalBlock(block, true, fullTx)
}
}
return nil, err
}
@ -159,12 +170,14 @@ func (pea *PublicEthAPI) GetBlockByHash(ctx context.Context, hash common.Hash, f
if block != nil && err == nil {
return pea.rpcMarshalBlock(block, true, fullTx)
}
if pea.ethClient != nil {
if block, err := pea.ethClient.BlockByHash(ctx, hash); block != nil && err == nil {
go pea.writeStateDiffFor(hash)
return pea.rpcMarshalBlock(block, true, fullTx)
}
}
return nil, err
}
@ -177,9 +190,11 @@ func (pea *PublicEthAPI) ChainId() hexutil.Uint64 {
return 0
}
if config := pea.B.Config.ChainConfig; config.IsEIP155(block.Number()) {
chainID = config.ChainID
}
return (hexutil.Uint64)(chainID.Uint64())
}
@ -202,12 +217,14 @@ func (pea *PublicEthAPI) GetUncleByBlockNumberAndIndex(ctx context.Context, bloc
block = types.NewBlockWithHeader(uncles[index])
return pea.rpcMarshalBlock(block, false, false)
}
if pea.rpc != nil {
if uncle, uncleHashes, err := getBlockAndUncleHashes(pea.rpc, ctx, "eth_getUncleByBlockNumberAndIndex", blockNr, index); uncle != nil && err == nil {
go pea.writeStateDiffAt(blockNr.Int64())
return pea.rpcMarshalBlockWithUncleHashes(uncle, uncleHashes, false, false)
}
}
return nil, err
}
@ -224,12 +241,14 @@ func (pea *PublicEthAPI) GetUncleByBlockHashAndIndex(ctx context.Context, blockH
block = types.NewBlockWithHeader(uncles[index])
return pea.rpcMarshalBlock(block, false, false)
}
if pea.rpc != nil {
if uncle, uncleHashes, err := getBlockAndUncleHashes(pea.rpc, ctx, "eth_getUncleByBlockHashAndIndex", blockHash, index); uncle != nil && err == nil {
go pea.writeStateDiffFor(blockHash)
return pea.rpcMarshalBlockWithUncleHashes(uncle, uncleHashes, false, false)
}
}
return nil, err
}
@ -239,6 +258,7 @@ func (pea *PublicEthAPI) GetUncleCountByBlockNumber(ctx context.Context, blockNr
n := hexutil.Uint(len(block.Uncles()))
return &n
}
if pea.rpc != nil {
var num *hexutil.Uint
if err := pea.rpc.CallContext(ctx, &num, "eth_getUncleCountByBlockNumber", blockNr); num != nil && err == nil {
@ -246,6 +266,7 @@ func (pea *PublicEthAPI) GetUncleCountByBlockNumber(ctx context.Context, blockNr
return num
}
}
return nil
}
@ -255,6 +276,7 @@ func (pea *PublicEthAPI) GetUncleCountByBlockHash(ctx context.Context, blockHash
n := hexutil.Uint(len(block.Uncles()))
return &n
}
if pea.rpc != nil {
var num *hexutil.Uint
if err := pea.rpc.CallContext(ctx, &num, "eth_getUncleCountByBlockHash", blockHash); num != nil && err == nil {
@ -262,6 +284,7 @@ func (pea *PublicEthAPI) GetUncleCountByBlockHash(ctx context.Context, blockHash
return num
}
}
return nil
}
@ -277,6 +300,7 @@ func (pea *PublicEthAPI) GetTransactionCount(ctx context.Context, address common
if count != nil && err == nil {
return count, nil
}
if pea.rpc != nil {
var num *hexutil.Uint64
if err := pea.rpc.CallContext(ctx, &num, "eth_getTransactionCount", address, blockNrOrHash); num != nil && err == nil {
@ -284,6 +308,7 @@ func (pea *PublicEthAPI) GetTransactionCount(ctx context.Context, address common
return num, nil
}
}
return nil, err
}
@ -292,6 +317,7 @@ func (pea *PublicEthAPI) localGetTransactionCount(ctx context.Context, address c
if err != nil {
return nil, err
}
nonce := hexutil.Uint64(account.Nonce)
return &nonce, nil
}
@ -302,6 +328,7 @@ func (pea *PublicEthAPI) GetBlockTransactionCountByNumber(ctx context.Context, b
n := hexutil.Uint(len(block.Transactions()))
return &n
}
if pea.rpc != nil {
var num *hexutil.Uint
if err := pea.rpc.CallContext(ctx, &num, "eth_getBlockTransactionCountByNumber", blockNr); num != nil && err == nil {
@ -309,6 +336,7 @@ func (pea *PublicEthAPI) GetBlockTransactionCountByNumber(ctx context.Context, b
return num
}
}
return nil
}
@ -318,6 +346,7 @@ func (pea *PublicEthAPI) GetBlockTransactionCountByHash(ctx context.Context, blo
n := hexutil.Uint(len(block.Transactions()))
return &n
}
if pea.rpc != nil {
var num *hexutil.Uint
if err := pea.rpc.CallContext(ctx, &num, "eth_getBlockTransactionCountByHash", blockHash); num != nil && err == nil {
@ -325,6 +354,7 @@ func (pea *PublicEthAPI) GetBlockTransactionCountByHash(ctx context.Context, blo
return num
}
}
return nil
}
@ -333,6 +363,7 @@ func (pea *PublicEthAPI) GetTransactionByBlockNumberAndIndex(ctx context.Context
if block, _ := pea.B.BlockByNumber(ctx, blockNr); block != nil {
return newRPCTransactionFromBlockIndex(block, uint64(index))
}
if pea.rpc != nil {
var tx *RPCTransaction
if err := pea.rpc.CallContext(ctx, &tx, "eth_getTransactionByBlockNumberAndIndex", blockNr, index); tx != nil && err == nil {
@ -340,6 +371,7 @@ func (pea *PublicEthAPI) GetTransactionByBlockNumberAndIndex(ctx context.Context
return tx
}
}
return nil
}
@ -348,6 +380,7 @@ func (pea *PublicEthAPI) GetTransactionByBlockHashAndIndex(ctx context.Context,
if block, _ := pea.B.BlockByHash(ctx, blockHash); block != nil {
return newRPCTransactionFromBlockIndex(block, uint64(index))
}
if pea.rpc != nil {
var tx *RPCTransaction
if err := pea.rpc.CallContext(ctx, &tx, "eth_getTransactionByBlockHashAndIndex", blockHash, index); tx != nil && err == nil {
@ -355,6 +388,7 @@ func (pea *PublicEthAPI) GetTransactionByBlockHashAndIndex(ctx context.Context,
return tx
}
}
return nil
}
@ -587,62 +621,14 @@ func (pea *PublicEthAPI) localGetLogs(crit filters.FilterCriteria) ([]*types.Log
}
}()
// If we have a blockhash to filter on, fire off single retrieval query
// If we have a blockHash to filter on, fire off single retrieval query
if crit.BlockHash != nil {
rctCIDs, err := pea.B.Retriever.RetrieveRctCIDs(tx, filter, 0, crit.BlockHash, nil)
filteredLogs, err := pea.B.Retriever.RetrieveFilteredLog(tx, filter, 0, crit.BlockHash)
if err != nil {
return nil, err
}
rctIPLDs, err := pea.B.Fetcher.FetchRcts(tx, rctCIDs)
if err != nil {
return nil, err
}
rcptIDs := make([]int64, len(rctCIDs))
txnIDs := make([]int64, len(rctCIDs))
for idx, v := range rctCIDs {
rcptIDs[idx] = v.ID
txnIDs[idx] = v.TxID
}
logCIDs, err := pea.B.Retriever.RetrieveLogCID(tx, rcptIDs)
if err != nil {
return nil, err
}
logIPLDs, err := pea.B.Fetcher.FetchLogs(tx, logCIDs)
if err != nil {
return nil, err
}
txnCIDs, err := pea.B.Retriever.RetrieveTxCIDsByReceipt(tx, txnIDs)
if err != nil {
return nil, err
}
txnIPLDs, err := pea.B.Fetcher.FetchTrxs(tx, txnCIDs)
if err != nil {
return nil, err
}
// TODO: write query for Retrieving block Number by hash
header, err := pea.B.Retriever.RetrieveHeaderCIDByHash(tx, *crit.BlockHash)
if err != nil {
return nil, err
}
if err = tx.Commit(); err != nil {
return nil, err
}
// TODO: should we convert string to uint ?
blockNumber, err := strconv.ParseUint(header.BlockNumber, 10, 64)
if err != nil {
return nil, err
}
return extractLogsOfInterest(pea.B.Config.ChainConfig, *crit.BlockHash, blockNumber, rctCIDs, txnIPLDs, rctIPLDs, logIPLDs, txnCIDs)
return pea.B.Fetcher.FetchLogs(filteredLogs)
}
// Otherwise, create block range from criteria
@ -664,50 +650,19 @@ func (pea *PublicEthAPI) localGetLogs(crit filters.FilterCriteria) ([]*types.Log
start := startingBlock.Int64()
end := endingBlock.Int64()
var logs []*types.Log
for i := start; i <= end; i++ {
rctCIDs, err := pea.B.Retriever.RetrieveRctCIDs(tx, filter, i, nil, nil)
for i := start; i <= end; {
filteredLog, err := pea.B.Retriever.RetrieveFilteredLog(tx, filter, i, nil)
if err != nil {
return nil, err
}
rctIPLDs, err := pea.B.Fetcher.FetchRcts(tx, rctCIDs)
logCIDs, err := pea.B.Fetcher.FetchLogs(filteredLog)
if err != nil {
return nil, err
}
rcptIDs := make([]int64, len(rctCIDs))
txnIDs := make([]int64, len(rctCIDs))
for idx, v := range rctCIDs {
rcptIDs[idx] = v.ID
txnIDs[idx] = v.TxID
}
logCIDs, err := pea.B.Retriever.RetrieveLogCID(tx, rcptIDs)
if err != nil {
return nil, err
}
logIPLDs, err := pea.B.Fetcher.FetchLogs(tx, logCIDs)
if err != nil {
return nil, err
}
txnCIDs, err := pea.B.Retriever.RetrieveTxCIDsByReceipt(tx, txnIDs)
if err != nil {
return nil, err
}
txnIPLDs, err := pea.B.Fetcher.FetchTrxs(tx, txnCIDs)
if err != nil {
return nil, err
}
canonicalHash, err := pea.B.GetCanonicalHash(uint64(i))
if err != nil {
return nil, err
}
return extractLogsOfInterest(pea.B.Config.ChainConfig, canonicalHash, uint64(i), rctCIDs, txnIPLDs, rctIPLDs, logIPLDs, txnCIDs)
logs = append(logs, logCIDs...)
i++
}
if err := tx.Commit(); err != nil {
@ -1130,7 +1085,12 @@ func (pea *PublicEthAPI) writeStateDiffFor(blockHash common.Hash) {
// rpcMarshalBlock uses the generalized output filler, then adds the total difficulty field
func (pea *PublicEthAPI) rpcMarshalBlock(b *types.Block, inclTx bool, fullTx bool) (map[string]interface{}, error) {
fields, err := RPCMarshalBlock(b, inclTx, fullTx)
var extractMiner bool
if pea.B.Config.ChainConfig.Clique != nil {
extractMiner = true
}
fields, err := RPCMarshalBlock(b, inclTx, fullTx, extractMiner)
if err != nil {
return nil, err
}
@ -1146,7 +1106,12 @@ func (pea *PublicEthAPI) rpcMarshalBlock(b *types.Block, inclTx bool, fullTx boo
// rpcMarshalBlockWithUncleHashes uses the generalized output filler, then adds the total difficulty field
func (pea *PublicEthAPI) rpcMarshalBlockWithUncleHashes(b *types.Block, uncleHashes []common.Hash, inclTx bool, fullTx bool) (map[string]interface{}, error) {
fields, err := RPCMarshalBlockWithUncleHashes(b, uncleHashes, inclTx, fullTx)
var extractMiner bool
if pea.B.Config.ChainConfig.Clique != nil {
extractMiner = true
}
fields, err := RPCMarshalBlockWithUncleHashes(b, uncleHashes, inclTx, fullTx, extractMiner)
if err != nil {
return nil, err
}

View File

@ -20,6 +20,7 @@ import (
"context"
"math/big"
"strconv"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
@ -37,6 +38,7 @@ import (
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
pgipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres"
"github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers"
)
@ -214,6 +216,11 @@ var _ = Describe("API", func() {
ChainConfig: chainConfig,
VmConfig: vm.Config{},
RPCGasCap: big.NewInt(10000000000), // Max gas capacity for a rpc call.
CacheConfig: pgipfsethdb.CacheConfig{
Name: "api_test",
Size: 3000000, // 3MB
ExpiryDuration: time.Hour,
},
})
Expect(err).ToNot(HaveOccurred())
api = eth.NewPublicEthAPI(backend, nil, false)
@ -645,7 +652,13 @@ var _ = Describe("API", func() {
crit := filters.FilterCriteria{
Topics: [][]common.Hash{
{
common.HexToHash("0x04"),
common.HexToHash("0x0c"),
},
{
common.HexToHash("0x0a"),
},
{
common.HexToHash("0x0b"),
},
},
FromBlock: test_helpers.MockBlock.Number(),
@ -653,6 +666,58 @@ var _ = Describe("API", func() {
}
logs, err := api.GetLogs(ctx, crit)
Expect(err).ToNot(HaveOccurred())
Expect(len(logs)).To(Equal(0))
crit = filters.FilterCriteria{
Topics: [][]common.Hash{
{
common.HexToHash("0x08"),
},
{
common.HexToHash("0x0a"),
},
{
common.HexToHash("0x0c"),
},
},
FromBlock: test_helpers.MockBlock.Number(),
ToBlock: test_helpers.MockBlock.Number(),
}
logs, err = api.GetLogs(ctx, crit)
Expect(err).ToNot(HaveOccurred())
Expect(len(logs)).To(Equal(0))
crit = filters.FilterCriteria{
Topics: [][]common.Hash{
{
common.HexToHash("0x09"),
},
{
common.HexToHash("0x0a"),
},
{
common.HexToHash("0x0b"),
},
},
FromBlock: test_helpers.MockBlock.Number(),
ToBlock: test_helpers.MockBlock.Number(),
}
logs, err = api.GetLogs(ctx, crit)
Expect(err).ToNot(HaveOccurred())
Expect(len(logs)).To(Equal(1))
Expect(logs).To(Equal([]*types.Log{test_helpers.MockLog4}))
crit = filters.FilterCriteria{
Topics: [][]common.Hash{
{
common.HexToHash("0x04"),
},
},
FromBlock: test_helpers.MockBlock.Number(),
ToBlock: test_helpers.MockBlock.Number(),
}
logs, err = api.GetLogs(ctx, crit)
Expect(err).ToNot(HaveOccurred())
Expect(len(logs)).To(Equal(1))
Expect(logs).To(Equal([]*types.Log{test_helpers.MockLog1}))
@ -810,8 +875,8 @@ var _ = Describe("API", func() {
}
logs, err = api.GetLogs(ctx, crit)
Expect(err).ToNot(HaveOccurred())
Expect(len(logs)).To(Equal(2))
Expect(logs).To(Equal([]*types.Log{test_helpers.MockLog1, test_helpers.MockLog2}))
Expect(len(logs)).To(Equal(5))
Expect(logs).To(Equal([]*types.Log{test_helpers.MockLog1, test_helpers.MockLog2, test_helpers.MockLog3, test_helpers.MockLog4, test_helpers.MockLog5}))
})
It("Uses the provided blockhash if one is provided", func() {
@ -946,8 +1011,8 @@ var _ = Describe("API", func() {
}
logs, err = api.GetLogs(ctx, crit)
Expect(err).ToNot(HaveOccurred())
Expect(len(logs)).To(Equal(2))
Expect(logs).To(Equal([]*types.Log{test_helpers.MockLog1, test_helpers.MockLog2}))
Expect(len(logs)).To(Equal(5))
Expect(logs).To(Equal([]*types.Log{test_helpers.MockLog1, test_helpers.MockLog2, test_helpers.MockLog3, test_helpers.MockLog4, test_helpers.MockLog5}))
})
It("Filters on contract address if any are provided", func() {

View File

@ -41,10 +41,10 @@ import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/ethereum/go-ethereum/trie"
pgipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
ipfsethdb "github.com/vulcanize/ipfs-ethdb"
)
var (
@ -52,6 +52,10 @@ var (
errNegativeBlockNumber = errors.New("negative block number not supported")
errHeaderHashNotFound = errors.New("header for hash not found")
errHeaderNotFound = errors.New("header not found")
// errMissingSignature is returned if a block's extra-data section doesn't seem
// to contain a 65 byte secp256k1 signature.
errMissingSignature = errors.New("extra-data 65 byte signature suffix missing")
)
const (
@ -101,11 +105,13 @@ type Config struct {
VmConfig vm.Config
DefaultSender *common.Address
RPCGasCap *big.Int
CacheConfig pgipfsethdb.CacheConfig
}
func NewEthBackend(db *postgres.DB, c *Config) (*Backend, error) {
r := NewCIDRetriever(db)
ethDB := ipfsethdb.NewDatabase(db.DB)
ethDB := pgipfsethdb.NewDatabase(db.DB, c.CacheConfig)
return &Backend{
DB: db,
Retriever: r,

View File

@ -26,18 +26,23 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/consensus/clique"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
)
// RPCMarshalHeader converts the given header to the RPC output.
// This function is eth/internal so we have to make our own version here...
func RPCMarshalHeader(head *types.Header) map[string]interface{} {
func RPCMarshalHeader(head *types.Header, extractMiner bool) map[string]interface{} {
if extractMiner {
err := recoverMiner(head)
if err != nil {
return nil
}
}
headerMap := map[string]interface{}{
"number": (*hexutil.Big)(head.Number),
"hash": head.Hash(),
@ -67,8 +72,8 @@ func RPCMarshalHeader(head *types.Header) map[string]interface{} {
// RPCMarshalBlock converts the given block to the RPC output which depends on fullTx. If inclTx is true transactions are
// returned. When fullTx is true the returned block contains full transaction details, otherwise it will only contain
// transaction hashes.
func RPCMarshalBlock(block *types.Block, inclTx bool, fullTx bool) (map[string]interface{}, error) {
fields := RPCMarshalHeader(block.Header())
func RPCMarshalBlock(block *types.Block, inclTx bool, fullTx bool, extractMiner bool) (map[string]interface{}, error) {
fields := RPCMarshalHeader(block.Header(), extractMiner)
fields["size"] = hexutil.Uint64(block.Size())
if inclTx {
@ -101,8 +106,8 @@ func RPCMarshalBlock(block *types.Block, inclTx bool, fullTx bool) (map[string]i
}
// RPCMarshalBlockWithUncleHashes marshals the block with the provided uncle hashes
func RPCMarshalBlockWithUncleHashes(block *types.Block, uncleHashes []common.Hash, inclTx bool, fullTx bool) (map[string]interface{}, error) {
fields := RPCMarshalHeader(block.Header())
func RPCMarshalBlockWithUncleHashes(block *types.Block, uncleHashes []common.Hash, inclTx bool, fullTx bool, extractMiner bool) (map[string]interface{}, error) {
fields := RPCMarshalHeader(block.Header(), extractMiner)
fields["size"] = hexutil.Uint64(block.Size())
if inclTx {
@ -124,8 +129,8 @@ func RPCMarshalBlockWithUncleHashes(block *types.Block, uncleHashes []common.Has
}
fields["transactions"] = transactions
}
fields["uncles"] = uncleHashes
fields["uncles"] = uncleHashes
return fields, nil
}
@ -272,171 +277,6 @@ func newRPCTransactionFromBlockIndex(b *types.Block, index uint64) *RPCTransacti
return NewRPCTransaction(txs[index], b.Hash(), b.NumberU64(), index, b.BaseFee())
}
// extractLogsOfInterest returns logs from the receipt IPLD
func extractLogsOfInterest(config *params.ChainConfig, blockHash common.Hash, blockNumber uint64,
rctCIDs []models.ReceiptModel, txnIPLDs, rctIPLDs []ipfs.BlockModel, logIPLDs map[int64]map[int64]ipfs.BlockModel,
txnCIDs []models.TxModel) ([]*types.Log, error) {
receipts := make(types.Receipts, len(rctIPLDs))
for k, rctBytes := range rctIPLDs {
rct := new(types.Receipt)
if err := rct.UnmarshalBinary(rctBytes.Data); err != nil {
return nil, err
}
if logModels, ok := logIPLDs[rctCIDs[k].ID]; ok {
idx := 0 // TODO: check if this is good way use index for log
for logIdx, v := range logModels {
l := &types.Log{}
err := rlp.DecodeBytes(v.Data, l)
if err != nil {
return nil, err
}
l.Index = uint(logIdx)
rct.Logs[idx] = l
idx++
}
}
receipts[k] = rct
}
txns := make(types.Transactions, len(txnIPLDs))
for idx, txnBytes := range txnIPLDs {
txn := new(types.Transaction)
if err := txn.UnmarshalBinary(txnBytes.Data); err != nil {
return nil, err
}
txns[idx] = txn
}
receipts, err := deriveFields(receipts, config, blockHash, blockNumber, txns, txnCIDs)
if err != nil {
return nil, err
}
var logs []*types.Log
for _, r := range receipts {
logs = append(logs, r.Logs...)
}
return logs, nil
}
func deriveFields(rs types.Receipts, config *params.ChainConfig, hash common.Hash, number uint64, txs types.Transactions,
txnCIDs []models.TxModel) (types.Receipts, error) {
signer := types.MakeSigner(config, new(big.Int).SetUint64(number))
for i := 0; i < len(rs); i++ {
// The transaction type and hash can be retrieved from the transaction itself
rs[i].Type = txs[i].Type()
rs[i].TxHash = txs[i].Hash()
// block location fields
rs[i].BlockHash = hash
rs[i].BlockNumber = new(big.Int).SetUint64(number)
rs[i].TransactionIndex = uint(txnCIDs[i].Index)
// The contract address can be derived from the transaction itself
if txs[i].To() == nil {
// Deriving the signer is expensive, only do if it's actually needed
from, _ := types.Sender(signer, txs[i])
rs[i].ContractAddress = crypto.CreateAddress(from, txs[i].Nonce())
}
// The used gas can be calculated based on previous r
if i == 0 {
rs[i].GasUsed = rs[i].CumulativeGasUsed
} else {
rs[i].GasUsed = rs[i].CumulativeGasUsed - rs[i-1].CumulativeGasUsed
}
for j := 0; j < len(rs[i].Logs); j++ {
rs[i].Logs[j].BlockNumber = number
rs[i].Logs[j].BlockHash = hash
rs[i].Logs[j].TxHash = rs[i].TxHash
rs[i].Logs[j].TxIndex = uint(txnCIDs[i].Index)
}
}
return rs, nil
}
func includes(addresses []common.Address, a common.Address) bool {
for _, addr := range addresses {
if addr == a {
return true
}
}
return false
}
// filterLogs creates a slice of logs matching the given criteria.
func filterLogs(logs []*types.Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []*types.Log {
var ret []*types.Log
Logs:
for _, log := range logs {
if fromBlock != nil && fromBlock.Int64() >= 0 && fromBlock.Uint64() > log.BlockNumber {
continue
}
if toBlock != nil && toBlock.Int64() >= 0 && toBlock.Uint64() < log.BlockNumber {
continue
}
if len(addresses) > 0 && !includes(addresses, log.Address) {
continue
}
// If the to filtered topics is greater than the amount of topics in logs, skip.
if len(topics) > len(log.Topics) {
continue
}
for i, sub := range topics {
match := len(sub) == 0 // empty rule set == wildcard
for _, topic := range sub {
if log.Topics[i] == topic {
match = true
break
}
}
if !match {
continue Logs
}
}
ret = append(ret, log)
}
return ret
}
// returns true if the log matches on the filter
func wantedLog(wantedTopics [][]string, actualTopics []common.Hash) bool {
// actualTopics will always have length <= 4
// wantedTopics will always have length 4
matches := 0
for i, actualTopic := range actualTopics {
// If we have topics in this filter slot, count as a match if the actualTopic matches one of the ones in this filter slot
if len(wantedTopics[i]) > 0 {
matches += sliceContainsHash(wantedTopics[i], actualTopic)
} else {
// Filter slot is empty, not matching any topics at this slot => counts as a match
matches++
}
}
if matches == len(actualTopics) {
return true
}
return false
}
// returns 1 if the slice contains the hash, 0 if it does not
func sliceContainsHash(slice []string, hash common.Hash) int {
for _, str := range slice {
if str == hash.String() {
return 1
}
}
return 0
}
func toFilterArg(q ethereum.FilterQuery) (interface{}, error) {
arg := map[string]interface{}{
"address": q.Addresses,
@ -464,3 +304,24 @@ func toBlockNumArg(number *big.Int) string {
}
return hexutil.EncodeBig(number)
}
func recoverMiner(header *types.Header) error {
// Retrieve the signature from the header extra-data
if len(header.Extra) < crypto.SignatureLength {
return errMissingSignature
}
signature := header.Extra[len(header.Extra)-crypto.SignatureLength:]
// Recover the public key and the Ethereum address
pubkey, err := crypto.Ecrecover(clique.SealHash(header).Bytes(), signature)
if err != nil {
return err
}
var signer common.Address
copy(signer[:], crypto.Keccak256(pubkey[1:])[12:])
header.Coinbase = signer
return nil
}

View File

@ -88,7 +88,7 @@ func (ecr *CIDRetriever) Retrieve(filter SubscriptionSettings, blockNumber int64
var headers []models.HeaderModel
headers, err = ecr.RetrieveHeaderCIDs(tx, blockNumber)
if err != nil {
log.Error("header cid retrieval error")
log.Error("header cid retrieval error", err)
return nil, true, err
}
cws := make([]CIDWrapper, len(headers))
@ -227,6 +227,23 @@ func topicFilterCondition(id int, topics [][]string, args []interface{}, pgStr s
return pgStr, args, id
}
func logFilterCondition(id int, pgStr string, args []interface{}, rctFilter ReceiptFilter) (string, []interface{}, int) {
if len(rctFilter.LogAddresses) > 0 {
pgStr += fmt.Sprintf(` AND eth.log_cids.address = ANY ($%d)`, id)
args = append(args, pq.Array(rctFilter.LogAddresses))
id++
// Filter on topics if there are any
if hasTopics(rctFilter.Topics) {
pgStr, args, id = topicFilterCondition(id, rctFilter.Topics, args, pgStr, false)
}
} else if hasTopics(rctFilter.Topics) {
pgStr, args, id = topicFilterCondition(id, rctFilter.Topics, args, pgStr, false)
}
return pgStr, args, id
}
func receiptFilterConditions(id int, pgStr string, args []interface{}, rctFilter ReceiptFilter, trxIds []int64) (string, []interface{}, int) {
rctCond := " AND (receipt_cids.id = ANY ( "
logQuery := "SELECT receipt_id FROM eth.log_cids WHERE"
@ -272,24 +289,6 @@ func receiptFilterConditions(id int, pgStr string, args []interface{}, rctFilter
return pgStr, args, id
}
// RetrieveLogCID retrieves and returns all of the log cids at the provided receipt IDs
func (ecr *CIDRetriever) RetrieveLogCID(tx *sqlx.Tx, rcptIDs []int64) ([]models.LogsModel, error) {
log.Debug("retrieving log cids for receipt ids")
args := make([]interface{}, 0, 4)
pgStr := `SELECT eth.log_cids.id, eth.log_cids.index, eth.log_cids.receipt_id, eth.log_cids.address, eth.log_cids.cid, eth.log_cids.mh_key,
eth.log_cids.topic0, eth.log_cids.topic1, eth.log_cids.topic2, eth.log_cids.topic3
FROM eth.log_cids WHERE eth.log_cids.receipt_id = ANY ( $1 )
ORDER BY log_cids.index`
args = append(args, pq.Array(rcptIDs))
logCIDs := make([]models.LogsModel, 0)
err := tx.Select(&logCIDs, pgStr, args...)
if err != nil {
return nil, err
}
return logCIDs, nil
}
// RetrieveRctCIDsByHeaderID retrieves and returns all of the rct cids at the provided header ID that conform to the provided
// filter parameters and correspond to the provided tx ids
func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter ReceiptFilter, headerID int64, trxIds []int64) ([]models.ReceiptModel, error) {
@ -310,13 +309,79 @@ func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter Receip
return receiptCids, tx.Select(&receiptCids, pgStr, args...)
}
// RetrieveFilteredGQLLogs retrieves and returns all the log cIDs provided blockHash that conform to the provided
// filter parameters.
func (ecr *CIDRetriever) RetrieveFilteredGQLLogs(tx *sqlx.Tx, rctFilter ReceiptFilter, blockHash *common.Hash) ([]customLog, error) {
log.Debug("retrieving log cids for receipt ids")
args := make([]interface{}, 0, 4)
id := 1
pgStr := `SELECT eth.log_cids.index, eth.log_cids.receipt_id,
eth.log_cids.address, eth.log_cids.topic0, eth.log_cids.topic1, eth.log_cids.topic2, eth.log_cids.topic3,
eth.log_cids.log_data, eth.transaction_cids.tx_hash, data, eth.receipt_cids.cid
FROM eth.log_cids, eth.receipt_cids, eth.transaction_cids, eth.header_cids, public.blocks
WHERE eth.log_cids.receipt_id = receipt_cids.id
AND receipt_cids.tx_id = transaction_cids.id
AND transaction_cids.header_id = header_cids.id
AND receipt_cids.mh_key = blocks.key AND header_cids.block_hash = $1`
args = append(args, blockHash.String())
id++
pgStr, args, id = logFilterCondition(id, pgStr, args, rctFilter)
pgStr += ` ORDER BY log_cids.index`
logCIDs := make([]customLog, 0)
err := tx.Select(&logCIDs, pgStr, args...)
if err != nil {
return nil, err
}
return logCIDs, nil
}
// RetrieveFilteredLog retrieves and returns all the log cIDs provided blockHeight or blockHash that conform to the provided
// filter parameters.
func (ecr *CIDRetriever) RetrieveFilteredLog(tx *sqlx.Tx, rctFilter ReceiptFilter, blockNumber int64, blockHash *common.Hash) ([]customLog, error) {
log.Debug("retrieving log cids for receipt ids")
args := make([]interface{}, 0, 4)
pgStr := `SELECT eth.log_cids.id,eth.log_cids.leaf_cid, eth.log_cids.index, eth.log_cids.receipt_id,
eth.log_cids.address, eth.log_cids.topic0, eth.log_cids.topic1, eth.log_cids.topic2, eth.log_cids.topic3,
eth.log_cids.log_data, eth.transaction_cids.tx_hash, eth.transaction_cids.index as txn_index,
header_cids.block_hash, header_cids.block_number
FROM eth.log_cids, eth.receipt_cids, eth.transaction_cids, eth.header_cids
WHERE eth.log_cids.receipt_id = receipt_cids.id
AND receipt_cids.tx_id = transaction_cids.id
AND transaction_cids.header_id = header_cids.id`
id := 1
if blockNumber > 0 {
pgStr += fmt.Sprintf(` AND header_cids.block_number = $%d`, id)
args = append(args, blockNumber)
id++
}
if blockHash != nil {
pgStr += fmt.Sprintf(` AND header_cids.block_hash = $%d`, id)
args = append(args, blockHash.String())
id++
}
pgStr, args, id = logFilterCondition(id, pgStr, args, rctFilter)
pgStr += ` ORDER BY log_cids.index`
logCIDs := make([]customLog, 0)
err := tx.Select(&logCIDs, pgStr, args...)
if err != nil {
return nil, err
}
return logCIDs, nil
}
// RetrieveRctCIDs retrieves and returns all of the rct cids at the provided blockheight or block hash that conform to the provided
// filter parameters and correspond to the provided tx ids
func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, blockNumber int64, blockHash *common.Hash, trxIds []int64) ([]models.ReceiptModel, error) {
log.Debug("retrieving receipt cids for block ", blockNumber)
args := make([]interface{}, 0, 5)
pgStr := `SELECT receipt_cids.id, receipt_cids.tx_id, receipt_cids.cid, receipt_cids.mh_key,
receipt_cids.contract, receipt_cids.contract_hash,receipt_cids.log_root
pgStr := `SELECT receipt_cids.id, receipt_cids.cid, receipt_cids.mh_key, receipt_cids.tx_id
FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids
WHERE receipt_cids.tx_id = transaction_cids.id
AND transaction_cids.header_id = header_cids.id`

View File

@ -21,6 +21,7 @@ import (
"context"
"io/ioutil"
"math/big"
"time"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
@ -36,6 +37,7 @@ import (
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
pgipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres"
"github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers"
@ -80,6 +82,11 @@ var _ = Describe("eth state reading tests", func() {
ChainConfig: chainConfig,
VmConfig: vm.Config{},
RPCGasCap: big.NewInt(10000000000), // Max gas capacity for a rpc call.
CacheConfig: pgipfsethdb.CacheConfig{
Name: "eth_state",
Size: 3000000, // 3MB
ExpiryDuration: time.Hour,
},
})
Expect(err).ToNot(HaveOccurred())
api = eth.NewPublicEthAPI(backend, nil, false)

View File

@ -20,8 +20,10 @@ import (
"errors"
"fmt"
"math/big"
"strconv"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/jmoiron/sqlx"
@ -166,31 +168,90 @@ func (f *IPLDFetcher) FetchRcts(tx *sqlx.Tx, cids []models.ReceiptModel) ([]ipfs
return rctIPLDs, nil
}
// FetchLogs fetches logs
func (f *IPLDFetcher) FetchLogs(tx *sqlx.Tx, logCIDs []models.LogsModel) (map[int64]map[int64]ipfs.BlockModel, error) {
log.Debug("fetching log iplds")
// FetchLogs fetches logs.
func (f *IPLDFetcher) FetchLogs(logCIDs []customLog) ([]*types.Log, error) {
log.Debug("fetching logs")
// receipt id and log index as key to store log IPLD at log index inside receipt.
logIPLDs := make(map[int64]map[int64]ipfs.BlockModel, len(logCIDs))
for _, l := range logCIDs {
logBytes, err := shared.FetchIPLDByMhKey(tx, l.MhKey)
logs := make([]*types.Log, len(logCIDs))
for i, l := range logCIDs {
topics := make([]common.Hash, 0)
if l.Topic0 != "" {
topics = append(topics, common.HexToHash(l.Topic0))
}
if l.Topic1 != "" {
topics = append(topics, common.HexToHash(l.Topic1))
}
if l.Topic2 != "" {
topics = append(topics, common.HexToHash(l.Topic2))
}
if l.Topic3 != "" {
topics = append(topics, common.HexToHash(l.Topic3))
}
// TODO: should we convert string to uint ?
blockNum, err := strconv.ParseUint(l.BlockNumber, 10, 64)
if err != nil {
return nil, err
}
if v, ok := logIPLDs[l.ReceiptID]; ok {
v[l.Index] = ipfs.BlockModel{
Data: logBytes,
CID: l.CID,
}
continue
}
logIPLDs[l.ReceiptID] = map[int64]ipfs.BlockModel{
l.Index: {Data: logBytes, CID: l.CID},
logs[i] = &types.Log{
Address: common.HexToAddress(l.Address),
Topics: topics,
Data: l.Data,
BlockNumber: blockNum,
TxHash: common.HexToHash(l.TxHash),
TxIndex: uint(l.TxnIndex),
BlockHash: common.HexToHash(l.BlockHash),
Index: uint(l.Index),
Removed: false, // TODO: check where to get this value
}
}
return logIPLDs, nil
return logs, nil
}
type logsCID struct {
Log *types.Log
CID string
RctCID string
RctData []byte
}
// FetchGQLLogs fetches logs for graphql.
func (f *IPLDFetcher) FetchGQLLogs(logCIDs []customLog) ([]logsCID, error) {
log.Debug("fetching logs")
logs := make([]logsCID, len(logCIDs))
for i, l := range logCIDs {
topics := make([]common.Hash, 0)
if l.Topic0 != "" {
topics = append(topics, common.HexToHash(l.Topic0))
}
if l.Topic1 != "" {
topics = append(topics, common.HexToHash(l.Topic1))
}
if l.Topic2 != "" {
topics = append(topics, common.HexToHash(l.Topic2))
}
if l.Topic3 != "" {
topics = append(topics, common.HexToHash(l.Topic3))
}
logs[i] = logsCID{
Log: &types.Log{
Address: common.HexToAddress(l.Address),
Topics: topics,
Data: l.Data,
Index: uint(l.Index),
TxHash: common.HexToHash(l.TxHash),
},
CID: l.LeafCID,
RctCID: l.RctCID,
RctData: l.RctData,
}
}
return logs, nil
}
// FetchState fetches state nodes

View File

@ -39,6 +39,8 @@ func TearDownDB(db *postgres.DB) {
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM blocks`)
Expect(err).NotTo(HaveOccurred())
_, err = tx.Exec(`DELETE FROM eth.log_cids`)
Expect(err).NotTo(HaveOccurred())
err = tx.Commit()
Expect(err).NotTo(HaveOccurred())

View File

@ -96,6 +96,7 @@ var (
MockChildRlp, _ = rlp.EncodeToBytes(MockChild.Header())
Address = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476592")
AnotherAddress = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476593")
AnotherAddress1 = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476594")
ContractAddress = crypto.CreateAddress(SenderAddr, MockTransactions[2].Nonce())
ContractHash = crypto.Keccak256Hash(ContractAddress.Bytes()).String()
MockContractByteCode = []byte{0, 1, 2, 3, 4, 5}
@ -103,6 +104,11 @@ var (
mockTopic12 = common.HexToHash("0x06")
mockTopic21 = common.HexToHash("0x05")
mockTopic22 = common.HexToHash("0x07")
mockTopic31 = common.HexToHash("0x08")
mockTopic41 = common.HexToHash("0x09")
mockTopic42 = common.HexToHash("0x0a")
mockTopic43 = common.HexToHash("0x0b")
mockTopic51 = common.HexToHash("0x0c")
MockLog1 = &types.Log{
Address: Address,
Topics: []common.Hash{mockTopic11, mockTopic12},
@ -119,6 +125,31 @@ var (
TxIndex: 1,
Index: 1,
}
MockLog3 = &types.Log{
Address: AnotherAddress1,
Topics: []common.Hash{mockTopic31},
Data: []byte{},
BlockNumber: BlockNumber.Uint64(),
TxIndex: 1,
Index: 1,
}
MockLog4 = &types.Log{
Address: AnotherAddress1,
Topics: []common.Hash{mockTopic41, mockTopic42, mockTopic43},
Data: []byte{},
BlockNumber: BlockNumber.Uint64(),
TxIndex: 1,
Index: 1,
}
MockLog5 = &types.Log{
Address: AnotherAddress1,
Topics: []common.Hash{mockTopic51},
Data: []byte{},
BlockNumber: BlockNumber.Uint64(),
TxIndex: 1,
Index: 1,
}
Tx1 = GetTxnRlp(0, MockTransactions)
Tx2 = GetTxnRlp(1, MockTransactions)
@ -595,7 +626,7 @@ func createLegacyTransactionsAndReceipts() (types.Transactions, types.Receipts,
mockReceipt2.GasUsed = mockReceipt2.CumulativeGasUsed - mockReceipt1.CumulativeGasUsed
mockReceipt3 := types.NewReceipt(common.HexToHash("0x2").Bytes(), false, 175)
mockReceipt3.Logs = []*types.Log{}
mockReceipt3.Logs = []*types.Log{MockLog3, MockLog4, MockLog5}
mockReceipt3.TxHash = signedTrx3.Hash()
mockReceipt3.GasUsed = mockReceipt3.CumulativeGasUsed - mockReceipt2.CumulativeGasUsed

View File

@ -167,3 +167,24 @@ type ConvertedPayload struct {
StateNodes []sdtypes.StateNode
StorageNodes map[string][]sdtypes.StorageNode
}
// customLog represent a log.
type customLog struct {
ID int64 `db:"id"`
LeafCID string `db:"leaf_cid"`
LeafMhKey string `db:"leaf_mh_key"`
ReceiptID int64 `db:"receipt_id"`
Address string `db:"address"`
Index int64 `db:"index"`
Data []byte `db:"log_data"`
Topic0 string `db:"topic0"`
Topic1 string `db:"topic1"`
Topic2 string `db:"topic2"`
Topic3 string `db:"topic3"`
RctData []byte `db:"data"`
RctCID string `db:"cid"`
BlockNumber string `db:"block_number"`
BlockHash string `db:"block_hash"`
TxnIndex int64 `db:"txn_index"`
TxHash string `db:"tx_hash"`
}

View File

@ -31,7 +31,6 @@ import (
"github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/vulcanize/ipld-eth-server/pkg/eth"
)
@ -94,6 +93,7 @@ type Log struct {
transaction *Transaction
log *types.Log
cid string
receiptCID string
ipldBlock []byte
}
@ -1009,37 +1009,37 @@ func (r *Resolver) GetLogs(ctx context.Context, args struct {
}) (*[]*Log, error) {
ret := make([]*Log, 0, 10)
receiptCIDs, receiptsBytes, txs, err := r.backend.IPLDRetriever.RetrieveReceiptsByBlockHash(args.BlockHash)
// Begin tx
tx, err := r.backend.DB.Beginx()
if err != nil {
return nil, err
}
var logIndexInBlock uint = 0
receipts := make(types.Receipts, len(receiptsBytes))
for index, receiptBytes := range receiptsBytes {
receiptCID := receiptCIDs[index]
receipt := new(types.Receipt)
if err := rlp.DecodeBytes(receiptBytes, receipt); err != nil {
return nil, err
}
filter := eth.ReceiptFilter{
LogAddresses: []string{args.Contract.String()},
}
receipts[index] = receipt
for _, log := range receipt.Logs {
log.Index = logIndexInBlock
logIndexInBlock++
filteredLogs, err := r.backend.Retriever.RetrieveFilteredGQLLogs(tx, filter, &args.BlockHash)
if err = tx.Commit(); err != nil {
return nil, err
}
if args.Contract == nil || *args.Contract == log.Address {
ret = append(ret, &Log{
backend: r.backend,
log: log,
cid: receiptCID,
ipldBlock: receiptBytes,
transaction: &Transaction{
hash: txs[index],
},
})
}
}
rctLog, err := r.backend.Fetcher.FetchGQLLogs(filteredLogs)
if err != nil {
return nil, err
}
for _, l := range rctLog {
ret = append(ret, &Log{
backend: r.backend,
log: l.Log,
cid: l.CID,
receiptCID: l.RctCID,
ipldBlock: l.RctData,
transaction: &Transaction{
hash: l.Log.TxHash,
},
})
}
return &ret, nil

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"math/big"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
@ -36,6 +37,7 @@ import (
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
pgipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres"
"github.com/vulcanize/ipld-eth-server/pkg/eth"
"github.com/vulcanize/ipld-eth-server/pkg/eth/test_helpers"
@ -85,6 +87,11 @@ var _ = Describe("GraphQL", func() {
ChainConfig: chainConfig,
VmConfig: vm.Config{},
RPCGasCap: big.NewInt(10000000000),
CacheConfig: pgipfsethdb.CacheConfig{
Name: "graphql_test",
Size: 3000000, // 3MB
ExpiryDuration: time.Hour,
},
})
Expect(err).ToNot(HaveOccurred())

View File

@ -29,7 +29,7 @@ import (
func StartHTTPEndpoint(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string, timeouts rpc.HTTPTimeouts) (*rpc.Server, error) {
srv := rpc.NewServer()
err := node.RegisterApisFromWhitelist(apis, modules, srv, false)
err := node.RegisterApis(apis, modules, srv, false)
if err != nil {
utils.Fatalf("Could not register HTTP API: %w", err)
}

View File

@ -37,7 +37,7 @@ func StartWSEndpoint(endpoint string, apis []rpc.API, modules []string, wsOrigin
// Register all the APIs exposed by the services
handler := rpc.NewServer()
err = node.RegisterApisFromWhitelist(apis, modules, handler, exposeAll)
err = node.RegisterApis(apis, modules, handler, exposeAll)
if err != nil {
utils.Fatalf("Could not register WS API: %w", err)
}

View File

@ -92,8 +92,7 @@ func NewConfig() (*Config, error) {
viper.BindEnv("ethereum.chainConfig", ETH_CHAIN_CONFIG)
viper.BindEnv("ethereum.supportsStateDiff", ETH_SUPPORTS_STATEDIFF)
//c.DBConfig.Init()
c.dbInit()
ethHTTP := viper.GetString("ethereum.httpPath")
ethHTTPEndpoint := fmt.Sprintf("http://%s", ethHTTP)
nodeInfo, cli, err := getEthNodeAndClient(ethHTTPEndpoint)
@ -213,3 +212,23 @@ func overrideDBConnConfig(con *postgres.ConnectionConfig) {
con.MaxOpen = viper.GetInt("database.server.maxOpen")
con.MaxLifetime = viper.GetInt("database.server.maxLifetime")
}
func (d *Config) dbInit() {
viper.BindEnv("database.name", DATABASE_NAME)
viper.BindEnv("database.hostname", DATABASE_HOSTNAME)
viper.BindEnv("database.port", DATABASE_PORT)
viper.BindEnv("database.user", DATABASE_USER)
viper.BindEnv("database.password", DATABASE_PASSWORD)
viper.BindEnv("database.maxIdle", DATABASE_MAX_IDLE_CONNECTIONS)
viper.BindEnv("database.maxOpen", DATABASE_MAX_OPEN_CONNECTIONS)
viper.BindEnv("database.maxLifetime", DATABASE_MAX_CONN_LIFETIME)
d.DBParams.Name = viper.GetString("database.name")
d.DBParams.Hostname = viper.GetString("database.hostname")
d.DBParams.Port = viper.GetInt("database.port")
d.DBParams.User = viper.GetString("database.user")
d.DBParams.Password = viper.GetString("database.password")
d.DBConfig.MaxIdle = viper.GetInt("database.maxIdle")
d.DBConfig.MaxOpen = viper.GetInt("database.maxOpen")
d.DBConfig.MaxLifetime = viper.GetInt("database.maxLifetime")
}

View File

@ -17,6 +17,15 @@ const (
ETH_GENESIS_BLOCK = "ETH_GENESIS_BLOCK"
ETH_NETWORK_ID = "ETH_NETWORK_ID"
ETH_CHAIN_ID = "ETH_CHAIN_ID"
DATABASE_NAME = "DATABASE_NAME"
DATABASE_HOSTNAME = "DATABASE_HOSTNAME"
DATABASE_PORT = "DATABASE_PORT"
DATABASE_USER = "DATABASE_USER"
DATABASE_PASSWORD = "DATABASE_PASSWORD"
DATABASE_MAX_IDLE_CONNECTIONS = "DATABASE_MAX_IDLE_CONNECTIONS"
DATABASE_MAX_OPEN_CONNECTIONS = "DATABASE_MAX_OPEN_CONNECTIONS"
DATABASE_MAX_CONN_LIFETIME = "DATABASE_MAX_CONN_LIFETIME"
)
// GetEthNodeAndClient returns eth node info and client from path url

View File

@ -20,8 +20,10 @@ import (
"fmt"
"strconv"
"sync"
"time"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
pgipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres"
"github.com/vulcanize/ipld-eth-server/pkg/net"
"github.com/ethereum/go-ethereum/core/vm"
@ -104,6 +106,11 @@ func NewServer(settings *Config) (Server, error) {
VmConfig: vm.Config{},
DefaultSender: settings.DefaultSender,
RPCGasCap: settings.RPCGasCap,
CacheConfig: pgipfsethdb.CacheConfig{
Name: "ipld-eth-server",
Size: 3000000, // 3MB
ExpiryDuration: time.Hour,
},
})
return sap, err
}