fixes and cli integration for new options

This commit is contained in:
i-norden 2021-11-11 21:51:14 -06:00
parent b3add308ee
commit 2339b0a5af
22 changed files with 1008 additions and 138 deletions

View File

@ -18,6 +18,7 @@ package main
import (
"bufio"
"context"
"errors"
"fmt"
"math/big"
@ -26,10 +27,7 @@ import (
"time"
"unicode"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/statediff"
"github.com/naoina/toml"
"gopkg.in/urfave/cli.v1"
"github.com/ethereum/go-ethereum/accounts/external"
@ -38,13 +36,18 @@ import (
"github.com/ethereum/go-ethereum/accounts/usbwallet"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/eth/catalyst"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/ethconfig"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/params"
"github.com/naoina/toml"
"github.com/ethereum/go-ethereum/statediff"
dumpdb "github.com/ethereum/go-ethereum/statediff/indexer/database/dump"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
)
var (
@ -185,48 +188,82 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
}
if ctx.GlobalBool(utils.StateDiffFlag.Name) {
var dbConfig *sql.Config
var indexerConfig interfaces.Config
var clientName, nodeID string
if ctx.GlobalIsSet(utils.StateDiffWritingFlag.Name) {
dbConfig = new(sql.Config)
dbConfig.Hostname = ctx.GlobalString(utils.StateDiffDBHostFlag.Name)
dbConfig.Port = ctx.GlobalInt(utils.StateDiffDBPortFlag.Name)
dbConfig.DatabaseName = ctx.GlobalString(utils.StateDiffDBNameFlag.Name)
dbConfig.Username = ctx.GlobalString(utils.StateDiffDBUserFlag.Name)
dbConfig.Password = ctx.GlobalString(utils.StateDiffDBPasswordFlag.Name)
clientName = ctx.GlobalString(utils.StateDiffDBClientNameFlag.Name)
if ctx.GlobalIsSet(utils.StateDiffDBNodeIDFlag.Name) {
dbConfig.ID = ctx.GlobalString(utils.StateDiffDBNodeIDFlag.Name)
nodeID = ctx.GlobalString(utils.StateDiffDBNodeIDFlag.Name)
} else {
utils.Fatalf("Must specify node ID for statediff DB output")
}
if ctx.GlobalIsSet(utils.StateDiffDBClientNameFlag.Name) {
dbConfig.ClientName = ctx.GlobalString(utils.StateDiffDBClientNameFlag.Name)
} else {
utils.Fatalf("Must specify client name for statediff DB output")
dbTypeStr := ctx.GlobalString(utils.StateDiffDBTypeFlag.Name)
dbType, err := shared.ResolveDBType(dbTypeStr)
if err != nil {
utils.Fatalf("%v", err)
}
if ctx.GlobalIsSet(utils.StateDiffDBMinConns.Name) {
dbConfig.MinConns = ctx.GlobalInt(utils.StateDiffDBMinConns.Name)
}
if ctx.GlobalIsSet(utils.StateDiffDBMaxConns.Name) {
dbConfig.MaxConns = ctx.GlobalInt(utils.StateDiffDBMaxConns.Name)
}
if ctx.GlobalIsSet(utils.StateDiffDBMaxIdleConns.Name) {
dbConfig.MaxIdle = ctx.GlobalInt(utils.StateDiffDBMaxIdleConns.Name)
}
if ctx.GlobalIsSet(utils.StateDiffDBMaxConnLifetime.Name) {
dbConfig.MaxConnLifetime = ctx.GlobalDuration(utils.StateDiffDBMaxConnLifetime.Name) * time.Second
}
if ctx.GlobalIsSet(utils.StateDiffDBMaxConnIdleTime.Name) {
dbConfig.MaxConnIdleTime = ctx.GlobalDuration(utils.StateDiffDBMaxConnIdleTime.Name) * time.Second
}
if ctx.GlobalIsSet(utils.StateDiffDBConnTimeout.Name) {
dbConfig.ConnTimeout = ctx.GlobalDuration(utils.StateDiffDBConnTimeout.Name) * time.Second
switch dbType {
case shared.POSTGRES:
driverTypeStr := ctx.GlobalString(utils.StateDiffDBDriverTypeFlag.Name)
driverType, err := postgres.ResolveDriverType(driverTypeStr)
if err != nil {
utils.Fatalf("%v", err)
}
pgConfig := postgres.Config{
Hostname: ctx.GlobalString(utils.StateDiffDBHostFlag.Name),
Port: ctx.GlobalInt(utils.StateDiffDBPortFlag.Name),
DatabaseName: ctx.GlobalString(utils.StateDiffDBNameFlag.Name),
Username: ctx.GlobalString(utils.StateDiffDBUserFlag.Name),
Password: ctx.GlobalString(utils.StateDiffDBPasswordFlag.Name),
ID: nodeID,
ClientName: clientName,
Driver: driverType,
}
if ctx.GlobalIsSet(utils.StateDiffDBMinConns.Name) {
pgConfig.MinConns = ctx.GlobalInt(utils.StateDiffDBMinConns.Name)
}
if ctx.GlobalIsSet(utils.StateDiffDBMaxConns.Name) {
pgConfig.MaxConns = ctx.GlobalInt(utils.StateDiffDBMaxConns.Name)
}
if ctx.GlobalIsSet(utils.StateDiffDBMaxIdleConns.Name) {
pgConfig.MaxIdle = ctx.GlobalInt(utils.StateDiffDBMaxIdleConns.Name)
}
if ctx.GlobalIsSet(utils.StateDiffDBMaxConnLifetime.Name) {
pgConfig.MaxConnLifetime = ctx.GlobalDuration(utils.StateDiffDBMaxConnLifetime.Name) * time.Second
}
if ctx.GlobalIsSet(utils.StateDiffDBMaxConnIdleTime.Name) {
pgConfig.MaxConnIdleTime = ctx.GlobalDuration(utils.StateDiffDBMaxConnIdleTime.Name) * time.Second
}
if ctx.GlobalIsSet(utils.StateDiffDBConnTimeout.Name) {
pgConfig.ConnTimeout = ctx.GlobalDuration(utils.StateDiffDBConnTimeout.Name) * time.Second
}
indexerConfig = pgConfig
case shared.DUMP:
dumpTypeStr := ctx.GlobalString(utils.StateDiffDBDumpDst.Name)
dumpType, err := dumpdb.ResolveDumpType(dumpTypeStr)
if err != nil {
utils.Fatalf("%v", err)
}
switch dumpType {
case dumpdb.STDERR:
indexerConfig = dumpdb.Config{Dump: os.Stdout}
case dumpdb.STDOUT:
indexerConfig = dumpdb.Config{Dump: os.Stderr}
case dumpdb.DISCARD:
indexerConfig = dumpdb.Config{Dump: dumpdb.NewDiscardWriterCloser()}
default:
utils.Fatalf("unrecognized dump destination: %s", dumpType)
}
default:
utils.Fatalf("unrecognized database type: %s", dbType)
}
}
p := statediff.ServiceParams{
DBParams: dbConfig,
p := statediff.Config{
IndexerConfig: indexerConfig,
ID: nodeID,
ClientName: clientName,
Context: context.Background(),
EnableWriteLoop: ctx.GlobalBool(utils.StateDiffWritingFlag.Name),
NumWorkers: ctx.GlobalUint(utils.StateDiffWorkersFlag.Name),
}

View File

@ -149,6 +149,9 @@ var (
utils.GpoIgnoreGasPriceFlag,
utils.MinerNotifyFullFlag,
utils.StateDiffFlag,
utils.StateDiffDBTypeFlag,
utils.StateDiffDBDriverTypeFlag,
utils.StateDiffDBDumpDst,
utils.StateDiffDBNameFlag,
utils.StateDiffDBPasswordFlag,
utils.StateDiffDBUserFlag,

View File

@ -225,6 +225,9 @@ var AppHelpFlagGroups = []flags.FlagGroup{
Name: "STATE DIFF",
Flags: []cli.Flag{
utils.StateDiffFlag,
utils.StateDiffDBTypeFlag,
utils.StateDiffDBDriverTypeFlag,
utils.StateDiffDBDumpDst,
utils.StateDiffDBNameFlag,
utils.StateDiffDBPasswordFlag,
utils.StateDiffDBUserFlag,

View File

@ -786,6 +786,21 @@ var (
Name: "statediff",
Usage: "Enables the processing of state diffs between each block",
}
StateDiffDBTypeFlag = cli.StringFlag{
Name: "statediff.db.type",
Usage: "Statediff database type",
Value: "postgres",
}
StateDiffDBDriverTypeFlag = cli.StringFlag{
Name: "statediff.db.driver",
Usage: "Statediff database driver type",
Value: "pgx",
}
StateDiffDBDumpDst = cli.StringFlag{
Name: "statediff.dump.dst",
Usage: "Statediff database dump destination (default is stdout)",
Value: "stdout",
}
StateDiffDBHostFlag = cli.StringFlag{
Name: "statediff.db.host",
Usage: "Statediff database hostname/ip",
@ -840,6 +855,7 @@ var (
StateDiffDBClientNameFlag = cli.StringFlag{
Name: "statediff.db.clientname",
Usage: "Client name to use when writing state diffs to database",
Value: "go-ethereum",
}
StateDiffWritingFlag = cli.BoolFlag{
Name: "statediff.writing",
@ -847,7 +863,8 @@ var (
}
StateDiffWorkersFlag = cli.UintFlag{
Name: "statediff.workers",
Usage: "Number of concurrent workers to use during statediff processing (0 = 1)",
Usage: "Number of concurrent workers to use during statediff processing (default 1)",
Value: 1,
}
)
@ -1804,7 +1821,7 @@ func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, cfg node.C
}
// RegisterStateDiffService configures and registers a service to stream state diff data over RPC
func RegisterStateDiffService(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params statediff.ServiceParams) {
func RegisterStateDiffService(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params statediff.Config) {
if err := statediff.New(stack, ethServ, cfg, params); err != nil {
Fatalf("Failed to register the Statediff service: %v", err)
}

View File

@ -79,6 +79,9 @@ This service introduces a CLI flag namespace `statediff`
`--statediff` flag is used to turn on the service
`--statediff.writing` is used to tell the service to write state diff objects it produces from synced ChainEvents directly to a configured Postgres database
`--statediff.workers` is used to set the number of concurrent workers to process state diff objects and write them into the database
`--statediff.db.type` is the type of database we write out to (current options: postgres and dump)
`--statediff.dump.dst` is the destination to write to when operating in database dump mode (stdout, stderr, discard)
`--statediff.db.driver` is the specific driver to use for the database (current options for postgres: pgx and sqlx)
`--statediff.db.host` is the hostname/ip to dial to connect to the database
`--statediff.db.port` is the port to dial to connect to the database
`--statediff.db.name` is the name of the database to connect to

View File

@ -30,21 +30,22 @@ import (
// BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration
type BatchTx struct {
dump io.Writer
quit chan struct{}
iplds chan models.IPLDModel
ipldCache models.IPLDBatch
BlockNumber uint64
dump io.Writer
quit chan struct{}
iplds chan models.IPLDModel
ipldCache models.IPLDBatch
close func(blockTx *BatchTx, err error) error
submit func(blockTx *BatchTx, err error) error
}
// Submit satisfies indexer.AtomicTx
func (tx *BatchTx) Submit(err error) error {
return tx.close(tx, err)
return tx.submit(tx, err)
}
func (tx *BatchTx) flush() error {
if _, err := fmt.Fprintf(tx.dump, "%+v", tx.ipldCache); err != nil {
if _, err := fmt.Fprintf(tx.dump, "%+v\r\n", tx.ipldCache); err != nil {
return err
}
tx.ipldCache = models.IPLDBatch{}

View File

@ -17,15 +17,63 @@
package dump
import (
"fmt"
"io"
"strings"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
)
// DumpType to explicitly type the dump destination
type DumpType string
const (
STDOUT = "Stdout"
STDERR = "Stderr"
DISCARD = "Discard"
UNKNOWN = "Unknown"
)
// ResolveDumpType resolves the dump type for the provided string
func ResolveDumpType(str string) (DumpType, error) {
switch strings.ToLower(str) {
case "stdout", "out", "std out":
return STDOUT, nil
case "stderr", "err", "std err":
return STDERR, nil
case "discard", "void", "devnull", "dev null":
return DISCARD, nil
default:
return UNKNOWN, fmt.Errorf("unrecognized dump type: %s", str)
}
}
// Config for data dump
type Config struct {
Dump io.WriteCloser
}
// Type satisfies interfaces.Config
func (c Config) Type() shared.DBType {
return shared.DUMP
}
// NewDiscardWriterCloser returns a discardWrapper wrapping io.Discard
func NewDiscardWriterCloser() io.WriteCloser {
return discardWrapper{blackhole: io.Discard}
}
// discardWrapper wraps io.Discard with io.Closer
type discardWrapper struct {
blackhole io.Writer
}
// Write satisfies io.Writer
func (dw discardWrapper) Write(b []byte) (int, error) {
return dw.blackhole.Write(b)
}
// Close satisfies io.Closer
func (dw discardWrapper) Close() error {
return nil
}

View File

@ -102,11 +102,12 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
t = time.Now()
blockTx := &BatchTx{
dump: sdi.dump,
iplds: make(chan models.IPLDModel),
quit: make(chan struct{}),
ipldCache: models.IPLDBatch{},
close: func(self *BatchTx, err error) error {
BlockNumber: height,
dump: sdi.dump,
iplds: make(chan models.IPLDModel),
quit: make(chan struct{}),
ipldCache: models.IPLDBatch{},
submit: func(self *BatchTx, err error) error {
close(self.quit)
close(self.iplds)
tDiff := time.Since(t)
@ -205,7 +206,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
Timestamp: header.Time,
BaseFee: baseFee,
}
_, err := fmt.Fprintf(sdi.dump, "%+v", mod)
_, err := fmt.Fprintf(sdi.dump, "%+v\r\n", mod)
return 0, err
}
@ -228,7 +229,7 @@ func (sdi *StateDiffIndexer) processUncles(tx *BatchTx, headerID int64, blockNum
BlockHash: uncleNode.Hash().String(),
Reward: uncleReward.String(),
}
if _, err := fmt.Fprintf(sdi.dump, "%+v", uncle); err != nil {
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", uncle); err != nil {
return err
}
}
@ -319,7 +320,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
if txType != types.LegacyTxType {
txModel.Type = &txType
}
if _, err := fmt.Fprintf(sdi.dump, "%+v", txModel); err != nil {
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", txModel); err != nil {
return err
}
@ -334,7 +335,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
Address: accessListElement.Address.Hex(),
StorageKeys: storageKeys,
}
if _, err := fmt.Fprintf(sdi.dump, "%+v", accessListElementModel); err != nil {
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", accessListElementModel); err != nil {
return err
}
}
@ -357,11 +358,11 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
rctModel.PostState = common.Bytes2Hex(receipt.PostState)
}
if _, err := fmt.Fprintf(sdi.dump, "%+v", rctModel); err != nil {
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", rctModel); err != nil {
return err
}
if _, err := fmt.Fprintf(sdi.dump, "%+v", logDataSet); err != nil {
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", logDataSet); err != nil {
return err
}
}
@ -392,7 +393,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
MhKey: shared.RemovedNodeMhKey,
NodeType: stateNode.NodeType.Int(),
}
_, err := fmt.Fprintf(sdi.dump, "%+v", stateModel)
_, err := fmt.Fprintf(sdi.dump, "%+v\r\n", stateModel)
return err
}
stateCIDStr, stateMhKey, err := tx.cacheRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
@ -407,7 +408,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
NodeType: stateNode.NodeType.Int(),
}
// index the state node, collect the stateID to reference by FK
if _, err := fmt.Fprintf(sdi.dump, "%+v", stateModel); err != nil {
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", stateModel); err != nil {
return err
}
// if we have a leaf, decode and index the account data
@ -429,7 +430,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
CodeHash: account.CodeHash,
StorageRoot: account.Root.String(),
}
if _, err := fmt.Fprintf(sdi.dump, "%+v", accountModel); err != nil {
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", accountModel); err != nil {
return err
}
}
@ -445,7 +446,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
MhKey: shared.RemovedNodeMhKey,
NodeType: storageNode.NodeType.Int(),
}
if _, err := fmt.Fprintf(sdi.dump, "%+v", storageModel); err != nil {
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", storageModel); err != nil {
return err
}
continue
@ -461,7 +462,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
MhKey: storageMhKey,
NodeType: storageNode.NodeType.Int(),
}
if _, err := fmt.Fprintf(sdi.dump, "%+v", storageModel); err != nil {
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", storageModel); err != nil {
return err
}
}

View File

@ -19,33 +19,33 @@ package sql
import (
"context"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help"
node "github.com/ipfs/go-ipld-format"
"github.com/lib/pq"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
)
// BatchTx wraps a sql tx with the state necessary for building the tx concurrently during trie difference iteration
type BatchTx struct {
ctx context.Context
dbtx Tx
headerID int64
stm string
quit chan struct{}
iplds chan models.IPLDModel
ipldCache models.IPLDBatch
BlockNumber uint64
ctx context.Context
dbtx Tx
headerID int64
stm string
quit chan struct{}
iplds chan models.IPLDModel
ipldCache models.IPLDBatch
close func(blockTx *BatchTx, err error) error
submit func(blockTx *BatchTx, err error) error
}
// Submit satisfies indexer.AtomicTx
func (tx *BatchTx) Submit(err error) error {
return tx.close(tx, err)
return tx.submit(tx, err)
}
func (tx *BatchTx) flush() error {

View File

@ -141,13 +141,15 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
}
}()
blockTx := &BatchTx{
stm: sdi.dbWriter.db.InsertIPLDsStm(),
iplds: make(chan models.IPLDModel),
quit: make(chan struct{}),
ipldCache: models.IPLDBatch{},
dbtx: tx,
ctx: sdi.ctx,
BlockNumber: height,
stm: sdi.dbWriter.db.InsertIPLDsStm(),
iplds: make(chan models.IPLDModel),
quit: make(chan struct{}),
ipldCache: models.IPLDBatch{},
dbtx: tx,
// handle transaction commit or rollback for any return case
close: func(self *BatchTx, err error) error {
submit: func(self *BatchTx, err error) error {
close(self.quit)
close(self.iplds)
if p := recover(); p != nil {

View File

@ -0,0 +1,88 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package sql_test
import (
"context"
"testing"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/test_helpers"
)
func setupLegacyPGX(t *testing.T) {
mockLegacyBlock = legacyData.MockBlock
legacyHeaderCID, _ = ipld.RawdataToCid(ipld.MEthHeader, legacyData.MockHeaderRlp, multihash.KECCAK_256)
db, err = postgres.SetupPGXDB()
require.NoError(t, err)
ind, err = sql.NewStateDiffIndexer(context.Background(), legacyData.Config, db)
require.NoError(t, err)
var tx interfaces.Batch
tx, err = ind.PushBlock(
mockLegacyBlock,
legacyData.MockReceipts,
legacyData.MockBlock.Difficulty())
require.NoError(t, err)
defer func() {
if err := tx.Submit(err); err != nil {
t.Fatal(err)
}
}()
for _, node := range legacyData.StateDiffs {
err = ind.PushStateNode(tx, node)
require.NoError(t, err)
}
test_helpers.ExpectEqual(t, tx.(*sql.BatchTx).BlockNumber, legacyData.BlockNumber.Uint64())
}
func TestPGXIndexerLegacy(t *testing.T) {
t.Run("Publish and index header IPLDs in a legacy tx", func(t *testing.T) {
setupLegacyPGX(t)
defer tearDown(t)
pgStr := `SELECT cid, td, reward, id, base_fee
FROM eth.header_cids
WHERE block_number = $1`
// check header was properly indexed
type res struct {
CID string
TD string
Reward string
ID int
BaseFee *int64 `db:"base_fee"`
}
header := new(res)
err = db.QueryRow(context.Background(), pgStr, legacyData.BlockNumber.Uint64()).StructScan(header)
require.NoError(t, err)
test_helpers.ExpectEqual(t, header.CID, legacyHeaderCID.String())
test_helpers.ExpectEqual(t, header.TD, legacyData.MockBlock.Difficulty().String())
test_helpers.ExpectEqual(t, header.Reward, "5000000000000011250")
require.Nil(t, legacyData.MockHeader.BaseFee)
require.Nil(t, header.BaseFee)
})
}

View File

@ -0,0 +1,609 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package sql_test
import (
"bytes"
"context"
"fmt"
"os"
"testing"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/mocks"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
"github.com/ethereum/go-ethereum/statediff/indexer/test_helpers"
)
func init() {
if os.Getenv("MODE") != "statediff" {
fmt.Println("Skipping statediff test")
os.Exit(0)
}
mockBlock = mocks.MockBlock
txs, rcts := mocks.MockBlock.Transactions(), mocks.MockReceipts
buf := new(bytes.Buffer)
txs.EncodeIndex(0, buf)
tx1 = make([]byte, buf.Len())
copy(tx1, buf.Bytes())
buf.Reset()
txs.EncodeIndex(1, buf)
tx2 = make([]byte, buf.Len())
copy(tx2, buf.Bytes())
buf.Reset()
txs.EncodeIndex(2, buf)
tx3 = make([]byte, buf.Len())
copy(tx3, buf.Bytes())
buf.Reset()
txs.EncodeIndex(3, buf)
tx4 = make([]byte, buf.Len())
copy(tx4, buf.Bytes())
buf.Reset()
txs.EncodeIndex(4, buf)
tx5 = make([]byte, buf.Len())
copy(tx5, buf.Bytes())
buf.Reset()
rcts.EncodeIndex(0, buf)
rct1 = make([]byte, buf.Len())
copy(rct1, buf.Bytes())
buf.Reset()
rcts.EncodeIndex(1, buf)
rct2 = make([]byte, buf.Len())
copy(rct2, buf.Bytes())
buf.Reset()
rcts.EncodeIndex(2, buf)
rct3 = make([]byte, buf.Len())
copy(rct3, buf.Bytes())
buf.Reset()
rcts.EncodeIndex(3, buf)
rct4 = make([]byte, buf.Len())
copy(rct4, buf.Bytes())
buf.Reset()
rcts.EncodeIndex(4, buf)
rct5 = make([]byte, buf.Len())
copy(rct5, buf.Bytes())
buf.Reset()
headerCID, _ = ipld.RawdataToCid(ipld.MEthHeader, mocks.MockHeaderRlp, multihash.KECCAK_256)
trx1CID, _ = ipld.RawdataToCid(ipld.MEthTx, tx1, multihash.KECCAK_256)
trx2CID, _ = ipld.RawdataToCid(ipld.MEthTx, tx2, multihash.KECCAK_256)
trx3CID, _ = ipld.RawdataToCid(ipld.MEthTx, tx3, multihash.KECCAK_256)
trx4CID, _ = ipld.RawdataToCid(ipld.MEthTx, tx4, multihash.KECCAK_256)
trx5CID, _ = ipld.RawdataToCid(ipld.MEthTx, tx5, multihash.KECCAK_256)
rct1CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct1, multihash.KECCAK_256)
rct2CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct2, multihash.KECCAK_256)
rct3CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct3, multihash.KECCAK_256)
rct4CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct4, multihash.KECCAK_256)
rct5CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct5, multihash.KECCAK_256)
state1CID, _ = ipld.RawdataToCid(ipld.MEthStateTrie, mocks.ContractLeafNode, multihash.KECCAK_256)
state2CID, _ = ipld.RawdataToCid(ipld.MEthStateTrie, mocks.AccountLeafNode, multihash.KECCAK_256)
storageCID, _ = ipld.RawdataToCid(ipld.MEthStorageTrie, mocks.StorageLeafNode, multihash.KECCAK_256)
}
func setupPGX(t *testing.T) {
db, err = postgres.SetupPGXDB()
if err != nil {
t.Fatal(err)
}
ind, err = sql.NewStateDiffIndexer(context.Background(), mocks.TestConfig, db)
require.NoError(t, err)
var tx interfaces.Batch
tx, err = ind.PushBlock(
mockBlock,
mocks.MockReceipts,
mocks.MockBlock.Difficulty())
if err != nil {
t.Fatal(err)
}
defer func() {
if err := tx.Submit(err); err != nil {
t.Fatal(err)
}
}()
for _, node := range mocks.StateDiffs {
err = ind.PushStateNode(tx, node)
if err != nil {
t.Fatal(err)
}
}
test_helpers.ExpectEqual(t, tx.(*sql.BatchTx).BlockNumber, mocks.BlockNumber.Uint64())
}
func TestPGXIndexer(t *testing.T) {
t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) {
setupPGX(t)
defer tearDown(t)
pgStr := `SELECT cid, td, reward, id, base_fee
FROM eth.header_cids
WHERE block_number = $1`
// check header was properly indexed
type res struct {
CID string
TD string
Reward string
ID int
BaseFee *int64 `db:"base_fee"`
}
header := new(res)
err = db.QueryRow(context.Background(), pgStr, mocks.BlockNumber.Uint64()).StructScan(header)
if err != nil {
t.Fatal(err)
}
test_helpers.ExpectEqual(t, header.CID, headerCID.String())
test_helpers.ExpectEqual(t, header.TD, mocks.MockBlock.Difficulty().String())
test_helpers.ExpectEqual(t, header.Reward, "2000000000000021250")
test_helpers.ExpectEqual(t, *header.BaseFee, mocks.MockHeader.BaseFee.Int64())
dc, err := cid.Decode(header.CID)
if err != nil {
t.Fatal(err)
}
mhKey := dshelp.MultihashToDsKey(dc.Hash())
prefixedKey := blockstore.BlockPrefix.String() + mhKey.String()
var data []byte
err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey)
if err != nil {
t.Fatal(err)
}
test_helpers.ExpectEqual(t, data, mocks.MockHeaderRlp)
})
t.Run("Publish and index transaction IPLDs in a single tx", func(t *testing.T) {
setupPGX(t)
defer tearDown(t)
// check that txs were properly indexed
trxs := make([]string, 0)
pgStr := `SELECT transaction_cids.cid FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.id)
WHERE header_cids.block_number = $1`
err = db.Select(context.Background(), &trxs, pgStr, mocks.BlockNumber.Uint64())
if err != nil {
t.Fatal(err)
}
test_helpers.ExpectEqual(t, len(trxs), 5)
expectTrue(t, test_helpers.ListContainsString(trxs, trx1CID.String()))
expectTrue(t, test_helpers.ListContainsString(trxs, trx2CID.String()))
expectTrue(t, test_helpers.ListContainsString(trxs, trx3CID.String()))
expectTrue(t, test_helpers.ListContainsString(trxs, trx4CID.String()))
expectTrue(t, test_helpers.ListContainsString(trxs, trx5CID.String()))
// and published
for _, c := range trxs {
dc, err := cid.Decode(c)
if err != nil {
t.Fatal(err)
}
mhKey := dshelp.MultihashToDsKey(dc.Hash())
prefixedKey := blockstore.BlockPrefix.String() + mhKey.String()
var data []byte
err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey)
if err != nil {
t.Fatal(err)
}
switch c {
case trx1CID.String():
test_helpers.ExpectEqual(t, data, tx1)
var txType *uint8
pgStr = `SELECT tx_type FROM eth.transaction_cids WHERE cid = $1`
err = db.Get(context.Background(), &txType, pgStr, c)
if err != nil {
t.Fatal(err)
}
if txType != nil {
t.Fatalf("expected nil tx_type, got %d", *txType)
}
case trx2CID.String():
test_helpers.ExpectEqual(t, data, tx2)
var txType *uint8
pgStr = `SELECT tx_type FROM eth.transaction_cids WHERE cid = $1`
err = db.Get(context.Background(), &txType, pgStr, c)
if err != nil {
t.Fatal(err)
}
if txType != nil {
t.Fatalf("expected nil tx_type, got %d", *txType)
}
case trx3CID.String():
test_helpers.ExpectEqual(t, data, tx3)
var txType *uint8
pgStr = `SELECT tx_type FROM eth.transaction_cids WHERE cid = $1`
err = db.Get(context.Background(), &txType, pgStr, c)
if err != nil {
t.Fatal(err)
}
if txType != nil {
t.Fatalf("expected nil tx_type, got %d", *txType)
}
case trx4CID.String():
test_helpers.ExpectEqual(t, data, tx4)
var txType *uint8
pgStr = `SELECT tx_type FROM eth.transaction_cids WHERE cid = $1`
err = db.Get(context.Background(), &txType, pgStr, c)
if err != nil {
t.Fatal(err)
}
if *txType != types.AccessListTxType {
t.Fatalf("expected AccessListTxType (1), got %d", *txType)
}
accessListElementModels := make([]models.AccessListElementModel, 0)
pgStr = `SELECT access_list_element.* FROM eth.access_list_element INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.id) WHERE cid = $1 ORDER BY access_list_element.index ASC`
err = db.Select(context.Background(), &accessListElementModels, pgStr, c)
if err != nil {
t.Fatal(err)
}
if len(accessListElementModels) != 2 {
t.Fatalf("expected two access list entries, got %d", len(accessListElementModels))
}
model1 := models.AccessListElementModel{
Index: accessListElementModels[0].Index,
Address: accessListElementModels[0].Address,
}
model2 := models.AccessListElementModel{
Index: accessListElementModels[1].Index,
Address: accessListElementModels[1].Address,
StorageKeys: accessListElementModels[1].StorageKeys,
}
test_helpers.ExpectEqual(t, model1, mocks.AccessListEntry1Model)
test_helpers.ExpectEqual(t, model2, mocks.AccessListEntry2Model)
case trx5CID.String():
test_helpers.ExpectEqual(t, data, tx5)
var txType *uint8
pgStr = `SELECT tx_type FROM eth.transaction_cids WHERE cid = $1`
err = db.Get(context.Background(), &txType, pgStr, c)
if err != nil {
t.Fatal(err)
}
if *txType != types.DynamicFeeTxType {
t.Fatalf("expected DynamicFeeTxType (2), got %d", *txType)
}
}
}
})
t.Run("Publish and index log IPLDs for multiple receipt of a specific block", func(t *testing.T) {
setupPGX(t)
defer tearDown(t)
rcts := make([]string, 0)
pgStr := `SELECT receipt_cids.leaf_cid FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids
WHERE receipt_cids.tx_id = transaction_cids.id
AND transaction_cids.header_id = header_cids.id
AND header_cids.block_number = $1
ORDER BY transaction_cids.index`
err = db.Select(context.Background(), &rcts, pgStr, mocks.BlockNumber.Uint64())
if err != nil {
t.Fatal(err)
}
type logIPLD struct {
Index int `db:"index"`
Address string `db:"address"`
Data []byte `db:"data"`
Topic0 string `db:"topic0"`
Topic1 string `db:"topic1"`
}
for i := range rcts {
results := make([]logIPLD, 0)
pgStr = `SELECT log_cids.index, log_cids.address, log_cids.Topic0, log_cids.Topic1, data FROM eth.log_cids
INNER JOIN eth.receipt_cids ON (log_cids.receipt_id = receipt_cids.id)
INNER JOIN public.blocks ON (log_cids.leaf_mh_key = blocks.key)
WHERE receipt_cids.leaf_cid = $1 ORDER BY eth.log_cids.index ASC`
err = db.Select(context.Background(), &results, pgStr, rcts[i])
require.NoError(t, err)
// expecting MockLog1 and MockLog2 for mockReceipt4
expectedLogs := mocks.MockReceipts[i].Logs
test_helpers.ExpectEqual(t, len(results), len(expectedLogs))
var nodeElements []interface{}
for idx, r := range results {
// Decode the log leaf node.
err = rlp.DecodeBytes(r.Data, &nodeElements)
require.NoError(t, err)
logRaw, err := rlp.EncodeToBytes(expectedLogs[idx])
require.NoError(t, err)
// 2nd element of the leaf node contains the encoded log data.
test_helpers.ExpectEqual(t, logRaw, nodeElements[1].([]byte))
}
}
})
t.Run("Publish and index receipt IPLDs in a single tx", func(t *testing.T) {
setupPGX(t)
defer tearDown(t)
// check receipts were properly indexed
rcts := make([]string, 0)
pgStr := `SELECT receipt_cids.leaf_cid FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids
WHERE receipt_cids.tx_id = transaction_cids.id
AND transaction_cids.header_id = header_cids.id
AND header_cids.block_number = $1 order by transaction_cids.id`
err = db.Select(context.Background(), &rcts, pgStr, mocks.BlockNumber.Uint64())
if err != nil {
t.Fatal(err)
}
test_helpers.ExpectEqual(t, len(rcts), 5)
for idx, rctLeafCID := range rcts {
result := make([]models.IPLDModel, 0)
pgStr = `SELECT data
FROM eth.receipt_cids
INNER JOIN public.blocks ON (receipt_cids.leaf_mh_key = public.blocks.key)
WHERE receipt_cids.leaf_cid = $1`
err = db.Select(context.Background(), &result, pgStr, rctLeafCID)
if err != nil {
t.Fatal(err)
}
// Decode the log leaf node.
var nodeElements []interface{}
err = rlp.DecodeBytes(result[0].Data, &nodeElements)
require.NoError(t, err)
expectedRct, err := mocks.MockReceipts[idx].MarshalBinary()
require.NoError(t, err)
test_helpers.ExpectEqual(t, expectedRct, nodeElements[1].([]byte))
}
// and published
for _, c := range rcts {
dc, err := cid.Decode(c)
if err != nil {
t.Fatal(err)
}
mhKey := dshelp.MultihashToDsKey(dc.Hash())
prefixedKey := blockstore.BlockPrefix.String() + mhKey.String()
var data []byte
err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey)
if err != nil {
t.Fatal(err)
}
switch c {
case rct1CID.String():
test_helpers.ExpectEqual(t, data, rct1)
var postStatus uint64
pgStr = `SELECT post_status FROM eth.receipt_cids WHERE leaf_cid = $1`
err = db.Get(context.Background(), &postStatus, pgStr, c)
if err != nil {
t.Fatal(err)
}
test_helpers.ExpectEqual(t, postStatus, mocks.ExpectedPostStatus)
case rct2CID.String():
test_helpers.ExpectEqual(t, data, rct2)
var postState string
pgStr = `SELECT post_state FROM eth.receipt_cids WHERE leaf_cid = $1`
err = db.Get(context.Background(), &postState, pgStr, c)
if err != nil {
t.Fatal(err)
}
test_helpers.ExpectEqual(t, postState, mocks.ExpectedPostState1)
case rct3CID.String():
test_helpers.ExpectEqual(t, data, rct3)
var postState string
pgStr = `SELECT post_state FROM eth.receipt_cids WHERE leaf_cid = $1`
err = db.Get(context.Background(), &postState, pgStr, c)
if err != nil {
t.Fatal(err)
}
test_helpers.ExpectEqual(t, postState, mocks.ExpectedPostState2)
case rct4CID.String():
test_helpers.ExpectEqual(t, data, rct4)
var postState string
pgStr = `SELECT post_state FROM eth.receipt_cids WHERE leaf_cid = $1`
err = db.Get(context.Background(), &postState, pgStr, c)
if err != nil {
t.Fatal(err)
}
test_helpers.ExpectEqual(t, postState, mocks.ExpectedPostState3)
case rct5CID.String():
test_helpers.ExpectEqual(t, data, rct5)
var postState string
pgStr = `SELECT post_state FROM eth.receipt_cids WHERE leaf_cid = $1`
err = db.Get(context.Background(), &postState, pgStr, c)
if err != nil {
t.Fatal(err)
}
test_helpers.ExpectEqual(t, postState, mocks.ExpectedPostState3)
}
}
})
t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) {
setupPGX(t)
defer tearDown(t)
// check that state nodes were properly indexed and published
stateNodes := make([]models.StateNodeModel, 0)
pgStr := `SELECT state_cids.id, state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id
FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id)
WHERE header_cids.block_number = $1 AND node_type != 3`
err = db.Select(context.Background(), &stateNodes, pgStr, mocks.BlockNumber.Uint64())
if err != nil {
t.Fatal(err)
}
test_helpers.ExpectEqual(t, len(stateNodes), 2)
for _, stateNode := range stateNodes {
var data []byte
dc, err := cid.Decode(stateNode.CID)
if err != nil {
t.Fatal(err)
}
mhKey := dshelp.MultihashToDsKey(dc.Hash())
prefixedKey := blockstore.BlockPrefix.String() + mhKey.String()
err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey)
if err != nil {
t.Fatal(err)
}
pgStr = `SELECT * from eth.state_accounts WHERE state_id = $1`
var account models.StateAccountModel
err = db.Get(context.Background(), &account, pgStr, stateNode.ID)
if err != nil {
t.Fatal(err)
}
if stateNode.CID == state1CID.String() {
test_helpers.ExpectEqual(t, stateNode.NodeType, 2)
test_helpers.ExpectEqual(t, stateNode.StateKey, common.BytesToHash(mocks.ContractLeafKey).Hex())
test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x06'})
test_helpers.ExpectEqual(t, data, mocks.ContractLeafNode)
test_helpers.ExpectEqual(t, account, models.StateAccountModel{
ID: account.ID,
StateID: stateNode.ID,
Balance: "0",
CodeHash: mocks.ContractCodeHash.Bytes(),
StorageRoot: mocks.ContractRoot,
Nonce: 1,
})
}
if stateNode.CID == state2CID.String() {
test_helpers.ExpectEqual(t, stateNode.NodeType, 2)
test_helpers.ExpectEqual(t, stateNode.StateKey, common.BytesToHash(mocks.AccountLeafKey).Hex())
test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x0c'})
test_helpers.ExpectEqual(t, data, mocks.AccountLeafNode)
test_helpers.ExpectEqual(t, account, models.StateAccountModel{
ID: account.ID,
StateID: stateNode.ID,
Balance: "1000",
CodeHash: mocks.AccountCodeHash.Bytes(),
StorageRoot: mocks.AccountRoot,
Nonce: 0,
})
}
}
// check that Removed state nodes were properly indexed and published
stateNodes = make([]models.StateNodeModel, 0)
pgStr = `SELECT state_cids.id, state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id
FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id)
WHERE header_cids.block_number = $1 AND node_type = 3`
err = db.Select(context.Background(), &stateNodes, pgStr, mocks.BlockNumber.Uint64())
if err != nil {
t.Fatal(err)
}
test_helpers.ExpectEqual(t, len(stateNodes), 1)
stateNode := stateNodes[0]
var data []byte
dc, err := cid.Decode(stateNode.CID)
if err != nil {
t.Fatal(err)
}
mhKey := dshelp.MultihashToDsKey(dc.Hash())
prefixedKey := blockstore.BlockPrefix.String() + mhKey.String()
test_helpers.ExpectEqual(t, prefixedKey, shared.RemovedNodeMhKey)
err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey)
if err != nil {
t.Fatal(err)
}
test_helpers.ExpectEqual(t, stateNode.CID, shared.RemovedNodeStateCID)
test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x02'})
test_helpers.ExpectEqual(t, data, []byte{})
})
t.Run("Publish and index storage IPLDs in a single tx", func(t *testing.T) {
setupPGX(t)
defer tearDown(t)
// check that storage nodes were properly indexed
storageNodes := make([]models.StorageNodeWithStateKeyModel, 0)
pgStr := `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path
FROM eth.storage_cids, eth.state_cids, eth.header_cids
WHERE storage_cids.state_id = state_cids.id
AND state_cids.header_id = header_cids.id
AND header_cids.block_number = $1
AND storage_cids.node_type != 3`
err = db.Select(context.Background(), &storageNodes, pgStr, mocks.BlockNumber.Uint64())
if err != nil {
t.Fatal(err)
}
test_helpers.ExpectEqual(t, len(storageNodes), 1)
test_helpers.ExpectEqual(t, storageNodes[0], models.StorageNodeWithStateKeyModel{
CID: storageCID.String(),
NodeType: 2,
StorageKey: common.BytesToHash(mocks.StorageLeafKey).Hex(),
StateKey: common.BytesToHash(mocks.ContractLeafKey).Hex(),
Path: []byte{},
})
var data []byte
dc, err := cid.Decode(storageNodes[0].CID)
if err != nil {
t.Fatal(err)
}
mhKey := dshelp.MultihashToDsKey(dc.Hash())
prefixedKey := blockstore.BlockPrefix.String() + mhKey.String()
err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey)
if err != nil {
t.Fatal(err)
}
test_helpers.ExpectEqual(t, data, mocks.StorageLeafNode)
// check that Removed storage nodes were properly indexed
storageNodes = make([]models.StorageNodeWithStateKeyModel, 0)
pgStr = `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path
FROM eth.storage_cids, eth.state_cids, eth.header_cids
WHERE storage_cids.state_id = state_cids.id
AND state_cids.header_id = header_cids.id
AND header_cids.block_number = $1
AND storage_cids.node_type = 3`
err = db.Select(context.Background(), &storageNodes, pgStr, mocks.BlockNumber.Uint64())
if err != nil {
t.Fatal(err)
}
test_helpers.ExpectEqual(t, len(storageNodes), 1)
test_helpers.ExpectEqual(t, storageNodes[0], models.StorageNodeWithStateKeyModel{
CID: shared.RemovedNodeStorageCID,
NodeType: 3,
StorageKey: common.BytesToHash(mocks.RemovedLeafKey).Hex(),
StateKey: common.BytesToHash(mocks.ContractLeafKey).Hex(),
Path: []byte{'\x03'},
})
dc, err = cid.Decode(storageNodes[0].CID)
if err != nil {
t.Fatal(err)
}
mhKey = dshelp.MultihashToDsKey(dc.Hash())
prefixedKey = blockstore.BlockPrefix.String() + mhKey.String()
test_helpers.ExpectEqual(t, prefixedKey, shared.RemovedNodeMhKey)
err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey)
if err != nil {
t.Fatal(err)
}
test_helpers.ExpectEqual(t, data, []byte{})
})
}

View File

@ -18,18 +18,33 @@ package postgres
import (
"fmt"
"strings"
"time"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
)
// DriverType to explicity type the kind of sql driver we are using
type DriverType string
const (
PGX DriverType = "PGX"
SQLX DriverType = "SQLX"
PGX DriverType = "PGX"
SQLX DriverType = "SQLX"
Unknown DriverType = "Unknown"
)
// ResolveDriverType resolves a DriverType from a provided string
func ResolveDriverType(str string) (DriverType, error) {
switch strings.ToLower(str) {
case "pgx", "pgxpool":
return PGX, nil
case "sqlx":
return SQLX, nil
default:
return Unknown, fmt.Errorf("unrecognized driver type string: %s", str)
}
}
// DefaultConfig are default parameters for connecting to a Postgres sql
var DefaultConfig = Config{
Hostname: "localhost",
@ -64,10 +79,12 @@ type Config struct {
Driver DriverType
}
// Type satisfies interfaces.Config
func (c Config) Type() shared.DBType {
return shared.POSTGRES
}
// DbConnectionString constructs and returns the connection string from the config
func (c Config) DbConnectionString() string {
if len(c.Username) > 0 && len(c.Password) > 0 {
return fmt.Sprintf("postgresql://%s:%s@%s:%d/%s?sslmode=disable",

View File

@ -105,8 +105,8 @@ func (pgx *PGXDriver) createNode() error {
// QueryRow satisfies sql.Database
func (pgx *PGXDriver) QueryRow(ctx context.Context, sql string, args ...interface{}) sql.ScannableRow {
row := pgx.pool.QueryRow(ctx, sql, args...)
return rowWrapper{row: row}
rows, _ := pgx.pool.Query(ctx, sql, args...)
return rowsWrapper{rows: rows}
}
// Exec satisfies sql.Database
@ -160,18 +160,18 @@ func (pgx *PGXDriver) Context() context.Context {
return pgx.ctx
}
type rowWrapper struct {
row pgx.Row
type rowsWrapper struct {
rows pgx.Rows
}
// Scan satisfies sql.ScannableRow
func (r rowWrapper) Scan(dest ...interface{}) error {
return r.row.Scan(dest)
func (r rowsWrapper) Scan(dest ...interface{}) error {
return (pgx.Row)(r.rows).Scan(dest...)
}
// StructScan satisfies sql.ScannableRow
func (r rowWrapper) StructScan(dest interface{}) error {
return pgxscan.ScanRow(dest, r.row.(pgx.Rows))
func (r rowsWrapper) StructScan(dest interface{}) error {
return pgxscan.ScanRow(dest, r.rows)
}
type resultWrapper struct {
@ -234,8 +234,8 @@ type pgxTxWrapper struct {
// QueryRow satisfies sql.Tx
func (t pgxTxWrapper) QueryRow(ctx context.Context, sql string, args ...interface{}) sql.ScannableRow {
row := t.tx.QueryRow(ctx, sql, args...)
return rowWrapper{row: row}
rows, _ := t.tx.Query(ctx, sql, args...)
return rowsWrapper{rows: rows}
}
// Exec satisfies sql.Tx

View File

@ -86,15 +86,15 @@ func TestPostgresPGX(t *testing.T) {
t.Fatal(err)
}
var data pgtype.Numeric
var data pgtype.Text
err = dbPool.QueryRow(ctx, `SELECT data FROM example WHERE id = 1`).Scan(&data)
if err != nil {
t.Fatal(err)
}
test_helpers.ExpectEqual(t, bi.String(), data)
test_helpers.ExpectEqual(t, data, bi.String())
actual := new(big.Int)
actual.Set(data.Int)
actual.SetString(data.String, 10)
test_helpers.ExpectEqual(t, actual, bi)
})

View File

@ -177,8 +177,7 @@ type sqlxTxWrapper struct {
// QueryRow satisfies sql.Tx
func (t sqlxTxWrapper) QueryRow(ctx context.Context, sql string, args ...interface{}) sql.ScannableRow {
row := t.tx.QueryRow(sql, args...)
return rowWrapper{row: row}
return t.tx.QueryRowx(sql, args...)
}
// Exec satisfies sql.Tx

View File

@ -38,9 +38,8 @@ func TestPostgresSQLX(t *testing.T) {
connStr := postgres.DefaultConfig.DbConnectionString()
sqlxdb, err = sqlx.Connect("postgres", connStr)
if err != nil {
t.Fatalf("failed to connect to db with connection string: %s err: %v", pgConfig, err)
t.Fatalf("failed to connect to db with connection string: %s err: %v", pgConfig.ConnString(), err)
}
if sqlxdb == nil {
t.Fatal("DB is nil")
@ -88,7 +87,7 @@ func TestPostgresSQLX(t *testing.T) {
t.Fatal(err)
}
test_helpers.ExpectEqual(t, bi.String(), data)
test_helpers.ExpectEqual(t, data, bi.String())
actual := new(big.Int)
actual.SetString(data, 10)
test_helpers.ExpectEqual(t, actual, bi)

View File

@ -20,6 +20,9 @@ import (
"context"
"testing"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
@ -37,34 +40,38 @@ var (
legacyHeaderCID cid.Cid
)
func setupLegacy(t *testing.T) {
func setupLegacySQLX(t *testing.T) {
mockLegacyBlock = legacyData.MockBlock
legacyHeaderCID, _ = ipld.RawdataToCid(ipld.MEthHeader, legacyData.MockHeaderRlp, multihash.KECCAK_256)
db, err = test_helpers.SetupDB()
db, err = postgres.SetupSQLXDB()
require.NoError(t, err)
ind, err = sql.NewSQLIndexer(context.Background(), legacyData.Config, db)
ind, err = sql.NewStateDiffIndexer(context.Background(), legacyData.Config, db)
require.NoError(t, err)
var tx *sql.BlockTx
var tx interfaces.Batch
tx, err = ind.PushBlock(
mockLegacyBlock,
legacyData.MockReceipts,
legacyData.MockBlock.Difficulty())
require.NoError(t, err)
defer tx.Close(tx, err)
defer func() {
if err := tx.Submit(err); err != nil {
t.Fatal(err)
}
}()
for _, node := range legacyData.StateDiffs {
err = ind.PushStateNode(tx, node)
require.NoError(t, err)
}
test_helpers.ExpectEqual(t, tx.BlockNumber, legacyData.BlockNumber.Uint64())
test_helpers.ExpectEqual(t, tx.(*sql.BatchTx).BlockNumber, legacyData.BlockNumber.Uint64())
}
func TestPublishAndIndexerLegacy(t *testing.T) {
func TestSQLXIndexerLegacy(t *testing.T) {
t.Run("Publish and index header IPLDs in a legacy tx", func(t *testing.T) {
setupLegacy(t)
setupLegacySQLX(t)
defer tearDown(t)
pgStr := `SELECT cid, td, reward, id, base_fee
FROM eth.header_cids

View File

@ -32,19 +32,20 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff/indexer"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/mocks"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
"github.com/ethereum/go-ethereum/statediff/indexer/test_helpers"
)
var (
db sql.Database
err error
ind *interfaces.StateDiffIndexer
ind interfaces.StateDiffIndexer
ipfsPgGet = `SELECT data FROM public.blocks
WHERE key = $1`
tx1, tx2, tx3, tx4, tx5, rct1, rct2, rct3, rct4, rct5 []byte
@ -136,14 +137,14 @@ func init() {
storageCID, _ = ipld.RawdataToCid(ipld.MEthStorageTrie, mocks.StorageLeafNode, multihash.KECCAK_256)
}
func setup(t *testing.T) {
db, err = test_helpers.SetupDB()
func setupSQLX(t *testing.T) {
db, err = postgres.SetupSQLXDB()
if err != nil {
t.Fatal(err)
}
ind, err = indexer.NewStateDiffIndexer(context.Background(), mocks.TestConfig, db)
ind, err = sql.NewStateDiffIndexer(context.Background(), mocks.TestConfig, db)
require.NoError(t, err)
var tx *sql.BlockTx
var tx interfaces.Batch
tx, err = ind.PushBlock(
mockBlock,
mocks.MockReceipts,
@ -151,7 +152,11 @@ func setup(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer tx.Close(tx, err)
defer func() {
if err := tx.Submit(err); err != nil {
t.Fatal(err)
}
}()
for _, node := range mocks.StateDiffs {
err = ind.PushStateNode(tx, node)
if err != nil {
@ -159,7 +164,7 @@ func setup(t *testing.T) {
}
}
test_helpers.ExpectEqual(t, tx.BlockNumber, mocks.BlockNumber.Uint64())
test_helpers.ExpectEqual(t, tx.(*sql.BatchTx).BlockNumber, mocks.BlockNumber.Uint64())
}
func tearDown(t *testing.T) {
@ -169,9 +174,9 @@ func tearDown(t *testing.T) {
}
}
func TestPublishAndIndexer(t *testing.T) {
func TestSQLXIndexer(t *testing.T) {
t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) {
setup(t)
setupSQLX(t)
defer tearDown(t)
pgStr := `SELECT cid, td, reward, id, base_fee
FROM eth.header_cids
@ -208,7 +213,7 @@ func TestPublishAndIndexer(t *testing.T) {
})
t.Run("Publish and index transaction IPLDs in a single tx", func(t *testing.T) {
setup(t)
setupSQLX(t)
defer tearDown(t)
// check that txs were properly indexed
trxs := make([]string, 0)
@ -318,7 +323,7 @@ func TestPublishAndIndexer(t *testing.T) {
})
t.Run("Publish and index log IPLDs for multiple receipt of a specific block", func(t *testing.T) {
setup(t)
setupSQLX(t)
defer tearDown(t)
rcts := make([]string, 0)
@ -368,7 +373,7 @@ func TestPublishAndIndexer(t *testing.T) {
})
t.Run("Publish and index receipt IPLDs in a single tx", func(t *testing.T) {
setup(t)
setupSQLX(t)
defer tearDown(t)
// check receipts were properly indexed
@ -470,7 +475,7 @@ func TestPublishAndIndexer(t *testing.T) {
})
t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) {
setup(t)
setupSQLX(t)
defer tearDown(t)
// check that state nodes were properly indexed and published
stateNodes := make([]models.StateNodeModel, 0)
@ -548,18 +553,18 @@ func TestPublishAndIndexer(t *testing.T) {
}
mhKey := dshelp.MultihashToDsKey(dc.Hash())
prefixedKey := blockstore.BlockPrefix.String() + mhKey.String()
test_helpers.ExpectEqual(t, prefixedKey, sql.RemovedNodeMhKey)
test_helpers.ExpectEqual(t, prefixedKey, shared.RemovedNodeMhKey)
err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey)
if err != nil {
t.Fatal(err)
}
test_helpers.ExpectEqual(t, stateNode.CID, sql.RemovedNodeStateCID)
test_helpers.ExpectEqual(t, stateNode.CID, shared.RemovedNodeStateCID)
test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x02'})
test_helpers.ExpectEqual(t, data, []byte{})
})
t.Run("Publish and index storage IPLDs in a single tx", func(t *testing.T) {
setup(t)
setupSQLX(t)
defer tearDown(t)
// check that storage nodes were properly indexed
storageNodes := make([]models.StorageNodeWithStateKeyModel, 0)
@ -608,7 +613,7 @@ func TestPublishAndIndexer(t *testing.T) {
}
test_helpers.ExpectEqual(t, len(storageNodes), 1)
test_helpers.ExpectEqual(t, storageNodes[0], models.StorageNodeWithStateKeyModel{
CID: sql.RemovedNodeStorageCID,
CID: shared.RemovedNodeStorageCID,
NodeType: 3,
StorageKey: common.BytesToHash(mocks.RemovedLeafKey).Hex(),
StateKey: common.BytesToHash(mocks.ContractLeafKey).Hex(),
@ -620,7 +625,7 @@ func TestPublishAndIndexer(t *testing.T) {
}
mhKey = dshelp.MultihashToDsKey(dc.Hash())
prefixedKey = blockstore.BlockPrefix.String() + mhKey.String()
test_helpers.ExpectEqual(t, prefixedKey, sql.RemovedNodeMhKey)
test_helpers.ExpectEqual(t, prefixedKey, shared.RemovedNodeMhKey)
err = db.Get(context.Background(), &data, ipfsPgGet, prefixedKey)
if err != nil {
t.Fatal(err)

View File

@ -49,6 +49,18 @@ func TearDownDB(t *testing.T, db Database) {
if err != nil {
t.Fatal(err)
}
_, err = tx.Exec(ctx, `DELETE FROM eth.state_accounts`)
if err != nil {
t.Fatal(err)
}
_, err = tx.Exec(ctx, `DELETE FROM eth.access_list_element`)
if err != nil {
t.Fatal(err)
}
_, err = tx.Exec(ctx, `DELETE FROM eth.log_cids`)
if err != nil {
t.Fatal(err)
}
_, err = tx.Exec(ctx, `DELETE FROM blocks`)
if err != nil {
t.Fatal(err)

View File

@ -16,9 +16,28 @@
package shared
import (
"fmt"
"strings"
)
// DBType to explicitly type the kind of DB
type DBType string
const (
POSTGRES DBType = "Postgres"
DUMP DBType = "Dump"
UNKOWN DBType = "Unknown"
)
// ResolveDBType resolves a DBType from a provided string
func ResolveDBType(str string) (DBType, error) {
switch strings.ToLower(str) {
case "postgres", "pg":
return POSTGRES, nil
case "dump", "d":
return DUMP, nil
default:
return UNKOWN, fmt.Errorf("unrecognized db type string: %s", str)
}
}

View File

@ -218,7 +218,6 @@ func (lbc *BlockCache) getParentBlock(currentBlock *types.Block, bc blockChain)
type workerParams struct {
chainEventCh <-chan core.ChainEvent
errCh <-chan error
wg *sync.WaitGroup
id uint
}
@ -239,14 +238,21 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
statediffMetrics.lastEventHeight.Update(int64(chainEvent.Block.Number().Uint64()))
statediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh)))
chainEventFwd <- chainEvent
case err := <-errCh:
log.Error("Error from chain event subscription", "error", err)
close(sds.QuitChan)
case <-sds.QuitChan:
log.Info("Quitting the statediffing writing loop")
if err := sds.indexer.Close(); err != nil {
log.Error("Error closing indexer", "err", err)
}
return
}
}
}()
wg.Add(int(sds.numWorkers))
for worker := uint(0); worker < sds.numWorkers; worker++ {
params := workerParams{chainEventCh: chainEventFwd, errCh: errCh, wg: &wg, id: worker}
params := workerParams{chainEventCh: chainEventFwd, wg: &wg, id: worker}
go sds.writeLoopWorker(params)
}
wg.Wait()
@ -291,13 +297,8 @@ func (sds *Service) writeLoopWorker(params workerParams) {
}
// TODO: how to handle with concurrent workers
statediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64()))
case err := <-params.errCh:
log.Warn("Error from chain event subscription", "error", err, "worker", params.id)
sds.close()
return
case <-sds.QuitChan:
log.Info("Quitting the statediff writing process", "worker", params.id)
sds.close()
return
}
}
@ -335,11 +336,10 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) {
sds.streamStateDiff(currentBlock, parentBlock.Root())
case err := <-errCh:
log.Warn("Error from chain event subscription", "error", err)
sds.close()
return
log.Error("Error from chain event subscription", "error", err)
close(sds.QuitChan)
case <-sds.QuitChan:
log.Info("Quitting the statediffing process")
log.Info("Quitting the statediffing listening loop")
sds.close()
return
}