From 49abb1a2fa6f456725c542a54fcb329ee86d03bd Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Wed, 31 May 2023 13:47:34 -0500 Subject: [PATCH 1/2] Fix rollback call on error. --- statediff/indexer/database/sql/indexer.go | 24 +++++++++++------------ statediff/indexer/database/sql/lazy_tx.go | 10 +++++----- statediff/metrics_helpers.go | 11 ++++++----- statediff/service.go | 4 ++-- 4 files changed, 25 insertions(+), 24 deletions(-) diff --git a/statediff/indexer/database/sql/indexer.go b/statediff/indexer/database/sql/indexer.go index 8a6228fa5..8581df92c 100644 --- a/statediff/indexer/database/sql/indexer.go +++ b/statediff/indexer/database/sql/indexer.go @@ -116,14 +116,14 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip // Begin new DB tx for everything tx := NewDelayedTx(sdi.dbWriter.db) - defer func() { + defer func(e *error) { if p := recover(); p != nil { rollback(sdi.ctx, tx) panic(p) - } else if err != nil { + } else if e != nil && *e != nil { rollback(sdi.ctx, tx) } - }() + }(&err) blockTx := &BatchTx{ removedCacheFlag: new(uint32), ctx: sdi.ctx, @@ -496,16 +496,16 @@ func (sdi *StateDiffIndexer) LoadWatchedAddresses() ([]common.Address, error) { // InsertWatchedAddresses inserts the given addresses in the database func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) (err error) { tx := NewDelayedTx(sdi.dbWriter.db) - defer func() { + defer func(e *error) { if p := recover(); p != nil { rollback(sdi.ctx, tx) panic(p) - } else if err != nil { + } else if e != nil && *e != nil { rollback(sdi.ctx, tx) } else { err = tx.Commit(sdi.ctx) } - }() + }(&err) for _, arg := range args { _, err = tx.Exec(sdi.ctx, `INSERT INTO eth_meta.watched_addresses (address, created_at, watched_at) VALUES ($1, $2, $3) ON CONFLICT (address) DO NOTHING`, @@ -521,16 +521,16 @@ func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressA // RemoveWatchedAddresses removes the given watched addresses from the database func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressArg) (err error) { tx := NewDelayedTx(sdi.dbWriter.db) - defer func() { + defer func(e *error) { if p := recover(); p != nil { rollback(sdi.ctx, tx) panic(p) - } else if err != nil { + } else if e != nil && *e != nil { rollback(sdi.ctx, tx) } else { err = tx.Commit(sdi.ctx) } - }() + }(&err) for _, arg := range args { _, err = tx.Exec(sdi.ctx, `DELETE FROM eth_meta.watched_addresses WHERE address = $1`, arg.Address) @@ -545,16 +545,16 @@ func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressA // SetWatchedAddresses clears and inserts the given addresses in the database func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) (err error) { tx := NewDelayedTx(sdi.dbWriter.db) - defer func() { + defer func(e *error) { if p := recover(); p != nil { rollback(sdi.ctx, tx) panic(p) - } else if err != nil { + } else if e != nil && *e != nil { rollback(sdi.ctx, tx) } else { err = tx.Commit(sdi.ctx) } - }() + }(&err) _, err = tx.Exec(sdi.ctx, `DELETE FROM eth_meta.watched_addresses`) if err != nil { diff --git a/statediff/indexer/database/sql/lazy_tx.go b/statediff/indexer/database/sql/lazy_tx.go index b2445e0d8..c392f3215 100644 --- a/statediff/indexer/database/sql/lazy_tx.go +++ b/statediff/indexer/database/sql/lazy_tx.go @@ -73,24 +73,24 @@ func (tx *DelayedTx) Commit(ctx context.Context) error { if err != nil { return err } - defer func() { + defer func(e *error) { if p := recover(); p != nil { rollback(ctx, base) panic(p) - } else if err != nil { + } else if e != nil && *e != nil { rollback(ctx, base) } - }() + }(&err) for _, item := range tx.cache { switch item := item.(type) { case *copyFrom: - _, err := base.CopyFrom(ctx, item.tableName, item.columnNames, item.rows) + _, err = base.CopyFrom(ctx, item.tableName, item.columnNames, item.rows) if err != nil { log.Error("COPY error", "table", item.tableName, "err", err) return err } case cachedStmt: - _, err := base.Exec(ctx, item.sql, item.args...) + _, err = base.Exec(ctx, item.sql, item.args...) if err != nil { return err } diff --git a/statediff/metrics_helpers.go b/statediff/metrics_helpers.go index 2bebfe253..0bca5e374 100644 --- a/statediff/metrics_helpers.go +++ b/statediff/metrics_helpers.go @@ -39,18 +39,19 @@ func countStateDiffBegin(block *types.Block) (time.Time, log.Logger) { return start, logger } -func countStateDiffEnd(start time.Time, logger log.Logger, err error) time.Duration { +func countStateDiffEnd(start time.Time, logger log.Logger, err *error) time.Duration { duration := time.Since(start) defaultStatediffMetrics.underway.Dec(1) - if nil == err { - defaultStatediffMetrics.succeeded.Inc(1) - } else { + failed := nil != err && nil != *err + if failed { defaultStatediffMetrics.failed.Inc(1) + } else { + defaultStatediffMetrics.succeeded.Inc(1) } defaultStatediffMetrics.totalProcessingTime.Inc(duration.Milliseconds()) logger.Debug(fmt.Sprintf("writeStateDiff END (duration=%dms, err=%t) [underway=%d, succeeded=%d, failed=%d, total_time=%dms]", - duration.Milliseconds(), nil != err, + duration.Milliseconds(), failed, defaultStatediffMetrics.underway.Count(), defaultStatediffMetrics.succeeded.Count(), defaultStatediffMetrics.failed.Count(), diff --git a/statediff/service.go b/statediff/service.go index 738723391..ec25d9c6a 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -815,7 +815,7 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p var err error var tx interfaces.Batch start, logger := countStateDiffBegin(block) - defer countStateDiffEnd(start, logger, err) + defer countStateDiffEnd(start, logger, &err) if params.IncludeTD { totalDifficulty = sds.BlockChain.GetTd(block.Hash(), block.NumberU64()) } @@ -847,7 +847,7 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p BlockNumber: block.Number(), }, params, output, ipldOutput) // TODO this anti-pattern needs to be sorted out eventually - if err := tx.Submit(err); err != nil { + if err = tx.Submit(err); err != nil { return fmt.Errorf("batch transaction submission failed: %w", err) } -- 2.45.2 From 0c4c2cade0d6a3afee731e1accff82cb640c8d1f Mon Sep 17 00:00:00 2001 From: Thomas E Lackey Date: Wed, 31 May 2023 15:37:09 -0500 Subject: [PATCH 2/2] Skip the pointers. --- statediff/indexer/database/sql/indexer.go | 24 +++++++++++------------ statediff/indexer/database/sql/lazy_tx.go | 6 +++--- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/statediff/indexer/database/sql/indexer.go b/statediff/indexer/database/sql/indexer.go index 8581df92c..8a6228fa5 100644 --- a/statediff/indexer/database/sql/indexer.go +++ b/statediff/indexer/database/sql/indexer.go @@ -116,14 +116,14 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip // Begin new DB tx for everything tx := NewDelayedTx(sdi.dbWriter.db) - defer func(e *error) { + defer func() { if p := recover(); p != nil { rollback(sdi.ctx, tx) panic(p) - } else if e != nil && *e != nil { + } else if err != nil { rollback(sdi.ctx, tx) } - }(&err) + }() blockTx := &BatchTx{ removedCacheFlag: new(uint32), ctx: sdi.ctx, @@ -496,16 +496,16 @@ func (sdi *StateDiffIndexer) LoadWatchedAddresses() ([]common.Address, error) { // InsertWatchedAddresses inserts the given addresses in the database func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) (err error) { tx := NewDelayedTx(sdi.dbWriter.db) - defer func(e *error) { + defer func() { if p := recover(); p != nil { rollback(sdi.ctx, tx) panic(p) - } else if e != nil && *e != nil { + } else if err != nil { rollback(sdi.ctx, tx) } else { err = tx.Commit(sdi.ctx) } - }(&err) + }() for _, arg := range args { _, err = tx.Exec(sdi.ctx, `INSERT INTO eth_meta.watched_addresses (address, created_at, watched_at) VALUES ($1, $2, $3) ON CONFLICT (address) DO NOTHING`, @@ -521,16 +521,16 @@ func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressA // RemoveWatchedAddresses removes the given watched addresses from the database func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressArg) (err error) { tx := NewDelayedTx(sdi.dbWriter.db) - defer func(e *error) { + defer func() { if p := recover(); p != nil { rollback(sdi.ctx, tx) panic(p) - } else if e != nil && *e != nil { + } else if err != nil { rollback(sdi.ctx, tx) } else { err = tx.Commit(sdi.ctx) } - }(&err) + }() for _, arg := range args { _, err = tx.Exec(sdi.ctx, `DELETE FROM eth_meta.watched_addresses WHERE address = $1`, arg.Address) @@ -545,16 +545,16 @@ func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressA // SetWatchedAddresses clears and inserts the given addresses in the database func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) (err error) { tx := NewDelayedTx(sdi.dbWriter.db) - defer func(e *error) { + defer func() { if p := recover(); p != nil { rollback(sdi.ctx, tx) panic(p) - } else if e != nil && *e != nil { + } else if err != nil { rollback(sdi.ctx, tx) } else { err = tx.Commit(sdi.ctx) } - }(&err) + }() _, err = tx.Exec(sdi.ctx, `DELETE FROM eth_meta.watched_addresses`) if err != nil { diff --git a/statediff/indexer/database/sql/lazy_tx.go b/statediff/indexer/database/sql/lazy_tx.go index c392f3215..883b10aca 100644 --- a/statediff/indexer/database/sql/lazy_tx.go +++ b/statediff/indexer/database/sql/lazy_tx.go @@ -73,14 +73,14 @@ func (tx *DelayedTx) Commit(ctx context.Context) error { if err != nil { return err } - defer func(e *error) { + defer func() { if p := recover(); p != nil { rollback(ctx, base) panic(p) - } else if e != nil && *e != nil { + } else if err != nil { rollback(ctx, base) } - }(&err) + }() for _, item := range tx.cache { switch item := item.(type) { case *copyFrom: -- 2.45.2