Add mode to write to CSV files in statediff file writer #249

Merged
nikugogoi merged 8 commits from ng-file-csv into v1.10.19-statediff-v4 2022-06-29 11:47:57 +00:00
12 changed files with 737 additions and 193 deletions
Showing only changes of commit 09af53bb94 - Show all commits

View File

@ -1,4 +1,4 @@
version: '3.2'
version: "3.2"
services:
migrations:

View File

@ -64,7 +64,7 @@ var TestConfig = Config{
Mode: CSV,
OutputDir: "./statediffing_test",
FilePath: "./statediffing_test_file.sql",
WatchedAddressesFilePath: "./statediffing_watched_addresses_test_file.sql",
WatchedAddressesFilePath: "./statediffing_watched_addresses_test_file.csv",
NodeInfo: node.Info{
GenesisBlock: "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3",
NetworkID: "1",

View File

@ -33,9 +33,11 @@ import (
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/types"
)
const dbDirectory = "/file"
const pgCopyStatement = `COPY %s FROM '%s' CSV`
func setupCSVLegacy(t *testing.T) {
mockLegacyBlock = legacyData.MockBlock
@ -81,7 +83,6 @@ func setupCSVLegacy(t *testing.T) {
}
func dumpCSVFileData(t *testing.T) {
pgCopyStatement := `COPY %s FROM '%s' CSV`
outputDir := filepath.Join(dbDirectory, file.TestConfig.OutputDir)
for _, tbl := range file.Tables {
@ -102,6 +103,14 @@ func dumpCSVFileData(t *testing.T) {
}
}
func dumpWatchedAddressesCSVFileData(t *testing.T) {
outputFilePath := filepath.Join(dbDirectory, file.TestConfig.WatchedAddressesFilePath)
stm := fmt.Sprintf(pgCopyStatement, types.TableWatchedAddresses.Name, outputFilePath)
_, err = sqlxdb.Exec(stm)
require.NoError(t, err)
}
func tearDownCSV(t *testing.T) {
file.TearDownDB(t, sqlxdb)

View File

@ -19,6 +19,7 @@ package file_test
import (
"context"
"errors"
"math/big"
"os"
"testing"
@ -26,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
@ -44,6 +46,7 @@ import (
func setupCSVIndexer(t *testing.T) {
file.TestConfig.Mode = file.CSV
file.TestConfig.OutputDir = "./statediffing_test"
file.TestConfig.WatchedAddressesFilePath = "./statediffing_watched_addresses_test_file.csv"
if _, err := os.Stat(file.TestConfig.OutputDir); !errors.Is(err, os.ErrNotExist) {
err := os.RemoveAll(file.TestConfig.OutputDir)
@ -620,3 +623,349 @@ func TestCSVFileIndexer(t *testing.T) {
}
})
}
func TestCSVFileWatchAddressMethods(t *testing.T) {
setupCSVIndexer(t)
defer tearDownCSV(t)
type res struct {
Address string `db:"address"`
CreatedAt uint64 `db:"created_at"`
WatchedAt uint64 `db:"watched_at"`
LastFilledAt uint64 `db:"last_filled_at"`
}
pgStr := "SELECT * FROM eth_meta.watched_addresses"
t.Run("Load watched addresses (empty table)", func(t *testing.T) {
expectedData := []common.Address{}
rows, err := ind.LoadWatchedAddresses()
require.NoError(t, err)
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
require.Equal(t, expectedData[idx], row)
}
})
t.Run("Insert watched addresses", func(t *testing.T) {
defer file.TearDownDB(t, sqlxdb)
args := []sdtypes.WatchAddressArg{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
}
expectedData := []res{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
}
err = ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt1)))
require.NoError(t, err)
dumpWatchedAddressesCSVFileData(t)
rows := []res{}
err = sqlxdb.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
require.Equal(t, expectedData[idx], row)
}
})
t.Run("Insert watched addresses (some already watched)", func(t *testing.T) {
defer file.TearDownDB(t, sqlxdb)
args := []sdtypes.WatchAddressArg{
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
}
expectedData := []res{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
}
err = ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt2)))
require.NoError(t, err)
dumpWatchedAddressesCSVFileData(t)
rows := []res{}
err = sqlxdb.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
require.Equal(t, expectedData[idx], row)
}
})
t.Run("Remove watched addresses", func(t *testing.T) {
defer file.TearDownDB(t, sqlxdb)
args := []sdtypes.WatchAddressArg{
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
}
expectedData := []res{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
}
err = ind.RemoveWatchedAddresses(args)
require.NoError(t, err)
dumpWatchedAddressesCSVFileData(t)
rows := []res{}
err = sqlxdb.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
require.Equal(t, expectedData[idx], row)
}
})
t.Run("Remove watched addresses (some non-watched)", func(t *testing.T) {
defer file.TearDownDB(t, sqlxdb)
args := []sdtypes.WatchAddressArg{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
}
expectedData := []res{}
err = ind.RemoveWatchedAddresses(args)
require.NoError(t, err)
dumpWatchedAddressesCSVFileData(t)
rows := []res{}
err = sqlxdb.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
require.Equal(t, expectedData[idx], row)
}
})
t.Run("Set watched addresses", func(t *testing.T) {
defer file.TearDownDB(t, sqlxdb)
args := []sdtypes.WatchAddressArg{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
},
}
expectedData := []res{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
}
err = ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt2)))
require.NoError(t, err)
dumpWatchedAddressesCSVFileData(t)
rows := []res{}
err = sqlxdb.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
require.Equal(t, expectedData[idx], row)
}
})
t.Run("Set watched addresses (some already watched)", func(t *testing.T) {
defer file.TearDownDB(t, sqlxdb)
args := []sdtypes.WatchAddressArg{
{
Address: contract4Address,
CreatedAt: contract4CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
},
}
expectedData := []res{
{
Address: contract4Address,
CreatedAt: contract4CreatedAt,
WatchedAt: watchedAt3,
LastFilledAt: lastFilledAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
WatchedAt: watchedAt3,
LastFilledAt: lastFilledAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
WatchedAt: watchedAt3,
LastFilledAt: lastFilledAt,
},
}
err = ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt3)))
require.NoError(t, err)
dumpWatchedAddressesCSVFileData(t)
rows := []res{}
err = sqlxdb.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
require.Equal(t, expectedData[idx], row)
}
})
t.Run("Load watched addresses", func(t *testing.T) {
defer file.TearDownDB(t, sqlxdb)
expectedData := []common.Address{
common.HexToAddress(contract4Address),
common.HexToAddress(contract2Address),
common.HexToAddress(contract3Address),
}
rows, err := ind.LoadWatchedAddresses()
require.NoError(t, err)
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
require.Equal(t, expectedData[idx], row)
}
})
t.Run("Clear watched addresses", func(t *testing.T) {
defer file.TearDownDB(t, sqlxdb)
expectedData := []res{}
err = ind.ClearWatchedAddresses()
require.NoError(t, err)
dumpWatchedAddressesCSVFileData(t)
rows := []res{}
err = sqlxdb.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
require.Equal(t, expectedData[idx], row)
}
})
t.Run("Clear watched addresses (empty table)", func(t *testing.T) {
defer file.TearDownDB(t, sqlxdb)
expectedData := []res{}
err = ind.ClearWatchedAddresses()
require.NoError(t, err)
dumpWatchedAddressesCSVFileData(t)
rows := []res{}
err = sqlxdb.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
require.Equal(t, expectedData[idx], row)
}
})
}

View File

@ -18,7 +18,9 @@ package file
import (
"encoding/csv"
"errors"
"fmt"
"math/big"
"os"
"path/filepath"
"strconv"
@ -26,7 +28,9 @@ import (
blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help"
node "github.com/ipfs/go-ipld-format"
"github.com/thoas/go-funk"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
@ -55,8 +59,11 @@ type tableRow struct {
}
type CSVWriter struct {
dir string // dir containing output files
writers fileWriters
// dir containing output files
dir string
writers fileWriters
watchedAddressesWriter fileWriter
rows chan tableRow
flushChan chan struct{}
@ -127,22 +134,30 @@ func (tx fileWriters) flush() error {
return nil
}
func NewCSVWriter(path string) (*CSVWriter, error) {
func NewCSVWriter(path string, watchedAddressesFilePath string) (*CSVWriter, error) {
if err := os.MkdirAll(path, 0777); err != nil {
return nil, fmt.Errorf("unable to make MkdirAll for path: %s err: %s", path, err)
}
writers, err := makeFileWriters(path, Tables)
if err != nil {
return nil, err
}
watchedAddressesWriter, err := newFileWriter(watchedAddressesFilePath)
if err != nil {
return nil, err
}
csvWriter := &CSVWriter{
writers: writers,
dir: path,
rows: make(chan tableRow),
flushChan: make(chan struct{}),
flushFinished: make(chan struct{}),
quitChan: make(chan struct{}),
doneChan: make(chan struct{}),
writers: writers,
watchedAddressesWriter: watchedAddressesWriter,
dir: path,
rows: make(chan tableRow),
flushChan: make(chan struct{}),
flushFinished: make(chan struct{}),
quitChan: make(chan struct{}),
doneChan: make(chan struct{}),
}
return csvWriter, nil
}
@ -308,3 +323,129 @@ func (csw *CSVWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
storageCID.Path, storageCID.NodeType, true, storageCID.MhKey)
csw.rows <- tableRow{types.TableStorageNode, values}
}
// LoadWatchedAddresses loads watched addresses from a file
func (csw *CSVWriter) loadWatchedAddresses() ([]common.Address, error) {
watchedAddressesFilePath := csw.watchedAddressesWriter.file.Name()
// load csv rows from watched addresses file
rows, err := loadWatchedAddressesRows(watchedAddressesFilePath)
if err != nil {
return nil, err
}
// extract addresses from the csv rows
watchedAddresses := funk.Map(rows, func(row []string) common.Address {
// first column is for address in eth_meta.watched_addresses
addressString := row[0]
return common.HexToAddress(addressString)
}).([]common.Address)
return watchedAddresses, nil
}
// InsertWatchedAddresses inserts the given addresses in a file
func (csw *CSVWriter) insertWatchedAddresses(args []types.WatchAddressArg, currentBlockNumber *big.Int) error {
// load csv rows from watched addresses file
watchedAddresses, err := csw.loadWatchedAddresses()
if err != nil {
return err
}
// append rows for new addresses to existing csv file
for _, arg := range args {
// ignore if already watched
if funk.Contains(watchedAddresses, common.HexToAddress(arg.Address)) {
continue
}
var values []interface{}
values = append(values, arg.Address, strconv.FormatUint(arg.CreatedAt, 10), currentBlockNumber.String(), "0")
row := types.TableWatchedAddresses.ToCsvRow(values...)
// writing directly instead of using rows channel as it needs to be flushed immediately
err = csw.watchedAddressesWriter.Write(row)
if err != nil {
return err
}
}
// watched addresses need to be flushed immediately as they also need to be read from the file
csw.watchedAddressesWriter.Flush()
err = csw.watchedAddressesWriter.Error()
if err != nil {
return err
}
return nil
}
// RemoveWatchedAddresses removes the given watched addresses from a file
func (csw *CSVWriter) removeWatchedAddresses(args []types.WatchAddressArg) error {
// load csv rows from watched addresses file
watchedAddressesFilePath := csw.watchedAddressesWriter.file.Name()
rows, err := loadWatchedAddressesRows(watchedAddressesFilePath)
if err != nil {
return err
}
// get rid of rows having addresses to be removed
filteredRows := funk.Filter(rows, func(row []string) bool {
return !funk.Contains(args, func(arg types.WatchAddressArg) bool {
// Compare first column in table for address
return arg.Address == row[0]
})
}).([][]string)
return dumpWatchedAddressesRows(csw.watchedAddressesWriter, filteredRows)
}
// SetWatchedAddresses clears and inserts the given addresses in a file
func (csw *CSVWriter) setWatchedAddresses(args []types.WatchAddressArg, currentBlockNumber *big.Int) error {
var rows [][]string
for _, arg := range args {
row := types.TableWatchedAddresses.ToCsvRow(arg.Address, strconv.FormatUint(arg.CreatedAt, 10), currentBlockNumber.String(), "0")
rows = append(rows, row)
}
return dumpWatchedAddressesRows(csw.watchedAddressesWriter, rows)
}
// loadCSVWatchedAddresses loads csv rows from the given file
func loadWatchedAddressesRows(filePath string) ([][]string, error) {
file, err := os.Open(filePath)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return [][]string{}, nil
}
return nil, fmt.Errorf("error opening watched addresses file: %v", err)
}
defer file.Close()
reader := csv.NewReader(file)
return reader.ReadAll()
}
// dumpWatchedAddressesRows dumps csv rows to the given file
func dumpWatchedAddressesRows(watchedAddressesWriter fileWriter, filteredRows [][]string) error {
file := watchedAddressesWriter.file
file.Close()
file, err := os.Create(file.Name())
if err != nil {
return fmt.Errorf("error creating watched addresses file: %v", err)
}
watchedAddressesWriter.Writer = csv.NewWriter(file)
watchedAddressesWriter.file = file
for _, row := range filteredRows {
watchedAddressesWriter.Write(row)
}
watchedAddressesWriter.Flush()
return nil
}

View File

@ -17,7 +17,6 @@
package file
import (
"bufio"
"context"
"errors"
"fmt"
@ -30,8 +29,6 @@ import (
"github.com/ipfs/go-cid"
node "github.com/ipfs/go-ipld-format"
"github.com/multiformats/go-multihash"
pg_query "github.com/pganalyze/pg_query_go/v2"
"github.com/thoas/go-funk"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
@ -49,7 +46,7 @@ import (
const defaultOutputDir = "./statediff_output"
const defaultFilePath = "./statediff.sql"
const defaultWatchedAddressesFilePath = "./statediff-watched-addresses.sql"
const defaultWatchedAddressesFilePath = "./statediff-watched-addresses.csv"
const watchedAddressesInsert = "INSERT INTO eth_meta.watched_addresses (address, created_at, watched_at) VALUES ('%s', '%d', '%d') ON CONFLICT (address) DO NOTHING;"
@ -66,14 +63,19 @@ type StateDiffIndexer struct {
nodeID string
wg *sync.WaitGroup
removedCacheFlag *uint32
watchedAddressesFilePath string
}
// NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer
func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, config Config) (*StateDiffIndexer, error) {
var err error
var writer FileWriter
watchedAddressesFilePath := config.WatchedAddressesFilePath
if watchedAddressesFilePath == "" {
watchedAddressesFilePath = defaultWatchedAddressesFilePath
}
log.Info("Writing watched addresses to file", "file", watchedAddressesFilePath)
switch config.Mode {
case CSV:
outputDir := config.OutputDir
@ -87,7 +89,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c
log.Info("Writing statediff CSV files to directory", "file", outputDir)
writer, err = NewCSVWriter(outputDir)
writer, err = NewCSVWriter(outputDir, watchedAddressesFilePath)
if err != nil {
return nil, err
}
@ -105,27 +107,20 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c
}
log.Info("Writing statediff SQL statements to file", "file", filePath)
writer = NewSQLWriter(file)
writer = NewSQLWriter(file, watchedAddressesFilePath)
default:
return nil, fmt.Errorf("unrecognized file mode: %s", config.Mode)
}
watchedAddressesFilePath := config.WatchedAddressesFilePath
if watchedAddressesFilePath == "" {
watchedAddressesFilePath = defaultWatchedAddressesFilePath
}
log.Info("Writing watched addresses SQL statements to file", "file", watchedAddressesFilePath)
wg := new(sync.WaitGroup)
writer.Loop()
writer.upsertNode(config.NodeInfo)
return &StateDiffIndexer{
fileWriter: writer,
chainConfig: chainConfig,
nodeID: config.NodeInfo.ID,
wg: wg,
watchedAddressesFilePath: watchedAddressesFilePath,
fileWriter: writer,
chainConfig: chainConfig,
nodeID: config.NodeInfo.ID,
wg: wg,
}, nil
}
@ -550,162 +545,25 @@ func (sdi *StateDiffIndexer) Close() error {
// LoadWatchedAddresses loads watched addresses from a file
func (sdi *StateDiffIndexer) LoadWatchedAddresses() ([]common.Address, error) {
// load sql statements from watched addresses file
stmts, err := loadWatchedAddressesStatements(sdi.watchedAddressesFilePath)
if err != nil {
return nil, err
}
// extract addresses from the sql statements
watchedAddresses := []common.Address{}
for _, stmt := range stmts {
addressString, err := parseWatchedAddressStatement(stmt)
if err != nil {
return nil, err
}
watchedAddresses = append(watchedAddresses, common.HexToAddress(addressString))
}
return watchedAddresses, nil
return sdi.fileWriter.loadWatchedAddresses()
}
// InsertWatchedAddresses inserts the given addresses in a file
func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error {
// load sql statements from watched addresses file
stmts, err := loadWatchedAddressesStatements(sdi.watchedAddressesFilePath)
if err != nil {
return err
}
// get already watched addresses
var watchedAddresses []string
for _, stmt := range stmts {
addressString, err := parseWatchedAddressStatement(stmt)
if err != nil {
return err
}
watchedAddresses = append(watchedAddresses, addressString)
}
// append statements for new addresses to existing statements
for _, arg := range args {
// ignore if already watched
if funk.Contains(watchedAddresses, arg.Address) {
continue
}
stmt := fmt.Sprintf(watchedAddressesInsert, arg.Address, arg.CreatedAt, currentBlockNumber.Uint64())
stmts = append(stmts, stmt)
}
return dumpWatchedAddressesStatements(sdi.watchedAddressesFilePath, stmts)
return sdi.fileWriter.insertWatchedAddresses(args, currentBlockNumber)
}
// RemoveWatchedAddresses removes the given watched addresses from a file
func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressArg) error {
// load sql statements from watched addresses file
stmts, err := loadWatchedAddressesStatements(sdi.watchedAddressesFilePath)
if err != nil {
return err
}
// get rid of statements having addresses to be removed
var filteredStmts []string
for _, stmt := range stmts {
addressString, err := parseWatchedAddressStatement(stmt)
if err != nil {
return err
}
toRemove := funk.Contains(args, func(arg sdtypes.WatchAddressArg) bool {
return arg.Address == addressString
})
if !toRemove {
filteredStmts = append(filteredStmts, stmt)
}
}
return dumpWatchedAddressesStatements(sdi.watchedAddressesFilePath, filteredStmts)
return sdi.fileWriter.removeWatchedAddresses(args)
}
// SetWatchedAddresses clears and inserts the given addresses in a file
func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error {
var stmts []string
for _, arg := range args {
stmt := fmt.Sprintf(watchedAddressesInsert, arg.Address, arg.CreatedAt, currentBlockNumber.Uint64())
stmts = append(stmts, stmt)
}
return dumpWatchedAddressesStatements(sdi.watchedAddressesFilePath, stmts)
return sdi.fileWriter.setWatchedAddresses(args, currentBlockNumber)
}
// ClearWatchedAddresses clears all the watched addresses from a file
func (sdi *StateDiffIndexer) ClearWatchedAddresses() error {
return sdi.SetWatchedAddresses([]sdtypes.WatchAddressArg{}, big.NewInt(0))
}
// loadWatchedAddressesStatements loads sql statements from the given file in a string slice
func loadWatchedAddressesStatements(filePath string) ([]string, error) {
file, err := os.Open(filePath)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return []string{}, nil
}
return nil, fmt.Errorf("error opening watched addresses file: %v", err)
}
defer file.Close()
stmts := []string{}
scanner := bufio.NewScanner(file)
for scanner.Scan() {
stmts = append(stmts, scanner.Text())
}
if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("error loading watched addresses: %v", err)
}
return stmts, nil
}
// dumpWatchedAddressesStatements dumps sql statements to the given file
func dumpWatchedAddressesStatements(filePath string, stmts []string) error {
file, err := os.Create(filePath)
if err != nil {
return fmt.Errorf("error creating watched addresses file: %v", err)
}
defer file.Close()
for _, stmt := range stmts {
_, err := file.Write([]byte(stmt + "\n"))
if err != nil {
return fmt.Errorf("error inserting watched_addresses entry: %v", err)
}
}
return nil
}
// parseWatchedAddressStatement parses given sql insert statement to extract the address argument
func parseWatchedAddressStatement(stmt string) (string, error) {
parseResult, err := pg_query.Parse(stmt)
if err != nil {
return "", fmt.Errorf("error parsing sql stmt: %v", err)
}
// extract address argument from parse output for a SQL statement of form
// "INSERT INTO eth_meta.watched_addresses (address, created_at, watched_at)
// VALUES ('0xabc', '123', '130') ON CONFLICT (address) DO NOTHING;"
addressString := parseResult.Stmts[0].Stmt.GetInsertStmt().
SelectStmt.GetSelectStmt().
ValuesLists[0].GetList().
Items[0].GetAConst().
GetVal().
GetString_().
Str
return addressString, nil
}

View File

@ -25,7 +25,6 @@ import (
"github.com/ipfs/go-cid"
"github.com/jmoiron/sqlx"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
@ -190,13 +189,3 @@ func resetDB(t *testing.T) {
t.Fatalf("failed to connect to db with connection string: %s err: %v", connStr, err)
}
}
func resetAndDumpWatchedAddressesFileData(t *testing.T) {
resetDB(t)
sqlFileBytes, err := os.ReadFile(file.TestConfig.WatchedAddressesFilePath)
require.NoError(t, err)
_, err = sqlxdb.Exec(string(sqlFileBytes))
require.NoError(t, err)
}

View File

@ -17,10 +17,14 @@
package file
import (
"math/big"
node "github.com/ipfs/go-ipld-format"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
"github.com/ethereum/go-ethereum/statediff/types"
)
// Writer interface required by the file indexer
@ -47,4 +51,10 @@ type FileWriter interface {
upsertIPLDDirect(blockNumber, key string, value []byte)
upsertIPLDNode(blockNumber string, i node.Node)
upsertIPLDRaw(blockNumber string, codec, mh uint64, raw []byte) (string, string, error)
// Methods to read and write watched addresses
loadWatchedAddresses() ([]common.Address, error)
insertWatchedAddresses(args []types.WatchAddressArg, currentBlockNumber *big.Int) error
removeWatchedAddresses(args []types.WatchAddressArg) error
setWatchedAddresses(args []types.WatchAddressArg, currentBlockNumber *big.Int) error
}

View File

@ -81,6 +81,16 @@ func dumpFileData(t *testing.T) {
require.NoError(t, err)
}
func resetAndDumpWatchedAddressesFileData(t *testing.T) {
resetDB(t)
sqlFileBytes, err := os.ReadFile(file.TestConfig.WatchedAddressesFilePath)
require.NoError(t, err)
_, err = sqlxdb.Exec(string(sqlFileBytes))
require.NoError(t, err)
}
func tearDown(t *testing.T) {
file.TearDownDB(t, sqlxdb)

View File

@ -45,6 +45,7 @@ import (
func setupIndexer(t *testing.T) {
file.TestConfig.Mode = file.SQL
file.TestConfig.WatchedAddressesFilePath = "./statediffing_watched_addresses_test_file.sql"
if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
err := os.Remove(file.TestConfig.FilePath)
@ -621,7 +622,7 @@ func TestSQLFileIndexer(t *testing.T) {
})
}
func TestFileWatchAddressMethods(t *testing.T) {
func TestSQLFileWatchAddressMethods(t *testing.T) {
setupIndexer(t)
defer tearDown(t)

View File

@ -17,17 +17,24 @@
package file
import (
"bufio"
"errors"
"fmt"
"io"
"math/big"
"os"
blockstore "github.com/ipfs/go-ipfs-blockstore"
dshelp "github.com/ipfs/go-ipfs-ds-help"
node "github.com/ipfs/go-ipld-format"
pg_query "github.com/pganalyze/pg_query_go/v2"
"github.com/thoas/go-funk"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
"github.com/ethereum/go-ethereum/statediff/types"
)
var (
@ -47,18 +54,21 @@ type SQLWriter struct {
flushFinished chan struct{}
quitChan chan struct{}
doneChan chan struct{}
watchedAddressesFilePath string
}
// NewSQLWriter creates a new pointer to a Writer
func NewSQLWriter(wc io.WriteCloser) *SQLWriter {
func NewSQLWriter(wc io.WriteCloser, watchedAddressesFilePath string) *SQLWriter {
return &SQLWriter{
wc: wc,
stmts: make(chan []byte),
collatedStmt: make([]byte, writeBufferSize),
flushChan: make(chan struct{}),
flushFinished: make(chan struct{}),
quitChan: make(chan struct{}),
doneChan: make(chan struct{}),
wc: wc,
stmts: make(chan []byte),
collatedStmt: make([]byte, writeBufferSize),
flushChan: make(chan struct{}),
flushFinished: make(chan struct{}),
quitChan: make(chan struct{}),
doneChan: make(chan struct{}),
watchedAddressesFilePath: watchedAddressesFilePath,
}
}
@ -257,3 +267,160 @@ func (sqw *SQLWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
sqw.stmts <- []byte(fmt.Sprintf(storageInsert, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID,
storageCID.Path, storageCID.NodeType, true, storageCID.MhKey))
}
// LoadWatchedAddresses loads watched addresses from a file
func (sqw *SQLWriter) loadWatchedAddresses() ([]common.Address, error) {
// load sql statements from watched addresses file
stmts, err := loadWatchedAddressesStatements(sqw.watchedAddressesFilePath)
if err != nil {
return nil, err
}
// extract addresses from the sql statements
watchedAddresses := []common.Address{}
for _, stmt := range stmts {
addressString, err := parseWatchedAddressStatement(stmt)
if err != nil {
return nil, err
}
watchedAddresses = append(watchedAddresses, common.HexToAddress(addressString))
}
return watchedAddresses, nil
}
// InsertWatchedAddresses inserts the given addresses in a file
func (sqw *SQLWriter) insertWatchedAddresses(args []types.WatchAddressArg, currentBlockNumber *big.Int) error {
// load sql statements from watched addresses file
stmts, err := loadWatchedAddressesStatements(sqw.watchedAddressesFilePath)
if err != nil {
return err
}
// get already watched addresses
var watchedAddresses []string
for _, stmt := range stmts {
addressString, err := parseWatchedAddressStatement(stmt)
if err != nil {
return err
}
watchedAddresses = append(watchedAddresses, addressString)
}
// append statements for new addresses to existing statements
for _, arg := range args {
// ignore if already watched
if funk.Contains(watchedAddresses, arg.Address) {
continue
}
stmt := fmt.Sprintf(watchedAddressesInsert, arg.Address, arg.CreatedAt, currentBlockNumber.Uint64())
stmts = append(stmts, stmt)
}
return dumpWatchedAddressesStatements(sqw.watchedAddressesFilePath, stmts)
}
// RemoveWatchedAddresses removes the given watched addresses from a file
func (sqw *SQLWriter) removeWatchedAddresses(args []types.WatchAddressArg) error {
// load sql statements from watched addresses file
stmts, err := loadWatchedAddressesStatements(sqw.watchedAddressesFilePath)
if err != nil {
return err
}
// get rid of statements having addresses to be removed
var filteredStmts []string
for _, stmt := range stmts {
addressString, err := parseWatchedAddressStatement(stmt)
if err != nil {
return err
}
toRemove := funk.Contains(args, func(arg types.WatchAddressArg) bool {
return arg.Address == addressString
})
if !toRemove {
filteredStmts = append(filteredStmts, stmt)
}
}
return dumpWatchedAddressesStatements(sqw.watchedAddressesFilePath, filteredStmts)
}
// SetWatchedAddresses clears and inserts the given addresses in a file
func (sqw *SQLWriter) setWatchedAddresses(args []types.WatchAddressArg, currentBlockNumber *big.Int) error {
var stmts []string
for _, arg := range args {
stmt := fmt.Sprintf(watchedAddressesInsert, arg.Address, arg.CreatedAt, currentBlockNumber.Uint64())
stmts = append(stmts, stmt)
}
return dumpWatchedAddressesStatements(sqw.watchedAddressesFilePath, stmts)
}
// loadWatchedAddressesStatements loads sql statements from the given file in a string slice
func loadWatchedAddressesStatements(filePath string) ([]string, error) {
file, err := os.Open(filePath)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return []string{}, nil
}
return nil, fmt.Errorf("error opening watched addresses file: %v", err)
}
defer file.Close()
stmts := []string{}
scanner := bufio.NewScanner(file)
for scanner.Scan() {
stmts = append(stmts, scanner.Text())
}
if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("error loading watched addresses: %v", err)
}
return stmts, nil
}
// dumpWatchedAddressesStatements dumps sql statements to the given file
func dumpWatchedAddressesStatements(filePath string, stmts []string) error {
file, err := os.Create(filePath)
if err != nil {
return fmt.Errorf("error creating watched addresses file: %v", err)
}
defer file.Close()
for _, stmt := range stmts {
_, err := file.Write([]byte(stmt + "\n"))
if err != nil {
return fmt.Errorf("error inserting watched_addresses entry: %v", err)
}
}
return nil
}
// parseWatchedAddressStatement parses given sql insert statement to extract the address argument
func parseWatchedAddressStatement(stmt string) (string, error) {
parseResult, err := pg_query.Parse(stmt)
if err != nil {
return "", fmt.Errorf("error parsing sql stmt: %v", err)
}
// extract address argument from parse output for a SQL statement of form
// "INSERT INTO eth_meta.watched_addresses (address, created_at, watched_at)
// VALUES ('0xabc', '123', '130') ON CONFLICT (address) DO NOTHING;"
addressString := parseResult.Stmts[0].Stmt.GetInsertStmt().
SelectStmt.GetSelectStmt().
ValuesLists[0].GetList().
Items[0].GetAConst().
GetVal().
GetString_().
Str
return addressString, nil
}

View File

@ -172,3 +172,13 @@ var TableStateAccount = Table{
{name: "storage_root", typ: varchar},
},
}
var TableWatchedAddresses = Table{
"eth_meta.watched_addresses",
[]column{
{name: "address", typ: varchar},
{name: "created_at", typ: bigint},
{name: "watched_at", typ: bigint},
{name: "last_filled_at", typ: bigint},
},
}