Close files in CSV writer

This commit is contained in:
nabarun 2022-06-23 16:39:21 +05:30
parent 2a9e07de83
commit 043502a93d
8 changed files with 43 additions and 22 deletions

View File

@ -219,7 +219,7 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
indexerConfig = file.Config{
Mode: fileMode,
OutputDir: ctx.GlobalString(utils.StateDiffFileCsvOutput.Name),
OutputDir: ctx.GlobalString(utils.StateDiffFileCsvDir.Name),
FilePath: ctx.GlobalString(utils.StateDiffFilePath.Name),
WatchedAddressesFilePath: ctx.GlobalString(utils.StateDiffWatchedAddressesFilePath.Name),
}

View File

@ -172,7 +172,7 @@ var (
utils.StateDiffWritingFlag,
utils.StateDiffWorkersFlag,
utils.StateDiffFileMode,
utils.StateDiffFileCsvOutput,
utils.StateDiffFileCsvDir,
utils.StateDiffFilePath,
utils.StateDiffKnownGapsFilePath,
utils.StateDiffWaitForSync,

View File

@ -245,7 +245,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.StateDiffWritingFlag,
utils.StateDiffWorkersFlag,
utils.StateDiffFileMode,
utils.StateDiffFileCsvOutput,
utils.StateDiffFileCsvDir,
utils.StateDiffFilePath,
utils.StateDiffKnownGapsFilePath,
utils.StateDiffWaitForSync,

View File

@ -907,8 +907,8 @@ var (
Usage: "Statediff file writing mode (current options: csv, sql)",
Value: "csv",
}
StateDiffFileCsvOutput = cli.StringFlag{
Name: "statediff.file.csvoutput",
StateDiffFileCsvDir = cli.StringFlag{
Name: "statediff.file.csvdir",
Usage: "Full path of output directory to write statediff data out to when operating in csv file mode",
}
StateDiffFilePath = cli.StringFlag{

View File

@ -45,7 +45,7 @@ func ResolveFileMode(str string) (FileMode, error) {
}
}
// Config holds params for writing CSV files out to a directory
// Config holds params for writing out CSV or SQL files
type Config struct {
Mode FileMode
OutputDir string

View File

@ -67,6 +67,7 @@ type CSVWriter struct {
type fileWriter struct {
*csv.Writer
file *os.File
}
// fileWriters wraps the file writers for each output table
@ -77,13 +78,13 @@ func newFileWriter(path string) (ret fileWriter, err error) {
if err != nil {
return
}
ret = fileWriter{csv.NewWriter(file)}
return
}
func (tx fileWriters) write(tbl *types.Table, args ...interface{}) error {
row := tbl.ToCsvRow(args...)
return tx[tbl.Name].Write(row)
ret = fileWriter{
Writer: csv.NewWriter(file),
file: file,
}
return
}
func makeFileWriters(dir string, tables []*types.Table) (fileWriters, error) {
@ -101,6 +102,21 @@ func makeFileWriters(dir string, tables []*types.Table) (fileWriters, error) {
return writers, nil
}
func (tx fileWriters) write(tbl *types.Table, args ...interface{}) error {
row := tbl.ToCsvRow(args...)
return tx[tbl.Name].Write(row)
}
func (tx fileWriters) close() error {
for _, w := range tx {
err := w.file.Close()
if err != nil {
return err
}
}
return nil
}
func (tx fileWriters) flush() error {
for _, w := range tx {
w.Flush()
@ -137,10 +153,10 @@ func (csw *CSVWriter) Loop() {
for {
select {
case row := <-csw.rows:
// TODO: Check available buffer size and flush
csw.writers.flush()
csw.writers.write(&row.table, row.values...)
err := csw.writers.write(&row.table, row.values...)
if err != nil {
panic(fmt.Sprintf("error writing csv buffer: %v", err))
}
case <-csw.quitChan:
if err := csw.writers.flush(); err != nil {
panic(fmt.Sprintf("error writing csv buffer to file: %v", err))
@ -168,7 +184,7 @@ func TableFile(dir, name string) string { return filepath.Join(dir, name+".csv")
func (csw *CSVWriter) Close() error {
close(csw.quitChan)
<-csw.doneChan
return nil
return csw.writers.close()
}
func (csw *CSVWriter) upsertNode(node nodeinfo.Info) {

View File

@ -25,14 +25,13 @@ import (
// Writer interface required by the file indexer
type FileWriter interface {
// Methods used to control the writer
Loop()
Close() error
Flush()
// Methods to write out data to tables
upsertNode(node nodeinfo.Info)
upsertIPLD(ipld models.IPLDModel)
upsertIPLDDirect(blockNumber, key string, value []byte)
upsertIPLDNode(blockNumber string, i node.Node)
upsertIPLDRaw(blockNumber string, codec, mh uint64, raw []byte) (string, string, error)
upsertHeaderCID(header models.HeaderModel)
upsertUncleCID(uncle models.UncleModel)
upsertTransactionCID(transaction models.TxModel)
@ -42,4 +41,10 @@ type FileWriter interface {
upsertStateCID(stateNode models.StateNodeModel)
upsertStateAccount(stateAccount models.StateAccountModel)
upsertStorageCID(storageCID models.StorageNodeModel)
upsertIPLD(ipld models.IPLDModel)
// Methods to upsert IPLD in different ways
upsertIPLDDirect(blockNumber, key string, value []byte)
upsertIPLDNode(blockNumber string, i node.Node)
upsertIPLDRaw(blockNumber string, codec, mh uint64, raw []byte) (string, string, error)
}

View File

@ -161,7 +161,7 @@ var TableLog = Table{
}
var TableStateAccount = Table{
"eth.state_account",
"eth.state_accounts",
[]column{
{name: "block_number", typ: bigint},
{name: "header_id", typ: varchar},