Patch for concurrent iterator & others #383

Merged
telackey merged 2 commits from roy/v5-dev into v1.11.5-statediff-v5 2023-05-18 17:07:13 +00:00
11 changed files with 29 additions and 25 deletions

View File

@ -253,7 +253,7 @@ func (met *dbMetricsHandles) Update(stats DbStats) {
func ReportAndUpdateDuration(msg string, start time.Time, logger log.Logger, timer metrics.Timer) { func ReportAndUpdateDuration(msg string, start time.Time, logger log.Logger, timer metrics.Timer) {
since := UpdateDuration(start, timer) since := UpdateDuration(start, timer)
logger.Debug(fmt.Sprintf("%s duration=%dms", msg, since.Milliseconds())) logger.Trace(fmt.Sprintf("%s duration=%dms", msg, since.Milliseconds()))
} }
func UpdateDuration(start time.Time, timer metrics.Timer) time.Duration { func UpdateDuration(start time.Time, timer metrics.Timer) time.Duration {

View File

@ -28,7 +28,7 @@ import (
) )
func setupLegacyPGXIndexer(t *testing.T) { func setupLegacyPGXIndexer(t *testing.T) {
db, err = postgres.SetupPGXDB(postgres.DefaultConfig) db, err = postgres.SetupPGXDB(postgres.TestConfig)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -39,7 +39,7 @@ func setupPGXIndexer(t *testing.T, config postgres.Config) {
} }
func setupPGX(t *testing.T) { func setupPGX(t *testing.T) {
setupPGXWithConfig(t, postgres.DefaultConfig) setupPGXWithConfig(t, postgres.TestConfig)
} }
func setupPGXWithConfig(t *testing.T, config postgres.Config) { func setupPGXWithConfig(t *testing.T, config postgres.Config) {
@ -48,7 +48,7 @@ func setupPGXWithConfig(t *testing.T, config postgres.Config) {
} }
func setupPGXNonCanonical(t *testing.T) { func setupPGXNonCanonical(t *testing.T) {
setupPGXIndexer(t, postgres.DefaultConfig) setupPGXIndexer(t, postgres.TestConfig)
test.SetupTestDataNonCanonical(t, ind) test.SetupTestDataNonCanonical(t, ind)
} }
@ -103,7 +103,7 @@ func TestPGXIndexer(t *testing.T) {
}) })
t.Run("Publish and index with CopyFrom enabled.", func(t *testing.T) { t.Run("Publish and index with CopyFrom enabled.", func(t *testing.T) {
config := postgres.DefaultConfig config := postgres.TestConfig
config.CopyFrom = true config.CopyFrom = true
setupPGXWithConfig(t, config) setupPGXWithConfig(t, config)
@ -169,7 +169,7 @@ func TestPGXIndexerNonCanonical(t *testing.T) {
} }
func TestPGXWatchAddressMethods(t *testing.T) { func TestPGXWatchAddressMethods(t *testing.T) {
setupPGXIndexer(t, postgres.DefaultConfig) setupPGXIndexer(t, postgres.TestConfig)
defer tearDown(t) defer tearDown(t)
defer checkTxClosure(t, 1, 0, 1) defer checkTxClosure(t, 1, 0, 1)

View File

@ -56,13 +56,14 @@ func ResolveDriverType(str string) (DriverType, error) {
} }
} }
// DefaultConfig are default parameters for connecting to a Postgres sql // TestConfig specifies default parameters for connecting to a testing DB
var DefaultConfig = Config{ var TestConfig = Config{
Hostname: "localhost", Hostname: "localhost",
Port: 8077, Port: 8077,
DatabaseName: "cerc_testing", DatabaseName: "cerc_testing",
Username: "vdbm", Username: "vdbm",
Password: "password", Password: "password",
Driver: SQLX,
} }
// Config holds params for a Postgres db // Config holds params for a Postgres db

View File

@ -31,7 +31,7 @@ import (
) )
var ( var (
pgConfig, _ = postgres.MakeConfig(postgres.DefaultConfig) pgConfig, _ = postgres.MakeConfig(postgres.TestConfig)
ctx = context.Background() ctx = context.Background()
) )
@ -111,7 +111,7 @@ func TestPostgresPGX(t *testing.T) {
badHash := fmt.Sprintf("x %s", strings.Repeat("1", 100)) badHash := fmt.Sprintf("x %s", strings.Repeat("1", 100))
badInfo := node.Info{GenesisBlock: badHash, NetworkID: "1", ID: "x123", ClientName: "geth"} badInfo := node.Info{GenesisBlock: badHash, NetworkID: "1", ID: "x123", ClientName: "geth"}
_, err := postgres.NewPGXDriver(ctx, postgres.DefaultConfig, badInfo) _, err := postgres.NewPGXDriver(ctx, postgres.TestConfig, badInfo)
if err == nil { if err == nil {
t.Fatal("Expected an error") t.Fatal("Expected an error")
} }

View File

@ -35,7 +35,7 @@ func TestPostgresSQLX(t *testing.T) {
t.Run("connects to the database", func(t *testing.T) { t.Run("connects to the database", func(t *testing.T) {
var err error var err error
connStr := postgres.DefaultConfig.DbConnectionString() connStr := postgres.TestConfig.DbConnectionString()
sqlxdb, err = sqlx.Connect("postgres", connStr) sqlxdb, err = sqlx.Connect("postgres", connStr)
if err != nil { if err != nil {
@ -58,7 +58,7 @@ func TestPostgresSQLX(t *testing.T) {
// sized int, so use string representation of big.Int // sized int, so use string representation of big.Int
// and cast on insert // and cast on insert
connStr := postgres.DefaultConfig.DbConnectionString() connStr := postgres.TestConfig.DbConnectionString()
db, err := sqlx.Connect("postgres", connStr) db, err := sqlx.Connect("postgres", connStr)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -109,7 +109,7 @@ func TestPostgresSQLX(t *testing.T) {
badHash := fmt.Sprintf("x %s", strings.Repeat("1", 100)) badHash := fmt.Sprintf("x %s", strings.Repeat("1", 100))
badInfo := node.Info{GenesisBlock: badHash, NetworkID: "1", ID: "x123", ClientName: "geth"} badInfo := node.Info{GenesisBlock: badHash, NetworkID: "1", ID: "x123", ClientName: "geth"}
_, err := postgres.NewSQLXDriver(ctx, postgres.DefaultConfig, badInfo) _, err := postgres.NewSQLXDriver(ctx, postgres.TestConfig, badInfo)
if err == nil { if err == nil {
t.Fatal("Expected an error") t.Fatal("Expected an error")
} }

View File

@ -25,7 +25,7 @@ import (
// SetupSQLXDB is used to setup a sqlx db for tests // SetupSQLXDB is used to setup a sqlx db for tests
func SetupSQLXDB() (sql.Database, error) { func SetupSQLXDB() (sql.Database, error) {
conf := DefaultConfig conf := TestConfig
conf.MaxIdle = 0 conf.MaxIdle = 0
driver, err := NewSQLXDriver(context.Background(), conf, node.Info{}) driver, err := NewSQLXDriver(context.Background(), conf, node.Info{})
if err != nil { if err != nil {

View File

@ -54,8 +54,8 @@ var (
parentHeader1 = types.Header{Number: big.NewInt(rand.Int63()), Root: parentRoot1} parentHeader1 = types.Header{Number: big.NewInt(rand.Int63()), Root: parentRoot1}
parentHeader2 = types.Header{Number: big.NewInt(rand.Int63()), Root: parentRoot2} parentHeader2 = types.Header{Number: big.NewInt(rand.Int63()), Root: parentRoot2}
parentBlock1 = types.NewBlock(&parentHeader1, nil, nil, nil, new(trie.Trie)) parentBlock1 = types.NewBlock(&parentHeader1, nil, nil, nil, trie.NewEmpty(nil))
parentBlock2 = types.NewBlock(&parentHeader2, nil, nil, nil, new(trie.Trie)) parentBlock2 = types.NewBlock(&parentHeader2, nil, nil, nil, trie.NewEmpty(nil))
parentHash1 = parentBlock1.Hash() parentHash1 = parentBlock1.Hash()
parentHash2 = parentBlock2.Hash() parentHash2 = parentBlock2.Hash()
@ -67,9 +67,9 @@ var (
header2 = types.Header{ParentHash: parentHash2, Root: testRoot2, Number: big.NewInt(2)} header2 = types.Header{ParentHash: parentHash2, Root: testRoot2, Number: big.NewInt(2)}
header3 = types.Header{ParentHash: common.HexToHash("parent hash"), Root: testRoot3, Number: big.NewInt(3)} header3 = types.Header{ParentHash: common.HexToHash("parent hash"), Root: testRoot3, Number: big.NewInt(3)}
testBlock1 = types.NewBlock(&header1, nil, nil, nil, new(trie.Trie)) testBlock1 = types.NewBlock(&header1, nil, nil, nil, trie.NewEmpty(nil))
testBlock2 = types.NewBlock(&header2, nil, nil, nil, new(trie.Trie)) testBlock2 = types.NewBlock(&header2, nil, nil, nil, trie.NewEmpty(nil))
testBlock3 = types.NewBlock(&header3, nil, nil, nil, new(trie.Trie)) testBlock3 = types.NewBlock(&header3, nil, nil, nil, trie.NewEmpty(nil))
receiptRoot1 = common.HexToHash("0x05") receiptRoot1 = common.HexToHash("0x05")
receiptRoot2 = common.HexToHash("0x06") receiptRoot2 = common.HexToHash("0x06")

View File

@ -31,6 +31,10 @@ type PrefixBoundIterator struct {
EndPath []byte EndPath []byte
} }
// IteratorConstructor is a constructor returning a NodeIterator, which is used to decouple this
// code from the trie implementation.
type IteratorConstructor = func(startKey []byte) trie.NodeIterator
func (it *PrefixBoundIterator) Next(descend bool) bool { func (it *PrefixBoundIterator) Next(descend bool) bool {
if it.EndPath == nil { if it.EndPath == nil {
return it.NodeIterator.Next(descend) return it.NodeIterator.Next(descend)
@ -130,10 +134,10 @@ func eachPrefixRange(prefix []byte, nbins uint, callback func([]byte, []byte)) {
} }
// SubtrieIterators cuts a trie by path prefix, returning `nbins` iterators covering its subtries // SubtrieIterators cuts a trie by path prefix, returning `nbins` iterators covering its subtries
func SubtrieIterators(tree state.Trie, nbins uint) []trie.NodeIterator { func SubtrieIterators(makeIterator IteratorConstructor, nbins uint) []trie.NodeIterator {
var iters []trie.NodeIterator var iters []trie.NodeIterator
eachPrefixRange(nil, nbins, func(from []byte, to []byte) { eachPrefixRange(nil, nbins, func(from []byte, to []byte) {
it := tree.NodeIterator(HexToKeyBytes(from)) it := makeIterator(HexToKeyBytes(from))
iters = append(iters, NewPrefixBoundIterator(it, from, to)) iters = append(iters, NewPrefixBoundIterator(it, from, to))
}) })
return iters return iters

View File

@ -82,7 +82,7 @@ func TestIterator(t *testing.T) {
allPaths := fixt.Block1_Paths allPaths := fixt.Block1_Paths
cases := []uint{1, 2, 4, 8, 16, 32} cases := []uint{1, 2, 4, 8, 16, 32}
runCase := func(t *testing.T, nbins uint) { runCase := func(t *testing.T, nbins uint) {
iters := iter.SubtrieIterators(tree, nbins) iters := iter.SubtrieIterators(tree.NodeIterator, nbins)
ix := 0 ix := 0
for b := uint(0); b < nbins; b++ { for b := uint(0); b < nbins; b++ {
for it := iters[b]; it.Next(true); ix++ { for it := iters[b]; it.Next(true); ix++ {

View File

@ -9,7 +9,6 @@ import (
"os/signal" "os/signal"
"syscall" "syscall"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie"
@ -122,7 +121,7 @@ func (tr *Tracker) dump() error {
// Restore attempts to read iterator state from file // Restore attempts to read iterator state from file
// if file doesn't exist, returns an empty slice with no error // if file doesn't exist, returns an empty slice with no error
func (tr *Tracker) Restore(tree state.Trie) ([]trie.NodeIterator, error) { func (tr *Tracker) Restore(makeIterator iter.IteratorConstructor) ([]trie.NodeIterator, error) {
file, err := os.Open(tr.recoveryFile) file, err := os.Open(tr.recoveryFile)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
@ -169,7 +168,7 @@ func (tr *Tracker) Restore(tree state.Trie) ([]trie.NodeIterator, error) {
decrementPath(startPath) decrementPath(startPath)
startPath = append(startPath, 0) startPath = append(startPath, 0)
} }
it := iter.NewPrefixBoundIterator(tree.NodeIterator(iter.HexToKeyBytes(startPath)), startPath, endPath) it := iter.NewPrefixBoundIterator(makeIterator(iter.HexToKeyBytes(startPath)), startPath, endPath)
ret = append(ret, tr.Tracked(it, recoveredPath)) ret = append(ret, tr.Tracked(it, recoveredPath))
} }