Create file for watched addresses only when required

This commit is contained in:
Prathamesh Musale 2022-03-21 18:03:48 +05:30
parent 710312378a
commit 52df3460d6
4 changed files with 49 additions and 40 deletions

View File

@ -24,8 +24,8 @@ import (
// Config holds params for writing sql statements out to a file // Config holds params for writing sql statements out to a file
type Config struct { type Config struct {
FilePath string FilePath string
NodeInfo node.Info
WatchedAddressesFilePath string WatchedAddressesFilePath string
NodeInfo node.Info
} }
// Type satisfies interfaces.Config // Type satisfies interfaces.Config
@ -35,7 +35,8 @@ func (c Config) Type() shared.DBType {
// TestConfig config for unit tests // TestConfig config for unit tests
var TestConfig = Config{ var TestConfig = Config{
FilePath: "./statediffing_test_file.sql", FilePath: "./statediffing_test_file.sql",
WatchedAddressesFilePath: "./statediffing_watched_addresses_test_file.sql",
NodeInfo: node.Info{ NodeInfo: node.Info{
GenesisBlock: "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3", GenesisBlock: "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3",
NetworkID: "1", NetworkID: "1",
@ -43,5 +44,4 @@ var TestConfig = Config{
ID: "mockNodeID", ID: "mockNodeID",
ClientName: "go-ethereum", ClientName: "go-ethereum",
}, },
WatchedAddressesFilePath: "./statediffing_watched_addresses_test_file.sql",
} }

View File

@ -86,13 +86,6 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c
if watchedAddressesFilePath == "" { if watchedAddressesFilePath == "" {
watchedAddressesFilePath = defaultWatchedAddressesFilePath watchedAddressesFilePath = defaultWatchedAddressesFilePath
} }
if _, err := os.Stat(watchedAddressesFilePath); !errors.Is(err, os.ErrNotExist) {
return nil, fmt.Errorf("cannot create watched addresses file, file (%s) already exists", watchedAddressesFilePath)
}
_, err = os.Create(watchedAddressesFilePath)
if err != nil {
return nil, fmt.Errorf("unable to create file (%s), err: %v", filePath, err)
}
log.Info("Writing watched addresses SQL statements to file", "file", watchedAddressesFilePath) log.Info("Writing watched addresses SQL statements to file", "file", watchedAddressesFilePath)
w := NewSQLWriter(file) w := NewSQLWriter(file)
@ -505,7 +498,7 @@ func (sdi *StateDiffIndexer) Close() error {
// LoadWatchedAddresses loads watched addresses from a file // LoadWatchedAddresses loads watched addresses from a file
func (sdi *StateDiffIndexer) LoadWatchedAddresses() ([]common.Address, error) { func (sdi *StateDiffIndexer) LoadWatchedAddresses() ([]common.Address, error) {
// load sql statements from watched addresses file // load sql statements from watched addresses file
stmts, err := loadWatchedAddressesStmts(sdi.watchedAddressesFilePath) stmts, err := loadWatchedAddressesStatements(sdi.watchedAddressesFilePath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -513,7 +506,7 @@ func (sdi *StateDiffIndexer) LoadWatchedAddresses() ([]common.Address, error) {
// extract addresses from the sql statements // extract addresses from the sql statements
watchedAddresses := []common.Address{} watchedAddresses := []common.Address{}
for _, stmt := range stmts { for _, stmt := range stmts {
addressString, err := parseWatchedAddressStmt(stmt) addressString, err := parseWatchedAddressStatement(stmt)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -526,7 +519,7 @@ func (sdi *StateDiffIndexer) LoadWatchedAddresses() ([]common.Address, error) {
// InsertWatchedAddresses inserts the given addresses in a file // InsertWatchedAddresses inserts the given addresses in a file
func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error { func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error {
// load sql statements from watched addresses file // load sql statements from watched addresses file
stmts, err := loadWatchedAddressesStmts(sdi.watchedAddressesFilePath) stmts, err := loadWatchedAddressesStatements(sdi.watchedAddressesFilePath)
if err != nil { if err != nil {
return err return err
} }
@ -534,7 +527,7 @@ func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressA
// get already watched addresses // get already watched addresses
var watchedAddresses []string var watchedAddresses []string
for _, stmt := range stmts { for _, stmt := range stmts {
addressString, err := parseWatchedAddressStmt(stmt) addressString, err := parseWatchedAddressStatement(stmt)
if err != nil { if err != nil {
return err return err
} }
@ -553,21 +546,21 @@ func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressA
stmts = append(stmts, stmt) stmts = append(stmts, stmt)
} }
return dumpWatchedAddressesStmts(sdi.watchedAddressesFilePath, stmts) return dumpWatchedAddressesStatements(sdi.watchedAddressesFilePath, stmts)
} }
// RemoveWatchedAddresses removes the given watched addresses from a file // RemoveWatchedAddresses removes the given watched addresses from a file
func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressArg) error { func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressArg) error {
// load sql statements from watched addresses file // load sql statements from watched addresses file
stmts, err := loadWatchedAddressesStmts(sdi.watchedAddressesFilePath) stmts, err := loadWatchedAddressesStatements(sdi.watchedAddressesFilePath)
if err != nil { if err != nil {
return err return err
} }
// get rid of statements having addresses to be removed // get rid of statements having addresses to be removed
var updatedStmts []string var filteredStmts []string
for _, stmt := range stmts { for _, stmt := range stmts {
addressString, err := parseWatchedAddressStmt(stmt) addressString, err := parseWatchedAddressStatement(stmt)
if err != nil { if err != nil {
return err return err
} }
@ -577,11 +570,11 @@ func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressA
}) })
if !toRemove { if !toRemove {
updatedStmts = append(updatedStmts, stmt) filteredStmts = append(filteredStmts, stmt)
} }
} }
return dumpWatchedAddressesStmts(sdi.watchedAddressesFilePath, updatedStmts) return dumpWatchedAddressesStatements(sdi.watchedAddressesFilePath, filteredStmts)
} }
// SetWatchedAddresses clears and inserts the given addresses in a file // SetWatchedAddresses clears and inserts the given addresses in a file
@ -592,18 +585,22 @@ func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg,
stmts = append(stmts, stmt) stmts = append(stmts, stmt)
} }
return dumpWatchedAddressesStmts(sdi.watchedAddressesFilePath, stmts) return dumpWatchedAddressesStatements(sdi.watchedAddressesFilePath, stmts)
} }
// ClearWatchedAddresses clears all the watched addresses from a file // ClearWatchedAddresses clears all the watched addresses from a file
func (sdi *StateDiffIndexer) ClearWatchedAddresses() error { func (sdi *StateDiffIndexer) ClearWatchedAddresses() error {
return sdi.SetWatchedAddresses([]sdtypes.WatchAddressArg{}, common.Big0) return sdi.SetWatchedAddresses([]sdtypes.WatchAddressArg{}, big.NewInt(0))
} }
// loadWatchedAddressesStmts loads sql statements from the given file in a string slice // loadWatchedAddressesStatements loads sql statements from the given file in a string slice
func loadWatchedAddressesStmts(filePath string) ([]string, error) { func loadWatchedAddressesStatements(filePath string) ([]string, error) {
file, err := os.Open(filePath) file, err := os.Open(filePath)
if err != nil { if err != nil {
if errors.Is(err, os.ErrNotExist) {
return []string{}, nil
}
return nil, fmt.Errorf("error opening watched addresses file: %v", err) return nil, fmt.Errorf("error opening watched addresses file: %v", err)
} }
defer file.Close() defer file.Close()
@ -621,8 +618,8 @@ func loadWatchedAddressesStmts(filePath string) ([]string, error) {
return stmts, nil return stmts, nil
} }
// dumpWatchedAddressesStmts dumps sql statements to the given file // dumpWatchedAddressesStatements dumps sql statements to the given file
func dumpWatchedAddressesStmts(filePath string, stmts []string) error { func dumpWatchedAddressesStatements(filePath string, stmts []string) error {
file, err := os.Create(filePath) file, err := os.Create(filePath)
if err != nil { if err != nil {
return fmt.Errorf("error creating watched addresses file: %v", err) return fmt.Errorf("error creating watched addresses file: %v", err)
@ -639,12 +636,23 @@ func dumpWatchedAddressesStmts(filePath string, stmts []string) error {
return nil return nil
} }
// parseWatchedAddressStmt parses given sql insert statement to extract the address argument // parseWatchedAddressStatement parses given sql insert statement to extract the address argument
func parseWatchedAddressStmt(stmt string) (string, error) { func parseWatchedAddressStatement(stmt string) (string, error) {
parseResult, err := pg_query.Parse(stmt) parseResult, err := pg_query.Parse(stmt)
if err != nil { if err != nil {
return "", fmt.Errorf("error parsing sql stmt: %v", err) return "", fmt.Errorf("error parsing sql stmt: %v", err)
} }
return parseResult.Stmts[0].Stmt.GetInsertStmt().SelectStmt.GetSelectStmt().ValuesLists[0].GetList().Items[0].GetAConst().GetVal().GetString_().Str, nil // extract address argument from parse output for a SQL statement of form
// "INSERT INTO eth_meta.watched_addresses (address, created_at, watched_at)
// VALUES ('0xabc', '123', '130') ON CONFLICT (address) DO NOTHING;"
addressString := parseResult.Stmts[0].Stmt.GetInsertStmt().
SelectStmt.GetSelectStmt().
ValuesLists[0].GetList().
Items[0].GetAConst().
GetVal().
GetString_().
Str
return addressString, nil
} }

View File

@ -89,7 +89,7 @@ func dumpFileData(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
func dumpWatchedAddressesFileData(t *testing.T) { func resetAndDumpWatchedAddressesFileData(t *testing.T) {
resetDB(t) resetDB(t)
sqlFileBytes, err := os.ReadFile(file.TestConfig.WatchedAddressesFilePath) sqlFileBytes, err := os.ReadFile(file.TestConfig.WatchedAddressesFilePath)
@ -115,8 +115,9 @@ func tearDown(t *testing.T) {
err := os.Remove(file.TestConfig.FilePath) err := os.Remove(file.TestConfig.FilePath)
require.NoError(t, err) require.NoError(t, err)
err = os.Remove(file.TestConfig.WatchedAddressesFilePath) if err := os.Remove(file.TestConfig.WatchedAddressesFilePath); !errors.Is(err, os.ErrNotExist) {
require.NoError(t, err) require.NoError(t, err)
}
err = sqlxdb.Close() err = sqlxdb.Close()
require.NoError(t, err) require.NoError(t, err)

View File

@ -769,7 +769,7 @@ func TestFileWatchAddressMethods(t *testing.T) {
err = ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt1))) err = ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt1)))
require.NoError(t, err) require.NoError(t, err)
dumpWatchedAddressesFileData(t) resetAndDumpWatchedAddressesFileData(t)
rows := []res{} rows := []res{}
err = sqlxdb.Select(&rows, pgStr) err = sqlxdb.Select(&rows, pgStr)
@ -817,7 +817,7 @@ func TestFileWatchAddressMethods(t *testing.T) {
err = ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt2))) err = ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt2)))
require.NoError(t, err) require.NoError(t, err)
dumpWatchedAddressesFileData(t) resetAndDumpWatchedAddressesFileData(t)
rows := []res{} rows := []res{}
err = sqlxdb.Select(&rows, pgStr) err = sqlxdb.Select(&rows, pgStr)
@ -853,7 +853,7 @@ func TestFileWatchAddressMethods(t *testing.T) {
err = ind.RemoveWatchedAddresses(args) err = ind.RemoveWatchedAddresses(args)
require.NoError(t, err) require.NoError(t, err)
dumpWatchedAddressesFileData(t) resetAndDumpWatchedAddressesFileData(t)
rows := []res{} rows := []res{}
err = sqlxdb.Select(&rows, pgStr) err = sqlxdb.Select(&rows, pgStr)
@ -882,7 +882,7 @@ func TestFileWatchAddressMethods(t *testing.T) {
err = ind.RemoveWatchedAddresses(args) err = ind.RemoveWatchedAddresses(args)
require.NoError(t, err) require.NoError(t, err)
dumpWatchedAddressesFileData(t) resetAndDumpWatchedAddressesFileData(t)
rows := []res{} rows := []res{}
err = sqlxdb.Select(&rows, pgStr) err = sqlxdb.Select(&rows, pgStr)
@ -934,7 +934,7 @@ func TestFileWatchAddressMethods(t *testing.T) {
err = ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt2))) err = ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt2)))
require.NoError(t, err) require.NoError(t, err)
dumpWatchedAddressesFileData(t) resetAndDumpWatchedAddressesFileData(t)
rows := []res{} rows := []res{}
err = sqlxdb.Select(&rows, pgStr) err = sqlxdb.Select(&rows, pgStr)
@ -986,7 +986,7 @@ func TestFileWatchAddressMethods(t *testing.T) {
err = ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt3))) err = ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt3)))
require.NoError(t, err) require.NoError(t, err)
dumpWatchedAddressesFileData(t) resetAndDumpWatchedAddressesFileData(t)
rows := []res{} rows := []res{}
err = sqlxdb.Select(&rows, pgStr) err = sqlxdb.Select(&rows, pgStr)
@ -1021,7 +1021,7 @@ func TestFileWatchAddressMethods(t *testing.T) {
err = ind.ClearWatchedAddresses() err = ind.ClearWatchedAddresses()
require.NoError(t, err) require.NoError(t, err)
dumpWatchedAddressesFileData(t) resetAndDumpWatchedAddressesFileData(t)
rows := []res{} rows := []res{}
err = sqlxdb.Select(&rows, pgStr) err = sqlxdb.Select(&rows, pgStr)
@ -1040,7 +1040,7 @@ func TestFileWatchAddressMethods(t *testing.T) {
err = ind.ClearWatchedAddresses() err = ind.ClearWatchedAddresses()
require.NoError(t, err) require.NoError(t, err)
dumpWatchedAddressesFileData(t) resetAndDumpWatchedAddressesFileData(t)
rows := []res{} rows := []res{}
err = sqlxdb.Select(&rows, pgStr) err = sqlxdb.Select(&rows, pgStr)