check tx pool state at end of unit tests #199

Closed
i-norden wants to merge 2 commits from trouble_shooting into v1.10.15-statediff-v3
10 changed files with 45 additions and 21 deletions

View File

@ -25,8 +25,6 @@ import (
"math/big"
"time"
ipld2 "github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ipfs/go-cid"
node "github.com/ipfs/go-ipld-format"
"github.com/multiformats/go-multihash"
@ -39,6 +37,7 @@ import (
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
ipld2 "github.com/ethereum/go-ethereum/statediff/indexer/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
@ -155,9 +154,11 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
close(self.iplds)
}()
if p := recover(); p != nil {
log.Info("panic detected before tx submission, rolling back the tx", "panic", p)
rollback(sdi.ctx, tx)
panic(p)
} else if err != nil {
log.Info("error detected before tx submission, rolling back the tx", "error", err)
rollback(sdi.ctx, tx)
} else {
tDiff := time.Since(t)

View File

@ -6,12 +6,12 @@ import (
"os"
"testing"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
@ -141,3 +141,15 @@ func expectTrue(t *testing.T, value bool) {
t.Fatalf("Assertion failed")
}
}
func checkTxClosure(t *testing.T, idle, inUse, open int64) {
require.Equal(t, idle, db.Stats().Idle())
require.Equal(t, inUse, db.Stats().InUse())
require.Equal(t, open, db.Stats().Open())
}
func tearDown(t *testing.T) {
sql.TearDownDB(t, db)
err := ind.Close()
require.NoError(t, err)
}

View File

@ -103,6 +103,9 @@ func setup(t *testing.T, testBlock *types.Block, testReceipts types.Receipts) {
}
func tearDown(t *testing.T) {
require.Equal(t, int64(0), db.Stats().Idle())
require.Equal(t, int64(0), db.Stats().InUse())
require.Equal(t, int64(0), db.Stats().Open())
sql.TearDownDB(t, db)
err = ind.Close()
require.NoError(t, err)

View File

@ -59,10 +59,11 @@ func setupLegacyPGX(t *testing.T) {
test_helpers.ExpectEqual(t, tx.(*sql.BatchTx).BlockNumber, legacyData.BlockNumber.Uint64())
}
func TestPGXIndexerLegacy(t *testing.T) {
func TestLegacyPGXIndexer(t *testing.T) {
t.Run("Publish and index header IPLDs", func(t *testing.T) {
setupLegacyPGX(t)
defer tearDown(t)
defer checkTxClosure(t, 1, 0, 1)
pgStr := `SELECT cid, cast(td AS TEXT), cast(reward AS TEXT), block_hash, coinbase
FROM eth.header_cids
WHERE block_number = $1`

View File

@ -69,6 +69,7 @@ func TestPGXIndexer(t *testing.T) {
t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) {
setupPGX(t)
defer tearDown(t)
defer checkTxClosure(t, 1, 0, 1)
pgStr := `SELECT cid, cast(td AS TEXT), cast(reward AS TEXT), block_hash, coinbase
FROM eth.header_cids
WHERE block_number = $1`
@ -111,6 +112,7 @@ func TestPGXIndexer(t *testing.T) {
t.Run("Publish and index transaction IPLDs in a single tx", func(t *testing.T) {
setupPGX(t)
defer tearDown(t)
defer checkTxClosure(t, 1, 0, 1)
// check that txs were properly indexed and published
trxs := make([]string, 0)
pgStr := `SELECT transaction_cids.cid FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash)
@ -237,6 +239,7 @@ func TestPGXIndexer(t *testing.T) {
t.Run("Publish and index log IPLDs for multiple receipt of a specific block", func(t *testing.T) {
setupPGX(t)
defer tearDown(t)
defer checkTxClosure(t, 1, 0, 1)
rcts := make([]string, 0)
rctsPgStr := `SELECT receipt_cids.leaf_cid FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids
@ -294,6 +297,7 @@ func TestPGXIndexer(t *testing.T) {
t.Run("Publish and index receipt IPLDs in a single tx", func(t *testing.T) {
setupPGX(t)
defer tearDown(t)
defer checkTxClosure(t, 1, 0, 1)
// check receipts were properly indexed and published
rcts := make([]string, 0)
@ -395,6 +399,7 @@ func TestPGXIndexer(t *testing.T) {
t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) {
setupPGX(t)
defer tearDown(t)
defer checkTxClosure(t, 1, 0, 1)
// check that state nodes were properly indexed and published
stateNodes := make([]models.StateNodeModel, 0)
pgStr := `SELECT state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id
@ -484,6 +489,7 @@ func TestPGXIndexer(t *testing.T) {
t.Run("Publish and index storage IPLDs in a single tx", func(t *testing.T) {
setupPGX(t)
defer tearDown(t)
defer checkTxClosure(t, 1, 0, 1)
// check that storage nodes were properly indexed
storageNodes := make([]models.StorageNodeWithStateKeyModel, 0)
pgStr := `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path

View File

@ -45,13 +45,10 @@ func NewSQLXDriver(ctx context.Context, config Config, node node.Info) (*SQLXDri
if config.MaxConns > 0 {
db.SetMaxOpenConns(config.MaxConns)
}
if config.MaxIdle > 0 {
db.SetMaxIdleConns(config.MaxIdle)
}
if config.MaxConnLifetime > 0 {
lifetime := config.MaxConnLifetime
db.SetConnMaxLifetime(lifetime)
db.SetConnMaxLifetime(config.MaxConnLifetime)
}
db.SetMaxIdleConns(config.MaxIdle)
driver := &SQLXDriver{ctx: ctx, db: db, nodeInfo: node}
if err := driver.createNode(); err != nil {
return &SQLXDriver{}, ErrUnableToSetNode(err)

View File

@ -25,7 +25,9 @@ import (
// SetupSQLXDB is used to setup a sqlx db for tests
func SetupSQLXDB() (sql.Database, error) {
driver, err := NewSQLXDriver(context.Background(), DefaultConfig, node.Info{})
conf := DefaultConfig
conf.MaxIdle = 0
driver, err := NewSQLXDriver(context.Background(), conf, node.Info{})
if err != nil {
return nil, err
}

View File

@ -69,10 +69,11 @@ func setupLegacySQLX(t *testing.T) {
test_helpers.ExpectEqual(t, tx.(*sql.BatchTx).BlockNumber, legacyData.BlockNumber.Uint64())
}
func TestSQLXIndexerLegacy(t *testing.T) {
func TestLegacySQLXIndexer(t *testing.T) {
t.Run("Publish and index header IPLDs", func(t *testing.T) {
setupLegacySQLX(t)
defer tearDown(t)
defer checkTxClosure(t, 0, 0, 0)
pgStr := `SELECT cid, td, reward, block_hash, coinbase
FROM eth.header_cids
WHERE block_number = $1`

View File

@ -66,17 +66,11 @@ func setupSQLX(t *testing.T) {
test_helpers.ExpectEqual(t, tx.(*sql.BatchTx).BlockNumber, mocks.BlockNumber.Uint64())
}
func tearDown(t *testing.T) {
sql.TearDownDB(t, db)
if err := ind.Close(); err != nil {
t.Fatal(err)
}
}
func TestSQLXIndexer(t *testing.T) {
t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) {
setupSQLX(t)
defer tearDown(t)
defer checkTxClosure(t, 0, 0, 0)
pgStr := `SELECT cid, td, reward, block_hash, coinbase
FROM eth.header_cids
WHERE block_number = $1`
@ -114,6 +108,7 @@ func TestSQLXIndexer(t *testing.T) {
t.Run("Publish and index transaction IPLDs in a single tx", func(t *testing.T) {
setupSQLX(t)
defer tearDown(t)
defer checkTxClosure(t, 0, 0, 0)
// check that txs were properly indexed and published
trxs := make([]string, 0)
pgStr := `SELECT transaction_cids.cid FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash)
@ -240,6 +235,7 @@ func TestSQLXIndexer(t *testing.T) {
t.Run("Publish and index log IPLDs for multiple receipt of a specific block", func(t *testing.T) {
setupSQLX(t)
defer tearDown(t)
defer checkTxClosure(t, 0, 0, 0)
rcts := make([]string, 0)
rctsPgStr := `SELECT receipt_cids.leaf_cid FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids
@ -295,6 +291,7 @@ func TestSQLXIndexer(t *testing.T) {
t.Run("Publish and index receipt IPLDs in a single tx", func(t *testing.T) {
setupSQLX(t)
defer tearDown(t)
defer checkTxClosure(t, 0, 0, 0)
// check receipts were properly indexed and published
rcts := make([]string, 0)
@ -395,6 +392,7 @@ func TestSQLXIndexer(t *testing.T) {
t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) {
setupSQLX(t)
defer tearDown(t)
defer checkTxClosure(t, 0, 0, 0)
// check that state nodes were properly indexed and published
stateNodes := make([]models.StateNodeModel, 0)
pgStr := `SELECT state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id
@ -484,6 +482,7 @@ func TestSQLXIndexer(t *testing.T) {
t.Run("Publish and index storage IPLDs in a single tx", func(t *testing.T) {
setupSQLX(t)
defer tearDown(t)
defer checkTxClosure(t, 0, 0, 0)
// check that storage nodes were properly indexed
storageNodes := make([]models.StorageNodeWithStateKeyModel, 0)
pgStr := `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path

View File

@ -39,12 +39,11 @@ import (
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/trie"
ind "github.com/ethereum/go-ethereum/statediff/indexer"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
types2 "github.com/ethereum/go-ethereum/statediff/types"
"github.com/ethereum/go-ethereum/trie"
)
const (
@ -705,6 +704,9 @@ func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot commo
err = sds.writeStateDiff(block, parentRoot, params)
if err != nil && strings.Contains(err.Error(), deadlockDetected) {
// Retry only when the deadlock is detected.
if i != sds.maxRetry {
log.Info("dead lock detected while writing statediff", "err", err, "retry number", i)
}
continue
}
break