From 7b12783a1889da84d7de3c80c553387c0a51b4fb Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Thu, 15 Apr 2021 10:18:31 -0500 Subject: [PATCH] fix connection leak (misplaced defer) and perform proper rollback on errs --- params/version.go | 2 +- statediff/indexer/indexer.go | 8 ++++---- statediff/indexer/indexer_test.go | 2 +- statediff/service.go | 11 ++++++----- 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/params/version.go b/params/version.go index b23fea6c9..9b7e67f76 100644 --- a/params/version.go +++ b/params/version.go @@ -24,7 +24,7 @@ const ( VersionMajor = 1 // Major version component of the current release VersionMinor = 10 // Minor version component of the current release VersionPatch = 2 // Patch version component of the current release - VersionMeta = "statediff-0.0.17" // Version metadata to append to the version string + VersionMeta = "statediff-0.0.18" // Version metadata to append to the version string ) // Version holds the textual version string. diff --git a/statediff/indexer/indexer.go b/statediff/indexer/indexer.go index bad64bf8d..2cc09fe98 100644 --- a/statediff/indexer/indexer.go +++ b/statediff/indexer/indexer.go @@ -73,8 +73,7 @@ type BlockTx struct { dbtx *sqlx.Tx BlockNumber uint64 headerID int64 - err error - Close func() error + Close func(err error) error } // Reporting function to run as goroutine @@ -128,11 +127,12 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip blocktx := BlockTx{ dbtx: tx, // handle transaction commit or rollback for any return case - Close: func() error { - var err error + Close: func(err error) error { if p := recover(); p != nil { shared.Rollback(tx) panic(p) + } else if err != nil { + shared.Rollback(tx) } else { tDiff := time.Since(t) indexerMetrics.tStateStoreCodeProcessing.Update(tDiff) diff --git a/statediff/indexer/indexer_test.go b/statediff/indexer/indexer_test.go index 86708a232..81bd9948c 100644 --- a/statediff/indexer/indexer_test.go +++ b/statediff/indexer/indexer_test.go @@ -117,7 +117,7 @@ func setup(t *testing.T) { if err != nil { t.Fatal(err) } - defer tx.Close() + defer tx.Close(err) for _, node := range mocks.StateDiffs { err = ind.PushStateNode(tx, node) if err != nil { diff --git a/statediff/service.go b/statediff/service.go index 898c319e9..9784306e5 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -23,14 +23,13 @@ import ( "sync" "sync/atomic" - "github.com/ethereum/go-ethereum/eth/ethconfig" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/eth/ethconfig" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" @@ -626,18 +625,20 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p // log.Info("Writing state diff", "block height", block.Number().Uint64()) var totalDifficulty *big.Int var receipts types.Receipts + var err error + var tx *ind.BlockTx if params.IncludeTD { totalDifficulty = sds.BlockChain.GetTdByHash(block.Hash()) } if params.IncludeReceipts { receipts = sds.BlockChain.GetReceiptsByHash(block.Hash()) } - tx, err := sds.indexer.PushBlock(block, receipts, totalDifficulty) + tx, err = sds.indexer.PushBlock(block, receipts, totalDifficulty) + // defer handling of commit/rollback for any return case + defer tx.Close(err) if err != nil { return err } - // defer handling of commit/rollback for any return case - defer tx.Close() output := func(node StateNode) error { return sds.indexer.PushStateNode(tx, node) }