Add mode to write to CSV files in statediff file writer #249
@ -24,4 +24,4 @@ services:
|
||||
ports:
|
||||
- "127.0.0.1:8077:5432"
|
||||
volumes:
|
||||
- ./statediff/indexer/database/file:/file
|
||||
- ./statediff/indexer/database/file:/file_indexer
|
||||
|
@ -59,17 +59,26 @@ func (c Config) Type() shared.DBType {
|
||||
return shared.FILE
|
||||
}
|
||||
|
||||
// TestConfig config for unit tests
|
||||
var TestConfig = Config{
|
||||
var nodeInfo = node.Info{
|
||||
GenesisBlock: "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3",
|
||||
NetworkID: "1",
|
||||
ChainID: 1,
|
||||
ID: "mockNodeID",
|
||||
ClientName: "go-ethereum",
|
||||
}
|
||||
|
||||
// CSVTestConfig config for unit tests
|
||||
var CSVTestConfig = Config{
|
||||
Mode: CSV,
|
||||
OutputDir: "./statediffing_test",
|
||||
FilePath: "./statediffing_test_file.sql",
|
||||
WatchedAddressesFilePath: "./statediffing_watched_addresses_test_file.csv",
|
||||
NodeInfo: node.Info{
|
||||
GenesisBlock: "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3",
|
||||
NetworkID: "1",
|
||||
ChainID: 1,
|
||||
ID: "mockNodeID",
|
||||
ClientName: "go-ethereum",
|
||||
},
|
||||
NodeInfo: nodeInfo,
|
||||
}
|
||||
|
||||
// SQLTestConfig config for unit tests
|
||||
var SQLTestConfig = Config{
|
||||
Mode: SQL,
|
||||
FilePath: "./statediffing_test_file.sql",
|
||||
WatchedAddressesFilePath: "./statediffing_watched_addresses_test_file.sql",
|
||||
NodeInfo: nodeInfo,
|
||||
}
|
||||
|
@ -30,27 +30,26 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/file"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/file/types"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
||||
"github.com/ethereum/go-ethereum/statediff/types"
|
||||
)
|
||||
|
||||
const dbDirectory = "/file"
|
||||
const dbDirectory = "/file_indexer"
|
||||
const pgCopyStatement = `COPY %s FROM '%s' CSV`
|
||||
|
||||
func setupCSVLegacy(t *testing.T) {
|
||||
mockLegacyBlock = legacyData.MockBlock
|
||||
legacyHeaderCID, _ = ipld.RawdataToCid(ipld.MEthHeader, legacyData.MockHeaderRlp, multihash.KECCAK_256)
|
||||
file.TestConfig.Mode = file.CSV
|
||||
file.TestConfig.OutputDir = "./statediffing_legacy_test"
|
||||
file.CSVTestConfig.OutputDir = "./statediffing_legacy_test"
|
||||
|
||||
if _, err := os.Stat(file.TestConfig.OutputDir); !errors.Is(err, os.ErrNotExist) {
|
||||
err := os.RemoveAll(file.TestConfig.OutputDir)
|
||||
if _, err := os.Stat(file.CSVTestConfig.OutputDir); !errors.Is(err, os.ErrNotExist) {
|
||||
err := os.RemoveAll(file.CSVTestConfig.OutputDir)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
ind, err := file.NewStateDiffIndexer(context.Background(), legacyData.Config, file.TestConfig)
|
||||
ind, err := file.NewStateDiffIndexer(context.Background(), legacyData.Config, file.CSVTestConfig)
|
||||
require.NoError(t, err)
|
||||
var tx interfaces.Batch
|
||||
tx, err = ind.PushBlock(
|
||||
@ -67,6 +66,7 @@ func setupCSVLegacy(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
for _, node := range legacyData.StateDiffs {
|
||||
err = ind.PushStateNode(tx, node, legacyData.MockBlock.Hash().String())
|
||||
require.NoError(t, err)
|
||||
@ -75,7 +75,6 @@ func setupCSVLegacy(t *testing.T) {
|
||||
require.Equal(t, legacyData.BlockNumber.String(), tx.(*file.BatchTx).BlockNumber)
|
||||
|
||||
connStr := postgres.DefaultConfig.DbConnectionString()
|
||||
|
||||
sqlxdb, err = sqlx.Connect("postgres", connStr)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to connect to db with connection string: %s err: %v", connStr, err)
|
||||
@ -83,41 +82,42 @@ func setupCSVLegacy(t *testing.T) {
|
||||
}
|
||||
|
||||
func dumpCSVFileData(t *testing.T) {
|
||||
outputDir := filepath.Join(dbDirectory, file.TestConfig.OutputDir)
|
||||
outputDir := filepath.Join(dbDirectory, file.CSVTestConfig.OutputDir)
|
||||
|
||||
for _, tbl := range file.Tables {
|
||||
stm := fmt.Sprintf(pgCopyStatement, tbl.Name, file.TableFile(outputDir, tbl.Name))
|
||||
|
||||
var stmt string
|
||||
varcharColumns := tbl.VarcharColumns()
|
||||
if len(varcharColumns) > 0 {
|
||||
stm = fmt.Sprintf(
|
||||
stmt = fmt.Sprintf(
|
||||
pgCopyStatement+" FORCE NOT NULL %s",
|
||||
tbl.Name,
|
||||
file.TableFile(outputDir, tbl.Name),
|
||||
file.TableFilePath(outputDir, tbl.Name),
|
||||
strings.Join(varcharColumns, ", "),
|
||||
)
|
||||
} else {
|
||||
stmt = fmt.Sprintf(pgCopyStatement, tbl.Name, file.TableFilePath(outputDir, tbl.Name))
|
||||
}
|
||||
|
||||
_, err = sqlxdb.Exec(stm)
|
||||
_, err = sqlxdb.Exec(stmt)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func dumpWatchedAddressesCSVFileData(t *testing.T) {
|
||||
outputFilePath := filepath.Join(dbDirectory, file.TestConfig.WatchedAddressesFilePath)
|
||||
stm := fmt.Sprintf(pgCopyStatement, types.TableWatchedAddresses.Name, outputFilePath)
|
||||
outputFilePath := filepath.Join(dbDirectory, file.CSVTestConfig.WatchedAddressesFilePath)
|
||||
stmt := fmt.Sprintf(pgCopyStatement, types.TableWatchedAddresses.Name, outputFilePath)
|
||||
|
||||
_, err = sqlxdb.Exec(stm)
|
||||
_, err = sqlxdb.Exec(stmt)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func tearDownCSV(t *testing.T) {
|
||||
file.TearDownDB(t, sqlxdb)
|
||||
|
||||
err := os.RemoveAll(file.TestConfig.OutputDir)
|
||||
err := os.RemoveAll(file.CSVTestConfig.OutputDir)
|
||||
require.NoError(t, err)
|
||||
|
||||
if err := os.Remove(file.TestConfig.WatchedAddressesFilePath); !errors.Is(err, os.ErrNotExist) {
|
||||
if err := os.Remove(file.CSVTestConfig.WatchedAddressesFilePath); !errors.Is(err, os.ErrNotExist) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
|
@ -44,21 +44,19 @@ import (
|
||||
)
|
||||
|
||||
func setupCSVIndexer(t *testing.T) {
|
||||
file.TestConfig.Mode = file.CSV
|
||||
file.TestConfig.OutputDir = "./statediffing_test"
|
||||
file.TestConfig.WatchedAddressesFilePath = "./statediffing_watched_addresses_test_file.csv"
|
||||
file.CSVTestConfig.OutputDir = "./statediffing_test"
|
||||
|
||||
if _, err := os.Stat(file.TestConfig.OutputDir); !errors.Is(err, os.ErrNotExist) {
|
||||
err := os.RemoveAll(file.TestConfig.OutputDir)
|
||||
if _, err := os.Stat(file.CSVTestConfig.OutputDir); !errors.Is(err, os.ErrNotExist) {
|
||||
err := os.RemoveAll(file.CSVTestConfig.OutputDir)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
if _, err := os.Stat(file.TestConfig.WatchedAddressesFilePath); !errors.Is(err, os.ErrNotExist) {
|
||||
err := os.Remove(file.TestConfig.WatchedAddressesFilePath)
|
||||
if _, err := os.Stat(file.CSVTestConfig.WatchedAddressesFilePath); !errors.Is(err, os.ErrNotExist) {
|
||||
err := os.Remove(file.CSVTestConfig.WatchedAddressesFilePath)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
ind, err = file.NewStateDiffIndexer(context.Background(), mocks.TestConfig, file.TestConfig)
|
||||
ind, err = file.NewStateDiffIndexer(context.Background(), mocks.TestConfig, file.CSVTestConfig)
|
||||
require.NoError(t, err)
|
||||
|
||||
connStr := postgres.DefaultConfig.DbConnectionString()
|
||||
|
@ -31,10 +31,11 @@ import (
|
||||
"github.com/thoas/go-funk"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/file/types"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
|
||||
"github.com/ethereum/go-ethereum/statediff/types"
|
||||
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -100,7 +101,7 @@ func makeFileWriters(dir string, tables []*types.Table) (fileWriters, error) {
|
||||
}
|
||||
writers := fileWriters{}
|
||||
for _, tbl := range tables {
|
||||
w, err := newFileWriter(TableFile(dir, tbl.Name))
|
||||
w, err := newFileWriter(TableFilePath(dir, tbl.Name))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -193,7 +194,7 @@ func (csw *CSVWriter) Flush() {
|
||||
<-csw.flushFinished
|
||||
}
|
||||
|
||||
func TableFile(dir, name string) string { return filepath.Join(dir, name+".csv") }
|
||||
func TableFilePath(dir, name string) string { return filepath.Join(dir, name+".csv") }
|
||||
|
||||
// Close satisfies io.Closer
|
||||
func (csw *CSVWriter) Close() error {
|
||||
@ -345,7 +346,7 @@ func (csw *CSVWriter) loadWatchedAddresses() ([]common.Address, error) {
|
||||
}
|
||||
|
||||
// InsertWatchedAddresses inserts the given addresses in a file
|
||||
func (csw *CSVWriter) insertWatchedAddresses(args []types.WatchAddressArg, currentBlockNumber *big.Int) error {
|
||||
func (csw *CSVWriter) insertWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error {
|
||||
// load csv rows from watched addresses file
|
||||
watchedAddresses, err := csw.loadWatchedAddresses()
|
||||
if err != nil {
|
||||
@ -370,7 +371,7 @@ func (csw *CSVWriter) insertWatchedAddresses(args []types.WatchAddressArg, curre
|
||||
}
|
||||
}
|
||||
|
||||
// watched addresses need to be flushed immediately as they also need to be read from the file
|
||||
// watched addresses need to be flushed immediately to the file to keep them in sync with in-memory watched addresses
|
||||
csw.watchedAddressesWriter.Flush()
|
||||
err = csw.watchedAddressesWriter.Error()
|
||||
if err != nil {
|
||||
@ -381,7 +382,7 @@ func (csw *CSVWriter) insertWatchedAddresses(args []types.WatchAddressArg, curre
|
||||
}
|
||||
|
||||
// RemoveWatchedAddresses removes the given watched addresses from a file
|
||||
func (csw *CSVWriter) removeWatchedAddresses(args []types.WatchAddressArg) error {
|
||||
func (csw *CSVWriter) removeWatchedAddresses(args []sdtypes.WatchAddressArg) error {
|
||||
// load csv rows from watched addresses file
|
||||
watchedAddressesFilePath := csw.watchedAddressesWriter.file.Name()
|
||||
rows, err := loadWatchedAddressesRows(watchedAddressesFilePath)
|
||||
@ -391,7 +392,7 @@ func (csw *CSVWriter) removeWatchedAddresses(args []types.WatchAddressArg) error
|
||||
|
||||
// get rid of rows having addresses to be removed
|
||||
filteredRows := funk.Filter(rows, func(row []string) bool {
|
||||
return !funk.Contains(args, func(arg types.WatchAddressArg) bool {
|
||||
return !funk.Contains(args, func(arg sdtypes.WatchAddressArg) bool {
|
||||
// Compare first column in table for address
|
||||
return arg.Address == row[0]
|
||||
})
|
||||
@ -401,7 +402,7 @@ func (csw *CSVWriter) removeWatchedAddresses(args []types.WatchAddressArg) error
|
||||
}
|
||||
|
||||
// SetWatchedAddresses clears and inserts the given addresses in a file
|
||||
func (csw *CSVWriter) setWatchedAddresses(args []types.WatchAddressArg, currentBlockNumber *big.Int) error {
|
||||
func (csw *CSVWriter) setWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error {
|
||||
var rows [][]string
|
||||
for _, arg := range args {
|
||||
row := types.TableWatchedAddresses.ToCsvRow(arg.Address, strconv.FormatUint(arg.CreatedAt, 10), currentBlockNumber.String(), "0")
|
||||
|
@ -44,9 +44,10 @@ import (
|
||||
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
|
||||
)
|
||||
|
||||
const defaultOutputDir = "./statediff_output"
|
||||
const defaultFilePath = "./statediff.sql"
|
||||
const defaultWatchedAddressesFilePath = "./statediff-watched-addresses.csv"
|
||||
const defaultCSVOutputDir = "./statediff_output"
|
||||
const defaultSQLFilePath = "./statediff.sql"
|
||||
const defaultWatchedAddressesCSVFilePath = "./statediff-watched-addresses.csv"
|
||||
const defaultWatchedAddressesSQLFilePath = "./statediff-watched-addresses.sql"
|
||||
|
||||
const watchedAddressesInsert = "INSERT INTO eth_meta.watched_addresses (address, created_at, watched_at) VALUES ('%s', '%d', '%d') ON CONFLICT (address) DO NOTHING;"
|
||||
|
||||
@ -71,24 +72,24 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c
|
||||
var writer FileWriter
|
||||
|
||||
watchedAddressesFilePath := config.WatchedAddressesFilePath
|
||||
if watchedAddressesFilePath == "" {
|
||||
watchedAddressesFilePath = defaultWatchedAddressesFilePath
|
||||
}
|
||||
log.Info("Writing watched addresses to file", "file", watchedAddressesFilePath)
|
||||
|
||||
switch config.Mode {
|
||||
case CSV:
|
||||
outputDir := config.OutputDir
|
||||
if outputDir == "" {
|
||||
outputDir = defaultOutputDir
|
||||
outputDir = defaultCSVOutputDir
|
||||
}
|
||||
|
||||
if _, err := os.Stat(outputDir); !errors.Is(err, os.ErrNotExist) {
|
||||
return nil, fmt.Errorf("cannot create output directory, directory (%s) already exists", outputDir)
|
||||
}
|
||||
|
||||
log.Info("Writing statediff CSV files to directory", "file", outputDir)
|
||||
|
||||
if watchedAddressesFilePath == "" {
|
||||
watchedAddressesFilePath = defaultWatchedAddressesCSVFilePath
|
||||
}
|
||||
log.Info("Writing watched addresses to file", "file", watchedAddressesFilePath)
|
||||
|
||||
writer, err = NewCSVWriter(outputDir, watchedAddressesFilePath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -96,7 +97,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c
|
||||
case SQL:
|
||||
filePath := config.FilePath
|
||||
if filePath == "" {
|
||||
filePath = defaultFilePath
|
||||
filePath = defaultSQLFilePath
|
||||
}
|
||||
if _, err := os.Stat(filePath); !errors.Is(err, os.ErrNotExist) {
|
||||
return nil, fmt.Errorf("cannot create file, file (%s) already exists", filePath)
|
||||
@ -107,6 +108,11 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c
|
||||
}
|
||||
log.Info("Writing statediff SQL statements to file", "file", filePath)
|
||||
|
||||
if watchedAddressesFilePath == "" {
|
||||
watchedAddressesFilePath = defaultWatchedAddressesSQLFilePath
|
||||
}
|
||||
log.Info("Writing watched addresses to file", "file", watchedAddressesFilePath)
|
||||
|
||||
writer = NewSQLWriter(file, watchedAddressesFilePath)
|
||||
default:
|
||||
return nil, fmt.Errorf("unrecognized file mode: %s", config.Mode)
|
||||
|
@ -81,11 +81,11 @@ func testPushBlockAndState(t *testing.T, block *types.Block, receipts types.Rece
|
||||
}
|
||||
|
||||
func setup(t *testing.T, testBlock *types.Block, testReceipts types.Receipts) {
|
||||
if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
|
||||
err := os.Remove(file.TestConfig.FilePath)
|
||||
if _, err := os.Stat(file.CSVTestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
|
||||
err := os.Remove(file.CSVTestConfig.FilePath)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
ind, err := file.NewStateDiffIndexer(context.Background(), chainConf, file.TestConfig)
|
||||
ind, err := file.NewStateDiffIndexer(context.Background(), chainConf, file.CSVTestConfig)
|
||||
require.NoError(t, err)
|
||||
var tx interfaces.Batch
|
||||
tx, err = ind.PushBlock(
|
||||
@ -118,7 +118,7 @@ func setup(t *testing.T, testBlock *types.Block, testReceipts types.Receipts) {
|
||||
}
|
||||
|
||||
func dumpData(t *testing.T) {
|
||||
sqlFileBytes, err := os.ReadFile(file.TestConfig.FilePath)
|
||||
sqlFileBytes, err := os.ReadFile(file.CSVTestConfig.FilePath)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = sqlxdb.Exec(string(sqlFileBytes))
|
||||
@ -127,7 +127,7 @@ func dumpData(t *testing.T) {
|
||||
|
||||
func tearDown(t *testing.T) {
|
||||
file.TearDownDB(t, sqlxdb)
|
||||
err := os.Remove(file.TestConfig.FilePath)
|
||||
err := os.Remove(file.CSVTestConfig.FilePath)
|
||||
require.NoError(t, err)
|
||||
err = sqlxdb.Close()
|
||||
require.NoError(t, err)
|
||||
|
@ -35,13 +35,12 @@ import (
|
||||
func setupLegacy(t *testing.T) {
|
||||
mockLegacyBlock = legacyData.MockBlock
|
||||
legacyHeaderCID, _ = ipld.RawdataToCid(ipld.MEthHeader, legacyData.MockHeaderRlp, multihash.KECCAK_256)
|
||||
file.TestConfig.Mode = file.SQL
|
||||
|
||||
if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
|
||||
err := os.Remove(file.TestConfig.FilePath)
|
||||
if _, err := os.Stat(file.SQLTestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
|
||||
err := os.Remove(file.SQLTestConfig.FilePath)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
ind, err := file.NewStateDiffIndexer(context.Background(), legacyData.Config, file.TestConfig)
|
||||
ind, err := file.NewStateDiffIndexer(context.Background(), legacyData.Config, file.SQLTestConfig)
|
||||
require.NoError(t, err)
|
||||
var tx interfaces.Batch
|
||||
tx, err = ind.PushBlock(
|
||||
@ -58,6 +57,7 @@ func setupLegacy(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
for _, node := range legacyData.StateDiffs {
|
||||
err = ind.PushStateNode(tx, node, legacyData.MockBlock.Hash().String())
|
||||
require.NoError(t, err)
|
||||
@ -66,7 +66,6 @@ func setupLegacy(t *testing.T) {
|
||||
require.Equal(t, legacyData.BlockNumber.String(), tx.(*file.BatchTx).BlockNumber)
|
||||
|
||||
connStr := postgres.DefaultConfig.DbConnectionString()
|
||||
|
||||
sqlxdb, err = sqlx.Connect("postgres", connStr)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to connect to db with connection string: %s err: %v", connStr, err)
|
||||
@ -74,7 +73,7 @@ func setupLegacy(t *testing.T) {
|
||||
}
|
||||
|
||||
func dumpFileData(t *testing.T) {
|
||||
sqlFileBytes, err := os.ReadFile(file.TestConfig.FilePath)
|
||||
sqlFileBytes, err := os.ReadFile(file.SQLTestConfig.FilePath)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = sqlxdb.Exec(string(sqlFileBytes))
|
||||
@ -84,7 +83,7 @@ func dumpFileData(t *testing.T) {
|
||||
func resetAndDumpWatchedAddressesFileData(t *testing.T) {
|
||||
resetDB(t)
|
||||
|
||||
sqlFileBytes, err := os.ReadFile(file.TestConfig.WatchedAddressesFilePath)
|
||||
sqlFileBytes, err := os.ReadFile(file.SQLTestConfig.WatchedAddressesFilePath)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = sqlxdb.Exec(string(sqlFileBytes))
|
||||
@ -94,10 +93,10 @@ func resetAndDumpWatchedAddressesFileData(t *testing.T) {
|
||||
func tearDown(t *testing.T) {
|
||||
file.TearDownDB(t, sqlxdb)
|
||||
|
||||
err := os.Remove(file.TestConfig.FilePath)
|
||||
err := os.Remove(file.SQLTestConfig.FilePath)
|
||||
require.NoError(t, err)
|
||||
|
||||
if err := os.Remove(file.TestConfig.WatchedAddressesFilePath); !errors.Is(err, os.ErrNotExist) {
|
||||
if err := os.Remove(file.SQLTestConfig.WatchedAddressesFilePath); !errors.Is(err, os.ErrNotExist) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
|
@ -44,20 +44,17 @@ import (
|
||||
)
|
||||
|
||||
func setupIndexer(t *testing.T) {
|
||||
file.TestConfig.Mode = file.SQL
|
||||
file.TestConfig.WatchedAddressesFilePath = "./statediffing_watched_addresses_test_file.sql"
|
||||
|
||||
if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
|
||||
err := os.Remove(file.TestConfig.FilePath)
|
||||
if _, err := os.Stat(file.SQLTestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
|
||||
err := os.Remove(file.SQLTestConfig.FilePath)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
if _, err := os.Stat(file.TestConfig.WatchedAddressesFilePath); !errors.Is(err, os.ErrNotExist) {
|
||||
err := os.Remove(file.TestConfig.WatchedAddressesFilePath)
|
||||
if _, err := os.Stat(file.SQLTestConfig.WatchedAddressesFilePath); !errors.Is(err, os.ErrNotExist) {
|
||||
err := os.Remove(file.SQLTestConfig.WatchedAddressesFilePath)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
ind, err = file.NewStateDiffIndexer(context.Background(), mocks.TestConfig, file.TestConfig)
|
||||
ind, err = file.NewStateDiffIndexer(context.Background(), mocks.TestConfig, file.SQLTestConfig)
|
||||
require.NoError(t, err)
|
||||
|
||||
connStr := postgres.DefaultConfig.DbConnectionString()
|
||||
|
184
statediff/indexer/database/file/types/schema.go
Normal file
184
statediff/indexer/database/file/types/schema.go
Normal file
@ -0,0 +1,184 @@
|
||||
// Copyright 2022 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package types
|
||||
|
||||
var TableIPLDBlock = Table{
|
||||
`public.blocks`,
|
||||
[]column{
|
||||
{name: "block_number", dbType: bigint},
|
||||
{name: "key", dbType: text},
|
||||
{name: "data", dbType: bytea},
|
||||
},
|
||||
}
|
||||
|
||||
var TableNodeInfo = Table{
|
||||
Name: `public.nodes`,
|
||||
Columns: []column{
|
||||
{name: "genesis_block", dbType: varchar},
|
||||
{name: "network_id", dbType: varchar},
|
||||
{name: "node_id", dbType: varchar},
|
||||
{name: "client_name", dbType: varchar},
|
||||
{name: "chain_id", dbType: integer},
|
||||
},
|
||||
}
|
||||
|
||||
var TableHeader = Table{
|
||||
"eth.header_cids",
|
||||
[]column{
|
||||
{name: "block_number", dbType: bigint},
|
||||
{name: "block_hash", dbType: varchar},
|
||||
{name: "parent_hash", dbType: varchar},
|
||||
{name: "cid", dbType: text},
|
||||
{name: "td", dbType: numeric},
|
||||
{name: "node_id", dbType: varchar},
|
||||
{name: "reward", dbType: numeric},
|
||||
{name: "state_root", dbType: varchar},
|
||||
{name: "tx_root", dbType: varchar},
|
||||
{name: "receipt_root", dbType: varchar},
|
||||
{name: "uncle_root", dbType: varchar},
|
||||
{name: "bloom", dbType: bytea},
|
||||
{name: "timestamp", dbType: numeric},
|
||||
{name: "mh_key", dbType: text},
|
||||
{name: "times_validated", dbType: integer},
|
||||
{name: "coinbase", dbType: varchar},
|
||||
},
|
||||
}
|
||||
|
||||
var TableStateNode = Table{
|
||||
"eth.state_cids",
|
||||
[]column{
|
||||
{name: "block_number", dbType: bigint},
|
||||
{name: "header_id", dbType: varchar},
|
||||
{name: "state_leaf_key", dbType: varchar},
|
||||
{name: "cid", dbType: text},
|
||||
{name: "state_path", dbType: bytea},
|
||||
{name: "node_type", dbType: integer},
|
||||
{name: "diff", dbType: boolean},
|
||||
{name: "mh_key", dbType: text},
|
||||
},
|
||||
}
|
||||
|
||||
var TableStorageNode = Table{
|
||||
"eth.storage_cids",
|
||||
[]column{
|
||||
{name: "block_number", dbType: bigint},
|
||||
{name: "header_id", dbType: varchar},
|
||||
{name: "state_path", dbType: bytea},
|
||||
{name: "storage_leaf_key", dbType: varchar},
|
||||
{name: "cid", dbType: text},
|
||||
{name: "storage_path", dbType: bytea},
|
||||
{name: "node_type", dbType: integer},
|
||||
{name: "diff", dbType: boolean},
|
||||
{name: "mh_key", dbType: text},
|
||||
},
|
||||
}
|
||||
|
||||
var TableUncle = Table{
|
||||
"eth.uncle_cids",
|
||||
[]column{
|
||||
{name: "block_number", dbType: bigint},
|
||||
{name: "block_hash", dbType: varchar},
|
||||
{name: "header_id", dbType: varchar},
|
||||
{name: "parent_hash", dbType: varchar},
|
||||
{name: "cid", dbType: text},
|
||||
{name: "reward", dbType: numeric},
|
||||
{name: "mh_key", dbType: text},
|
||||
},
|
||||
}
|
||||
|
||||
var TableTransaction = Table{
|
||||
"eth.transaction_cids",
|
||||
[]column{
|
||||
{name: "block_number", dbType: bigint},
|
||||
{name: "header_id", dbType: varchar},
|
||||
{name: "tx_hash", dbType: varchar},
|
||||
{name: "cid", dbType: text},
|
||||
{name: "dst", dbType: varchar},
|
||||
{name: "src", dbType: varchar},
|
||||
{name: "index", dbType: integer},
|
||||
{name: "mh_key", dbType: text},
|
||||
{name: "tx_data", dbType: bytea},
|
||||
{name: "tx_type", dbType: integer},
|
||||
{name: "value", dbType: numeric},
|
||||
},
|
||||
}
|
||||
|
||||
var TableAccessListElement = Table{
|
||||
"eth.access_list_elements",
|
||||
[]column{
|
||||
{name: "block_number", dbType: bigint},
|
||||
{name: "tx_id", dbType: varchar},
|
||||
{name: "index", dbType: integer},
|
||||
{name: "address", dbType: varchar},
|
||||
{name: "storage_keys", dbType: varchar, isArray: true},
|
||||
},
|
||||
}
|
||||
|
||||
var TableReceipt = Table{
|
||||
"eth.receipt_cids",
|
||||
[]column{
|
||||
{name: "block_number", dbType: bigint},
|
||||
{name: "tx_id", dbType: varchar},
|
||||
{name: "leaf_cid", dbType: text},
|
||||
{name: "contract", dbType: varchar},
|
||||
{name: "contract_hash", dbType: varchar},
|
||||
{name: "leaf_mh_key", dbType: text},
|
||||
{name: "post_state", dbType: varchar},
|
||||
{name: "post_status", dbType: integer},
|
||||
{name: "log_root", dbType: varchar},
|
||||
},
|
||||
}
|
||||
|
||||
var TableLog = Table{
|
||||
"eth.log_cids",
|
||||
[]column{
|
||||
{name: "block_number", dbType: bigint},
|
||||
{name: "leaf_cid", dbType: text},
|
||||
{name: "leaf_mh_key", dbType: text},
|
||||
{name: "rct_id", dbType: varchar},
|
||||
{name: "address", dbType: varchar},
|
||||
{name: "index", dbType: integer},
|
||||
{name: "topic0", dbType: varchar},
|
||||
{name: "topic1", dbType: varchar},
|
||||
{name: "topic2", dbType: varchar},
|
||||
{name: "topic3", dbType: varchar},
|
||||
{name: "log_data", dbType: bytea},
|
||||
},
|
||||
}
|
||||
|
||||
var TableStateAccount = Table{
|
||||
"eth.state_accounts",
|
||||
[]column{
|
||||
{name: "block_number", dbType: bigint},
|
||||
{name: "header_id", dbType: varchar},
|
||||
{name: "state_path", dbType: bytea},
|
||||
{name: "balance", dbType: numeric},
|
||||
{name: "nonce", dbType: bigint},
|
||||
{name: "code_hash", dbType: bytea},
|
||||
{name: "storage_root", dbType: varchar},
|
||||
},
|
||||
}
|
||||
|
||||
var TableWatchedAddresses = Table{
|
||||
"eth_meta.watched_addresses",
|
||||
[]column{
|
||||
{name: "address", dbType: varchar},
|
||||
{name: "created_at", dbType: bigint},
|
||||
{name: "watched_at", dbType: bigint},
|
||||
{name: "last_filled_at", dbType: bigint},
|
||||
},
|
||||
}
|
@ -37,7 +37,7 @@ const (
|
||||
|
||||
type column struct {
|
||||
name string
|
||||
typ colType
|
||||
dbType colType
|
||||
isArray bool
|
||||
}
|
||||
type Table struct {
|
||||
@ -48,10 +48,10 @@ type Table struct {
|
||||
func (tbl *Table) ToCsvRow(args ...interface{}) []string {
|
||||
var row []string
|
||||
for i, col := range tbl.Columns {
|
||||
value := col.typ.formatter()(args[i])
|
||||
value := col.dbType.formatter()(args[i])
|
||||
|
||||
if col.isArray {
|
||||
valueList := funk.Map(args[i], col.typ.formatter()).([]string)
|
||||
valueList := funk.Map(args[i], col.dbType.formatter()).([]string)
|
||||
value = fmt.Sprintf("{%s}", strings.Join(valueList, ","))
|
||||
}
|
||||
|
||||
@ -62,7 +62,7 @@ func (tbl *Table) ToCsvRow(args ...interface{}) []string {
|
||||
|
||||
func (tbl *Table) VarcharColumns() []string {
|
||||
columns := funk.Filter(tbl.Columns, func(col column) bool {
|
||||
return col.typ == varchar
|
||||
return col.dbType == varchar
|
||||
}).([]column)
|
||||
|
||||
columnNames := funk.Map(columns, func(col column) string {
|
@ -1,184 +0,0 @@
|
||||
// Copyright 2022 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package types
|
||||
|
||||
var TableIPLDBlock = Table{
|
||||
`public.blocks`,
|
||||
[]column{
|
||||
{name: "block_number", typ: bigint},
|
||||
{name: "key", typ: text},
|
||||
{name: "data", typ: bytea},
|
||||
},
|
||||
}
|
||||
|
||||
var TableNodeInfo = Table{
|
||||
Name: `public.nodes`,
|
||||
Columns: []column{
|
||||
{name: "genesis_block", typ: varchar},
|
||||
{name: "network_id", typ: varchar},
|
||||
{name: "node_id", typ: varchar},
|
||||
{name: "client_name", typ: varchar},
|
||||
{name: "chain_id", typ: integer},
|
||||
},
|
||||
}
|
||||
|
||||
var TableHeader = Table{
|
||||
"eth.header_cids",
|
||||
[]column{
|
||||
{name: "block_number", typ: bigint},
|
||||
{name: "block_hash", typ: varchar},
|
||||
{name: "parent_hash", typ: varchar},
|
||||
{name: "cid", typ: text},
|
||||
{name: "td", typ: numeric},
|
||||
{name: "node_id", typ: varchar},
|
||||
{name: "reward", typ: numeric},
|
||||
{name: "state_root", typ: varchar},
|
||||
{name: "tx_root", typ: varchar},
|
||||
{name: "receipt_root", typ: varchar},
|
||||
{name: "uncle_root", typ: varchar},
|
||||
{name: "bloom", typ: bytea},
|
||||
{name: "timestamp", typ: numeric},
|
||||
{name: "mh_key", typ: text},
|
||||
{name: "times_validated", typ: integer},
|
||||
{name: "coinbase", typ: varchar},
|
||||
},
|
||||
}
|
||||
|
||||
var TableStateNode = Table{
|
||||
"eth.state_cids",
|
||||
[]column{
|
||||
{name: "block_number", typ: bigint},
|
||||
{name: "header_id", typ: varchar},
|
||||
{name: "state_leaf_key", typ: varchar},
|
||||
{name: "cid", typ: text},
|
||||
{name: "state_path", typ: bytea},
|
||||
{name: "node_type", typ: integer},
|
||||
{name: "diff", typ: boolean},
|
||||
{name: "mh_key", typ: text},
|
||||
},
|
||||
}
|
||||
|
||||
var TableStorageNode = Table{
|
||||
"eth.storage_cids",
|
||||
[]column{
|
||||
{name: "block_number", typ: bigint},
|
||||
{name: "header_id", typ: varchar},
|
||||
{name: "state_path", typ: bytea},
|
||||
{name: "storage_leaf_key", typ: varchar},
|
||||
{name: "cid", typ: text},
|
||||
{name: "storage_path", typ: bytea},
|
||||
{name: "node_type", typ: integer},
|
||||
{name: "diff", typ: boolean},
|
||||
{name: "mh_key", typ: text},
|
||||
},
|
||||
}
|
||||
|
||||
var TableUncle = Table{
|
||||
"eth.uncle_cids",
|
||||
[]column{
|
||||
{name: "block_number", typ: bigint},
|
||||
{name: "block_hash", typ: varchar},
|
||||
{name: "header_id", typ: varchar},
|
||||
{name: "parent_hash", typ: varchar},
|
||||
{name: "cid", typ: text},
|
||||
{name: "reward", typ: numeric},
|
||||
{name: "mh_key", typ: text},
|
||||
},
|
||||
}
|
||||
|
||||
var TableTransaction = Table{
|
||||
"eth.transaction_cids",
|
||||
[]column{
|
||||
{name: "block_number", typ: bigint},
|
||||
{name: "header_id", typ: varchar},
|
||||
{name: "tx_hash", typ: varchar},
|
||||
{name: "cid", typ: text},
|
||||
{name: "dst", typ: varchar},
|
||||
{name: "src", typ: varchar},
|
||||
{name: "index", typ: integer},
|
||||
{name: "mh_key", typ: text},
|
||||
{name: "tx_data", typ: bytea},
|
||||
{name: "tx_type", typ: integer},
|
||||
{name: "value", typ: numeric},
|
||||
},
|
||||
}
|
||||
|
||||
var TableAccessListElement = Table{
|
||||
"eth.access_list_elements",
|
||||
[]column{
|
||||
{name: "block_number", typ: bigint},
|
||||
{name: "tx_id", typ: varchar},
|
||||
{name: "index", typ: integer},
|
||||
{name: "address", typ: varchar},
|
||||
{name: "storage_keys", typ: varchar, isArray: true},
|
||||
},
|
||||
}
|
||||
|
||||
var TableReceipt = Table{
|
||||
"eth.receipt_cids",
|
||||
[]column{
|
||||
{name: "block_number", typ: bigint},
|
||||
{name: "tx_id", typ: varchar},
|
||||
{name: "leaf_cid", typ: text},
|
||||
{name: "contract", typ: varchar},
|
||||
{name: "contract_hash", typ: varchar},
|
||||
{name: "leaf_mh_key", typ: text},
|
||||
{name: "post_state", typ: varchar},
|
||||
{name: "post_status", typ: integer},
|
||||
{name: "log_root", typ: varchar},
|
||||
},
|
||||
}
|
||||
|
||||
var TableLog = Table{
|
||||
"eth.log_cids",
|
||||
[]column{
|
||||
{name: "block_number", typ: bigint},
|
||||
{name: "leaf_cid", typ: text},
|
||||
{name: "leaf_mh_key", typ: text},
|
||||
{name: "rct_id", typ: varchar},
|
||||
{name: "address", typ: varchar},
|
||||
{name: "index", typ: integer},
|
||||
{name: "topic0", typ: varchar},
|
||||
{name: "topic1", typ: varchar},
|
||||
{name: "topic2", typ: varchar},
|
||||
{name: "topic3", typ: varchar},
|
||||
{name: "log_data", typ: bytea},
|
||||
},
|
||||
}
|
||||
|
||||
var TableStateAccount = Table{
|
||||
"eth.state_accounts",
|
||||
[]column{
|
||||
{name: "block_number", typ: bigint},
|
||||
{name: "header_id", typ: varchar},
|
||||
{name: "state_path", typ: bytea},
|
||||
{name: "balance", typ: numeric},
|
||||
{name: "nonce", typ: bigint},
|
||||
{name: "code_hash", typ: bytea},
|
||||
{name: "storage_root", typ: varchar},
|
||||
},
|
||||
}
|
||||
|
||||
var TableWatchedAddresses = Table{
|
||||
"eth_meta.watched_addresses",
|
||||
[]column{
|
||||
{name: "address", typ: varchar},
|
||||
{name: "created_at", typ: bigint},
|
||||
{name: "watched_at", typ: bigint},
|
||||
{name: "last_filled_at", typ: bigint},
|
||||
},
|
||||
}
|
Loading…
Reference in New Issue
Block a user