diff --git a/cmd/geth/config.go b/cmd/geth/config.go index 21edd9a32..6c40f5529 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -194,7 +194,8 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { switch dbType { case shared.FILE: indexerConfig = file.Config{ - FilePath: ctx.GlobalString(utils.StateDiffFilePath.Name), + FilePath: ctx.GlobalString(utils.StateDiffFilePath.Name), + WatchedAddressesFilePath: ctx.GlobalString(utils.StateDiffWatchedAddressesFilePath.Name), } case shared.POSTGRES: driverTypeStr := ctx.GlobalString(utils.StateDiffDBDriverTypeFlag.Name) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index f931d3ffa..1bc38c8b0 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -178,6 +178,7 @@ var ( utils.StateDiffFilePath, utils.StateDiffKnownGapsFilePath, utils.StateDiffWaitForSync, + utils.StateDiffWatchedAddressesFilePath, configFileFlag, } diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index c8338ac5f..6cd6df946 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -248,6 +248,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{ utils.StateDiffFilePath, utils.StateDiffKnownGapsFilePath, utils.StateDiffWaitForSync, + utils.StateDiffWatchedAddressesFilePath, }, }, { diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 92f8cfc93..275f70a98 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -872,6 +872,10 @@ var ( Usage: "Full path (including filename) to write knownGaps statements when the DB is unavailable.", Value: "./known_gaps.sql", } + StateDiffWatchedAddressesFilePath = cli.StringFlag{ + Name: "statediff.file.wapath", + Usage: "Full path (including filename) to write statediff watched addresses out to when operating in file mode", + } StateDiffDBClientNameFlag = cli.StringFlag{ Name: "statediff.db.clientname", Usage: "Client name to use when writing state diffs to database", diff --git a/go.mod b/go.mod index 30b3c97ed..cd2a78f43 100644 --- a/go.mod +++ b/go.mod @@ -63,6 +63,7 @@ require ( github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416 github.com/olekukonko/tablewriter v0.0.5 github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7 + github.com/pganalyze/pg_query_go/v2 v2.1.0 github.com/prometheus/tsdb v0.7.1 github.com/rjeczalik/notify v0.9.1 github.com/rs/cors v1.7.0 diff --git a/go.sum b/go.sum index dc748a005..26e893ea9 100644 --- a/go.sum +++ b/go.sum @@ -222,6 +222,7 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.1.1-0.20200604201612-c04b05f3adfa h1:Q75Upo5UN4JbPFURXZ8nLKYUvF85dyFRop/vQ0Rv+64= @@ -516,6 +517,8 @@ github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChl github.com/peterh/liner v1.0.1-0.20180619022028-8c1271fcf47f/go.mod h1:xIteQHvHuaLYG9IFj6mSxM0fCKrs34IrEQUhOYuGPHc= github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7 h1:oYW+YCJ1pachXTQmzR3rNLYGGz4g/UgFcjb28p/viDM= github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7/go.mod h1:CRroGNssyjTd/qIG2FyxByd2S8JEAZXBl4qUrZf8GS0= +github.com/pganalyze/pg_query_go/v2 v2.1.0 h1:donwPZ4G/X+kMs7j5eYtKjdziqyOLVp3pkUrzb9lDl8= +github.com/pganalyze/pg_query_go/v2 v2.1.0/go.mod h1:XAxmVqz1tEGqizcQ3YSdN90vCOHBWjJi8URL1er5+cA= github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/statediff/README.md b/statediff/README.md index 39ba24775..f262d7a8e 100644 --- a/statediff/README.md +++ b/statediff/README.md @@ -120,6 +120,8 @@ This service introduces a CLI flag namespace `statediff` `--statediff.file.path` full path (including filename) to write statediff data out to when operating in file mode +`--statediff.file.wapath` full path (including filename) to write statediff watched addresses out to when operating in file mode + The service can only operate in full sync mode (`--syncmode=full`), but only the historical RPC endpoints require an archive node (`--gcmode=archive`) e.g. diff --git a/statediff/indexer/database/dump/indexer.go b/statediff/indexer/database/dump/indexer.go index 6908e739f..fb9865f8d 100644 --- a/statediff/indexer/database/dump/indexer.go +++ b/statediff/indexer/database/dump/indexer.go @@ -497,32 +497,27 @@ func (sdi *StateDiffIndexer) Close() error { return sdi.dump.Close() } -// LoadWatchedAddresses reads watched addresses from the database +// LoadWatchedAddresses satisfies the interfaces.StateDiffIndexer interface func (sdi *StateDiffIndexer) LoadWatchedAddresses() ([]common.Address, error) { - // TODO implement return nil, nil } -// InsertWatchedAddresses inserts the given addresses in the database +// InsertWatchedAddresses satisfies the interfaces.StateDiffIndexer interface func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error { - // TODO implement return nil } -// RemoveWatchedAddresses removes the given watched addresses from the database +// RemoveWatchedAddresses satisfies the interfaces.StateDiffIndexer interface func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressArg) error { - // TODO implement return nil } -// SetWatchedAddresses clears and inserts the given addresses in the database +// SetWatchedAddresses satisfies the interfaces.StateDiffIndexer interface func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error { - // TODO implement return nil } -// ClearWatchedAddresses clears all the watched addresses from the database +// ClearWatchedAddresses satisfies the interfaces.StateDiffIndexer interface func (sdi *StateDiffIndexer) ClearWatchedAddresses() error { - // TODO implement return nil } diff --git a/statediff/indexer/database/file/config.go b/statediff/indexer/database/file/config.go index c2c6804c0..33fd8cd17 100644 --- a/statediff/indexer/database/file/config.go +++ b/statediff/indexer/database/file/config.go @@ -23,8 +23,9 @@ import ( // Config holds params for writing sql statements out to a file type Config struct { - FilePath string - NodeInfo node.Info + FilePath string + NodeInfo node.Info + WatchedAddressesFilePath string } // Type satisfies interfaces.Config diff --git a/statediff/indexer/database/file/indexer.go b/statediff/indexer/database/file/indexer.go index 44806d558..ad497d2a7 100644 --- a/statediff/indexer/database/file/indexer.go +++ b/statediff/indexer/database/file/indexer.go @@ -17,6 +17,7 @@ package file import ( + "bufio" "context" "errors" "fmt" @@ -28,6 +29,8 @@ import ( "github.com/ipfs/go-cid" node "github.com/ipfs/go-ipld-format" "github.com/multiformats/go-multihash" + pg_query "github.com/pganalyze/pg_query_go/v2" + "github.com/thoas/go-funk" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -44,6 +47,9 @@ import ( ) const defaultFilePath = "./statediff.sql" +const defaultWatchedAddressesFilePath = "./statediff-watched-addresses.sql" + +const watchedAddressesInsert = "INSERT INTO eth_meta.watched_addresses (address, created_at, watched_at) VALUES ('%s', '%d', '%d') ON CONFLICT (address) DO NOTHING;" var _ interfaces.StateDiffIndexer = &StateDiffIndexer{} @@ -57,6 +63,8 @@ type StateDiffIndexer struct { chainConfig *params.ChainConfig nodeID string wg *sync.WaitGroup + + watchedAddressesFilePath string } // NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer @@ -73,16 +81,24 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c return nil, fmt.Errorf("unable to create file (%s), err: %v", filePath, err) } log.Info("Writing statediff SQL statements to file", "file", filePath) + + watchedAddressesFilePath := config.WatchedAddressesFilePath + if watchedAddressesFilePath == "" { + watchedAddressesFilePath = defaultWatchedAddressesFilePath + } + log.Info("Writing watched addresses SQL statements to file", "file", filePath) + w := NewSQLWriter(file) wg := new(sync.WaitGroup) w.Loop() w.upsertNode(config.NodeInfo) w.upsertIPLDDirect(shared.RemovedNodeMhKey, []byte{}) return &StateDiffIndexer{ - fileWriter: w, - chainConfig: chainConfig, - nodeID: config.NodeInfo.ID, - wg: wg, + fileWriter: w, + chainConfig: chainConfig, + nodeID: config.NodeInfo.ID, + wg: wg, + watchedAddressesFilePath: watchedAddressesFilePath, }, nil } @@ -479,32 +495,138 @@ func (sdi *StateDiffIndexer) Close() error { return sdi.fileWriter.Close() } -// LoadWatchedAddresses reads watched addresses from the database +// LoadWatchedAddresses loads watched addresses from a file func (sdi *StateDiffIndexer) LoadWatchedAddresses() ([]common.Address, error) { - // TODO implement - return nil, nil + // load sql statements from watched addresses file + stmts, err := loadWatchedAddressesStmts(sdi.watchedAddressesFilePath) + if err != nil { + return nil, err + } + + // extract addresses from the sql statements + watchedAddresses := []common.Address{} + for _, stmt := range stmts { + addressString, err := parseWatchedAddressStmt(stmt) + if err != nil { + return nil, err + } + watchedAddresses = append(watchedAddresses, common.HexToAddress(addressString)) + } + + return watchedAddresses, nil } -// InsertWatchedAddresses inserts the given addresses in the database +// InsertWatchedAddresses inserts the given addresses in a file func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error { - // TODO implement - return nil + // load sql statements from watched addresses file + stmts, err := loadWatchedAddressesStmts(sdi.watchedAddressesFilePath) + if err != nil { + return err + } + + // append statements for new addresses to existing statements + for _, arg := range args { + stmt := fmt.Sprintf(watchedAddressesInsert, arg.Address, arg.CreatedAt, currentBlockNumber.Uint64()) + stmts = append(stmts, stmt) + } + + return dumpWatchedAddressesStmts(sdi.watchedAddressesFilePath, stmts) } -// RemoveWatchedAddresses removes the given watched addresses from the database +// RemoveWatchedAddresses removes the given watched addresses from a file func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressArg) error { - // TODO implement - return nil + // load sql statements from watched addresses file + stmts, err := loadWatchedAddressesStmts(sdi.watchedAddressesFilePath) + if err != nil { + return err + } + + // get rid of statements having addresses to be removed + var updatedStmts []string + for _, stmt := range stmts { + addressString, err := parseWatchedAddressStmt(stmt) + if err != nil { + return err + } + + toRemove := funk.Contains(args, func(arg sdtypes.WatchAddressArg) bool { + return arg.Address == addressString + }) + + if !toRemove { + updatedStmts = append(updatedStmts, stmt) + } + } + + return dumpWatchedAddressesStmts(sdi.watchedAddressesFilePath, updatedStmts) } -// SetWatchedAddresses clears and inserts the given addresses in the database +// SetWatchedAddresses clears and inserts the given addresses in a file func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error { - // TODO implement + var stmts []string + for _, arg := range args { + stmt := fmt.Sprintf(watchedAddressesInsert, arg.Address, arg.CreatedAt, currentBlockNumber.Uint64()) + stmts = append(stmts, stmt) + } + + return dumpWatchedAddressesStmts(sdi.watchedAddressesFilePath, stmts) +} + +// ClearWatchedAddresses clears all the watched addresses from a file +func (sdi *StateDiffIndexer) ClearWatchedAddresses() error { + return sdi.SetWatchedAddresses([]sdtypes.WatchAddressArg{}, common.Big0) +} + +// loadWatchedAddressesStmts loads sql statements from the given file in a string slice +func loadWatchedAddressesStmts(filePath string) ([]string, error) { + // return emtpy if file does not exist + if _, err := os.Stat(filePath); errors.Is(err, os.ErrNotExist) { + return []string{}, nil + } + + file, err := os.Open(filePath) + if err != nil { + return nil, err + } + defer file.Close() + + var stmts []string + scanner := bufio.NewScanner(file) + for scanner.Scan() { + stmts = append(stmts, scanner.Text()) + } + + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("error loading watched addresses: %v", err) + } + + return stmts, nil +} + +// dumpWatchedAddressesStmts dumps sql statements to the given file +func dumpWatchedAddressesStmts(filePath string, stmts []string) error { + file, err := os.Create(filePath) + if err != nil { + return fmt.Errorf("error creating watched addresses file (%s): %v", filePath, err) + } + defer file.Close() + + for _, stmt := range stmts { + _, err := file.Write([]byte(stmt + "\n")) + if err != nil { + return fmt.Errorf("error inserting watched_addresses entry: %v", err) + } + } + return nil } -// ClearWatchedAddresses clears all the watched addresses from the database -func (sdi *StateDiffIndexer) ClearWatchedAddresses() error { - // TODO implement - return nil +// parseWatchedAddressStmt parses given sql insert statement to extract the address argument +func parseWatchedAddressStmt(stmt string) (string, error) { + parseResult, err := pg_query.Parse(stmt) + if err != nil { + return "", fmt.Errorf("error parsing sql stmt: %v", err) + } + + return parseResult.Stmts[0].Stmt.GetInsertStmt().SelectStmt.GetSelectStmt().ValuesLists[0].GetList().Items[0].GetAConst().GetVal().GetString_().Str, nil }