Add file mode indexer unit tests for watched address methods

This commit is contained in:
Prathamesh Musale 2022-03-18 19:49:41 +05:30
parent a7f9354f0e
commit e7526469db
5 changed files with 409 additions and 31 deletions

View File

@ -43,4 +43,5 @@ var TestConfig = Config{
ID: "mockNodeID", ID: "mockNodeID",
ClientName: "go-ethereum", ClientName: "go-ethereum",
}, },
WatchedAddressesFilePath: "./statediffing_watched_addresses_test_file.sql",
} }

View File

@ -86,7 +86,14 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c
if watchedAddressesFilePath == "" { if watchedAddressesFilePath == "" {
watchedAddressesFilePath = defaultWatchedAddressesFilePath watchedAddressesFilePath = defaultWatchedAddressesFilePath
} }
log.Info("Writing watched addresses SQL statements to file", "file", filePath) if _, err := os.Stat(watchedAddressesFilePath); !errors.Is(err, os.ErrNotExist) {
return nil, fmt.Errorf("cannot create watched addresses file, file (%s) already exists", watchedAddressesFilePath)
}
_, err = os.Create(watchedAddressesFilePath)
if err != nil {
return nil, fmt.Errorf("unable to create file (%s), err: %v", filePath, err)
}
log.Info("Writing watched addresses SQL statements to file", "file", watchedAddressesFilePath)
w := NewSQLWriter(file) w := NewSQLWriter(file)
wg := new(sync.WaitGroup) wg := new(sync.WaitGroup)
@ -524,8 +531,24 @@ func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressA
return err return err
} }
// get already watched addresses
var watchedAddresses []string
for _, stmt := range stmts {
addressString, err := parseWatchedAddressStmt(stmt)
if err != nil {
return err
}
watchedAddresses = append(watchedAddresses, addressString)
}
// append statements for new addresses to existing statements // append statements for new addresses to existing statements
for _, arg := range args { for _, arg := range args {
// ignore if already watched
if funk.Contains(watchedAddresses, arg.Address) {
continue
}
stmt := fmt.Sprintf(watchedAddressesInsert, arg.Address, arg.CreatedAt, currentBlockNumber.Uint64()) stmt := fmt.Sprintf(watchedAddressesInsert, arg.Address, arg.CreatedAt, currentBlockNumber.Uint64())
stmts = append(stmts, stmt) stmts = append(stmts, stmt)
} }
@ -579,18 +602,13 @@ func (sdi *StateDiffIndexer) ClearWatchedAddresses() error {
// loadWatchedAddressesStmts loads sql statements from the given file in a string slice // loadWatchedAddressesStmts loads sql statements from the given file in a string slice
func loadWatchedAddressesStmts(filePath string) ([]string, error) { func loadWatchedAddressesStmts(filePath string) ([]string, error) {
// return emtpy if file does not exist
if _, err := os.Stat(filePath); errors.Is(err, os.ErrNotExist) {
return []string{}, nil
}
file, err := os.Open(filePath) file, err := os.Open(filePath)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("error opening watched addresses file: %v", err)
} }
defer file.Close() defer file.Close()
var stmts []string stmts := []string{}
scanner := bufio.NewScanner(file) scanner := bufio.NewScanner(file)
for scanner.Scan() { for scanner.Scan() {
stmts = append(stmts, scanner.Text()) stmts = append(stmts, scanner.Text())
@ -607,7 +625,7 @@ func loadWatchedAddressesStmts(filePath string) ([]string, error) {
func dumpWatchedAddressesStmts(filePath string, stmts []string) error { func dumpWatchedAddressesStmts(filePath string, stmts []string) error {
file, err := os.Create(filePath) file, err := os.Create(filePath)
if err != nil { if err != nil {
return fmt.Errorf("error creating watched addresses file (%s): %v", filePath, err) return fmt.Errorf("error creating watched addresses file: %v", err)
} }
defer file.Close() defer file.Close()

View File

@ -81,7 +81,7 @@ func setupLegacy(t *testing.T) {
} }
} }
func dumpData(t *testing.T) { func dumpFileData(t *testing.T) {
sqlFileBytes, err := os.ReadFile(file.TestConfig.FilePath) sqlFileBytes, err := os.ReadFile(file.TestConfig.FilePath)
require.NoError(t, err) require.NoError(t, err)
@ -89,10 +89,35 @@ func dumpData(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
func dumpWatchedAddressesFileData(t *testing.T) {
resetDB(t)
sqlFileBytes, err := os.ReadFile(file.TestConfig.WatchedAddressesFilePath)
require.NoError(t, err)
_, err = sqlxdb.Exec(string(sqlFileBytes))
require.NoError(t, err)
}
func resetDB(t *testing.T) {
file.TearDownDB(t, sqlxdb)
connStr := postgres.DefaultConfig.DbConnectionString()
sqlxdb, err = sqlx.Connect("postgres", connStr)
if err != nil {
t.Fatalf("failed to connect to db with connection string: %s err: %v", connStr, err)
}
}
func tearDown(t *testing.T) { func tearDown(t *testing.T) {
file.TearDownDB(t, sqlxdb) file.TearDownDB(t, sqlxdb)
err := os.Remove(file.TestConfig.FilePath) err := os.Remove(file.TestConfig.FilePath)
require.NoError(t, err) require.NoError(t, err)
err = os.Remove(file.TestConfig.WatchedAddressesFilePath)
require.NoError(t, err)
err = sqlxdb.Close() err = sqlxdb.Close()
require.NoError(t, err) require.NoError(t, err)
} }
@ -106,7 +131,7 @@ func expectTrue(t *testing.T, value bool) {
func TestFileIndexerLegacy(t *testing.T) { func TestFileIndexerLegacy(t *testing.T) {
t.Run("Publish and index header IPLDs", func(t *testing.T) { t.Run("Publish and index header IPLDs", func(t *testing.T) {
setupLegacy(t) setupLegacy(t)
dumpData(t) dumpFileData(t)
defer tearDown(t) defer tearDown(t)
pgStr := `SELECT cid, td, reward, block_hash, coinbase pgStr := `SELECT cid, td, reward, block_hash, coinbase
FROM eth.header_cids FROM eth.header_cids

View File

@ -21,6 +21,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"math/big"
"os" "os"
"testing" "testing"
@ -28,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff/indexer/models" "github.com/ethereum/go-ethereum/statediff/indexer/models"
"github.com/ethereum/go-ethereum/statediff/indexer/shared" "github.com/ethereum/go-ethereum/statediff/indexer/shared"
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
"github.com/ipfs/go-cid" "github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore" blockstore "github.com/ipfs/go-ipfs-blockstore"
@ -51,12 +53,15 @@ var (
ind interfaces.StateDiffIndexer ind interfaces.StateDiffIndexer
ipfsPgGet = `SELECT data FROM public.blocks ipfsPgGet = `SELECT data FROM public.blocks
WHERE key = $1` WHERE key = $1`
tx1, tx2, tx3, tx4, tx5, rct1, rct2, rct3, rct4, rct5 []byte tx1, tx2, tx3, tx4, tx5, rct1, rct2, rct3, rct4, rct5 []byte
mockBlock *types.Block mockBlock *types.Block
headerCID, trx1CID, trx2CID, trx3CID, trx4CID, trx5CID cid.Cid headerCID, trx1CID, trx2CID, trx3CID, trx4CID, trx5CID cid.Cid
rct1CID, rct2CID, rct3CID, rct4CID, rct5CID cid.Cid rct1CID, rct2CID, rct3CID, rct4CID, rct5CID cid.Cid
rctLeaf1, rctLeaf2, rctLeaf3, rctLeaf4, rctLeaf5 []byte rctLeaf1, rctLeaf2, rctLeaf3, rctLeaf4, rctLeaf5 []byte
state1CID, state2CID, storageCID cid.Cid state1CID, state2CID, storageCID cid.Cid
contract1Address, contract2Address, contract3Address, contract4Address string
contract1CreatedAt, contract2CreatedAt, contract3CreatedAt, contract4CreatedAt uint64
lastFilledAt, watchedAt1, watchedAt2, watchedAt3 uint64
) )
func init() { func init() {
@ -161,15 +166,45 @@ func init() {
rctLeaf3 = orderedRctLeafNodes[2] rctLeaf3 = orderedRctLeafNodes[2]
rctLeaf4 = orderedRctLeafNodes[3] rctLeaf4 = orderedRctLeafNodes[3]
rctLeaf5 = orderedRctLeafNodes[4] rctLeaf5 = orderedRctLeafNodes[4]
contract1Address = "0x5d663F5269090bD2A7DC2390c911dF6083D7b28F"
contract2Address = "0x6Eb7e5C66DB8af2E96159AC440cbc8CDB7fbD26B"
contract3Address = "0xcfeB164C328CA13EFd3C77E1980d94975aDfedfc"
contract4Address = "0x0Edf0c4f393a628DE4828B228C48175b3EA297fc"
contract1CreatedAt = uint64(1)
contract2CreatedAt = uint64(2)
contract3CreatedAt = uint64(3)
contract4CreatedAt = uint64(4)
lastFilledAt = uint64(0)
watchedAt1 = uint64(10)
watchedAt2 = uint64(15)
watchedAt3 = uint64(20)
} }
func setup(t *testing.T) { func setupIndexer(t *testing.T) {
if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) { if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) {
err := os.Remove(file.TestConfig.FilePath) err := os.Remove(file.TestConfig.FilePath)
require.NoError(t, err) require.NoError(t, err)
} }
if _, err := os.Stat(file.TestConfig.WatchedAddressesFilePath); !errors.Is(err, os.ErrNotExist) {
err := os.Remove(file.TestConfig.WatchedAddressesFilePath)
require.NoError(t, err)
}
ind, err = file.NewStateDiffIndexer(context.Background(), mocks.TestConfig, file.TestConfig) ind, err = file.NewStateDiffIndexer(context.Background(), mocks.TestConfig, file.TestConfig)
require.NoError(t, err) require.NoError(t, err)
connStr := postgres.DefaultConfig.DbConnectionString()
sqlxdb, err = sqlx.Connect("postgres", connStr)
if err != nil {
t.Fatalf("failed to connect to db with connection string: %s err: %v", connStr, err)
}
}
func setup(t *testing.T) {
setupIndexer(t)
var tx interfaces.Batch var tx interfaces.Batch
tx, err = ind.PushBlock( tx, err = ind.PushBlock(
mockBlock, mockBlock,
@ -192,19 +227,12 @@ func setup(t *testing.T) {
} }
test_helpers.ExpectEqual(t, tx.(*file.BatchTx).BlockNumber, mocks.BlockNumber.Uint64()) test_helpers.ExpectEqual(t, tx.(*file.BatchTx).BlockNumber, mocks.BlockNumber.Uint64())
connStr := postgres.DefaultConfig.DbConnectionString()
sqlxdb, err = sqlx.Connect("postgres", connStr)
if err != nil {
t.Fatalf("failed to connect to db with connection string: %s err: %v", connStr, err)
}
} }
func TestFileIndexer(t *testing.T) { func TestFileIndexer(t *testing.T) {
t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) { t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) {
setup(t) setup(t)
dumpData(t) dumpFileData(t)
defer tearDown(t) defer tearDown(t)
pgStr := `SELECT cid, td, reward, block_hash, coinbase pgStr := `SELECT cid, td, reward, block_hash, coinbase
FROM eth.header_cids FROM eth.header_cids
@ -242,7 +270,7 @@ func TestFileIndexer(t *testing.T) {
}) })
t.Run("Publish and index transaction IPLDs in a single tx", func(t *testing.T) { t.Run("Publish and index transaction IPLDs in a single tx", func(t *testing.T) {
setup(t) setup(t)
dumpData(t) dumpFileData(t)
defer tearDown(t) defer tearDown(t)
// check that txs were properly indexed and published // check that txs were properly indexed and published
@ -370,7 +398,7 @@ func TestFileIndexer(t *testing.T) {
t.Run("Publish and index log IPLDs for multiple receipt of a specific block", func(t *testing.T) { t.Run("Publish and index log IPLDs for multiple receipt of a specific block", func(t *testing.T) {
setup(t) setup(t)
dumpData(t) dumpFileData(t)
defer tearDown(t) defer tearDown(t)
rcts := make([]string, 0) rcts := make([]string, 0)
@ -426,7 +454,7 @@ func TestFileIndexer(t *testing.T) {
t.Run("Publish and index receipt IPLDs in a single tx", func(t *testing.T) { t.Run("Publish and index receipt IPLDs in a single tx", func(t *testing.T) {
setup(t) setup(t)
dumpData(t) dumpFileData(t)
defer tearDown(t) defer tearDown(t)
// check receipts were properly indexed and published // check receipts were properly indexed and published
@ -527,7 +555,7 @@ func TestFileIndexer(t *testing.T) {
t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) { t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) {
setup(t) setup(t)
dumpData(t) dumpFileData(t)
defer tearDown(t) defer tearDown(t)
// check that state nodes were properly indexed and published // check that state nodes were properly indexed and published
@ -618,7 +646,7 @@ func TestFileIndexer(t *testing.T) {
t.Run("Publish and index storage IPLDs in a single tx", func(t *testing.T) { t.Run("Publish and index storage IPLDs in a single tx", func(t *testing.T) {
setup(t) setup(t)
dumpData(t) dumpFileData(t)
defer tearDown(t) defer tearDown(t)
// check that storage nodes were properly indexed // check that storage nodes were properly indexed
@ -688,3 +716,305 @@ func TestFileIndexer(t *testing.T) {
test_helpers.ExpectEqual(t, data, []byte{}) test_helpers.ExpectEqual(t, data, []byte{})
}) })
} }
func TestFileWatchAddressMethods(t *testing.T) {
setupIndexer(t)
defer tearDown(t)
type res struct {
Address string `db:"address"`
CreatedAt uint64 `db:"created_at"`
WatchedAt uint64 `db:"watched_at"`
LastFilledAt uint64 `db:"last_filled_at"`
}
pgStr := "SELECT * FROM eth_meta.watched_addresses"
t.Run("Insert watched addresses", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
}
expectedData := []res{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
}
ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt1)))
dumpWatchedAddressesFileData(t)
rows := []res{}
err = sqlxdb.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Insert watched addresses (some already watched)", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
}
expectedData := []res{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
}
ind.InsertWatchedAddresses(args, big.NewInt(int64(watchedAt2)))
dumpWatchedAddressesFileData(t)
rows := []res{}
err = sqlxdb.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Remove watched addresses", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
}
expectedData := []res{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
WatchedAt: watchedAt1,
LastFilledAt: lastFilledAt,
},
}
ind.RemoveWatchedAddresses(args)
dumpWatchedAddressesFileData(t)
rows := []res{}
err = sqlxdb.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Remove watched addresses (some non-watched)", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
}
expectedData := []res{}
ind.RemoveWatchedAddresses(args)
dumpWatchedAddressesFileData(t)
rows := []res{}
err = sqlxdb.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Set watched addresses", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
},
}
expectedData := []res{
{
Address: contract1Address,
CreatedAt: contract1CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
WatchedAt: watchedAt2,
LastFilledAt: lastFilledAt,
},
}
ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt2)))
dumpWatchedAddressesFileData(t)
rows := []res{}
err = sqlxdb.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Set watched addresses (some already watched)", func(t *testing.T) {
args := []sdtypes.WatchAddressArg{
{
Address: contract4Address,
CreatedAt: contract4CreatedAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
},
}
expectedData := []res{
{
Address: contract4Address,
CreatedAt: contract4CreatedAt,
WatchedAt: watchedAt3,
LastFilledAt: lastFilledAt,
},
{
Address: contract2Address,
CreatedAt: contract2CreatedAt,
WatchedAt: watchedAt3,
LastFilledAt: lastFilledAt,
},
{
Address: contract3Address,
CreatedAt: contract3CreatedAt,
WatchedAt: watchedAt3,
LastFilledAt: lastFilledAt,
},
}
ind.SetWatchedAddresses(args, big.NewInt(int64(watchedAt3)))
dumpWatchedAddressesFileData(t)
rows := []res{}
err = sqlxdb.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Clear watched addresses", func(t *testing.T) {
expectedData := []res{}
ind.ClearWatchedAddresses()
dumpWatchedAddressesFileData(t)
rows := []res{}
err = sqlxdb.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
t.Run("Clear watched addresses (empty table)", func(t *testing.T) {
expectedData := []res{}
ind.ClearWatchedAddresses()
dumpWatchedAddressesFileData(t)
rows := []res{}
err = sqlxdb.Select(&rows, pgStr)
if err != nil {
t.Fatal(err)
}
expectTrue(t, len(rows) == len(expectedData))
for idx, row := range rows {
test_helpers.ExpectEqual(t, row, expectedData[idx])
}
})
}

View File

@ -57,6 +57,10 @@ func TearDownDB(t *testing.T, db *sqlx.DB) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
_, err = tx.Exec(`DELETE FROM eth_meta.watched_addresses`)
if err != nil {
t.Fatal(err)
}
err = tx.Commit() err = tx.Commit()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)