diff --git a/statediff/indexer/database/file/config.go b/statediff/indexer/database/file/config.go index 33fd8cd17..ccf2c2e5f 100644 --- a/statediff/indexer/database/file/config.go +++ b/statediff/indexer/database/file/config.go @@ -43,4 +43,5 @@ var TestConfig = Config{ ID: "mockNodeID", ClientName: "go-ethereum", }, + WatchedAddressesFilePath: "./statediffing_watched_addresses_test_file.sql", } diff --git a/statediff/indexer/database/file/indexer.go b/statediff/indexer/database/file/indexer.go index ad497d2a7..490b147c4 100644 --- a/statediff/indexer/database/file/indexer.go +++ b/statediff/indexer/database/file/indexer.go @@ -86,7 +86,14 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c if watchedAddressesFilePath == "" { watchedAddressesFilePath = defaultWatchedAddressesFilePath } - log.Info("Writing watched addresses SQL statements to file", "file", filePath) + if _, err := os.Stat(watchedAddressesFilePath); !errors.Is(err, os.ErrNotExist) { + return nil, fmt.Errorf("cannot create watched addresses file, file (%s) already exists", watchedAddressesFilePath) + } + _, err = os.Create(watchedAddressesFilePath) + if err != nil { + return nil, fmt.Errorf("unable to create file (%s), err: %v", filePath, err) + } + log.Info("Writing watched addresses SQL statements to file", "file", watchedAddressesFilePath) w := NewSQLWriter(file) wg := new(sync.WaitGroup) @@ -524,8 +531,24 @@ func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressA return err } + // get already watched addresses + var watchedAddresses []string + for _, stmt := range stmts { + addressString, err := parseWatchedAddressStmt(stmt) + if err != nil { + return err + } + + watchedAddresses = append(watchedAddresses, addressString) + } + // append statements for new addresses to existing statements for _, arg := range args { + // ignore if already watched + if funk.Contains(watchedAddresses, arg.Address) { + continue + } + stmt := fmt.Sprintf(watchedAddressesInsert, arg.Address, arg.CreatedAt, currentBlockNumber.Uint64()) stmts = append(stmts, stmt) } @@ -579,18 +602,13 @@ func (sdi *StateDiffIndexer) ClearWatchedAddresses() error { // loadWatchedAddressesStmts loads sql statements from the given file in a string slice func loadWatchedAddressesStmts(filePath string) ([]string, error) { - // return emtpy if file does not exist - if _, err := os.Stat(filePath); errors.Is(err, os.ErrNotExist) { - return []string{}, nil - } - file, err := os.Open(filePath) if err != nil { - return nil, err + return nil, fmt.Errorf("error opening watched addresses file: %v", err) } defer file.Close() - var stmts []string + stmts := []string{} scanner := bufio.NewScanner(file) for scanner.Scan() { stmts = append(stmts, scanner.Text()) @@ -607,7 +625,7 @@ func loadWatchedAddressesStmts(filePath string) ([]string, error) { func dumpWatchedAddressesStmts(filePath string, stmts []string) error { file, err := os.Create(filePath) if err != nil { - return fmt.Errorf("error creating watched addresses file (%s): %v", filePath, err) + return fmt.Errorf("error creating watched addresses file: %v", err) } defer file.Close() diff --git a/statediff/indexer/database/file/indexer_legacy_test.go b/statediff/indexer/database/file/indexer_legacy_test.go index 56bca2683..aa5a4ae41 100644 --- a/statediff/indexer/database/file/indexer_legacy_test.go +++ b/statediff/indexer/database/file/indexer_legacy_test.go @@ -81,7 +81,7 @@ func setupLegacy(t *testing.T) { } } -func dumpData(t *testing.T) { +func dumpFileData(t *testing.T) { sqlFileBytes, err := os.ReadFile(file.TestConfig.FilePath) require.NoError(t, err) @@ -89,10 +89,35 @@ func dumpData(t *testing.T) { require.NoError(t, err) } +func dumpWatchedAddressesFileData(t *testing.T) { + resetDB(t) + + sqlFileBytes, err := os.ReadFile(file.TestConfig.WatchedAddressesFilePath) + require.NoError(t, err) + + _, err = sqlxdb.Exec(string(sqlFileBytes)) + require.NoError(t, err) +} + +func resetDB(t *testing.T) { + file.TearDownDB(t, sqlxdb) + + connStr := postgres.DefaultConfig.DbConnectionString() + sqlxdb, err = sqlx.Connect("postgres", connStr) + if err != nil { + t.Fatalf("failed to connect to db with connection string: %s err: %v", connStr, err) + } +} + func tearDown(t *testing.T) { file.TearDownDB(t, sqlxdb) + err := os.Remove(file.TestConfig.FilePath) require.NoError(t, err) + + err = os.Remove(file.TestConfig.WatchedAddressesFilePath) + require.NoError(t, err) + err = sqlxdb.Close() require.NoError(t, err) } @@ -106,7 +131,7 @@ func expectTrue(t *testing.T, value bool) { func TestFileIndexerLegacy(t *testing.T) { t.Run("Publish and index header IPLDs", func(t *testing.T) { setupLegacy(t) - dumpData(t) + dumpFileData(t) defer tearDown(t) pgStr := `SELECT cid, td, reward, block_hash, coinbase FROM eth.header_cids diff --git a/statediff/indexer/database/file/indexer_test.go b/statediff/indexer/database/file/indexer_test.go index ef849e8e8..99caa20fc 100644 --- a/statediff/indexer/database/file/indexer_test.go +++ b/statediff/indexer/database/file/indexer_test.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "math/big" "os" "testing" @@ -28,6 +29,7 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/statediff/indexer/models" "github.com/ethereum/go-ethereum/statediff/indexer/shared" + sdtypes "github.com/ethereum/go-ethereum/statediff/types" "github.com/ipfs/go-cid" blockstore "github.com/ipfs/go-ipfs-blockstore" @@ -51,12 +53,15 @@ var ( ind interfaces.StateDiffIndexer ipfsPgGet = `SELECT data FROM public.blocks WHERE key = $1` - tx1, tx2, tx3, tx4, tx5, rct1, rct2, rct3, rct4, rct5 []byte - mockBlock *types.Block - headerCID, trx1CID, trx2CID, trx3CID, trx4CID, trx5CID cid.Cid - rct1CID, rct2CID, rct3CID, rct4CID, rct5CID cid.Cid - rctLeaf1, rctLeaf2, rctLeaf3, rctLeaf4, rctLeaf5 []byte - state1CID, state2CID, storageCID cid.Cid + tx1, tx2, tx3, tx4, tx5, rct1, rct2, rct3, rct4, rct5 []byte + mockBlock *types.Block + headerCID, trx1CID, trx2CID, trx3CID, trx4CID, trx5CID cid.Cid + rct1CID, rct2CID, rct3CID, rct4CID, rct5CID cid.Cid + rctLeaf1, rctLeaf2, rctLeaf3, rctLeaf4, rctLeaf5 []byte + state1CID, state2CID, storageCID cid.Cid + contract1Address, contract2Address, contract3Address, contract4Address string + contract1CreatedAt, contract2CreatedAt, contract3CreatedAt, contract4CreatedAt uint64 + lastFilledAt, watchedAt1, watchedAt2, watchedAt3 uint64 ) func init() { @@ -161,15 +166,45 @@ func init() { rctLeaf3 = orderedRctLeafNodes[2] rctLeaf4 = orderedRctLeafNodes[3] rctLeaf5 = orderedRctLeafNodes[4] + + contract1Address = "0x5d663F5269090bD2A7DC2390c911dF6083D7b28F" + contract2Address = "0x6Eb7e5C66DB8af2E96159AC440cbc8CDB7fbD26B" + contract3Address = "0xcfeB164C328CA13EFd3C77E1980d94975aDfedfc" + contract4Address = "0x0Edf0c4f393a628DE4828B228C48175b3EA297fc" + contract1CreatedAt = uint64(1) + contract2CreatedAt = uint64(2) + contract3CreatedAt = uint64(3) + contract4CreatedAt = uint64(4) + + lastFilledAt = uint64(0) + watchedAt1 = uint64(10) + watchedAt2 = uint64(15) + watchedAt3 = uint64(20) } -func setup(t *testing.T) { +func setupIndexer(t *testing.T) { if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) { err := os.Remove(file.TestConfig.FilePath) require.NoError(t, err) } + + if _, err := os.Stat(file.TestConfig.WatchedAddressesFilePath); !errors.Is(err, os.ErrNotExist) { + err := os.Remove(file.TestConfig.WatchedAddressesFilePath) + require.NoError(t, err) + } + ind, err = file.NewStateDiffIndexer(context.Background(), mocks.TestConfig, file.TestConfig) require.NoError(t, err) + + connStr := postgres.DefaultConfig.DbConnectionString() + sqlxdb, err = sqlx.Connect("postgres", connStr) + if err != nil { + t.Fatalf("failed to connect to db with connection string: %s err: %v", connStr, err) + } +} + +func setup(t *testing.T) { + setupIndexer(t) var tx interfaces.Batch tx, err = ind.PushBlock( mockBlock, @@ -192,19 +227,12 @@ func setup(t *testing.T) { } test_helpers.ExpectEqual(t, tx.(*file.BatchTx).BlockNumber, mocks.BlockNumber.Uint64()) - - connStr := postgres.DefaultConfig.DbConnectionString() - - sqlxdb, err = sqlx.Connect("postgres", connStr) - if err != nil { - t.Fatalf("failed to connect to db with connection string: %s err: %v", connStr, err) - } } func TestFileIndexer(t *testing.T) { t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) { setup(t) - dumpData(t) + dumpFileData(t) defer tearDown(t) pgStr := `SELECT cid, td, reward, block_hash, coinbase FROM eth.header_cids @@ -242,7 +270,7 @@ func TestFileIndexer(t *testing.T) { }) t.Run("Publish and index transaction IPLDs in a single tx", func(t *testing.T) { setup(t) - dumpData(t) + dumpFileData(t) defer tearDown(t) // check that txs were properly indexed and published @@ -370,7 +398,7 @@ func TestFileIndexer(t *testing.T) { t.Run("Publish and index log IPLDs for multiple receipt of a specific block", func(t *testing.T) { setup(t) - dumpData(t) + dumpFileData(t) defer tearDown(t) rcts := make([]string, 0) @@ -426,7 +454,7 @@ func TestFileIndexer(t *testing.T) { t.Run("Publish and index receipt IPLDs in a single tx", func(t *testing.T) { setup(t) - dumpData(t) + dumpFileData(t) defer tearDown(t) // check receipts were properly indexed and published @@ -527,7 +555,7 @@ func TestFileIndexer(t *testing.T) { t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) { setup(t) - dumpData(t) + dumpFileData(t) defer tearDown(t) // check that state nodes were properly indexed and published @@ -618,7 +646,7 @@ func TestFileIndexer(t *testing.T) { t.Run("Publish and index storage IPLDs in a single tx", func(t *testing.T) { setup(t) - dumpData(t) + dumpFileData(t) defer tearDown(t) // check that storage nodes were properly indexed @@ -688,3 +716,305 @@ func TestFileIndexer(t *testing.T) { test_helpers.ExpectEqual(t, data, []byte{}) }) } + +func TestFileWatchAddressMethods(t *testing.T) { + setupIndexer(t) + defer tearDown(t) + + type res struct { + Address string `db:"address"` + CreatedAt uint64 `db:"created_at"` + WatchedAt uint64 `db:"watched_at"` + LastFilledAt uint64 `db:"last_filled_at"` + } + pgStr := "SELECT * FROM eth_meta.watched_addresses" + + t.Run("Insert watched addresses", func(t *testing.T) { + args := []sdtypes.WatchAddressArg{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + }, + } + expectedData := []res{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + WatchedAt: watchedAt1, + LastFilledAt: lastFilledAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + WatchedAt: watchedAt1, + LastFilledAt: lastFilledAt, + }, + } + + ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt1))) + dumpWatchedAddressesFileData(t) + + rows := []res{} + err = sqlxdb.Select(&rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Insert watched addresses (some already watched)", func(t *testing.T) { + args := []sdtypes.WatchAddressArg{ + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + }, + } + expectedData := []res{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + WatchedAt: watchedAt1, + LastFilledAt: lastFilledAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + WatchedAt: watchedAt1, + LastFilledAt: lastFilledAt, + }, + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + WatchedAt: watchedAt2, + LastFilledAt: lastFilledAt, + }, + } + + ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt2))) + dumpWatchedAddressesFileData(t) + + rows := []res{} + err = sqlxdb.Select(&rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Remove watched addresses", func(t *testing.T) { + args := []sdtypes.WatchAddressArg{ + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + }, + } + expectedData := []res{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + WatchedAt: watchedAt1, + LastFilledAt: lastFilledAt, + }, + } + + ind.RemoveWatchedAddresses(args) + dumpWatchedAddressesFileData(t) + + rows := []res{} + err = sqlxdb.Select(&rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Remove watched addresses (some non-watched)", func(t *testing.T) { + args := []sdtypes.WatchAddressArg{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + }, + } + expectedData := []res{} + + ind.RemoveWatchedAddresses(args) + dumpWatchedAddressesFileData(t) + + rows := []res{} + err = sqlxdb.Select(&rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Set watched addresses", func(t *testing.T) { + args := []sdtypes.WatchAddressArg{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + }, + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + }, + } + expectedData := []res{ + { + Address: contract1Address, + CreatedAt: contract1CreatedAt, + WatchedAt: watchedAt2, + LastFilledAt: lastFilledAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + WatchedAt: watchedAt2, + LastFilledAt: lastFilledAt, + }, + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + WatchedAt: watchedAt2, + LastFilledAt: lastFilledAt, + }, + } + + ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt2))) + dumpWatchedAddressesFileData(t) + + rows := []res{} + err = sqlxdb.Select(&rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Set watched addresses (some already watched)", func(t *testing.T) { + args := []sdtypes.WatchAddressArg{ + { + Address: contract4Address, + CreatedAt: contract4CreatedAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + }, + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + }, + } + expectedData := []res{ + { + Address: contract4Address, + CreatedAt: contract4CreatedAt, + WatchedAt: watchedAt3, + LastFilledAt: lastFilledAt, + }, + { + Address: contract2Address, + CreatedAt: contract2CreatedAt, + WatchedAt: watchedAt3, + LastFilledAt: lastFilledAt, + }, + { + Address: contract3Address, + CreatedAt: contract3CreatedAt, + WatchedAt: watchedAt3, + LastFilledAt: lastFilledAt, + }, + } + + ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt3))) + dumpWatchedAddressesFileData(t) + + rows := []res{} + err = sqlxdb.Select(&rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Clear watched addresses", func(t *testing.T) { + expectedData := []res{} + + ind.ClearWatchedAddresses() + dumpWatchedAddressesFileData(t) + + rows := []res{} + err = sqlxdb.Select(&rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) + + t.Run("Clear watched addresses (empty table)", func(t *testing.T) { + expectedData := []res{} + + ind.ClearWatchedAddresses() + dumpWatchedAddressesFileData(t) + + rows := []res{} + err = sqlxdb.Select(&rows, pgStr) + if err != nil { + t.Fatal(err) + } + + expectTrue(t, len(rows) == len(expectedData)) + for idx, row := range rows { + test_helpers.ExpectEqual(t, row, expectedData[idx]) + } + }) +} diff --git a/statediff/indexer/database/file/test_helpers.go b/statediff/indexer/database/file/test_helpers.go index 27d204d55..27a1581a4 100644 --- a/statediff/indexer/database/file/test_helpers.go +++ b/statediff/indexer/database/file/test_helpers.go @@ -57,6 +57,10 @@ func TearDownDB(t *testing.T, db *sqlx.DB) { if err != nil { t.Fatal(err) } + _, err = tx.Exec(`DELETE FROM eth_meta.watched_addresses`) + if err != nil { + t.Fatal(err) + } err = tx.Commit() if err != nil { t.Fatal(err)