diff --git a/statediff/indexer/database/file/csv_indexer_legacy_test.go b/statediff/indexer/database/file/csv_indexer_legacy_test.go index 55350a912..f16926d95 100644 --- a/statediff/indexer/database/file/csv_indexer_legacy_test.go +++ b/statediff/indexer/database/file/csv_indexer_legacy_test.go @@ -28,8 +28,8 @@ import ( "github.com/stretchr/testify/require" "github.com/ethereum/go-ethereum/statediff/indexer/database/file" - "github.com/ethereum/go-ethereum/statediff/indexer/database/file/types" "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres" + "github.com/ethereum/go-ethereum/statediff/indexer/shared/schema" "github.com/ethereum/go-ethereum/statediff/indexer/test" "github.com/ethereum/go-ethereum/statediff/indexer/test_helpers" ) @@ -90,7 +90,7 @@ func resetAndDumpWatchedAddressesCSVFileData(t *testing.T) { test_helpers.TearDownDB(t, db) outputFilePath := filepath.Join(dbDirectory, file.CSVTestConfig.WatchedAddressesFilePath) - stmt := fmt.Sprintf(pgCopyStatement, types.TableWatchedAddresses.Name, outputFilePath) + stmt := fmt.Sprintf(pgCopyStatement, schema.TableWatchedAddresses.Name, outputFilePath) _, err = db.Exec(context.Background(), stmt) require.NoError(t, err) diff --git a/statediff/indexer/database/file/csv_writer.go b/statediff/indexer/database/file/csv_writer.go index 3bdcc5bc2..1d6816395 100644 --- a/statediff/indexer/database/file/csv_writer.go +++ b/statediff/indexer/database/file/csv_writer.go @@ -28,29 +28,29 @@ import ( "github.com/thoas/go-funk" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/statediff/indexer/database/file/types" "github.com/ethereum/go-ethereum/statediff/indexer/ipld" "github.com/ethereum/go-ethereum/statediff/indexer/models" nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node" + "github.com/ethereum/go-ethereum/statediff/indexer/shared/schema" sdtypes "github.com/ethereum/go-ethereum/statediff/types" ) var ( - Tables = []*types.Table{ - &types.TableIPLDBlock, - &types.TableNodeInfo, - &types.TableHeader, - &types.TableStateNode, - &types.TableStorageNode, - &types.TableUncle, - &types.TableTransaction, - &types.TableReceipt, - &types.TableLog, + Tables = []*schema.Table{ + &schema.TableIPLDBlock, + &schema.TableNodeInfo, + &schema.TableHeader, + &schema.TableStateNode, + &schema.TableStorageNode, + &schema.TableUncle, + &schema.TableTransaction, + &schema.TableReceipt, + &schema.TableLog, } ) type tableRow struct { - table types.Table + table schema.Table values []interface{} } @@ -90,7 +90,7 @@ func newFileWriter(path string) (ret fileWriter, err error) { return } -func makeFileWriters(dir string, tables []*types.Table) (fileWriters, error) { +func makeFileWriters(dir string, tables []*schema.Table) (fileWriters, error) { if err := os.MkdirAll(dir, 0755); err != nil { return nil, err } @@ -105,7 +105,7 @@ func makeFileWriters(dir string, tables []*types.Table) (fileWriters, error) { return writers, nil } -func (tx fileWriters) write(tbl *types.Table, args ...interface{}) error { +func (tx fileWriters) write(tbl *schema.Table, args ...interface{}) error { row := tbl.ToCsvRow(args...) return tx[tbl.Name].Write(row) } @@ -204,13 +204,13 @@ func (csw *CSVWriter) Close() error { func (csw *CSVWriter) upsertNode(node nodeinfo.Info) { var values []interface{} values = append(values, node.GenesisBlock, node.NetworkID, node.ID, node.ClientName, node.ChainID) - csw.rows <- tableRow{types.TableNodeInfo, values} + csw.rows <- tableRow{schema.TableNodeInfo, values} } func (csw *CSVWriter) upsertIPLD(ipld models.IPLDModel) { var values []interface{} values = append(values, ipld.BlockNumber, ipld.Key, ipld.Data) - csw.rows <- tableRow{types.TableIPLDBlock, values} + csw.rows <- tableRow{schema.TableIPLDBlock, values} } func (csw *CSVWriter) upsertIPLDDirect(blockNumber, key string, value []byte) { @@ -234,7 +234,7 @@ func (csw *CSVWriter) upsertHeaderCID(header models.HeaderModel) { values = append(values, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, header.NodeIDs, header.Reward, header.StateRoot, header.TxRoot, header.RctRoot, header.UnclesHash, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.Coinbase) - csw.rows <- tableRow{types.TableHeader, values} + csw.rows <- tableRow{schema.TableHeader, values} indexerMetrics.blocks.Inc(1) } @@ -242,14 +242,14 @@ func (csw *CSVWriter) upsertUncleCID(uncle models.UncleModel) { var values []interface{} values = append(values, uncle.BlockNumber, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.Index) - csw.rows <- tableRow{types.TableUncle, values} + csw.rows <- tableRow{schema.TableUncle, values} } func (csw *CSVWriter) upsertTransactionCID(transaction models.TxModel) { var values []interface{} values = append(values, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.Type, transaction.Value) - csw.rows <- tableRow{types.TableTransaction, values} + csw.rows <- tableRow{schema.TableTransaction, values} indexerMetrics.transactions.Inc(1) } @@ -257,7 +257,7 @@ func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) { var values []interface{} values = append(values, rct.BlockNumber, rct.HeaderID, rct.TxID, rct.CID, rct.Contract, rct.PostState, rct.PostStatus) - csw.rows <- tableRow{types.TableReceipt, values} + csw.rows <- tableRow{schema.TableReceipt, values} indexerMetrics.receipts.Inc(1) } @@ -266,7 +266,7 @@ func (csw *CSVWriter) upsertLogCID(logs []*models.LogsModel) { var values []interface{} values = append(values, l.BlockNumber, l.HeaderID, l.CID, l.ReceiptID, l.Address, l.Index, l.Topic0, l.Topic1, l.Topic2, l.Topic3) - csw.rows <- tableRow{types.TableLog, values} + csw.rows <- tableRow{schema.TableLog, values} indexerMetrics.logs.Inc(1) } } @@ -280,7 +280,7 @@ func (csw *CSVWriter) upsertStateCID(stateNode models.StateNodeModel) { var values []interface{} values = append(values, stateNode.BlockNumber, stateNode.HeaderID, stateKey, stateNode.CID, true, stateNode.Balance, strconv.FormatUint(stateNode.Nonce, 10), stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed) - csw.rows <- tableRow{types.TableStateNode, values} + csw.rows <- tableRow{schema.TableStateNode, values} } func (csw *CSVWriter) upsertStorageCID(storageCID models.StorageNodeModel) { @@ -292,7 +292,7 @@ func (csw *CSVWriter) upsertStorageCID(storageCID models.StorageNodeModel) { var values []interface{} values = append(values, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageKey, storageCID.CID, true, storageCID.Value, storageCID.Removed) - csw.rows <- tableRow{types.TableStorageNode, values} + csw.rows <- tableRow{schema.TableStorageNode, values} } // LoadWatchedAddresses loads watched addresses from a file @@ -332,7 +332,7 @@ func (csw *CSVWriter) insertWatchedAddresses(args []sdtypes.WatchAddressArg, cur var values []interface{} values = append(values, arg.Address, strconv.FormatUint(arg.CreatedAt, 10), currentBlockNumber.String(), "0") - row := types.TableWatchedAddresses.ToCsvRow(values...) + row := schema.TableWatchedAddresses.ToCsvRow(values...) // writing directly instead of using rows channel as it needs to be flushed immediately err = csw.watchedAddressesWriter.Write(row) @@ -375,7 +375,7 @@ func (csw *CSVWriter) removeWatchedAddresses(args []sdtypes.WatchAddressArg) err func (csw *CSVWriter) setWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error { var rows [][]string for _, arg := range args { - row := types.TableWatchedAddresses.ToCsvRow(arg.Address, strconv.FormatUint(arg.CreatedAt, 10), currentBlockNumber.String(), "0") + row := schema.TableWatchedAddresses.ToCsvRow(arg.Address, strconv.FormatUint(arg.CreatedAt, 10), currentBlockNumber.String(), "0") rows = append(rows, row) } diff --git a/statediff/indexer/database/file/types/schema.go b/statediff/indexer/database/file/types/schema.go deleted file mode 100644 index a7e2823fc..000000000 --- a/statediff/indexer/database/file/types/schema.go +++ /dev/null @@ -1,154 +0,0 @@ -// Copyright 2022 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package types - -var TableIPLDBlock = Table{ - `ipld.blocks`, - []column{ - {name: "block_number", dbType: bigint}, - {name: "key", dbType: text}, - {name: "data", dbType: bytea}, - }, -} - -var TableNodeInfo = Table{ - Name: `public.nodes`, - Columns: []column{ - {name: "genesis_block", dbType: varchar}, - {name: "network_id", dbType: varchar}, - {name: "node_id", dbType: varchar}, - {name: "client_name", dbType: varchar}, - {name: "chain_id", dbType: integer}, - }, -} - -var TableHeader = Table{ - "eth.header_cids", - []column{ - {name: "block_number", dbType: bigint}, - {name: "block_hash", dbType: varchar}, - {name: "parent_hash", dbType: varchar}, - {name: "cid", dbType: text}, - {name: "td", dbType: numeric}, - {name: "node_ids", dbType: varchar, isArray: true}, - {name: "reward", dbType: numeric}, - {name: "state_root", dbType: varchar}, - {name: "tx_root", dbType: varchar}, - {name: "receipt_root", dbType: varchar}, - {name: "uncles_hash", dbType: varchar}, - {name: "bloom", dbType: bytea}, - {name: "timestamp", dbType: numeric}, - {name: "coinbase", dbType: varchar}, - }, -} - -var TableStateNode = Table{ - "eth.state_cids", - []column{ - {name: "block_number", dbType: bigint}, - {name: "header_id", dbType: varchar}, - {name: "state_leaf_key", dbType: varchar}, - {name: "cid", dbType: text}, - {name: "removed", dbType: boolean}, - {name: "diff", dbType: boolean}, - {name: "balance", dbType: numeric}, - {name: "nonce", dbType: bigint}, - {name: "code_hash", dbType: varchar}, - {name: "storage_root", dbType: varchar}, - }, -} - -var TableStorageNode = Table{ - "eth.storage_cids", - []column{ - {name: "block_number", dbType: bigint}, - {name: "header_id", dbType: varchar}, - {name: "state_leaf_key", dbType: varchar}, - {name: "storage_leaf_key", dbType: varchar}, - {name: "cid", dbType: text}, - {name: "removed", dbType: boolean}, - {name: "diff", dbType: boolean}, - {name: "val", dbType: bytea}, - }, -} - -var TableUncle = Table{ - "eth.uncle_cids", - []column{ - {name: "block_number", dbType: bigint}, - {name: "block_hash", dbType: varchar}, - {name: "header_id", dbType: varchar}, - {name: "parent_hash", dbType: varchar}, - {name: "cid", dbType: text}, - {name: "reward", dbType: numeric}, - {name: "index", dbType: integer}, - }, -} - -var TableTransaction = Table{ - "eth.transaction_cids", - []column{ - {name: "block_number", dbType: bigint}, - {name: "header_id", dbType: varchar}, - {name: "tx_hash", dbType: varchar}, - {name: "cid", dbType: text}, - {name: "dst", dbType: varchar}, - {name: "src", dbType: varchar}, - {name: "index", dbType: integer}, - {name: "tx_type", dbType: integer}, - {name: "value", dbType: numeric}, - }, -} - -var TableReceipt = Table{ - "eth.receipt_cids", - []column{ - {name: "block_number", dbType: bigint}, - {name: "header_id", dbType: varchar}, - {name: "tx_id", dbType: varchar}, - {name: "cid", dbType: text}, - {name: "contract", dbType: varchar}, - {name: "post_state", dbType: varchar}, - {name: "post_status", dbType: integer}, - }, -} - -var TableLog = Table{ - "eth.log_cids", - []column{ - {name: "block_number", dbType: bigint}, - {name: "header_id", dbType: varchar}, - {name: "cid", dbType: text}, - {name: "rct_id", dbType: varchar}, - {name: "address", dbType: varchar}, - {name: "index", dbType: integer}, - {name: "topic0", dbType: varchar}, - {name: "topic1", dbType: varchar}, - {name: "topic2", dbType: varchar}, - {name: "topic3", dbType: varchar}, - }, -} - -var TableWatchedAddresses = Table{ - "eth_meta.watched_addresses", - []column{ - {name: "address", dbType: varchar}, - {name: "created_at", dbType: bigint}, - {name: "watched_at", dbType: bigint}, - {name: "last_filled_at", dbType: bigint}, - }, -} diff --git a/statediff/indexer/database/file/types/table.go b/statediff/indexer/database/file/types/table.go deleted file mode 100644 index d7fd5af6c..000000000 --- a/statediff/indexer/database/file/types/table.go +++ /dev/null @@ -1,104 +0,0 @@ -// Copyright 2022 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package types - -import ( - "fmt" - "strings" - - "github.com/thoas/go-funk" -) - -type colType int - -const ( - integer colType = iota - boolean - bigint - numeric - bytea - varchar - text -) - -type column struct { - name string - dbType colType - isArray bool -} -type Table struct { - Name string - Columns []column -} - -func (tbl *Table) ToCsvRow(args ...interface{}) []string { - var row []string - for i, col := range tbl.Columns { - value := col.dbType.formatter()(args[i]) - - if col.isArray { - valueList := funk.Map(args[i], col.dbType.formatter()).([]string) - value = fmt.Sprintf("{%s}", strings.Join(valueList, ",")) - } - - row = append(row, value) - } - return row -} - -func (tbl *Table) VarcharColumns() []string { - columns := funk.Filter(tbl.Columns, func(col column) bool { - return col.dbType == varchar - }).([]column) - - columnNames := funk.Map(columns, func(col column) string { - return col.name - }).([]string) - - return columnNames -} - -type colfmt = func(interface{}) string - -func sprintf(f string) colfmt { - return func(x interface{}) string { return fmt.Sprintf(f, x) } -} - -func (typ colType) formatter() colfmt { - switch typ { - case integer: - return sprintf("%d") - case boolean: - return func(x interface{}) string { - if x.(bool) { - return "t" - } - return "f" - } - case bigint: - return sprintf("%s") - case numeric: - return sprintf("%s") - case bytea: - return sprintf(`\x%x`) - case varchar: - return sprintf("%s") - case text: - return sprintf("%s") - } - panic("unreachable") -} diff --git a/statediff/indexer/shared/schema/schema.go b/statediff/indexer/shared/schema/schema.go new file mode 100644 index 000000000..151672790 --- /dev/null +++ b/statediff/indexer/shared/schema/schema.go @@ -0,0 +1,173 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package schema + +var TableIPLDBlock = Table{ + Name: `ipld.blocks`, + Columns: []Column{ + {Name: "block_number", Type: Dbigint}, + {Name: "key", Type: Dtext}, + {Name: "data", Type: Dbytea}, + }, +} + +var TableNodeInfo = Table{ + Name: `public.nodes`, + Columns: []Column{ + {Name: "genesis_block", Type: Dvarchar}, + {Name: "network_id", Type: Dvarchar}, + {Name: "node_id", Type: Dvarchar}, + {Name: "client_name", Type: Dvarchar}, + {Name: "chain_id", Type: Dinteger}, + }, +} + +var TableHeader = Table{ + Name: "eth.header_cids", + Columns: []Column{ + {Name: "block_number", Type: Dbigint}, + {Name: "block_hash", Type: Dvarchar}, + {Name: "parent_hash", Type: Dvarchar}, + {Name: "cid", Type: Dtext}, + {Name: "td", Type: Dnumeric}, + {Name: "node_ids", Type: Dvarchar, Array: true}, + {Name: "reward", Type: Dnumeric}, + {Name: "state_root", Type: Dvarchar}, + {Name: "tx_root", Type: Dvarchar}, + {Name: "receipt_root", Type: Dvarchar}, + {Name: "uncles_hash", Type: Dvarchar}, + {Name: "bloom", Type: Dbytea}, + {Name: "timestamp", Type: Dnumeric}, + {Name: "coinbase", Type: Dvarchar}, + }, + UpsertClause: OnConflict("block_number", "block_hash").Set( + "parent_hash", + "cid", + "td", + "node_ids", + "reward", + "state_root", + "tx_root", + "receipt_root", + "uncles_hash", + "bloom", + "timestamp", + "coinbase", + )} + +var TableStateNode = Table{ + Name: "eth.state_cids", + Columns: []Column{ + {Name: "block_number", Type: Dbigint}, + {Name: "header_id", Type: Dvarchar}, + {Name: "state_leaf_key", Type: Dvarchar}, + {Name: "cid", Type: Dtext}, + {Name: "diff", Type: Dboolean}, + {Name: "balance", Type: Dnumeric}, + {Name: "nonce", Type: Dbigint}, + {Name: "code_hash", Type: Dvarchar}, + {Name: "storage_root", Type: Dvarchar}, + {Name: "removed", Type: Dboolean}, + }, + UpsertClause: OnConflict("block_number", "header_id", "state_leaf_key"), +} + +var TableStorageNode = Table{ + Name: "eth.storage_cids", + Columns: []Column{ + {Name: "block_number", Type: Dbigint}, + {Name: "header_id", Type: Dvarchar}, + {Name: "state_leaf_key", Type: Dvarchar}, + {Name: "storage_leaf_key", Type: Dvarchar}, + {Name: "cid", Type: Dtext}, + {Name: "diff", Type: Dboolean}, + {Name: "val", Type: Dbytea}, + {Name: "removed", Type: Dboolean}, + }, + UpsertClause: OnConflict("block_number", "header_id", "state_leaf_key", "storage_leaf_key"), +} + +var TableUncle = Table{ + Name: "eth.uncle_cids", + Columns: []Column{ + {Name: "block_number", Type: Dbigint}, + {Name: "block_hash", Type: Dvarchar}, + {Name: "header_id", Type: Dvarchar}, + {Name: "parent_hash", Type: Dvarchar}, + {Name: "cid", Type: Dtext}, + {Name: "reward", Type: Dnumeric}, + {Name: "index", Type: Dinteger}, + }, + UpsertClause: OnConflict("block_number", "block_hash"), +} + +var TableTransaction = Table{ + Name: "eth.transaction_cids", + Columns: []Column{ + {Name: "block_number", Type: Dbigint}, + {Name: "header_id", Type: Dvarchar}, + {Name: "tx_hash", Type: Dvarchar}, + {Name: "cid", Type: Dtext}, + {Name: "dst", Type: Dvarchar}, + {Name: "src", Type: Dvarchar}, + {Name: "index", Type: Dinteger}, + {Name: "tx_type", Type: Dinteger}, + {Name: "value", Type: Dnumeric}, + }, + UpsertClause: OnConflict("block_number", "header_id", "tx_hash"), +} + +var TableReceipt = Table{ + Name: "eth.receipt_cids", + Columns: []Column{ + {Name: "block_number", Type: Dbigint}, + {Name: "header_id", Type: Dvarchar}, + {Name: "tx_id", Type: Dvarchar}, + {Name: "cid", Type: Dtext}, + {Name: "contract", Type: Dvarchar}, + {Name: "post_state", Type: Dvarchar}, + {Name: "post_status", Type: Dinteger}, + }, + UpsertClause: OnConflict("block_number", "header_id", "tx_id"), +} + +var TableLog = Table{ + Name: "eth.log_cids", + Columns: []Column{ + {Name: "block_number", Type: Dbigint}, + {Name: "header_id", Type: Dvarchar}, + {Name: "cid", Type: Dtext}, + {Name: "rct_id", Type: Dvarchar}, + {Name: "address", Type: Dvarchar}, + {Name: "index", Type: Dinteger}, + {Name: "topic0", Type: Dvarchar}, + {Name: "topic1", Type: Dvarchar}, + {Name: "topic2", Type: Dvarchar}, + {Name: "topic3", Type: Dvarchar}, + }, + UpsertClause: OnConflict("block_number", "header_id", "rct_id", "index"), +} + +var TableWatchedAddresses = Table{ + Name: "eth_meta.watched_addresses", + Columns: []Column{ + {Name: "address", Type: Dvarchar}, + {Name: "created_at", Type: Dbigint}, + {Name: "watched_at", Type: Dbigint}, + {Name: "last_filled_at", Type: Dbigint}, + }, +} diff --git a/statediff/indexer/shared/schema/table.go b/statediff/indexer/shared/schema/table.go new file mode 100644 index 000000000..9bc19ac3d --- /dev/null +++ b/statediff/indexer/shared/schema/table.go @@ -0,0 +1,147 @@ +// Copyright 2022 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package schema + +import ( + "fmt" + "strings" + + "github.com/thoas/go-funk" +) + +type colType int + +const ( + Dinteger colType = iota + Dboolean + Dbigint + Dnumeric + Dbytea + Dvarchar + Dtext +) + +type ConflictClause struct { + Target []string + Update []string +} + +type Column struct { + Name string + Type colType + Array bool +} +type Table struct { + Name string + Columns []Column + + UpsertClause ConflictClause +} + +type colfmt = func(interface{}) string + +func (tbl *Table) ToCsvRow(args ...interface{}) []string { + var row []string + for i, col := range tbl.Columns { + value := col.Type.formatter()(args[i]) + + if col.Array { + valueList := funk.Map(args[i], col.Type.formatter()).([]string) + value = fmt.Sprintf("{%s}", strings.Join(valueList, ",")) + } + + row = append(row, value) + } + return row +} + +func (tbl *Table) VarcharColumns() []string { + columns := funk.Filter(tbl.Columns, func(col Column) bool { + return col.Type == Dvarchar + }).([]Column) + + columnNames := funk.Map(columns, func(col Column) string { + return col.Name + }).([]string) + return columnNames +} + +func OnConflict(target ...string) ConflictClause { + return ConflictClause{Target: target} +} +func (c ConflictClause) Set(fields ...string) ConflictClause { + c.Update = fields + return c +} + +// ToInsertStatement returns a Postgres-compatible SQL insert statement for the table +// using positional placeholders +func (tbl *Table) ToInsertStatement(upsert bool) string { + var colnames, placeholders []string + for i, col := range tbl.Columns { + colnames = append(colnames, col.Name) + placeholders = append(placeholders, fmt.Sprintf("$%d", i+1)) + } + suffix := fmt.Sprintf("ON CONFLICT (%s)", strings.Join(tbl.UpsertClause.Target, ", ")) + if upsert && len(tbl.UpsertClause.Update) != 0 { + var update_placeholders []string + for _, name := range tbl.UpsertClause.Update { + i := funk.IndexOf(tbl.Columns, func(col Column) bool { return col.Name == name }) + update_placeholders = append(update_placeholders, fmt.Sprintf("$%d", i+1)) + } + suffix += fmt.Sprintf( + " DO UPDATE SET (%s) = (%s)", + strings.Join(tbl.UpsertClause.Update, ", "), strings.Join(update_placeholders, ", "), + ) + } else { + suffix += " DO NOTHING" + } + + return fmt.Sprintf( + "INSERT INTO %s (%s) VALUES (%s) %s", + tbl.Name, strings.Join(colnames, ", "), strings.Join(placeholders, ", "), suffix, + ) +} + +func sprintf(f string) colfmt { + return func(x interface{}) string { return fmt.Sprintf(f, x) } +} + +func (typ colType) formatter() colfmt { + switch typ { + case Dinteger: + return sprintf("%d") + case Dboolean: + return func(x interface{}) string { + if x.(bool) { + return "t" + } + return "f" + } + case Dbigint: + return sprintf("%s") + case Dnumeric: + return sprintf("%s") + case Dbytea: + return sprintf(`\x%x`) + case Dvarchar: + return sprintf("%s") + case Dtext: + return sprintf("%s") + } + panic("unreachable") +} diff --git a/statediff/indexer/shared/schema/table_test.go b/statediff/indexer/shared/schema/table_test.go new file mode 100644 index 000000000..b38ef6e07 --- /dev/null +++ b/statediff/indexer/shared/schema/table_test.go @@ -0,0 +1,56 @@ +package schema_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + . "github.com/ethereum/go-ethereum/statediff/indexer/shared/schema" +) + +var testHeaderTable = Table{ + Name: "eth.header_cids", + Columns: []Column{ + {Name: "block_number", Type: Dbigint}, + {Name: "block_hash", Type: Dvarchar}, + {Name: "parent_hash", Type: Dvarchar}, + {Name: "cid", Type: Dtext}, + {Name: "td", Type: Dnumeric}, + {Name: "node_id", Type: Dvarchar}, + {Name: "reward", Type: Dnumeric}, + {Name: "state_root", Type: Dvarchar}, + {Name: "tx_root", Type: Dvarchar}, + {Name: "receipt_root", Type: Dvarchar}, + {Name: "uncle_root", Type: Dvarchar}, + {Name: "bloom", Type: Dbytea}, + {Name: "timestamp", Type: Dnumeric}, + {Name: "mh_key", Type: Dtext}, + {Name: "times_validated", Type: Dinteger}, + {Name: "coinbase", Type: Dvarchar}, + }, + UpsertClause: OnConflict("block_hash", "block_number").Set( + "parent_hash", + "cid", + "td", + "node_id", + "reward", + "state_root", + "tx_root", + "receipt_root", + "uncle_root", + "bloom", + "timestamp", + "mh_key", + "times_validated", + "coinbase", + )} + +func TestTable(t *testing.T) { + + headerUpsert := `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (block_hash, block_number) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)` + + headerNoUpsert := `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (block_hash, block_number) DO NOTHING` + + require.Equal(t, headerNoUpsert, testHeaderTable.ToInsertStatement(false)) + require.Equal(t, headerUpsert, testHeaderTable.ToInsertStatement(true)) +}