Add mode to write to CSV files in statediff file writer #249
@ -211,8 +211,16 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
|
||||
}
|
||||
switch dbType {
|
||||
case shared.FILE:
|
||||
fileModeStr := ctx.GlobalString(utils.StateDiffFileMode.Name)
|
||||
fileMode, err := file.ResolveFileMode(fileModeStr)
|
||||
if err != nil {
|
||||
utils.Fatalf("%v", err)
|
||||
}
|
||||
|
||||
indexerConfig = file.Config{
|
||||
OutputDir: ctx.GlobalString(utils.StateDiffFilePath.Name),
|
||||
Mode: fileMode,
|
||||
OutputDir: ctx.GlobalString(utils.StateDiffFileCsvOutput.Name),
|
||||
FilePath: ctx.GlobalString(utils.StateDiffFilePath.Name),
|
||||
WatchedAddressesFilePath: ctx.GlobalString(utils.StateDiffWatchedAddressesFilePath.Name),
|
||||
}
|
||||
case shared.POSTGRES:
|
||||
|
@ -171,6 +171,8 @@ var (
|
||||
utils.StateDiffDBClientNameFlag,
|
||||
utils.StateDiffWritingFlag,
|
||||
utils.StateDiffWorkersFlag,
|
||||
utils.StateDiffFileMode,
|
||||
utils.StateDiffFileCsvOutput,
|
||||
utils.StateDiffFilePath,
|
||||
utils.StateDiffKnownGapsFilePath,
|
||||
utils.StateDiffWaitForSync,
|
||||
|
@ -244,6 +244,8 @@ var AppHelpFlagGroups = []flags.FlagGroup{
|
||||
utils.StateDiffDBClientNameFlag,
|
||||
utils.StateDiffWritingFlag,
|
||||
utils.StateDiffWorkersFlag,
|
||||
utils.StateDiffFileMode,
|
||||
utils.StateDiffFileCsvOutput,
|
||||
utils.StateDiffFilePath,
|
||||
utils.StateDiffKnownGapsFilePath,
|
||||
utils.StateDiffWaitForSync,
|
||||
|
@ -902,9 +902,18 @@ var (
|
||||
Name: "statediff.db.nodeid",
|
||||
Usage: "Node ID to use when writing state diffs to database",
|
||||
}
|
||||
StateDiffFileMode = cli.StringFlag{
|
||||
Name: "statediff.file.mode",
|
||||
Usage: "Statediff file writing mode (current options: csv, sql)",
|
||||
Value: "csv",
|
||||
}
|
||||
StateDiffFileCsvOutput = cli.StringFlag{
|
||||
Name: "statediff.file.csvoutput",
|
||||
Usage: "Full path of output directory to write statediff data out to when operating in csv file mode",
|
||||
}
|
||||
StateDiffFilePath = cli.StringFlag{
|
||||
Name: "statediff.file.path",
|
||||
Usage: "Full path (including filename) to write statediff data out to when operating in file mode",
|
||||
Usage: "Full path (including filename) to write statediff data out to when operating in sql file mode",
|
||||
}
|
||||
StateDiffKnownGapsFilePath = cli.StringFlag{
|
||||
Name: "statediff.knowngapsfile.path",
|
||||
|
@ -17,13 +17,39 @@
|
||||
package file
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/node"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
|
||||
)
|
||||
|
||||
// FileMode to explicitly type the mode of file writer we are using
|
||||
type FileMode string
|
||||
|
||||
const (
|
||||
CSV FileMode = "CSV"
|
||||
SQL FileMode = "SQL"
|
||||
Unknown FileMode = "Unknown"
|
||||
)
|
||||
|
||||
// ResolveFileMode resolves a FileMode from a provided string
|
||||
func ResolveFileMode(str string) (FileMode, error) {
|
||||
switch strings.ToLower(str) {
|
||||
case "csv":
|
||||
return CSV, nil
|
||||
case "sql":
|
||||
return SQL, nil
|
||||
default:
|
||||
return Unknown, fmt.Errorf("unrecognized file type string: %s", str)
|
||||
}
|
||||
}
|
||||
|
||||
// Config holds params for writing CSV files out to a directory
|
||||
type Config struct {
|
||||
Mode FileMode
|
||||
OutputDir string
|
||||
FilePath string
|
||||
WatchedAddressesFilePath string
|
||||
NodeInfo node.Info
|
||||
}
|
||||
@ -35,7 +61,9 @@ func (c Config) Type() shared.DBType {
|
||||
|
||||
// TestConfig config for unit tests
|
||||
var TestConfig = Config{
|
||||
Mode: CSV,
|
||||
OutputDir: "./statediffing_test",
|
||||
FilePath: "./statediffing_test_file.sql",
|
||||
WatchedAddressesFilePath: "./statediffing_watched_addresses_test_file.sql",
|
||||
NodeInfo: node.Info{
|
||||
GenesisBlock: "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3",
|
||||
|
@ -122,6 +122,7 @@ func NewCSVWriter(path string) (*CSVWriter, error) {
|
||||
csvWriter := &CSVWriter{
|
||||
writers: writers,
|
||||
dir: path,
|
||||
rows: make(chan tableRow),
|
||||
flushChan: make(chan struct{}),
|
||||
flushFinished: make(chan struct{}),
|
||||
quitChan: make(chan struct{}),
|
||||
|
@ -48,6 +48,7 @@ import (
|
||||
)
|
||||
|
||||
const defaultOutputDir = "./statediff_output"
|
||||
const defaultFilePath = "./statediff.sql"
|
||||
const defaultWatchedAddressesFilePath = "./statediff-watched-addresses.sql"
|
||||
|
||||
const watchedAddressesInsert = "INSERT INTO eth_meta.watched_addresses (address, created_at, watched_at) VALUES ('%s', '%d', '%d') ON CONFLICT (address) DO NOTHING;"
|
||||
@ -71,12 +72,39 @@ type StateDiffIndexer struct {
|
||||
|
||||
// NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer
|
||||
func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, config Config) (*StateDiffIndexer, error) {
|
||||
outputDir := config.OutputDir
|
||||
if outputDir == "" {
|
||||
outputDir = defaultOutputDir
|
||||
}
|
||||
var err error
|
||||
var writer FileWriter
|
||||
switch config.Mode {
|
||||
case CSV:
|
||||
outputDir := config.OutputDir
|
||||
if outputDir == "" {
|
||||
outputDir = defaultOutputDir
|
||||
}
|
||||
|
||||
log.Info("Writing statediff CSV files to directory", "file", outputDir)
|
||||
log.Info("Writing statediff CSV files to directory", "file", outputDir)
|
||||
|
||||
writer, err = NewCSVWriter(outputDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
case SQL:
|
||||
filePath := config.FilePath
|
||||
if filePath == "" {
|
||||
filePath = defaultFilePath
|
||||
}
|
||||
if _, err := os.Stat(filePath); !errors.Is(err, os.ErrNotExist) {
|
||||
return nil, fmt.Errorf("cannot create file, file (%s) already exists", filePath)
|
||||
}
|
||||
file, err := os.Create(filePath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to create file (%s), err: %v", filePath, err)
|
||||
}
|
||||
log.Info("Writing statediff SQL statements to file", "file", filePath)
|
||||
|
||||
writer = NewSQLWriter(file)
|
||||
default:
|
||||
return nil, fmt.Errorf("unrecognized file mode: %s", config.Mode)
|
||||
}
|
||||
|
||||
watchedAddressesFilePath := config.WatchedAddressesFilePath
|
||||
if watchedAddressesFilePath == "" {
|
||||
@ -84,16 +112,12 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c
|
||||
}
|
||||
log.Info("Writing watched addresses SQL statements to file", "file", watchedAddressesFilePath)
|
||||
|
||||
w, err := NewCSVWriter(outputDir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
wg := new(sync.WaitGroup)
|
||||
w.Loop()
|
||||
w.upsertNode(config.NodeInfo)
|
||||
writer.Loop()
|
||||
writer.upsertNode(config.NodeInfo)
|
||||
|
||||
return &StateDiffIndexer{
|
||||
fileWriter: w,
|
||||
fileWriter: writer,
|
||||
chainConfig: chainConfig,
|
||||
nodeID: config.NodeInfo.ID,
|
||||
wg: wg,
|
||||
|
Loading…
Reference in New Issue
Block a user