check tx pool state at end of unit tests #199
@ -25,8 +25,6 @@ import (
|
||||
"math/big"
|
||||
"time"
|
||||
|
||||
ipld2 "github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
node "github.com/ipfs/go-ipld-format"
|
||||
"github.com/multiformats/go-multihash"
|
||||
@ -39,6 +37,7 @@ import (
|
||||
"github.com/ethereum/go-ethereum/params"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
||||
ipld2 "github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
|
||||
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
|
||||
@ -155,9 +154,11 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
close(self.iplds)
|
||||
}()
|
||||
if p := recover(); p != nil {
|
||||
log.Info("panic detected before tx submission, rolling back the tx", "panic", p)
|
||||
rollback(sdi.ctx, tx)
|
||||
panic(p)
|
||||
} else if err != nil {
|
||||
log.Info("error detected before tx submission, rolling back the tx", "error", err)
|
||||
rollback(sdi.ctx, tx)
|
||||
} else {
|
||||
tDiff := time.Since(t)
|
||||
|
@ -6,12 +6,12 @@ import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/multiformats/go-multihash"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
|
||||
@ -141,3 +141,15 @@ func expectTrue(t *testing.T, value bool) {
|
||||
t.Fatalf("Assertion failed")
|
||||
}
|
||||
}
|
||||
|
||||
func checkTxClosure(t *testing.T, idle, inUse, open int64) {
|
||||
require.Equal(t, idle, db.Stats().Idle())
|
||||
require.Equal(t, inUse, db.Stats().InUse())
|
||||
require.Equal(t, open, db.Stats().Open())
|
||||
}
|
||||
|
||||
func tearDown(t *testing.T) {
|
||||
sql.TearDownDB(t, db)
|
||||
err := ind.Close()
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
@ -103,6 +103,9 @@ func setup(t *testing.T, testBlock *types.Block, testReceipts types.Receipts) {
|
||||
}
|
||||
|
||||
func tearDown(t *testing.T) {
|
||||
require.Equal(t, int64(0), db.Stats().Idle())
|
||||
require.Equal(t, int64(0), db.Stats().InUse())
|
||||
require.Equal(t, int64(0), db.Stats().Open())
|
||||
sql.TearDownDB(t, db)
|
||||
err = ind.Close()
|
||||
require.NoError(t, err)
|
||||
|
@ -59,10 +59,11 @@ func setupLegacyPGX(t *testing.T) {
|
||||
test_helpers.ExpectEqual(t, tx.(*sql.BatchTx).BlockNumber, legacyData.BlockNumber.Uint64())
|
||||
}
|
||||
|
||||
func TestPGXIndexerLegacy(t *testing.T) {
|
||||
func TestLegacyPGXIndexer(t *testing.T) {
|
||||
t.Run("Publish and index header IPLDs", func(t *testing.T) {
|
||||
setupLegacyPGX(t)
|
||||
defer tearDown(t)
|
||||
defer checkTxClosure(t, 1, 0, 1)
|
||||
pgStr := `SELECT cid, cast(td AS TEXT), cast(reward AS TEXT), block_hash, coinbase
|
||||
FROM eth.header_cids
|
||||
WHERE block_number = $1`
|
||||
|
@ -69,6 +69,7 @@ func TestPGXIndexer(t *testing.T) {
|
||||
t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) {
|
||||
setupPGX(t)
|
||||
defer tearDown(t)
|
||||
defer checkTxClosure(t, 1, 0, 1)
|
||||
pgStr := `SELECT cid, cast(td AS TEXT), cast(reward AS TEXT), block_hash, coinbase
|
||||
FROM eth.header_cids
|
||||
WHERE block_number = $1`
|
||||
@ -111,6 +112,7 @@ func TestPGXIndexer(t *testing.T) {
|
||||
t.Run("Publish and index transaction IPLDs in a single tx", func(t *testing.T) {
|
||||
setupPGX(t)
|
||||
defer tearDown(t)
|
||||
defer checkTxClosure(t, 1, 0, 1)
|
||||
// check that txs were properly indexed and published
|
||||
trxs := make([]string, 0)
|
||||
pgStr := `SELECT transaction_cids.cid FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash)
|
||||
@ -237,6 +239,7 @@ func TestPGXIndexer(t *testing.T) {
|
||||
t.Run("Publish and index log IPLDs for multiple receipt of a specific block", func(t *testing.T) {
|
||||
setupPGX(t)
|
||||
defer tearDown(t)
|
||||
defer checkTxClosure(t, 1, 0, 1)
|
||||
|
||||
rcts := make([]string, 0)
|
||||
rctsPgStr := `SELECT receipt_cids.leaf_cid FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids
|
||||
@ -294,6 +297,7 @@ func TestPGXIndexer(t *testing.T) {
|
||||
t.Run("Publish and index receipt IPLDs in a single tx", func(t *testing.T) {
|
||||
setupPGX(t)
|
||||
defer tearDown(t)
|
||||
defer checkTxClosure(t, 1, 0, 1)
|
||||
|
||||
// check receipts were properly indexed and published
|
||||
rcts := make([]string, 0)
|
||||
@ -395,6 +399,7 @@ func TestPGXIndexer(t *testing.T) {
|
||||
t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) {
|
||||
setupPGX(t)
|
||||
defer tearDown(t)
|
||||
defer checkTxClosure(t, 1, 0, 1)
|
||||
// check that state nodes were properly indexed and published
|
||||
stateNodes := make([]models.StateNodeModel, 0)
|
||||
pgStr := `SELECT state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id
|
||||
@ -484,6 +489,7 @@ func TestPGXIndexer(t *testing.T) {
|
||||
t.Run("Publish and index storage IPLDs in a single tx", func(t *testing.T) {
|
||||
setupPGX(t)
|
||||
defer tearDown(t)
|
||||
defer checkTxClosure(t, 1, 0, 1)
|
||||
// check that storage nodes were properly indexed
|
||||
storageNodes := make([]models.StorageNodeWithStateKeyModel, 0)
|
||||
pgStr := `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path
|
||||
|
@ -45,13 +45,10 @@ func NewSQLXDriver(ctx context.Context, config Config, node node.Info) (*SQLXDri
|
||||
if config.MaxConns > 0 {
|
||||
db.SetMaxOpenConns(config.MaxConns)
|
||||
}
|
||||
if config.MaxIdle > 0 {
|
||||
db.SetMaxIdleConns(config.MaxIdle)
|
||||
}
|
||||
if config.MaxConnLifetime > 0 {
|
||||
lifetime := config.MaxConnLifetime
|
||||
db.SetConnMaxLifetime(lifetime)
|
||||
db.SetConnMaxLifetime(config.MaxConnLifetime)
|
||||
}
|
||||
db.SetMaxIdleConns(config.MaxIdle)
|
||||
driver := &SQLXDriver{ctx: ctx, db: db, nodeInfo: node}
|
||||
if err := driver.createNode(); err != nil {
|
||||
return &SQLXDriver{}, ErrUnableToSetNode(err)
|
||||
|
@ -25,7 +25,9 @@ import (
|
||||
|
||||
// SetupSQLXDB is used to setup a sqlx db for tests
|
||||
func SetupSQLXDB() (sql.Database, error) {
|
||||
driver, err := NewSQLXDriver(context.Background(), DefaultConfig, node.Info{})
|
||||
conf := DefaultConfig
|
||||
conf.MaxIdle = 0
|
||||
driver, err := NewSQLXDriver(context.Background(), conf, node.Info{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -69,10 +69,11 @@ func setupLegacySQLX(t *testing.T) {
|
||||
test_helpers.ExpectEqual(t, tx.(*sql.BatchTx).BlockNumber, legacyData.BlockNumber.Uint64())
|
||||
}
|
||||
|
||||
func TestSQLXIndexerLegacy(t *testing.T) {
|
||||
func TestLegacySQLXIndexer(t *testing.T) {
|
||||
t.Run("Publish and index header IPLDs", func(t *testing.T) {
|
||||
setupLegacySQLX(t)
|
||||
defer tearDown(t)
|
||||
defer checkTxClosure(t, 0, 0, 0)
|
||||
pgStr := `SELECT cid, td, reward, block_hash, coinbase
|
||||
FROM eth.header_cids
|
||||
WHERE block_number = $1`
|
||||
|
@ -66,17 +66,11 @@ func setupSQLX(t *testing.T) {
|
||||
test_helpers.ExpectEqual(t, tx.(*sql.BatchTx).BlockNumber, mocks.BlockNumber.Uint64())
|
||||
}
|
||||
|
||||
func tearDown(t *testing.T) {
|
||||
sql.TearDownDB(t, db)
|
||||
if err := ind.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSQLXIndexer(t *testing.T) {
|
||||
t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) {
|
||||
setupSQLX(t)
|
||||
defer tearDown(t)
|
||||
defer checkTxClosure(t, 0, 0, 0)
|
||||
pgStr := `SELECT cid, td, reward, block_hash, coinbase
|
||||
FROM eth.header_cids
|
||||
WHERE block_number = $1`
|
||||
@ -114,6 +108,7 @@ func TestSQLXIndexer(t *testing.T) {
|
||||
t.Run("Publish and index transaction IPLDs in a single tx", func(t *testing.T) {
|
||||
setupSQLX(t)
|
||||
defer tearDown(t)
|
||||
defer checkTxClosure(t, 0, 0, 0)
|
||||
// check that txs were properly indexed and published
|
||||
trxs := make([]string, 0)
|
||||
pgStr := `SELECT transaction_cids.cid FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash)
|
||||
@ -240,6 +235,7 @@ func TestSQLXIndexer(t *testing.T) {
|
||||
t.Run("Publish and index log IPLDs for multiple receipt of a specific block", func(t *testing.T) {
|
||||
setupSQLX(t)
|
||||
defer tearDown(t)
|
||||
defer checkTxClosure(t, 0, 0, 0)
|
||||
|
||||
rcts := make([]string, 0)
|
||||
rctsPgStr := `SELECT receipt_cids.leaf_cid FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids
|
||||
@ -295,6 +291,7 @@ func TestSQLXIndexer(t *testing.T) {
|
||||
t.Run("Publish and index receipt IPLDs in a single tx", func(t *testing.T) {
|
||||
setupSQLX(t)
|
||||
defer tearDown(t)
|
||||
defer checkTxClosure(t, 0, 0, 0)
|
||||
|
||||
// check receipts were properly indexed and published
|
||||
rcts := make([]string, 0)
|
||||
@ -395,6 +392,7 @@ func TestSQLXIndexer(t *testing.T) {
|
||||
t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) {
|
||||
setupSQLX(t)
|
||||
defer tearDown(t)
|
||||
defer checkTxClosure(t, 0, 0, 0)
|
||||
// check that state nodes were properly indexed and published
|
||||
stateNodes := make([]models.StateNodeModel, 0)
|
||||
pgStr := `SELECT state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id
|
||||
@ -484,6 +482,7 @@ func TestSQLXIndexer(t *testing.T) {
|
||||
t.Run("Publish and index storage IPLDs in a single tx", func(t *testing.T) {
|
||||
setupSQLX(t)
|
||||
defer tearDown(t)
|
||||
defer checkTxClosure(t, 0, 0, 0)
|
||||
// check that storage nodes were properly indexed
|
||||
storageNodes := make([]models.StorageNodeWithStateKeyModel, 0)
|
||||
pgStr := `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path
|
||||
|
@ -39,12 +39,11 @@ import (
|
||||
"github.com/ethereum/go-ethereum/p2p"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
"github.com/ethereum/go-ethereum/trie"
|
||||
|
||||
ind "github.com/ethereum/go-ethereum/statediff/indexer"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
||||
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
|
||||
types2 "github.com/ethereum/go-ethereum/statediff/types"
|
||||
"github.com/ethereum/go-ethereum/trie"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -705,6 +704,9 @@ func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot commo
|
||||
err = sds.writeStateDiff(block, parentRoot, params)
|
||||
if err != nil && strings.Contains(err.Error(), deadlockDetected) {
|
||||
// Retry only when the deadlock is detected.
|
||||
if i != sds.maxRetry {
|
||||
log.Info("dead lock detected while writing statediff", "err", err, "retry number", i)
|
||||
}
|
||||
continue
|
||||
}
|
||||
break
|
||||
|
Loading…
Reference in New Issue
Block a user