367: Fix deferred rollback on error. #391

Merged
telackey merged 2 commits from telackey/367v5 into v1.11.6-statediff-v5 2023-06-01 00:36:37 +00:00
2 changed files with 15 additions and 15 deletions
Showing only changes of commit 0c4c2cade0 - Show all commits

View File

@ -116,14 +116,14 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
// Begin new DB tx for everything // Begin new DB tx for everything
tx := NewDelayedTx(sdi.dbWriter.db) tx := NewDelayedTx(sdi.dbWriter.db)
defer func(e *error) { defer func() {
if p := recover(); p != nil { if p := recover(); p != nil {
rollback(sdi.ctx, tx) rollback(sdi.ctx, tx)
panic(p) panic(p)
} else if e != nil && *e != nil { } else if err != nil {
rollback(sdi.ctx, tx) rollback(sdi.ctx, tx)
} }
}(&err) }()
blockTx := &BatchTx{ blockTx := &BatchTx{
removedCacheFlag: new(uint32), removedCacheFlag: new(uint32),
ctx: sdi.ctx, ctx: sdi.ctx,
@ -496,16 +496,16 @@ func (sdi *StateDiffIndexer) LoadWatchedAddresses() ([]common.Address, error) {
// InsertWatchedAddresses inserts the given addresses in the database // InsertWatchedAddresses inserts the given addresses in the database
func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) (err error) { func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) (err error) {
tx := NewDelayedTx(sdi.dbWriter.db) tx := NewDelayedTx(sdi.dbWriter.db)
defer func(e *error) { defer func() {
if p := recover(); p != nil { if p := recover(); p != nil {
rollback(sdi.ctx, tx) rollback(sdi.ctx, tx)
panic(p) panic(p)
} else if e != nil && *e != nil { } else if err != nil {
rollback(sdi.ctx, tx) rollback(sdi.ctx, tx)
} else { } else {
err = tx.Commit(sdi.ctx) err = tx.Commit(sdi.ctx)
} }
}(&err) }()
for _, arg := range args { for _, arg := range args {
_, err = tx.Exec(sdi.ctx, `INSERT INTO eth_meta.watched_addresses (address, created_at, watched_at) VALUES ($1, $2, $3) ON CONFLICT (address) DO NOTHING`, _, err = tx.Exec(sdi.ctx, `INSERT INTO eth_meta.watched_addresses (address, created_at, watched_at) VALUES ($1, $2, $3) ON CONFLICT (address) DO NOTHING`,
@ -521,16 +521,16 @@ func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressA
// RemoveWatchedAddresses removes the given watched addresses from the database // RemoveWatchedAddresses removes the given watched addresses from the database
func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressArg) (err error) { func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressArg) (err error) {
tx := NewDelayedTx(sdi.dbWriter.db) tx := NewDelayedTx(sdi.dbWriter.db)
defer func(e *error) { defer func() {
if p := recover(); p != nil { if p := recover(); p != nil {
rollback(sdi.ctx, tx) rollback(sdi.ctx, tx)
panic(p) panic(p)
} else if e != nil && *e != nil { } else if err != nil {
rollback(sdi.ctx, tx) rollback(sdi.ctx, tx)
} else { } else {
err = tx.Commit(sdi.ctx) err = tx.Commit(sdi.ctx)
} }
}(&err) }()
for _, arg := range args { for _, arg := range args {
_, err = tx.Exec(sdi.ctx, `DELETE FROM eth_meta.watched_addresses WHERE address = $1`, arg.Address) _, err = tx.Exec(sdi.ctx, `DELETE FROM eth_meta.watched_addresses WHERE address = $1`, arg.Address)
@ -545,16 +545,16 @@ func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressA
// SetWatchedAddresses clears and inserts the given addresses in the database // SetWatchedAddresses clears and inserts the given addresses in the database
func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) (err error) { func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) (err error) {
tx := NewDelayedTx(sdi.dbWriter.db) tx := NewDelayedTx(sdi.dbWriter.db)
defer func(e *error) { defer func() {
if p := recover(); p != nil { if p := recover(); p != nil {
rollback(sdi.ctx, tx) rollback(sdi.ctx, tx)
panic(p) panic(p)
} else if e != nil && *e != nil { } else if err != nil {
rollback(sdi.ctx, tx) rollback(sdi.ctx, tx)
} else { } else {
err = tx.Commit(sdi.ctx) err = tx.Commit(sdi.ctx)
} }
}(&err) }()
_, err = tx.Exec(sdi.ctx, `DELETE FROM eth_meta.watched_addresses`) _, err = tx.Exec(sdi.ctx, `DELETE FROM eth_meta.watched_addresses`)
if err != nil { if err != nil {

View File

@ -73,14 +73,14 @@ func (tx *DelayedTx) Commit(ctx context.Context) error {
if err != nil { if err != nil {
return err return err
} }
defer func(e *error) { defer func() {
if p := recover(); p != nil { if p := recover(); p != nil {
rollback(ctx, base) rollback(ctx, base)
panic(p) panic(p)
} else if e != nil && *e != nil { } else if err != nil {
rollback(ctx, base) rollback(ctx, base)
} }
}(&err) }()
for _, item := range tx.cache { for _, item := range tx.cache {
switch item := item.(type) { switch item := item.(type) {
case *copyFrom: case *copyFrom: