Implement watch address indexer methods for file mode

This commit is contained in:
Prathamesh Musale 2022-03-18 18:29:15 +05:30
parent 12b9f50930
commit a7f9354f0e
10 changed files with 163 additions and 32 deletions

View File

@ -195,6 +195,7 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
case shared.FILE: case shared.FILE:
indexerConfig = file.Config{ indexerConfig = file.Config{
FilePath: ctx.GlobalString(utils.StateDiffFilePath.Name), FilePath: ctx.GlobalString(utils.StateDiffFilePath.Name),
WatchedAddressesFilePath: ctx.GlobalString(utils.StateDiffWatchedAddressesFilePath.Name),
} }
case shared.POSTGRES: case shared.POSTGRES:
driverTypeStr := ctx.GlobalString(utils.StateDiffDBDriverTypeFlag.Name) driverTypeStr := ctx.GlobalString(utils.StateDiffDBDriverTypeFlag.Name)

View File

@ -178,6 +178,7 @@ var (
utils.StateDiffFilePath, utils.StateDiffFilePath,
utils.StateDiffKnownGapsFilePath, utils.StateDiffKnownGapsFilePath,
utils.StateDiffWaitForSync, utils.StateDiffWaitForSync,
utils.StateDiffWatchedAddressesFilePath,
configFileFlag, configFileFlag,
} }

View File

@ -248,6 +248,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.StateDiffFilePath, utils.StateDiffFilePath,
utils.StateDiffKnownGapsFilePath, utils.StateDiffKnownGapsFilePath,
utils.StateDiffWaitForSync, utils.StateDiffWaitForSync,
utils.StateDiffWatchedAddressesFilePath,
}, },
}, },
{ {

View File

@ -872,6 +872,10 @@ var (
Usage: "Full path (including filename) to write knownGaps statements when the DB is unavailable.", Usage: "Full path (including filename) to write knownGaps statements when the DB is unavailable.",
Value: "./known_gaps.sql", Value: "./known_gaps.sql",
} }
StateDiffWatchedAddressesFilePath = cli.StringFlag{
Name: "statediff.file.wapath",
Usage: "Full path (including filename) to write statediff watched addresses out to when operating in file mode",
}
StateDiffDBClientNameFlag = cli.StringFlag{ StateDiffDBClientNameFlag = cli.StringFlag{
Name: "statediff.db.clientname", Name: "statediff.db.clientname",
Usage: "Client name to use when writing state diffs to database", Usage: "Client name to use when writing state diffs to database",

1
go.mod
View File

@ -63,6 +63,7 @@ require (
github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416 github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416
github.com/olekukonko/tablewriter v0.0.5 github.com/olekukonko/tablewriter v0.0.5
github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7 github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7
github.com/pganalyze/pg_query_go/v2 v2.1.0
github.com/prometheus/tsdb v0.7.1 github.com/prometheus/tsdb v0.7.1
github.com/rjeczalik/notify v0.9.1 github.com/rjeczalik/notify v0.9.1
github.com/rs/cors v1.7.0 github.com/rs/cors v1.7.0

3
go.sum
View File

@ -222,6 +222,7 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.1.1-0.20200604201612-c04b05f3adfa h1:Q75Upo5UN4JbPFURXZ8nLKYUvF85dyFRop/vQ0Rv+64= github.com/google/gofuzz v1.1.1-0.20200604201612-c04b05f3adfa h1:Q75Upo5UN4JbPFURXZ8nLKYUvF85dyFRop/vQ0Rv+64=
@ -516,6 +517,8 @@ github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChl
github.com/peterh/liner v1.0.1-0.20180619022028-8c1271fcf47f/go.mod h1:xIteQHvHuaLYG9IFj6mSxM0fCKrs34IrEQUhOYuGPHc= github.com/peterh/liner v1.0.1-0.20180619022028-8c1271fcf47f/go.mod h1:xIteQHvHuaLYG9IFj6mSxM0fCKrs34IrEQUhOYuGPHc=
github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7 h1:oYW+YCJ1pachXTQmzR3rNLYGGz4g/UgFcjb28p/viDM= github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7 h1:oYW+YCJ1pachXTQmzR3rNLYGGz4g/UgFcjb28p/viDM=
github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7/go.mod h1:CRroGNssyjTd/qIG2FyxByd2S8JEAZXBl4qUrZf8GS0= github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7/go.mod h1:CRroGNssyjTd/qIG2FyxByd2S8JEAZXBl4qUrZf8GS0=
github.com/pganalyze/pg_query_go/v2 v2.1.0 h1:donwPZ4G/X+kMs7j5eYtKjdziqyOLVp3pkUrzb9lDl8=
github.com/pganalyze/pg_query_go/v2 v2.1.0/go.mod h1:XAxmVqz1tEGqizcQ3YSdN90vCOHBWjJi8URL1er5+cA=
github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=

View File

@ -120,6 +120,8 @@ This service introduces a CLI flag namespace `statediff`
`--statediff.file.path` full path (including filename) to write statediff data out to when operating in file mode `--statediff.file.path` full path (including filename) to write statediff data out to when operating in file mode
`--statediff.file.wapath` full path (including filename) to write statediff watched addresses out to when operating in file mode
The service can only operate in full sync mode (`--syncmode=full`), but only the historical RPC endpoints require an archive node (`--gcmode=archive`) The service can only operate in full sync mode (`--syncmode=full`), but only the historical RPC endpoints require an archive node (`--gcmode=archive`)
e.g. e.g.

View File

@ -497,32 +497,27 @@ func (sdi *StateDiffIndexer) Close() error {
return sdi.dump.Close() return sdi.dump.Close()
} }
// LoadWatchedAddresses reads watched addresses from the database // LoadWatchedAddresses satisfies the interfaces.StateDiffIndexer interface
func (sdi *StateDiffIndexer) LoadWatchedAddresses() ([]common.Address, error) { func (sdi *StateDiffIndexer) LoadWatchedAddresses() ([]common.Address, error) {
// TODO implement
return nil, nil return nil, nil
} }
// InsertWatchedAddresses inserts the given addresses in the database // InsertWatchedAddresses satisfies the interfaces.StateDiffIndexer interface
func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error { func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error {
// TODO implement
return nil return nil
} }
// RemoveWatchedAddresses removes the given watched addresses from the database // RemoveWatchedAddresses satisfies the interfaces.StateDiffIndexer interface
func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressArg) error { func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressArg) error {
// TODO implement
return nil return nil
} }
// SetWatchedAddresses clears and inserts the given addresses in the database // SetWatchedAddresses satisfies the interfaces.StateDiffIndexer interface
func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error { func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error {
// TODO implement
return nil return nil
} }
// ClearWatchedAddresses clears all the watched addresses from the database // ClearWatchedAddresses satisfies the interfaces.StateDiffIndexer interface
func (sdi *StateDiffIndexer) ClearWatchedAddresses() error { func (sdi *StateDiffIndexer) ClearWatchedAddresses() error {
// TODO implement
return nil return nil
} }

View File

@ -25,6 +25,7 @@ import (
type Config struct { type Config struct {
FilePath string FilePath string
NodeInfo node.Info NodeInfo node.Info
WatchedAddressesFilePath string
} }
// Type satisfies interfaces.Config // Type satisfies interfaces.Config

View File

@ -17,6 +17,7 @@
package file package file
import ( import (
"bufio"
"context" "context"
"errors" "errors"
"fmt" "fmt"
@ -28,6 +29,8 @@ import (
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
node "github.com/ipfs/go-ipld-format" node "github.com/ipfs/go-ipld-format"
"github.com/multiformats/go-multihash" "github.com/multiformats/go-multihash"
pg_query "github.com/pganalyze/pg_query_go/v2"
"github.com/thoas/go-funk"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
@ -44,6 +47,9 @@ import (
) )
const defaultFilePath = "./statediff.sql" const defaultFilePath = "./statediff.sql"
const defaultWatchedAddressesFilePath = "./statediff-watched-addresses.sql"
const watchedAddressesInsert = "INSERT INTO eth_meta.watched_addresses (address, created_at, watched_at) VALUES ('%s', '%d', '%d') ON CONFLICT (address) DO NOTHING;"
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{} var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
@ -57,6 +63,8 @@ type StateDiffIndexer struct {
chainConfig *params.ChainConfig chainConfig *params.ChainConfig
nodeID string nodeID string
wg *sync.WaitGroup wg *sync.WaitGroup
watchedAddressesFilePath string
} }
// NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer // NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer
@ -73,6 +81,13 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c
return nil, fmt.Errorf("unable to create file (%s), err: %v", filePath, err) return nil, fmt.Errorf("unable to create file (%s), err: %v", filePath, err)
} }
log.Info("Writing statediff SQL statements to file", "file", filePath) log.Info("Writing statediff SQL statements to file", "file", filePath)
watchedAddressesFilePath := config.WatchedAddressesFilePath
if watchedAddressesFilePath == "" {
watchedAddressesFilePath = defaultWatchedAddressesFilePath
}
log.Info("Writing watched addresses SQL statements to file", "file", filePath)
w := NewSQLWriter(file) w := NewSQLWriter(file)
wg := new(sync.WaitGroup) wg := new(sync.WaitGroup)
w.Loop() w.Loop()
@ -83,6 +98,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c
chainConfig: chainConfig, chainConfig: chainConfig,
nodeID: config.NodeInfo.ID, nodeID: config.NodeInfo.ID,
wg: wg, wg: wg,
watchedAddressesFilePath: watchedAddressesFilePath,
}, nil }, nil
} }
@ -479,32 +495,138 @@ func (sdi *StateDiffIndexer) Close() error {
return sdi.fileWriter.Close() return sdi.fileWriter.Close()
} }
// LoadWatchedAddresses reads watched addresses from the database // LoadWatchedAddresses loads watched addresses from a file
func (sdi *StateDiffIndexer) LoadWatchedAddresses() ([]common.Address, error) { func (sdi *StateDiffIndexer) LoadWatchedAddresses() ([]common.Address, error) {
// TODO implement // load sql statements from watched addresses file
return nil, nil stmts, err := loadWatchedAddressesStmts(sdi.watchedAddressesFilePath)
if err != nil {
return nil, err
} }
// InsertWatchedAddresses inserts the given addresses in the database // extract addresses from the sql statements
watchedAddresses := []common.Address{}
for _, stmt := range stmts {
addressString, err := parseWatchedAddressStmt(stmt)
if err != nil {
return nil, err
}
watchedAddresses = append(watchedAddresses, common.HexToAddress(addressString))
}
return watchedAddresses, nil
}
// InsertWatchedAddresses inserts the given addresses in a file
func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error { func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error {
// TODO implement // load sql statements from watched addresses file
return nil stmts, err := loadWatchedAddressesStmts(sdi.watchedAddressesFilePath)
if err != nil {
return err
} }
// RemoveWatchedAddresses removes the given watched addresses from the database // append statements for new addresses to existing statements
for _, arg := range args {
stmt := fmt.Sprintf(watchedAddressesInsert, arg.Address, arg.CreatedAt, currentBlockNumber.Uint64())
stmts = append(stmts, stmt)
}
return dumpWatchedAddressesStmts(sdi.watchedAddressesFilePath, stmts)
}
// RemoveWatchedAddresses removes the given watched addresses from a file
func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressArg) error { func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressArg) error {
// TODO implement // load sql statements from watched addresses file
return nil stmts, err := loadWatchedAddressesStmts(sdi.watchedAddressesFilePath)
if err != nil {
return err
} }
// SetWatchedAddresses clears and inserts the given addresses in the database // get rid of statements having addresses to be removed
var updatedStmts []string
for _, stmt := range stmts {
addressString, err := parseWatchedAddressStmt(stmt)
if err != nil {
return err
}
toRemove := funk.Contains(args, func(arg sdtypes.WatchAddressArg) bool {
return arg.Address == addressString
})
if !toRemove {
updatedStmts = append(updatedStmts, stmt)
}
}
return dumpWatchedAddressesStmts(sdi.watchedAddressesFilePath, updatedStmts)
}
// SetWatchedAddresses clears and inserts the given addresses in a file
func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error { func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error {
// TODO implement var stmts []string
for _, arg := range args {
stmt := fmt.Sprintf(watchedAddressesInsert, arg.Address, arg.CreatedAt, currentBlockNumber.Uint64())
stmts = append(stmts, stmt)
}
return dumpWatchedAddressesStmts(sdi.watchedAddressesFilePath, stmts)
}
// ClearWatchedAddresses clears all the watched addresses from a file
func (sdi *StateDiffIndexer) ClearWatchedAddresses() error {
return sdi.SetWatchedAddresses([]sdtypes.WatchAddressArg{}, common.Big0)
}
// loadWatchedAddressesStmts loads sql statements from the given file in a string slice
func loadWatchedAddressesStmts(filePath string) ([]string, error) {
// return emtpy if file does not exist
if _, err := os.Stat(filePath); errors.Is(err, os.ErrNotExist) {
return []string{}, nil
}
file, err := os.Open(filePath)
if err != nil {
return nil, err
}
defer file.Close()
var stmts []string
scanner := bufio.NewScanner(file)
for scanner.Scan() {
stmts = append(stmts, scanner.Text())
}
if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("error loading watched addresses: %v", err)
}
return stmts, nil
}
// dumpWatchedAddressesStmts dumps sql statements to the given file
func dumpWatchedAddressesStmts(filePath string, stmts []string) error {
file, err := os.Create(filePath)
if err != nil {
return fmt.Errorf("error creating watched addresses file (%s): %v", filePath, err)
}
defer file.Close()
for _, stmt := range stmts {
_, err := file.Write([]byte(stmt + "\n"))
if err != nil {
return fmt.Errorf("error inserting watched_addresses entry: %v", err)
}
}
return nil return nil
} }
// ClearWatchedAddresses clears all the watched addresses from the database // parseWatchedAddressStmt parses given sql insert statement to extract the address argument
func (sdi *StateDiffIndexer) ClearWatchedAddresses() error { func parseWatchedAddressStmt(stmt string) (string, error) {
// TODO implement parseResult, err := pg_query.Parse(stmt)
return nil if err != nil {
return "", fmt.Errorf("error parsing sql stmt: %v", err)
}
return parseResult.Stmts[0].Stmt.GetInsertStmt().SelectStmt.GetSelectStmt().ValuesLists[0].GetList().Items[0].GetAConst().GetVal().GetString_().Str, nil
} }