schema pkg: support postgres inserts, refactor into shared

This commit is contained in:
Roy Crihfield 2023-03-20 14:21:13 +08:00
parent cda966a518
commit d65e742b12
7 changed files with 403 additions and 285 deletions

View File

@ -28,8 +28,8 @@ import (
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/statediff/indexer/database/file"
"github.com/ethereum/go-ethereum/statediff/indexer/database/file/types"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/shared/schema"
"github.com/ethereum/go-ethereum/statediff/indexer/test"
"github.com/ethereum/go-ethereum/statediff/indexer/test_helpers"
)
@ -90,7 +90,7 @@ func resetAndDumpWatchedAddressesCSVFileData(t *testing.T) {
test_helpers.TearDownDB(t, db)
outputFilePath := filepath.Join(dbDirectory, file.CSVTestConfig.WatchedAddressesFilePath)
stmt := fmt.Sprintf(pgCopyStatement, types.TableWatchedAddresses.Name, outputFilePath)
stmt := fmt.Sprintf(pgCopyStatement, schema.TableWatchedAddresses.Name, outputFilePath)
_, err = db.Exec(context.Background(), stmt)
require.NoError(t, err)

View File

@ -28,29 +28,29 @@ import (
"github.com/thoas/go-funk"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/statediff/indexer/database/file/types"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
"github.com/ethereum/go-ethereum/statediff/indexer/shared/schema"
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
)
var (
Tables = []*types.Table{
&types.TableIPLDBlock,
&types.TableNodeInfo,
&types.TableHeader,
&types.TableStateNode,
&types.TableStorageNode,
&types.TableUncle,
&types.TableTransaction,
&types.TableReceipt,
&types.TableLog,
Tables = []*schema.Table{
&schema.TableIPLDBlock,
&schema.TableNodeInfo,
&schema.TableHeader,
&schema.TableStateNode,
&schema.TableStorageNode,
&schema.TableUncle,
&schema.TableTransaction,
&schema.TableReceipt,
&schema.TableLog,
}
)
type tableRow struct {
table types.Table
table schema.Table
values []interface{}
}
@ -90,7 +90,7 @@ func newFileWriter(path string) (ret fileWriter, err error) {
return
}
func makeFileWriters(dir string, tables []*types.Table) (fileWriters, error) {
func makeFileWriters(dir string, tables []*schema.Table) (fileWriters, error) {
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, err
}
@ -105,7 +105,7 @@ func makeFileWriters(dir string, tables []*types.Table) (fileWriters, error) {
return writers, nil
}
func (tx fileWriters) write(tbl *types.Table, args ...interface{}) error {
func (tx fileWriters) write(tbl *schema.Table, args ...interface{}) error {
row := tbl.ToCsvRow(args...)
return tx[tbl.Name].Write(row)
}
@ -204,13 +204,13 @@ func (csw *CSVWriter) Close() error {
func (csw *CSVWriter) upsertNode(node nodeinfo.Info) {
var values []interface{}
values = append(values, node.GenesisBlock, node.NetworkID, node.ID, node.ClientName, node.ChainID)
csw.rows <- tableRow{types.TableNodeInfo, values}
csw.rows <- tableRow{schema.TableNodeInfo, values}
}
func (csw *CSVWriter) upsertIPLD(ipld models.IPLDModel) {
var values []interface{}
values = append(values, ipld.BlockNumber, ipld.Key, ipld.Data)
csw.rows <- tableRow{types.TableIPLDBlock, values}
csw.rows <- tableRow{schema.TableIPLDBlock, values}
}
func (csw *CSVWriter) upsertIPLDDirect(blockNumber, key string, value []byte) {
@ -234,7 +234,7 @@ func (csw *CSVWriter) upsertHeaderCID(header models.HeaderModel) {
values = append(values, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID,
header.TotalDifficulty, header.NodeIDs, header.Reward, header.StateRoot, header.TxRoot,
header.RctRoot, header.UnclesHash, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.Coinbase)
csw.rows <- tableRow{types.TableHeader, values}
csw.rows <- tableRow{schema.TableHeader, values}
indexerMetrics.blocks.Inc(1)
}
@ -242,14 +242,14 @@ func (csw *CSVWriter) upsertUncleCID(uncle models.UncleModel) {
var values []interface{}
values = append(values, uncle.BlockNumber, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID,
uncle.Reward, uncle.Index)
csw.rows <- tableRow{types.TableUncle, values}
csw.rows <- tableRow{schema.TableUncle, values}
}
func (csw *CSVWriter) upsertTransactionCID(transaction models.TxModel) {
var values []interface{}
values = append(values, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst,
transaction.Src, transaction.Index, transaction.Type, transaction.Value)
csw.rows <- tableRow{types.TableTransaction, values}
csw.rows <- tableRow{schema.TableTransaction, values}
indexerMetrics.transactions.Inc(1)
}
@ -257,7 +257,7 @@ func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) {
var values []interface{}
values = append(values, rct.BlockNumber, rct.HeaderID, rct.TxID, rct.CID, rct.Contract,
rct.PostState, rct.PostStatus)
csw.rows <- tableRow{types.TableReceipt, values}
csw.rows <- tableRow{schema.TableReceipt, values}
indexerMetrics.receipts.Inc(1)
}
@ -266,7 +266,7 @@ func (csw *CSVWriter) upsertLogCID(logs []*models.LogsModel) {
var values []interface{}
values = append(values, l.BlockNumber, l.HeaderID, l.CID, l.ReceiptID, l.Address, l.Index, l.Topic0,
l.Topic1, l.Topic2, l.Topic3)
csw.rows <- tableRow{types.TableLog, values}
csw.rows <- tableRow{schema.TableLog, values}
indexerMetrics.logs.Inc(1)
}
}
@ -280,7 +280,7 @@ func (csw *CSVWriter) upsertStateCID(stateNode models.StateNodeModel) {
var values []interface{}
values = append(values, stateNode.BlockNumber, stateNode.HeaderID, stateKey, stateNode.CID,
true, stateNode.Balance, strconv.FormatUint(stateNode.Nonce, 10), stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed)
csw.rows <- tableRow{types.TableStateNode, values}
csw.rows <- tableRow{schema.TableStateNode, values}
}
func (csw *CSVWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
@ -292,7 +292,7 @@ func (csw *CSVWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
var values []interface{}
values = append(values, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageKey, storageCID.CID,
true, storageCID.Value, storageCID.Removed)
csw.rows <- tableRow{types.TableStorageNode, values}
csw.rows <- tableRow{schema.TableStorageNode, values}
}
// LoadWatchedAddresses loads watched addresses from a file
@ -332,7 +332,7 @@ func (csw *CSVWriter) insertWatchedAddresses(args []sdtypes.WatchAddressArg, cur
var values []interface{}
values = append(values, arg.Address, strconv.FormatUint(arg.CreatedAt, 10), currentBlockNumber.String(), "0")
row := types.TableWatchedAddresses.ToCsvRow(values...)
row := schema.TableWatchedAddresses.ToCsvRow(values...)
// writing directly instead of using rows channel as it needs to be flushed immediately
err = csw.watchedAddressesWriter.Write(row)
@ -375,7 +375,7 @@ func (csw *CSVWriter) removeWatchedAddresses(args []sdtypes.WatchAddressArg) err
func (csw *CSVWriter) setWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error {
var rows [][]string
for _, arg := range args {
row := types.TableWatchedAddresses.ToCsvRow(arg.Address, strconv.FormatUint(arg.CreatedAt, 10), currentBlockNumber.String(), "0")
row := schema.TableWatchedAddresses.ToCsvRow(arg.Address, strconv.FormatUint(arg.CreatedAt, 10), currentBlockNumber.String(), "0")
rows = append(rows, row)
}

View File

@ -1,154 +0,0 @@
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package types
var TableIPLDBlock = Table{
`ipld.blocks`,
[]column{
{name: "block_number", dbType: bigint},
{name: "key", dbType: text},
{name: "data", dbType: bytea},
},
}
var TableNodeInfo = Table{
Name: `public.nodes`,
Columns: []column{
{name: "genesis_block", dbType: varchar},
{name: "network_id", dbType: varchar},
{name: "node_id", dbType: varchar},
{name: "client_name", dbType: varchar},
{name: "chain_id", dbType: integer},
},
}
var TableHeader = Table{
"eth.header_cids",
[]column{
{name: "block_number", dbType: bigint},
{name: "block_hash", dbType: varchar},
{name: "parent_hash", dbType: varchar},
{name: "cid", dbType: text},
{name: "td", dbType: numeric},
{name: "node_ids", dbType: varchar, isArray: true},
{name: "reward", dbType: numeric},
{name: "state_root", dbType: varchar},
{name: "tx_root", dbType: varchar},
{name: "receipt_root", dbType: varchar},
{name: "uncles_hash", dbType: varchar},
{name: "bloom", dbType: bytea},
{name: "timestamp", dbType: numeric},
{name: "coinbase", dbType: varchar},
},
}
var TableStateNode = Table{
"eth.state_cids",
[]column{
{name: "block_number", dbType: bigint},
{name: "header_id", dbType: varchar},
{name: "state_leaf_key", dbType: varchar},
{name: "cid", dbType: text},
{name: "removed", dbType: boolean},
{name: "diff", dbType: boolean},
{name: "balance", dbType: numeric},
{name: "nonce", dbType: bigint},
{name: "code_hash", dbType: varchar},
{name: "storage_root", dbType: varchar},
},
}
var TableStorageNode = Table{
"eth.storage_cids",
[]column{
{name: "block_number", dbType: bigint},
{name: "header_id", dbType: varchar},
{name: "state_leaf_key", dbType: varchar},
{name: "storage_leaf_key", dbType: varchar},
{name: "cid", dbType: text},
{name: "removed", dbType: boolean},
{name: "diff", dbType: boolean},
{name: "val", dbType: bytea},
},
}
var TableUncle = Table{
"eth.uncle_cids",
[]column{
{name: "block_number", dbType: bigint},
{name: "block_hash", dbType: varchar},
{name: "header_id", dbType: varchar},
{name: "parent_hash", dbType: varchar},
{name: "cid", dbType: text},
{name: "reward", dbType: numeric},
{name: "index", dbType: integer},
},
}
var TableTransaction = Table{
"eth.transaction_cids",
[]column{
{name: "block_number", dbType: bigint},
{name: "header_id", dbType: varchar},
{name: "tx_hash", dbType: varchar},
{name: "cid", dbType: text},
{name: "dst", dbType: varchar},
{name: "src", dbType: varchar},
{name: "index", dbType: integer},
{name: "tx_type", dbType: integer},
{name: "value", dbType: numeric},
},
}
var TableReceipt = Table{
"eth.receipt_cids",
[]column{
{name: "block_number", dbType: bigint},
{name: "header_id", dbType: varchar},
{name: "tx_id", dbType: varchar},
{name: "cid", dbType: text},
{name: "contract", dbType: varchar},
{name: "post_state", dbType: varchar},
{name: "post_status", dbType: integer},
},
}
var TableLog = Table{
"eth.log_cids",
[]column{
{name: "block_number", dbType: bigint},
{name: "header_id", dbType: varchar},
{name: "cid", dbType: text},
{name: "rct_id", dbType: varchar},
{name: "address", dbType: varchar},
{name: "index", dbType: integer},
{name: "topic0", dbType: varchar},
{name: "topic1", dbType: varchar},
{name: "topic2", dbType: varchar},
{name: "topic3", dbType: varchar},
},
}
var TableWatchedAddresses = Table{
"eth_meta.watched_addresses",
[]column{
{name: "address", dbType: varchar},
{name: "created_at", dbType: bigint},
{name: "watched_at", dbType: bigint},
{name: "last_filled_at", dbType: bigint},
},
}

View File

@ -1,104 +0,0 @@
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package types
import (
"fmt"
"strings"
"github.com/thoas/go-funk"
)
type colType int
const (
integer colType = iota
boolean
bigint
numeric
bytea
varchar
text
)
type column struct {
name string
dbType colType
isArray bool
}
type Table struct {
Name string
Columns []column
}
func (tbl *Table) ToCsvRow(args ...interface{}) []string {
var row []string
for i, col := range tbl.Columns {
value := col.dbType.formatter()(args[i])
if col.isArray {
valueList := funk.Map(args[i], col.dbType.formatter()).([]string)
value = fmt.Sprintf("{%s}", strings.Join(valueList, ","))
}
row = append(row, value)
}
return row
}
func (tbl *Table) VarcharColumns() []string {
columns := funk.Filter(tbl.Columns, func(col column) bool {
return col.dbType == varchar
}).([]column)
columnNames := funk.Map(columns, func(col column) string {
return col.name
}).([]string)
return columnNames
}
type colfmt = func(interface{}) string
func sprintf(f string) colfmt {
return func(x interface{}) string { return fmt.Sprintf(f, x) }
}
func (typ colType) formatter() colfmt {
switch typ {
case integer:
return sprintf("%d")
case boolean:
return func(x interface{}) string {
if x.(bool) {
return "t"
}
return "f"
}
case bigint:
return sprintf("%s")
case numeric:
return sprintf("%s")
case bytea:
return sprintf(`\x%x`)
case varchar:
return sprintf("%s")
case text:
return sprintf("%s")
}
panic("unreachable")
}

View File

@ -0,0 +1,173 @@
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package schema
var TableIPLDBlock = Table{
Name: `ipld.blocks`,
Columns: []Column{
{Name: "block_number", Type: Dbigint},
{Name: "key", Type: Dtext},
{Name: "data", Type: Dbytea},
},
}
var TableNodeInfo = Table{
Name: `public.nodes`,
Columns: []Column{
{Name: "genesis_block", Type: Dvarchar},
{Name: "network_id", Type: Dvarchar},
{Name: "node_id", Type: Dvarchar},
{Name: "client_name", Type: Dvarchar},
{Name: "chain_id", Type: Dinteger},
},
}
var TableHeader = Table{
Name: "eth.header_cids",
Columns: []Column{
{Name: "block_number", Type: Dbigint},
{Name: "block_hash", Type: Dvarchar},
{Name: "parent_hash", Type: Dvarchar},
{Name: "cid", Type: Dtext},
{Name: "td", Type: Dnumeric},
{Name: "node_ids", Type: Dvarchar, Array: true},
{Name: "reward", Type: Dnumeric},
{Name: "state_root", Type: Dvarchar},
{Name: "tx_root", Type: Dvarchar},
{Name: "receipt_root", Type: Dvarchar},
{Name: "uncles_hash", Type: Dvarchar},
{Name: "bloom", Type: Dbytea},
{Name: "timestamp", Type: Dnumeric},
{Name: "coinbase", Type: Dvarchar},
},
UpsertClause: OnConflict("block_number", "block_hash").Set(
"parent_hash",
"cid",
"td",
"node_ids",
"reward",
"state_root",
"tx_root",
"receipt_root",
"uncles_hash",
"bloom",
"timestamp",
"coinbase",
)}
var TableStateNode = Table{
Name: "eth.state_cids",
Columns: []Column{
{Name: "block_number", Type: Dbigint},
{Name: "header_id", Type: Dvarchar},
{Name: "state_leaf_key", Type: Dvarchar},
{Name: "cid", Type: Dtext},
{Name: "diff", Type: Dboolean},
{Name: "balance", Type: Dnumeric},
{Name: "nonce", Type: Dbigint},
{Name: "code_hash", Type: Dvarchar},
{Name: "storage_root", Type: Dvarchar},
{Name: "removed", Type: Dboolean},
},
UpsertClause: OnConflict("block_number", "header_id", "state_leaf_key"),
}
var TableStorageNode = Table{
Name: "eth.storage_cids",
Columns: []Column{
{Name: "block_number", Type: Dbigint},
{Name: "header_id", Type: Dvarchar},
{Name: "state_leaf_key", Type: Dvarchar},
{Name: "storage_leaf_key", Type: Dvarchar},
{Name: "cid", Type: Dtext},
{Name: "diff", Type: Dboolean},
{Name: "val", Type: Dbytea},
{Name: "removed", Type: Dboolean},
},
UpsertClause: OnConflict("block_number", "header_id", "state_leaf_key", "storage_leaf_key"),
}
var TableUncle = Table{
Name: "eth.uncle_cids",
Columns: []Column{
{Name: "block_number", Type: Dbigint},
{Name: "block_hash", Type: Dvarchar},
{Name: "header_id", Type: Dvarchar},
{Name: "parent_hash", Type: Dvarchar},
{Name: "cid", Type: Dtext},
{Name: "reward", Type: Dnumeric},
{Name: "index", Type: Dinteger},
},
UpsertClause: OnConflict("block_number", "block_hash"),
}
var TableTransaction = Table{
Name: "eth.transaction_cids",
Columns: []Column{
{Name: "block_number", Type: Dbigint},
{Name: "header_id", Type: Dvarchar},
{Name: "tx_hash", Type: Dvarchar},
{Name: "cid", Type: Dtext},
{Name: "dst", Type: Dvarchar},
{Name: "src", Type: Dvarchar},
{Name: "index", Type: Dinteger},
{Name: "tx_type", Type: Dinteger},
{Name: "value", Type: Dnumeric},
},
UpsertClause: OnConflict("block_number", "header_id", "tx_hash"),
}
var TableReceipt = Table{
Name: "eth.receipt_cids",
Columns: []Column{
{Name: "block_number", Type: Dbigint},
{Name: "header_id", Type: Dvarchar},
{Name: "tx_id", Type: Dvarchar},
{Name: "cid", Type: Dtext},
{Name: "contract", Type: Dvarchar},
{Name: "post_state", Type: Dvarchar},
{Name: "post_status", Type: Dinteger},
},
UpsertClause: OnConflict("block_number", "header_id", "tx_id"),
}
var TableLog = Table{
Name: "eth.log_cids",
Columns: []Column{
{Name: "block_number", Type: Dbigint},
{Name: "header_id", Type: Dvarchar},
{Name: "cid", Type: Dtext},
{Name: "rct_id", Type: Dvarchar},
{Name: "address", Type: Dvarchar},
{Name: "index", Type: Dinteger},
{Name: "topic0", Type: Dvarchar},
{Name: "topic1", Type: Dvarchar},
{Name: "topic2", Type: Dvarchar},
{Name: "topic3", Type: Dvarchar},
},
UpsertClause: OnConflict("block_number", "header_id", "rct_id", "index"),
}
var TableWatchedAddresses = Table{
Name: "eth_meta.watched_addresses",
Columns: []Column{
{Name: "address", Type: Dvarchar},
{Name: "created_at", Type: Dbigint},
{Name: "watched_at", Type: Dbigint},
{Name: "last_filled_at", Type: Dbigint},
},
}

View File

@ -0,0 +1,147 @@
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package schema
import (
"fmt"
"strings"
"github.com/thoas/go-funk"
)
type colType int
const (
Dinteger colType = iota
Dboolean
Dbigint
Dnumeric
Dbytea
Dvarchar
Dtext
)
type ConflictClause struct {
Target []string
Update []string
}
type Column struct {
Name string
Type colType
Array bool
}
type Table struct {
Name string
Columns []Column
UpsertClause ConflictClause
}
type colfmt = func(interface{}) string
func (tbl *Table) ToCsvRow(args ...interface{}) []string {
var row []string
for i, col := range tbl.Columns {
value := col.Type.formatter()(args[i])
if col.Array {
valueList := funk.Map(args[i], col.Type.formatter()).([]string)
value = fmt.Sprintf("{%s}", strings.Join(valueList, ","))
}
row = append(row, value)
}
return row
}
func (tbl *Table) VarcharColumns() []string {
columns := funk.Filter(tbl.Columns, func(col Column) bool {
return col.Type == Dvarchar
}).([]Column)
columnNames := funk.Map(columns, func(col Column) string {
return col.Name
}).([]string)
return columnNames
}
func OnConflict(target ...string) ConflictClause {
return ConflictClause{Target: target}
}
func (c ConflictClause) Set(fields ...string) ConflictClause {
c.Update = fields
return c
}
// ToInsertStatement returns a Postgres-compatible SQL insert statement for the table
// using positional placeholders
func (tbl *Table) ToInsertStatement(upsert bool) string {
var colnames, placeholders []string
for i, col := range tbl.Columns {
colnames = append(colnames, col.Name)
placeholders = append(placeholders, fmt.Sprintf("$%d", i+1))
}
suffix := fmt.Sprintf("ON CONFLICT (%s)", strings.Join(tbl.UpsertClause.Target, ", "))
if upsert && len(tbl.UpsertClause.Update) != 0 {
var update_placeholders []string
for _, name := range tbl.UpsertClause.Update {
i := funk.IndexOf(tbl.Columns, func(col Column) bool { return col.Name == name })
update_placeholders = append(update_placeholders, fmt.Sprintf("$%d", i+1))
}
suffix += fmt.Sprintf(
" DO UPDATE SET (%s) = (%s)",
strings.Join(tbl.UpsertClause.Update, ", "), strings.Join(update_placeholders, ", "),
)
} else {
suffix += " DO NOTHING"
}
return fmt.Sprintf(
"INSERT INTO %s (%s) VALUES (%s) %s",
tbl.Name, strings.Join(colnames, ", "), strings.Join(placeholders, ", "), suffix,
)
}
func sprintf(f string) colfmt {
return func(x interface{}) string { return fmt.Sprintf(f, x) }
}
func (typ colType) formatter() colfmt {
switch typ {
case Dinteger:
return sprintf("%d")
case Dboolean:
return func(x interface{}) string {
if x.(bool) {
return "t"
}
return "f"
}
case Dbigint:
return sprintf("%s")
case Dnumeric:
return sprintf("%s")
case Dbytea:
return sprintf(`\x%x`)
case Dvarchar:
return sprintf("%s")
case Dtext:
return sprintf("%s")
}
panic("unreachable")
}

View File

@ -0,0 +1,56 @@
package schema_test
import (
"testing"
"github.com/stretchr/testify/require"
. "github.com/ethereum/go-ethereum/statediff/indexer/shared/schema"
)
var testHeaderTable = Table{
Name: "eth.header_cids",
Columns: []Column{
{Name: "block_number", Type: Dbigint},
{Name: "block_hash", Type: Dvarchar},
{Name: "parent_hash", Type: Dvarchar},
{Name: "cid", Type: Dtext},
{Name: "td", Type: Dnumeric},
{Name: "node_id", Type: Dvarchar},
{Name: "reward", Type: Dnumeric},
{Name: "state_root", Type: Dvarchar},
{Name: "tx_root", Type: Dvarchar},
{Name: "receipt_root", Type: Dvarchar},
{Name: "uncle_root", Type: Dvarchar},
{Name: "bloom", Type: Dbytea},
{Name: "timestamp", Type: Dnumeric},
{Name: "mh_key", Type: Dtext},
{Name: "times_validated", Type: Dinteger},
{Name: "coinbase", Type: Dvarchar},
},
UpsertClause: OnConflict("block_hash", "block_number").Set(
"parent_hash",
"cid",
"td",
"node_id",
"reward",
"state_root",
"tx_root",
"receipt_root",
"uncle_root",
"bloom",
"timestamp",
"mh_key",
"times_validated",
"coinbase",
)}
func TestTable(t *testing.T) {
headerUpsert := `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (block_hash, block_number) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)`
headerNoUpsert := `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (block_hash, block_number) DO NOTHING`
require.Equal(t, headerNoUpsert, testHeaderTable.ToInsertStatement(false))
require.Equal(t, headerUpsert, testHeaderTable.ToInsertStatement(true))
}