Updates #152

Merged
telackey merged 5 commits from schema_updates into postgres_refactor 2021-11-18 15:22:05 +00:00
17 changed files with 84 additions and 82 deletions
Showing only changes of commit bb788f71c6 - Show all commits

View File

@ -206,11 +206,7 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
}
switch dbType {
case shared.FILE:
if !ctx.GlobalIsSet(utils.StateDiffFileNodeRowIDFlag.Name) {
utils.Fatalf("In statediff file writing mode a node row ID must be provided")
}
indexerConfig = file.Config{
NodeID: int64(ctx.GlobalInt(utils.StateDiffFileNodeRowIDFlag.Name)),
FilePath: ctx.GlobalString(utils.StateDiffFilePath.Name),
}
case shared.POSTGRES:

View File

@ -167,7 +167,6 @@ var (
utils.StateDiffDBClientNameFlag,
utils.StateDiffWritingFlag,
utils.StateDiffWorkersFlag,
utils.StateDiffFileNodeRowIDFlag,
utils.StateDiffFilePath,
configFileFlag,
utils.CatalystFlag,

View File

@ -243,7 +243,6 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.StateDiffDBClientNameFlag,
utils.StateDiffWritingFlag,
utils.StateDiffWorkersFlag,
utils.StateDiffFileNodeRowIDFlag,
utils.StateDiffFilePath,
},
},

View File

@ -852,10 +852,6 @@ var (
Name: "statediff.db.nodeid",
Usage: "Node ID to use when writing state diffs to database",
}
StateDiffFileNodeRowIDFlag = cli.IntFlag{
Name: "statediff.file.nodeid",
Usage: "Node row ID to use as FK when writing state diffs to database",
}
StateDiffFilePath = cli.StringFlag{
Name: "statediff.file.path",
Usage: "Full path (including filename) to write statediff data out to when operating in file mode",

View File

@ -79,7 +79,7 @@ This service introduces a CLI flag namespace `statediff`
`--statediff` flag is used to turn on the service
`--statediff.writing` is used to tell the service to write state diff objects it produces from synced ChainEvents directly to a configured Postgres database
`--statediff.workers` is used to set the number of concurrent workers to process state diff objects and write them into the database
`--statediff.db.type` is the type of database we write out to (current options: postgres and dump)
`--statediff.db.type` is the type of database we write out to (current options: postgres, dump, file)
`--statediff.dump.dst` is the destination to write to when operating in database dump mode (stdout, stderr, discard)
`--statediff.db.driver` is the specific driver to use for the database (current options for postgres: pgx and sqlx)
`--statediff.db.host` is the hostname/ip to dial to connect to the database
@ -95,6 +95,7 @@ This service introduces a CLI flag namespace `statediff`
`--statediff.db.maxconnlifetime` is the maximum lifetime for a connection (in seconds)
`--statediff.db.nodeid` is the node id to use in the Postgres database
`--statediff.db.clientname` is the client name to use in the Postgres database
`--statediff.file.path` full path (including filename) to write statediff data out to when operating in file mode
The service can only operate in full sync mode (`--syncmode=full`), but only the historical RPC endpoints require an archive node (`--gcmode=archive`)

View File

@ -38,6 +38,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, n
if !ok {
return nil, fmt.Errorf("file config is not the correct type: got %T, expected %T", config, file.Config{})
}
fc.NodeInfo = nodeInfo
return file.NewStateDiffIndexer(ctx, chainConfig, fc)
case shared.POSTGRES:
pgc, ok := config.(postgres.Config)

View File

@ -17,13 +17,14 @@
package file
import (
"github.com/ethereum/go-ethereum/statediff/indexer/node"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
)
// Config holds params for writing sql statements out to a file
type Config struct {
NodeID int64 // this is the nodeID used as FK in eth.header_cids
FilePath string
NodeInfo node.Info
}
// Type satisfies interfaces.Config

View File

@ -55,7 +55,7 @@ var (
type StateDiffIndexer struct {
writer *SQLWriter
chainConfig *params.ChainConfig
nodeID int64
nodeID string
wg *sync.WaitGroup
}
@ -78,7 +78,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c
return &StateDiffIndexer{
writer: w,
chainConfig: chainConfig,
nodeID: config.NodeID,
nodeID: config.NodeInfo.ID,
wg: wg,
}, nil
}

View File

@ -28,6 +28,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
)
var (
@ -102,15 +103,18 @@ func (sqw *SQLWriter) flush() error {
}
const (
nodeInsert = `INSERT INTO nodes (genesis_block, network_id, node_id, client_name, chain_id) VALUES (%s, %s, %s, %s, %d)
ON CONFLICT (node_id) DO NOTHING;\n`
ipldInsert = `INSERT INTO public.blocks (key, data) VALUES (%s, %x) ON CONFLICT (key) DO NOTHING;\n`
headerInsert = `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee)
VALUES (%s, %s, %s, %s, %s, %d, %s, %s, %s, %s, %s, %s, %d, %s, %d, %d)
ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = (%s, %s, %s, %d, %s, %s, %s, %s, %s, %s, %d, %s, eth.header_cids.times_validated + 1, %d);\n`
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d, %s, %d, %d)
ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d, %s, eth.header_cids.times_validated + 1, %d);\n`
headerInsertWithoutBaseFee = `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee)
VALUES (%s, %s, %s, %s, %s, %d, %s, %s, %s, %s, %s, %s, %d, %s, %d, NULL)
ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = (%s, %s, %s, %d, %s, %s, %s, %s, %s, %s, %d, %s, eth.header_cids.times_validated + 1, NULL);\n`
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d, %s, %d, NULL)
ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %d, %s, eth.header_cids.times_validated + 1, NULL);\n`
uncleInsert = `INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (block_hash) DO NOTHING;\n`
@ -137,6 +141,12 @@ ON CONFLICT (header_id, state_path) DO NOTHING;\n`
ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = (%s, %s, %d, %t, %s);\n`
)
// ON CONFLICT (node_id) DO UPDATE SET genesis_block = %s, network_id = %s, client_name = %s, chain_id = %s;\n`
func (sqw *SQLWriter) upsertNode(node nodeinfo.Info) {
sqw.stmts <- []byte(fmt.Sprintf(nodeInsert, node.GenesisBlock, node.NetworkID, node.ID, node.ClientName, node.ChainID))
}
func (sqw *SQLWriter) upsertIPLD(ipld models.IPLDModel) {
sqw.stmts <- []byte(fmt.Sprintf(ipldInsert, ipld.Key, ipld.Data))
}

View File

@ -36,7 +36,7 @@ type Driver interface {
Get(ctx context.Context, dest interface{}, query string, args ...interface{}) error
Begin(ctx context.Context) (Tx, error)
Stats() Stats
NodeID() int64
NodeID() string
Context() context.Context
io.Closer
}

View File

@ -26,7 +26,6 @@ import (
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help"
"github.com/jmoiron/sqlx"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
@ -153,7 +152,7 @@ func TestPGXIndexer(t *testing.T) {
t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) {
setupPGX(t)
defer tearDown(t)
pgStr := `SELECT cid, td, reward, block_hash, base_fee
pgStr := `SELECT cid, cast(td AS TEXT), cast(reward AS TEXT), block_hash, base_fee
FROM eth.header_cids
WHERE block_number = $1`
// check header was properly indexed
@ -165,7 +164,12 @@ func TestPGXIndexer(t *testing.T) {
BaseFee *int64 `db:"base_fee"`
}
header := new(res)
err = db.QueryRow(context.Background(), pgStr, mocks.BlockNumber.Uint64()).(*sqlx.Row).StructScan(header)
err = db.QueryRow(context.Background(), pgStr, mocks.BlockNumber.Uint64()).Scan(
&header.CID,
&header.TD,
&header.Reward,
&header.BlockHash,
&header.BaseFee)
if err != nil {
t.Fatal(err)
}
@ -221,43 +225,43 @@ func TestPGXIndexer(t *testing.T) {
switch c {
case trx1CID.String():
test_helpers.ExpectEqual(t, data, tx1)
var txType *uint8
var txType uint8
err = db.Get(context.Background(), &txType, txTypePgStr, c)
if err != nil {
t.Fatal(err)
}
if txType != nil {
t.Fatalf("expected nil tx_type, got %d", *txType)
if txType != 0 {
t.Fatalf("expected tx_type 0, got %d", txType)
}
case trx2CID.String():
test_helpers.ExpectEqual(t, data, tx2)
var txType *uint8
var txType uint8
err = db.Get(context.Background(), &txType, txTypePgStr, c)
if err != nil {
t.Fatal(err)
}
if txType != nil {
t.Fatalf("expected nil tx_type, got %d", *txType)
if txType != 0 {
t.Fatalf("expected tx_type 0, got %d", txType)
}
case trx3CID.String():
test_helpers.ExpectEqual(t, data, tx3)
var txType *uint8
var txType uint8
err = db.Get(context.Background(), &txType, txTypePgStr, c)
if err != nil {
t.Fatal(err)
}
if txType != nil {
t.Fatalf("expected nil tx_type, got %d", *txType)
if txType != 0 {
t.Fatalf("expected tx_type 0, got %d", txType)
}
case trx4CID.String():
test_helpers.ExpectEqual(t, data, tx4)
var txType *uint8
var txType uint8
err = db.Get(context.Background(), &txType, txTypePgStr, c)
if err != nil {
t.Fatal(err)
}
if *txType != types.AccessListTxType {
t.Fatalf("expected AccessListTxType (1), got %d", *txType)
if txType != types.AccessListTxType {
t.Fatalf("expected AccessListTxType (1), got %d", txType)
}
accessListElementModels := make([]models.AccessListElementModel, 0)
pgStr = `SELECT access_list_element.* FROM eth.access_list_element INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.tx_hash) WHERE cid = $1 ORDER BY access_list_element.index ASC`
@ -467,7 +471,7 @@ func TestPGXIndexer(t *testing.T) {
if err != nil {
t.Fatal(err)
}
pgStr = `SELECT * from eth.state_accounts WHERE header_id = $1 AND state_path = $2`
pgStr = `SELECT header_id, state_path, cast(balance AS TEXT), nonce, code_hash, storage_root from eth.state_accounts WHERE header_id = $1 AND state_path = $2`
var account models.StateAccountModel
err = db.Get(context.Background(), &account, pgStr, stateNode.HeaderID, stateNode.Path)
if err != nil {

View File

@ -30,7 +30,6 @@ type DriverType string
const (
PGX DriverType = "PGX"
SQLX DriverType = "SQLX"
FILE DriverType = "File"
Unknown DriverType = "Unknown"
)
@ -41,8 +40,6 @@ func ResolveDriverType(str string) (DriverType, error) {
return PGX, nil
case "sqlx":
return SQLX, nil
case "file":
return FILE, nil
default:
return Unknown, fmt.Errorf("unrecognized driver type string: %s", str)
}
@ -52,9 +49,9 @@ func ResolveDriverType(str string) (DriverType, error) {
var DefaultConfig = Config{
Hostname: "localhost",
Port: 5432,
DatabaseName: "vulcanize_test",
DatabaseName: "vulcanize_testing",
Username: "postgres",
Password: "",
Password: "password",
}
// Config holds params for a Postgres db

View File

@ -22,14 +22,7 @@ var _ sql.Database = &DB{}
const (
createNodeStm = `INSERT INTO nodes (genesis_block, network_id, node_id, client_name, chain_id) VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (genesis_block, network_id, node_id, chain_id)
DO UPDATE
SET genesis_block = $1,
network_id = $2,
node_id = $3,
client_name = $4,
chain_id = $5
RETURNING id`
ON CONFLICT (node_id) DO NOTHING`
)
// NewPostgresDB returns a postgres.DB using the provided driver
@ -37,7 +30,7 @@ func NewPostgresDB(driver sql.Driver) *DB {
return &DB{driver}
}
// DB implements sql.Databse using a configured driver and Postgres statement syntax
// DB implements sql.Database using a configured driver and Postgres statement syntax
type DB struct {
sql.Driver
}

View File

@ -34,7 +34,7 @@ type PGXDriver struct {
ctx context.Context
pool *pgxpool.Pool
nodeInfo node.Info
nodeID int64
nodeID string
}
// NewPGXDriver returns a new pgx driver
@ -89,17 +89,16 @@ func MakeConfig(config Config) (*pgxpool.Config, error) {
}
func (pgx *PGXDriver) createNode() error {
var nodeID int64
err := pgx.pool.QueryRow(
_, err := pgx.pool.Exec(
pgx.ctx,
createNodeStm,
pgx.nodeInfo.GenesisBlock, pgx.nodeInfo.NetworkID,
pgx.nodeInfo.ID, pgx.nodeInfo.ClientName,
pgx.nodeInfo.ChainID).Scan(&nodeID)
pgx.nodeInfo.ChainID)
if err != nil {
return ErrUnableToSetNode(err)
}
pgx.nodeID = nodeID
pgx.nodeID = pgx.nodeInfo.ID
return nil
}
@ -139,7 +138,7 @@ func (pgx *PGXDriver) Stats() sql.Stats {
}
// NodeID satisfies sql.Database
func (pgx *PGXDriver) NodeID() int64 {
func (pgx *PGXDriver) NodeID() string {
return pgx.nodeID
}

View File

@ -32,7 +32,7 @@ type SQLXDriver struct {
ctx context.Context
db *sqlx.DB
nodeInfo node.Info
nodeID int64
nodeID string
}
// NewSQLXDriver returns a new sqlx driver for Postgres
@ -60,16 +60,15 @@ func NewSQLXDriver(ctx context.Context, config Config, node node.Info) (*SQLXDri
}
func (driver *SQLXDriver) createNode() error {
var nodeID int64
err := driver.db.QueryRowx(
_, err := driver.db.Exec(
createNodeStm,
driver.nodeInfo.GenesisBlock, driver.nodeInfo.NetworkID,
driver.nodeInfo.ID, driver.nodeInfo.ClientName,
driver.nodeInfo.ChainID).Scan(&nodeID)
driver.nodeInfo.ChainID)
if err != nil {
return ErrUnableToSetNode(err)
}
driver.nodeID = nodeID
driver.nodeID = driver.nodeInfo.ID
return nil
}
@ -108,7 +107,7 @@ func (driver *SQLXDriver) Stats() sql.Stats {
}
// NodeID satisfies sql.Database
func (driver *SQLXDriver) NodeID() int64 {
func (driver *SQLXDriver) NodeID() string {
return driver.nodeID
}

View File

@ -32,7 +32,7 @@ type HeaderModel struct {
CID string `db:"cid"`
MhKey string `db:"mh_key"`
TotalDifficulty string `db:"td"`
NodeID int64 `db:"node_id"`
NodeID string `db:"node_id"`
Reward string `db:"reward"`
StateRoot string `db:"state_root"`
UncleRoot string `db:"uncle_root"`

View File

@ -24,6 +24,7 @@ import (
"sort"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
@ -150,10 +151,10 @@ func testSubscriptionAPI(t *testing.T) {
id := rpc.NewID()
payloadChan := make(chan statediff.Payload)
quitChan := make(chan bool)
mockService.Subscribe(id, payloadChan, quitChan, params)
blockChan <- block1
parentBlockChain <- block0
wg := new(sync.WaitGroup)
go func() {
wg.Add(1)
defer wg.Done()
sort.Slice(expectedStateDiffBytes, func(i, j int) bool { return expectedStateDiffBytes[i] < expectedStateDiffBytes[j] })
select {
case payload := <-payloadChan:
@ -173,6 +174,12 @@ func testSubscriptionAPI(t *testing.T) {
case <-quitChan:
t.Errorf("channel quit before delivering payload")
}
}()
time.Sleep(1)
mockService.Subscribe(id, payloadChan, quitChan, params)
blockChan <- block1
parentBlockChain <- block0
wg.Wait()
}
func testHTTPAPI(t *testing.T) {