From ea6dfb8fa5a30ec8a35079944dc03ba398009a6c Mon Sep 17 00:00:00 2001 From: i-norden Date: Mon, 15 May 2023 19:52:22 -0500 Subject: [PATCH 1/3] fix isWatchedAddress and isValidPath usage --- statediff/builder.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/statediff/builder.go b/statediff/builder.go index 291cdbb8a..6299edb23 100644 --- a/statediff/builder.go +++ b/statediff/builder.go @@ -200,7 +200,7 @@ func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator, it, itCount := trie.NewDifferenceIterator(a, b) for it.Next(true) { // ignore node if it is not along paths of interest - if watchingAddresses && !isValidPrefixPath(watchedAddressesLeafPaths, it.Path()) { + if watchingAddresses && !isValidPrefixPath(watchedAddressesLeafPaths, append(prefixPath, it.Path()...)) { continue } // index values by leaf key @@ -234,8 +234,7 @@ func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator, return nil, err } if ok { - nodePath := make([]byte, len(it.Path())) - copy(nodePath, it.Path()) + nodePath := append(prefixPath, it.Path()...) partialPath := trie.CompactToHex(elements[0].([]byte)) valueNodePath := append(nodePath, partialPath...) if !isWatchedAddress(watchedAddressesLeafPaths, valueNodePath) { @@ -262,7 +261,7 @@ func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator, func (sdb *StateDiffBuilder) processStateValueNode(it trie.NodeIterator, watchedAddressesLeafPaths [][]byte, prefixPath []byte) (*types2.AccountWrapper, error) { // skip if it is not a watched address // If we aren't watching any specific addresses, we are watching everything - if len(watchedAddressesLeafPaths) > 0 && !isWatchedAddress(watchedAddressesLeafPaths, it.Path()) { + if len(watchedAddressesLeafPaths) > 0 && !isWatchedAddress(watchedAddressesLeafPaths, append(prefixPath, it.Path()...)) { return nil, nil } @@ -310,7 +309,7 @@ func (sdb *StateDiffBuilder) deletedOrUpdatedState(a, b trie.NodeIterator, diffA it, _ := trie.NewDifferenceIterator(b, a) for it.Next(true) { // ignore node if it is not along paths of interest - if watchingAddresses && !isValidPrefixPath(watchedAddressesLeafPaths, it.Path()) { + if watchingAddresses && !isValidPrefixPath(watchedAddressesLeafPaths, append(prefixPath, it.Path()...)) { continue } -- 2.45.2 From 3555790e7686141ece307527f48341e67a8c0df4 Mon Sep 17 00:00:00 2001 From: i-norden Date: Thu, 30 Mar 2023 18:41:01 -0500 Subject: [PATCH 2/3] Small fixes replace new(trie.Trie) reduce debug noise patch postgres config - add driver, rename DefaultConfig => TestConfig --- statediff/indexer/database/metrics/metrics.go | 2 +- .../indexer/database/sql/pgx_indexer_legacy_test.go | 2 +- statediff/indexer/database/sql/pgx_indexer_test.go | 8 ++++---- statediff/indexer/database/sql/postgres/config.go | 5 +++-- statediff/indexer/database/sql/postgres/pgx_test.go | 4 ++-- statediff/indexer/database/sql/postgres/sqlx_test.go | 6 +++--- .../indexer/database/sql/postgres/test_helpers.go | 2 +- statediff/service_test.go | 10 +++++----- 8 files changed, 20 insertions(+), 19 deletions(-) diff --git a/statediff/indexer/database/metrics/metrics.go b/statediff/indexer/database/metrics/metrics.go index 34cb3a69b..6174e203c 100644 --- a/statediff/indexer/database/metrics/metrics.go +++ b/statediff/indexer/database/metrics/metrics.go @@ -253,7 +253,7 @@ func (met *dbMetricsHandles) Update(stats DbStats) { func ReportAndUpdateDuration(msg string, start time.Time, logger log.Logger, timer metrics.Timer) { since := UpdateDuration(start, timer) - logger.Debug(fmt.Sprintf("%s duration=%dms", msg, since.Milliseconds())) + logger.Trace(fmt.Sprintf("%s duration=%dms", msg, since.Milliseconds())) } func UpdateDuration(start time.Time, timer metrics.Timer) time.Duration { diff --git a/statediff/indexer/database/sql/pgx_indexer_legacy_test.go b/statediff/indexer/database/sql/pgx_indexer_legacy_test.go index 80094a8d0..b07987754 100644 --- a/statediff/indexer/database/sql/pgx_indexer_legacy_test.go +++ b/statediff/indexer/database/sql/pgx_indexer_legacy_test.go @@ -28,7 +28,7 @@ import ( ) func setupLegacyPGXIndexer(t *testing.T) { - db, err = postgres.SetupPGXDB(postgres.DefaultConfig) + db, err = postgres.SetupPGXDB(postgres.TestConfig) if err != nil { t.Fatal(err) } diff --git a/statediff/indexer/database/sql/pgx_indexer_test.go b/statediff/indexer/database/sql/pgx_indexer_test.go index c0ce57c1f..27e9f2080 100644 --- a/statediff/indexer/database/sql/pgx_indexer_test.go +++ b/statediff/indexer/database/sql/pgx_indexer_test.go @@ -39,7 +39,7 @@ func setupPGXIndexer(t *testing.T, config postgres.Config) { } func setupPGX(t *testing.T) { - setupPGXWithConfig(t, postgres.DefaultConfig) + setupPGXWithConfig(t, postgres.TestConfig) } func setupPGXWithConfig(t *testing.T, config postgres.Config) { @@ -48,7 +48,7 @@ func setupPGXWithConfig(t *testing.T, config postgres.Config) { } func setupPGXNonCanonical(t *testing.T) { - setupPGXIndexer(t, postgres.DefaultConfig) + setupPGXIndexer(t, postgres.TestConfig) test.SetupTestDataNonCanonical(t, ind) } @@ -103,7 +103,7 @@ func TestPGXIndexer(t *testing.T) { }) t.Run("Publish and index with CopyFrom enabled.", func(t *testing.T) { - config := postgres.DefaultConfig + config := postgres.TestConfig config.CopyFrom = true setupPGXWithConfig(t, config) @@ -169,7 +169,7 @@ func TestPGXIndexerNonCanonical(t *testing.T) { } func TestPGXWatchAddressMethods(t *testing.T) { - setupPGXIndexer(t, postgres.DefaultConfig) + setupPGXIndexer(t, postgres.TestConfig) defer tearDown(t) defer checkTxClosure(t, 1, 0, 1) diff --git a/statediff/indexer/database/sql/postgres/config.go b/statediff/indexer/database/sql/postgres/config.go index 1cbec55c1..28c5aaa92 100644 --- a/statediff/indexer/database/sql/postgres/config.go +++ b/statediff/indexer/database/sql/postgres/config.go @@ -56,13 +56,14 @@ func ResolveDriverType(str string) (DriverType, error) { } } -// DefaultConfig are default parameters for connecting to a Postgres sql -var DefaultConfig = Config{ +// TestConfig specifies default parameters for connecting to a testing DB +var TestConfig = Config{ Hostname: "localhost", Port: 8077, DatabaseName: "cerc_testing", Username: "vdbm", Password: "password", + Driver: SQLX, } // Config holds params for a Postgres db diff --git a/statediff/indexer/database/sql/postgres/pgx_test.go b/statediff/indexer/database/sql/postgres/pgx_test.go index 043112e8d..86d082ace 100644 --- a/statediff/indexer/database/sql/postgres/pgx_test.go +++ b/statediff/indexer/database/sql/postgres/pgx_test.go @@ -31,7 +31,7 @@ import ( ) var ( - pgConfig, _ = postgres.MakeConfig(postgres.DefaultConfig) + pgConfig, _ = postgres.MakeConfig(postgres.TestConfig) ctx = context.Background() ) @@ -111,7 +111,7 @@ func TestPostgresPGX(t *testing.T) { badHash := fmt.Sprintf("x %s", strings.Repeat("1", 100)) badInfo := node.Info{GenesisBlock: badHash, NetworkID: "1", ID: "x123", ClientName: "geth"} - _, err := postgres.NewPGXDriver(ctx, postgres.DefaultConfig, badInfo) + _, err := postgres.NewPGXDriver(ctx, postgres.TestConfig, badInfo) if err == nil { t.Fatal("Expected an error") } diff --git a/statediff/indexer/database/sql/postgres/sqlx_test.go b/statediff/indexer/database/sql/postgres/sqlx_test.go index 40b976314..903f8715b 100644 --- a/statediff/indexer/database/sql/postgres/sqlx_test.go +++ b/statediff/indexer/database/sql/postgres/sqlx_test.go @@ -35,7 +35,7 @@ func TestPostgresSQLX(t *testing.T) { t.Run("connects to the database", func(t *testing.T) { var err error - connStr := postgres.DefaultConfig.DbConnectionString() + connStr := postgres.TestConfig.DbConnectionString() sqlxdb, err = sqlx.Connect("postgres", connStr) if err != nil { @@ -58,7 +58,7 @@ func TestPostgresSQLX(t *testing.T) { // sized int, so use string representation of big.Int // and cast on insert - connStr := postgres.DefaultConfig.DbConnectionString() + connStr := postgres.TestConfig.DbConnectionString() db, err := sqlx.Connect("postgres", connStr) if err != nil { t.Fatal(err) @@ -109,7 +109,7 @@ func TestPostgresSQLX(t *testing.T) { badHash := fmt.Sprintf("x %s", strings.Repeat("1", 100)) badInfo := node.Info{GenesisBlock: badHash, NetworkID: "1", ID: "x123", ClientName: "geth"} - _, err := postgres.NewSQLXDriver(ctx, postgres.DefaultConfig, badInfo) + _, err := postgres.NewSQLXDriver(ctx, postgres.TestConfig, badInfo) if err == nil { t.Fatal("Expected an error") } diff --git a/statediff/indexer/database/sql/postgres/test_helpers.go b/statediff/indexer/database/sql/postgres/test_helpers.go index cb5255429..75c50e315 100644 --- a/statediff/indexer/database/sql/postgres/test_helpers.go +++ b/statediff/indexer/database/sql/postgres/test_helpers.go @@ -25,7 +25,7 @@ import ( // SetupSQLXDB is used to setup a sqlx db for tests func SetupSQLXDB() (sql.Database, error) { - conf := DefaultConfig + conf := TestConfig conf.MaxIdle = 0 driver, err := NewSQLXDriver(context.Background(), conf, node.Info{}) if err != nil { diff --git a/statediff/service_test.go b/statediff/service_test.go index ceea79ece..6e6ad200c 100644 --- a/statediff/service_test.go +++ b/statediff/service_test.go @@ -54,8 +54,8 @@ var ( parentHeader1 = types.Header{Number: big.NewInt(rand.Int63()), Root: parentRoot1} parentHeader2 = types.Header{Number: big.NewInt(rand.Int63()), Root: parentRoot2} - parentBlock1 = types.NewBlock(&parentHeader1, nil, nil, nil, new(trie.Trie)) - parentBlock2 = types.NewBlock(&parentHeader2, nil, nil, nil, new(trie.Trie)) + parentBlock1 = types.NewBlock(&parentHeader1, nil, nil, nil, trie.NewEmpty(nil)) + parentBlock2 = types.NewBlock(&parentHeader2, nil, nil, nil, trie.NewEmpty(nil)) parentHash1 = parentBlock1.Hash() parentHash2 = parentBlock2.Hash() @@ -67,9 +67,9 @@ var ( header2 = types.Header{ParentHash: parentHash2, Root: testRoot2, Number: big.NewInt(2)} header3 = types.Header{ParentHash: common.HexToHash("parent hash"), Root: testRoot3, Number: big.NewInt(3)} - testBlock1 = types.NewBlock(&header1, nil, nil, nil, new(trie.Trie)) - testBlock2 = types.NewBlock(&header2, nil, nil, nil, new(trie.Trie)) - testBlock3 = types.NewBlock(&header3, nil, nil, nil, new(trie.Trie)) + testBlock1 = types.NewBlock(&header1, nil, nil, nil, trie.NewEmpty(nil)) + testBlock2 = types.NewBlock(&header2, nil, nil, nil, trie.NewEmpty(nil)) + testBlock3 = types.NewBlock(&header3, nil, nil, nil, trie.NewEmpty(nil)) receiptRoot1 = common.HexToHash("0x05") receiptRoot2 = common.HexToHash("0x06") -- 2.45.2 From 79a7643a2896577f6ae3ca3a357ade4b0a2dcd3b Mon Sep 17 00:00:00 2001 From: Roy Crihfield Date: Tue, 16 May 2023 17:24:02 +0800 Subject: [PATCH 3/3] concurrent iterator: pass a constructor method instead of state.Trie This allows us to use our alt trie implementation --- trie/concurrent_iterator/iterator.go | 8 ++++++-- trie/concurrent_iterator/iterator_test.go | 2 +- trie/concurrent_iterator/tracker/tracker.go | 5 ++--- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/trie/concurrent_iterator/iterator.go b/trie/concurrent_iterator/iterator.go index de015ad87..5bd5f7f59 100644 --- a/trie/concurrent_iterator/iterator.go +++ b/trie/concurrent_iterator/iterator.go @@ -31,6 +31,10 @@ type PrefixBoundIterator struct { EndPath []byte } +// IteratorConstructor is a constructor returning a NodeIterator, which is used to decouple this +// code from the trie implementation. +type IteratorConstructor = func(startKey []byte) trie.NodeIterator + func (it *PrefixBoundIterator) Next(descend bool) bool { if it.EndPath == nil { return it.NodeIterator.Next(descend) @@ -130,10 +134,10 @@ func eachPrefixRange(prefix []byte, nbins uint, callback func([]byte, []byte)) { } // SubtrieIterators cuts a trie by path prefix, returning `nbins` iterators covering its subtries -func SubtrieIterators(tree state.Trie, nbins uint) []trie.NodeIterator { +func SubtrieIterators(makeIterator IteratorConstructor, nbins uint) []trie.NodeIterator { var iters []trie.NodeIterator eachPrefixRange(nil, nbins, func(from []byte, to []byte) { - it := tree.NodeIterator(HexToKeyBytes(from)) + it := makeIterator(HexToKeyBytes(from)) iters = append(iters, NewPrefixBoundIterator(it, from, to)) }) return iters diff --git a/trie/concurrent_iterator/iterator_test.go b/trie/concurrent_iterator/iterator_test.go index de3080ff8..b39e60e25 100644 --- a/trie/concurrent_iterator/iterator_test.go +++ b/trie/concurrent_iterator/iterator_test.go @@ -82,7 +82,7 @@ func TestIterator(t *testing.T) { allPaths := fixt.Block1_Paths cases := []uint{1, 2, 4, 8, 16, 32} runCase := func(t *testing.T, nbins uint) { - iters := iter.SubtrieIterators(tree, nbins) + iters := iter.SubtrieIterators(tree.NodeIterator, nbins) ix := 0 for b := uint(0); b < nbins; b++ { for it := iters[b]; it.Next(true); ix++ { diff --git a/trie/concurrent_iterator/tracker/tracker.go b/trie/concurrent_iterator/tracker/tracker.go index 0ee5dad80..7c7029048 100644 --- a/trie/concurrent_iterator/tracker/tracker.go +++ b/trie/concurrent_iterator/tracker/tracker.go @@ -9,7 +9,6 @@ import ( "os/signal" "syscall" - "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/trie" @@ -122,7 +121,7 @@ func (tr *Tracker) dump() error { // Restore attempts to read iterator state from file // if file doesn't exist, returns an empty slice with no error -func (tr *Tracker) Restore(tree state.Trie) ([]trie.NodeIterator, error) { +func (tr *Tracker) Restore(makeIterator iter.IteratorConstructor) ([]trie.NodeIterator, error) { file, err := os.Open(tr.recoveryFile) if err != nil { if os.IsNotExist(err) { @@ -169,7 +168,7 @@ func (tr *Tracker) Restore(tree state.Trie) ([]trie.NodeIterator, error) { decrementPath(startPath) startPath = append(startPath, 0) } - it := iter.NewPrefixBoundIterator(tree.NodeIterator(iter.HexToKeyBytes(startPath)), startPath, endPath) + it := iter.NewPrefixBoundIterator(makeIterator(iter.HexToKeyBytes(startPath)), startPath, endPath) ret = append(ret, tr.Tracked(it, recoveredPath)) } -- 2.45.2