From 52df3460d6cb74acd95284ef34d083403a461d5f Mon Sep 17 00:00:00 2001 From: prathamesh0 Date: Mon, 21 Mar 2022 18:03:48 +0530 Subject: [PATCH] Create file for watched addresses only when required --- statediff/indexer/database/file/config.go | 6 +- statediff/indexer/database/file/indexer.go | 60 +++++++++++-------- .../database/file/indexer_legacy_test.go | 7 ++- .../indexer/database/file/indexer_test.go | 16 ++--- 4 files changed, 49 insertions(+), 40 deletions(-) diff --git a/statediff/indexer/database/file/config.go b/statediff/indexer/database/file/config.go index ccf2c2e5f..a075e896b 100644 --- a/statediff/indexer/database/file/config.go +++ b/statediff/indexer/database/file/config.go @@ -24,8 +24,8 @@ import ( // Config holds params for writing sql statements out to a file type Config struct { FilePath string - NodeInfo node.Info WatchedAddressesFilePath string + NodeInfo node.Info } // Type satisfies interfaces.Config @@ -35,7 +35,8 @@ func (c Config) Type() shared.DBType { // TestConfig config for unit tests var TestConfig = Config{ - FilePath: "./statediffing_test_file.sql", + FilePath: "./statediffing_test_file.sql", + WatchedAddressesFilePath: "./statediffing_watched_addresses_test_file.sql", NodeInfo: node.Info{ GenesisBlock: "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3", NetworkID: "1", @@ -43,5 +44,4 @@ var TestConfig = Config{ ID: "mockNodeID", ClientName: "go-ethereum", }, - WatchedAddressesFilePath: "./statediffing_watched_addresses_test_file.sql", } diff --git a/statediff/indexer/database/file/indexer.go b/statediff/indexer/database/file/indexer.go index 490b147c4..c842cef12 100644 --- a/statediff/indexer/database/file/indexer.go +++ b/statediff/indexer/database/file/indexer.go @@ -86,13 +86,6 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c if watchedAddressesFilePath == "" { watchedAddressesFilePath = defaultWatchedAddressesFilePath } - if _, err := os.Stat(watchedAddressesFilePath); !errors.Is(err, os.ErrNotExist) { - return nil, fmt.Errorf("cannot create watched addresses file, file (%s) already exists", watchedAddressesFilePath) - } - _, err = os.Create(watchedAddressesFilePath) - if err != nil { - return nil, fmt.Errorf("unable to create file (%s), err: %v", filePath, err) - } log.Info("Writing watched addresses SQL statements to file", "file", watchedAddressesFilePath) w := NewSQLWriter(file) @@ -505,7 +498,7 @@ func (sdi *StateDiffIndexer) Close() error { // LoadWatchedAddresses loads watched addresses from a file func (sdi *StateDiffIndexer) LoadWatchedAddresses() ([]common.Address, error) { // load sql statements from watched addresses file - stmts, err := loadWatchedAddressesStmts(sdi.watchedAddressesFilePath) + stmts, err := loadWatchedAddressesStatements(sdi.watchedAddressesFilePath) if err != nil { return nil, err } @@ -513,7 +506,7 @@ func (sdi *StateDiffIndexer) LoadWatchedAddresses() ([]common.Address, error) { // extract addresses from the sql statements watchedAddresses := []common.Address{} for _, stmt := range stmts { - addressString, err := parseWatchedAddressStmt(stmt) + addressString, err := parseWatchedAddressStatement(stmt) if err != nil { return nil, err } @@ -526,7 +519,7 @@ func (sdi *StateDiffIndexer) LoadWatchedAddresses() ([]common.Address, error) { // InsertWatchedAddresses inserts the given addresses in a file func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error { // load sql statements from watched addresses file - stmts, err := loadWatchedAddressesStmts(sdi.watchedAddressesFilePath) + stmts, err := loadWatchedAddressesStatements(sdi.watchedAddressesFilePath) if err != nil { return err } @@ -534,7 +527,7 @@ func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressA // get already watched addresses var watchedAddresses []string for _, stmt := range stmts { - addressString, err := parseWatchedAddressStmt(stmt) + addressString, err := parseWatchedAddressStatement(stmt) if err != nil { return err } @@ -553,21 +546,21 @@ func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressA stmts = append(stmts, stmt) } - return dumpWatchedAddressesStmts(sdi.watchedAddressesFilePath, stmts) + return dumpWatchedAddressesStatements(sdi.watchedAddressesFilePath, stmts) } // RemoveWatchedAddresses removes the given watched addresses from a file func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressArg) error { // load sql statements from watched addresses file - stmts, err := loadWatchedAddressesStmts(sdi.watchedAddressesFilePath) + stmts, err := loadWatchedAddressesStatements(sdi.watchedAddressesFilePath) if err != nil { return err } // get rid of statements having addresses to be removed - var updatedStmts []string + var filteredStmts []string for _, stmt := range stmts { - addressString, err := parseWatchedAddressStmt(stmt) + addressString, err := parseWatchedAddressStatement(stmt) if err != nil { return err } @@ -577,11 +570,11 @@ func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressA }) if !toRemove { - updatedStmts = append(updatedStmts, stmt) + filteredStmts = append(filteredStmts, stmt) } } - return dumpWatchedAddressesStmts(sdi.watchedAddressesFilePath, updatedStmts) + return dumpWatchedAddressesStatements(sdi.watchedAddressesFilePath, filteredStmts) } // SetWatchedAddresses clears and inserts the given addresses in a file @@ -592,18 +585,22 @@ func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg, stmts = append(stmts, stmt) } - return dumpWatchedAddressesStmts(sdi.watchedAddressesFilePath, stmts) + return dumpWatchedAddressesStatements(sdi.watchedAddressesFilePath, stmts) } // ClearWatchedAddresses clears all the watched addresses from a file func (sdi *StateDiffIndexer) ClearWatchedAddresses() error { - return sdi.SetWatchedAddresses([]sdtypes.WatchAddressArg{}, common.Big0) + return sdi.SetWatchedAddresses([]sdtypes.WatchAddressArg{}, big.NewInt(0)) } -// loadWatchedAddressesStmts loads sql statements from the given file in a string slice -func loadWatchedAddressesStmts(filePath string) ([]string, error) { +// loadWatchedAddressesStatements loads sql statements from the given file in a string slice +func loadWatchedAddressesStatements(filePath string) ([]string, error) { file, err := os.Open(filePath) if err != nil { + if errors.Is(err, os.ErrNotExist) { + return []string{}, nil + } + return nil, fmt.Errorf("error opening watched addresses file: %v", err) } defer file.Close() @@ -621,8 +618,8 @@ func loadWatchedAddressesStmts(filePath string) ([]string, error) { return stmts, nil } -// dumpWatchedAddressesStmts dumps sql statements to the given file -func dumpWatchedAddressesStmts(filePath string, stmts []string) error { +// dumpWatchedAddressesStatements dumps sql statements to the given file +func dumpWatchedAddressesStatements(filePath string, stmts []string) error { file, err := os.Create(filePath) if err != nil { return fmt.Errorf("error creating watched addresses file: %v", err) @@ -639,12 +636,23 @@ func dumpWatchedAddressesStmts(filePath string, stmts []string) error { return nil } -// parseWatchedAddressStmt parses given sql insert statement to extract the address argument -func parseWatchedAddressStmt(stmt string) (string, error) { +// parseWatchedAddressStatement parses given sql insert statement to extract the address argument +func parseWatchedAddressStatement(stmt string) (string, error) { parseResult, err := pg_query.Parse(stmt) if err != nil { return "", fmt.Errorf("error parsing sql stmt: %v", err) } - return parseResult.Stmts[0].Stmt.GetInsertStmt().SelectStmt.GetSelectStmt().ValuesLists[0].GetList().Items[0].GetAConst().GetVal().GetString_().Str, nil + // extract address argument from parse output for a SQL statement of form + // "INSERT INTO eth_meta.watched_addresses (address, created_at, watched_at) + // VALUES ('0xabc', '123', '130') ON CONFLICT (address) DO NOTHING;" + addressString := parseResult.Stmts[0].Stmt.GetInsertStmt(). + SelectStmt.GetSelectStmt(). + ValuesLists[0].GetList(). + Items[0].GetAConst(). + GetVal(). + GetString_(). + Str + + return addressString, nil } diff --git a/statediff/indexer/database/file/indexer_legacy_test.go b/statediff/indexer/database/file/indexer_legacy_test.go index aa5a4ae41..e89d168aa 100644 --- a/statediff/indexer/database/file/indexer_legacy_test.go +++ b/statediff/indexer/database/file/indexer_legacy_test.go @@ -89,7 +89,7 @@ func dumpFileData(t *testing.T) { require.NoError(t, err) } -func dumpWatchedAddressesFileData(t *testing.T) { +func resetAndDumpWatchedAddressesFileData(t *testing.T) { resetDB(t) sqlFileBytes, err := os.ReadFile(file.TestConfig.WatchedAddressesFilePath) @@ -115,8 +115,9 @@ func tearDown(t *testing.T) { err := os.Remove(file.TestConfig.FilePath) require.NoError(t, err) - err = os.Remove(file.TestConfig.WatchedAddressesFilePath) - require.NoError(t, err) + if err := os.Remove(file.TestConfig.WatchedAddressesFilePath); !errors.Is(err, os.ErrNotExist) { + require.NoError(t, err) + } err = sqlxdb.Close() require.NoError(t, err) diff --git a/statediff/indexer/database/file/indexer_test.go b/statediff/indexer/database/file/indexer_test.go index aa8f059e2..fb5453fe6 100644 --- a/statediff/indexer/database/file/indexer_test.go +++ b/statediff/indexer/database/file/indexer_test.go @@ -769,7 +769,7 @@ func TestFileWatchAddressMethods(t *testing.T) { err = ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt1))) require.NoError(t, err) - dumpWatchedAddressesFileData(t) + resetAndDumpWatchedAddressesFileData(t) rows := []res{} err = sqlxdb.Select(&rows, pgStr) @@ -817,7 +817,7 @@ func TestFileWatchAddressMethods(t *testing.T) { err = ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt2))) require.NoError(t, err) - dumpWatchedAddressesFileData(t) + resetAndDumpWatchedAddressesFileData(t) rows := []res{} err = sqlxdb.Select(&rows, pgStr) @@ -853,7 +853,7 @@ func TestFileWatchAddressMethods(t *testing.T) { err = ind.RemoveWatchedAddresses(args) require.NoError(t, err) - dumpWatchedAddressesFileData(t) + resetAndDumpWatchedAddressesFileData(t) rows := []res{} err = sqlxdb.Select(&rows, pgStr) @@ -882,7 +882,7 @@ func TestFileWatchAddressMethods(t *testing.T) { err = ind.RemoveWatchedAddresses(args) require.NoError(t, err) - dumpWatchedAddressesFileData(t) + resetAndDumpWatchedAddressesFileData(t) rows := []res{} err = sqlxdb.Select(&rows, pgStr) @@ -934,7 +934,7 @@ func TestFileWatchAddressMethods(t *testing.T) { err = ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt2))) require.NoError(t, err) - dumpWatchedAddressesFileData(t) + resetAndDumpWatchedAddressesFileData(t) rows := []res{} err = sqlxdb.Select(&rows, pgStr) @@ -986,7 +986,7 @@ func TestFileWatchAddressMethods(t *testing.T) { err = ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt3))) require.NoError(t, err) - dumpWatchedAddressesFileData(t) + resetAndDumpWatchedAddressesFileData(t) rows := []res{} err = sqlxdb.Select(&rows, pgStr) @@ -1021,7 +1021,7 @@ func TestFileWatchAddressMethods(t *testing.T) { err = ind.ClearWatchedAddresses() require.NoError(t, err) - dumpWatchedAddressesFileData(t) + resetAndDumpWatchedAddressesFileData(t) rows := []res{} err = sqlxdb.Select(&rows, pgStr) @@ -1040,7 +1040,7 @@ func TestFileWatchAddressMethods(t *testing.T) { err = ind.ClearWatchedAddresses() require.NoError(t, err) - dumpWatchedAddressesFileData(t) + resetAndDumpWatchedAddressesFileData(t) rows := []res{} err = sqlxdb.Select(&rows, pgStr)