From 4ee8214562fbe4f925d7e997bae63b4029049878 Mon Sep 17 00:00:00 2001 From: i-norden Date: Wed, 17 Nov 2021 19:37:41 -0600 Subject: [PATCH] fix unit tests --- .../database/sql/pgx_indexer_legacy_test.go | 16 ++--- .../indexer/database/sql/pgx_indexer_test.go | 60 +++++++++---------- .../database/sql/sqlx_indexer_legacy_test.go | 14 ++--- .../indexer/database/sql/sqlx_indexer_test.go | 58 +++++++++--------- statediff/indexer/database/sql/writer.go | 2 +- statediff/service.go | 9 +++ 6 files changed, 84 insertions(+), 75 deletions(-) diff --git a/statediff/indexer/database/sql/pgx_indexer_legacy_test.go b/statediff/indexer/database/sql/pgx_indexer_legacy_test.go index 0373ff5ee..21b74b3b2 100644 --- a/statediff/indexer/database/sql/pgx_indexer_legacy_test.go +++ b/statediff/indexer/database/sql/pgx_indexer_legacy_test.go @@ -20,7 +20,6 @@ import ( "context" "testing" - "github.com/jmoiron/sqlx" "github.com/multiformats/go-multihash" "github.com/stretchr/testify/require" @@ -64,20 +63,21 @@ func TestPGXIndexerLegacy(t *testing.T) { t.Run("Publish and index header IPLDs in a legacy tx", func(t *testing.T) { setupLegacyPGX(t) defer tearDown(t) - pgStr := `SELECT cid, td, reward, id, base_fee + pgStr := `SELECT cid, cast(td AS TEXT), cast(reward AS TEXT), block_hash, base_fee FROM eth.header_cids WHERE block_number = $1` // check header was properly indexed type res struct { - CID string - TD string - Reward string - ID int - BaseFee *int64 `db:"base_fee"` + CID string + TD string + Reward string + BlockHash string `db:"block_hash"` + BaseFee *int64 `db:"base_fee"` } header := new(res) - err = db.QueryRow(context.Background(), pgStr, legacyData.BlockNumber.Uint64()).(*sqlx.Row).StructScan(header) + err = db.QueryRow(context.Background(), pgStr, legacyData.BlockNumber.Uint64()).Scan( + &header.CID, &header.TD, &header.Reward, &header.BlockHash, &header.BaseFee) require.NoError(t, err) test_helpers.ExpectEqual(t, header.CID, legacyHeaderCID.String()) diff --git a/statediff/indexer/database/sql/pgx_indexer_test.go b/statediff/indexer/database/sql/pgx_indexer_test.go index f63efe712..26cc660ec 100644 --- a/statediff/indexer/database/sql/pgx_indexer_test.go +++ b/statediff/indexer/database/sql/pgx_indexer_test.go @@ -140,7 +140,7 @@ func setupPGX(t *testing.T) { } }() for _, node := range mocks.StateDiffs { - err = ind.PushStateNode(tx, node) + err = ind.PushStateNode(tx, node, mockBlock.Hash().String()) if err != nil { t.Fatal(err) } @@ -153,16 +153,16 @@ func TestPGXIndexer(t *testing.T) { t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) { setupPGX(t) defer tearDown(t) - pgStr := `SELECT cid, td, reward, id, base_fee + pgStr := `SELECT cid, td, reward, block_hash, base_fee FROM eth.header_cids WHERE block_number = $1` // check header was properly indexed type res struct { - CID string - TD string - Reward string - ID int - BaseFee *int64 `db:"base_fee"` + CID string + TD string + Reward string + BlockHash string `db:"block_hash"` + BaseFee *int64 `db:"base_fee"` } header := new(res) err = db.QueryRow(context.Background(), pgStr, mocks.BlockNumber.Uint64()).(*sqlx.Row).StructScan(header) @@ -192,7 +192,7 @@ func TestPGXIndexer(t *testing.T) { defer tearDown(t) // check that txs were properly indexed trxs := make([]string, 0) - pgStr := `SELECT transaction_cids.cid FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.id) + pgStr := `SELECT transaction_cids.cid FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash) WHERE header_cids.block_number = $1` err = db.Select(context.Background(), &trxs, pgStr, mocks.BlockNumber.Uint64()) if err != nil { @@ -260,7 +260,7 @@ func TestPGXIndexer(t *testing.T) { t.Fatalf("expected AccessListTxType (1), got %d", *txType) } accessListElementModels := make([]models.AccessListElementModel, 0) - pgStr = `SELECT access_list_element.* FROM eth.access_list_element INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.id) WHERE cid = $1 ORDER BY access_list_element.index ASC` + pgStr = `SELECT access_list_element.* FROM eth.access_list_element INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.tx_hash) WHERE cid = $1 ORDER BY access_list_element.index ASC` err = db.Select(context.Background(), &accessListElementModels, pgStr, c) if err != nil { t.Fatal(err) @@ -299,8 +299,8 @@ func TestPGXIndexer(t *testing.T) { rcts := make([]string, 0) pgStr := `SELECT receipt_cids.leaf_cid FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids - WHERE receipt_cids.tx_id = transaction_cids.id - AND transaction_cids.header_id = header_cids.id + WHERE receipt_cids.tx_id = transaction_cids.tx_hash + AND transaction_cids.header_id = header_cids.block_hash AND header_cids.block_number = $1 ORDER BY transaction_cids.index` err = db.Select(context.Background(), &rcts, pgStr, mocks.BlockNumber.Uint64()) @@ -317,8 +317,8 @@ func TestPGXIndexer(t *testing.T) { } for i := range rcts { results := make([]logIPLD, 0) - pgStr = `SELECT log_cids.index, log_cids.address, log_cids.Topic0, log_cids.Topic1, data FROM eth.log_cids - INNER JOIN eth.receipt_cids ON (log_cids.receipt_id = receipt_cids.id) + pgStr = `SELECT log_cids.index, log_cids.address, log_cids.topic0, log_cids.topic1, data FROM eth.log_cids + INNER JOIN eth.receipt_cids ON (log_cids.rct_id = receipt_cids.tx_id) INNER JOIN public.blocks ON (log_cids.leaf_mh_key = blocks.key) WHERE receipt_cids.leaf_cid = $1 ORDER BY eth.log_cids.index ASC` err = db.Select(context.Background(), &results, pgStr, rcts[i]) @@ -350,9 +350,9 @@ func TestPGXIndexer(t *testing.T) { // check receipts were properly indexed rcts := make([]string, 0) pgStr := `SELECT receipt_cids.leaf_cid FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids - WHERE receipt_cids.tx_id = transaction_cids.id - AND transaction_cids.header_id = header_cids.id - AND header_cids.block_number = $1 order by transaction_cids.id` + WHERE receipt_cids.tx_id = transaction_cids.tx_hash + AND transaction_cids.header_id = header_cids.block_hash + AND header_cids.block_number = $1 order by transaction_cids.index` err = db.Select(context.Background(), &rcts, pgStr, mocks.BlockNumber.Uint64()) if err != nil { t.Fatal(err) @@ -447,8 +447,8 @@ func TestPGXIndexer(t *testing.T) { defer tearDown(t) // check that state nodes were properly indexed and published stateNodes := make([]models.StateNodeModel, 0) - pgStr := `SELECT state_cids.id, state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id - FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id) + pgStr := `SELECT state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id + FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash) WHERE header_cids.block_number = $1 AND node_type != 3` err = db.Select(context.Background(), &stateNodes, pgStr, mocks.BlockNumber.Uint64()) if err != nil { @@ -467,9 +467,9 @@ func TestPGXIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - pgStr = `SELECT * from eth.state_accounts WHERE state_id = $1` + pgStr = `SELECT * from eth.state_accounts WHERE header_id = $1 AND state_path = $2` var account models.StateAccountModel - err = db.Get(context.Background(), &account, pgStr, stateNode.ID) + err = db.Get(context.Background(), &account, pgStr, stateNode.HeaderID, stateNode.Path) if err != nil { t.Fatal(err) } @@ -479,8 +479,8 @@ func TestPGXIndexer(t *testing.T) { test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x06'}) test_helpers.ExpectEqual(t, data, mocks.ContractLeafNode) test_helpers.ExpectEqual(t, account, models.StateAccountModel{ - ID: account.ID, - StateID: stateNode.ID, + HeaderID: account.HeaderID, + StatePath: stateNode.Path, Balance: "0", CodeHash: mocks.ContractCodeHash.Bytes(), StorageRoot: mocks.ContractRoot, @@ -493,8 +493,8 @@ func TestPGXIndexer(t *testing.T) { test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x0c'}) test_helpers.ExpectEqual(t, data, mocks.AccountLeafNode) test_helpers.ExpectEqual(t, account, models.StateAccountModel{ - ID: account.ID, - StateID: stateNode.ID, + HeaderID: account.HeaderID, + StatePath: stateNode.Path, Balance: "1000", CodeHash: mocks.AccountCodeHash.Bytes(), StorageRoot: mocks.AccountRoot, @@ -505,8 +505,8 @@ func TestPGXIndexer(t *testing.T) { // check that Removed state nodes were properly indexed and published stateNodes = make([]models.StateNodeModel, 0) - pgStr = `SELECT state_cids.id, state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id - FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id) + pgStr = `SELECT state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id + FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash) WHERE header_cids.block_number = $1 AND node_type = 3` err = db.Select(context.Background(), &stateNodes, pgStr, mocks.BlockNumber.Uint64()) if err != nil { @@ -538,8 +538,8 @@ func TestPGXIndexer(t *testing.T) { storageNodes := make([]models.StorageNodeWithStateKeyModel, 0) pgStr := `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path FROM eth.storage_cids, eth.state_cids, eth.header_cids - WHERE storage_cids.state_id = state_cids.id - AND state_cids.header_id = header_cids.id + WHERE (storage_cids.state_path, storage_cids.header_id) = (state_cids.state_path, state_cids.header_id) + AND state_cids.header_id = header_cids.block_hash AND header_cids.block_number = $1 AND storage_cids.node_type != 3` err = db.Select(context.Background(), &storageNodes, pgStr, mocks.BlockNumber.Uint64()) @@ -571,8 +571,8 @@ func TestPGXIndexer(t *testing.T) { storageNodes = make([]models.StorageNodeWithStateKeyModel, 0) pgStr = `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path FROM eth.storage_cids, eth.state_cids, eth.header_cids - WHERE storage_cids.state_id = state_cids.id - AND state_cids.header_id = header_cids.id + WHERE (storage_cids.state_path, storage_cids.header_id) = (state_cids.state_path, state_cids.header_id) + AND state_cids.header_id = header_cids.block_hash AND header_cids.block_number = $1 AND storage_cids.node_type = 3` err = db.Select(context.Background(), &storageNodes, pgStr, mocks.BlockNumber.Uint64()) diff --git a/statediff/indexer/database/sql/sqlx_indexer_legacy_test.go b/statediff/indexer/database/sql/sqlx_indexer_legacy_test.go index 2ce5f494f..4349850ed 100644 --- a/statediff/indexer/database/sql/sqlx_indexer_legacy_test.go +++ b/statediff/indexer/database/sql/sqlx_indexer_legacy_test.go @@ -62,7 +62,7 @@ func setupLegacySQLX(t *testing.T) { } }() for _, node := range legacyData.StateDiffs { - err = ind.PushStateNode(tx, node) + err = ind.PushStateNode(tx, node, mockLegacyBlock.Hash().String()) require.NoError(t, err) } @@ -73,16 +73,16 @@ func TestSQLXIndexerLegacy(t *testing.T) { t.Run("Publish and index header IPLDs in a legacy tx", func(t *testing.T) { setupLegacySQLX(t) defer tearDown(t) - pgStr := `SELECT cid, td, reward, id, base_fee + pgStr := `SELECT cid, td, reward, block_hash, base_fee FROM eth.header_cids WHERE block_number = $1` // check header was properly indexed type res struct { - CID string - TD string - Reward string - ID int - BaseFee *int64 `db:"base_fee"` + CID string + TD string + Reward string + BlockHash string `db:"block_hash"` + BaseFee *int64 `db:"base_fee"` } header := new(res) err = db.QueryRow(context.Background(), pgStr, legacyData.BlockNumber.Uint64()).(*sqlx.Row).StructScan(header) diff --git a/statediff/indexer/database/sql/sqlx_indexer_test.go b/statediff/indexer/database/sql/sqlx_indexer_test.go index 1861d65d4..09ee62fa3 100644 --- a/statediff/indexer/database/sql/sqlx_indexer_test.go +++ b/statediff/indexer/database/sql/sqlx_indexer_test.go @@ -179,16 +179,16 @@ func TestSQLXIndexer(t *testing.T) { t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) { setupSQLX(t) defer tearDown(t) - pgStr := `SELECT cid, td, reward, id, base_fee + pgStr := `SELECT cid, td, reward, block_hash, base_fee FROM eth.header_cids WHERE block_number = $1` // check header was properly indexed type res struct { - CID string - TD string - Reward string - ID int - BaseFee *int64 `db:"base_fee"` + CID string + TD string + Reward string + BlockHash string `db:"block_hash"` + BaseFee *int64 `db:"base_fee"` } header := new(res) err = db.QueryRow(context.Background(), pgStr, mocks.BlockNumber.Uint64()).(*sqlx.Row).StructScan(header) @@ -218,7 +218,7 @@ func TestSQLXIndexer(t *testing.T) { defer tearDown(t) // check that txs were properly indexed trxs := make([]string, 0) - pgStr := `SELECT transaction_cids.cid FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.id) + pgStr := `SELECT transaction_cids.cid FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash) WHERE header_cids.block_number = $1` err = db.Select(context.Background(), &trxs, pgStr, mocks.BlockNumber.Uint64()) if err != nil { @@ -286,7 +286,7 @@ func TestSQLXIndexer(t *testing.T) { t.Fatalf("expected AccessListTxType (1), got %d", txType) } accessListElementModels := make([]models.AccessListElementModel, 0) - pgStr = `SELECT access_list_element.* FROM eth.access_list_element INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.id) WHERE cid = $1 ORDER BY access_list_element.index ASC` + pgStr = `SELECT access_list_element.* FROM eth.access_list_element INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.tx_hash) WHERE cid = $1 ORDER BY access_list_element.index ASC` err = db.Select(context.Background(), &accessListElementModels, pgStr, c) if err != nil { t.Fatal(err) @@ -325,8 +325,8 @@ func TestSQLXIndexer(t *testing.T) { rcts := make([]string, 0) pgStr := `SELECT receipt_cids.leaf_cid FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids - WHERE receipt_cids.tx_id = transaction_cids.id - AND transaction_cids.header_id = header_cids.id + WHERE receipt_cids.tx_id = transaction_cids.tx_hash + AND transaction_cids.header_id = header_cids.block_hash AND header_cids.block_number = $1 ORDER BY transaction_cids.index` err = db.Select(context.Background(), &rcts, pgStr, mocks.BlockNumber.Uint64()) @@ -343,8 +343,8 @@ func TestSQLXIndexer(t *testing.T) { } for i := range rcts { results := make([]logIPLD, 0) - pgStr = `SELECT log_cids.index, log_cids.address, log_cids.Topic0, log_cids.Topic1, data FROM eth.log_cids - INNER JOIN eth.receipt_cids ON (log_cids.receipt_id = receipt_cids.id) + pgStr = `SELECT log_cids.index, log_cids.address, log_cids.topic0, log_cids.topic1, data FROM eth.log_cids + INNER JOIN eth.receipt_cids ON (log_cids.rct_id = receipt_cids.tx_id) INNER JOIN public.blocks ON (log_cids.leaf_mh_key = blocks.key) WHERE receipt_cids.leaf_cid = $1 ORDER BY eth.log_cids.index ASC` err = db.Select(context.Background(), &results, pgStr, rcts[i]) @@ -376,9 +376,9 @@ func TestSQLXIndexer(t *testing.T) { // check receipts were properly indexed rcts := make([]string, 0) pgStr := `SELECT receipt_cids.leaf_cid FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids - WHERE receipt_cids.tx_id = transaction_cids.id - AND transaction_cids.header_id = header_cids.id - AND header_cids.block_number = $1 order by transaction_cids.id` + WHERE receipt_cids.tx_id = transaction_cids.tx_hash + AND transaction_cids.header_id = header_cids.block_hash + AND header_cids.block_number = $1 order by transaction_cids.index` err = db.Select(context.Background(), &rcts, pgStr, mocks.BlockNumber.Uint64()) if err != nil { t.Fatal(err) @@ -472,8 +472,8 @@ func TestSQLXIndexer(t *testing.T) { defer tearDown(t) // check that state nodes were properly indexed and published stateNodes := make([]models.StateNodeModel, 0) - pgStr := `SELECT state_cids.id, state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id - FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id) + pgStr := `SELECT state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id + FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash) WHERE header_cids.block_number = $1 AND node_type != 3` err = db.Select(context.Background(), &stateNodes, pgStr, mocks.BlockNumber.Uint64()) if err != nil { @@ -492,9 +492,9 @@ func TestSQLXIndexer(t *testing.T) { if err != nil { t.Fatal(err) } - pgStr = `SELECT * from eth.state_accounts WHERE state_id = $1` + pgStr = `SELECT * from eth.state_accounts WHERE header_id = $1 AND state_path = $2` var account models.StateAccountModel - err = db.Get(context.Background(), &account, pgStr, stateNode.ID) + err = db.Get(context.Background(), &account, pgStr, stateNode.HeaderID, stateNode.Path) if err != nil { t.Fatal(err) } @@ -504,8 +504,8 @@ func TestSQLXIndexer(t *testing.T) { test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x06'}) test_helpers.ExpectEqual(t, data, mocks.ContractLeafNode) test_helpers.ExpectEqual(t, account, models.StateAccountModel{ - ID: account.ID, - StateID: stateNode.ID, + HeaderID: account.HeaderID, + StatePath: stateNode.Path, Balance: "0", CodeHash: mocks.ContractCodeHash.Bytes(), StorageRoot: mocks.ContractRoot, @@ -518,8 +518,8 @@ func TestSQLXIndexer(t *testing.T) { test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x0c'}) test_helpers.ExpectEqual(t, data, mocks.AccountLeafNode) test_helpers.ExpectEqual(t, account, models.StateAccountModel{ - ID: account.ID, - StateID: stateNode.ID, + HeaderID: account.HeaderID, + StatePath: stateNode.Path, Balance: "1000", CodeHash: mocks.AccountCodeHash.Bytes(), StorageRoot: mocks.AccountRoot, @@ -530,8 +530,8 @@ func TestSQLXIndexer(t *testing.T) { // check that Removed state nodes were properly indexed and published stateNodes = make([]models.StateNodeModel, 0) - pgStr = `SELECT state_cids.id, state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id - FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id) + pgStr = `SELECT state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id + FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash) WHERE header_cids.block_number = $1 AND node_type = 3` err = db.Select(context.Background(), &stateNodes, pgStr, mocks.BlockNumber.Uint64()) if err != nil { @@ -563,8 +563,8 @@ func TestSQLXIndexer(t *testing.T) { storageNodes := make([]models.StorageNodeWithStateKeyModel, 0) pgStr := `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path FROM eth.storage_cids, eth.state_cids, eth.header_cids - WHERE storage_cids.state_id = state_cids.id - AND state_cids.header_id = header_cids.id + WHERE (storage_cids.state_path, storage_cids.header_id) = (state_cids.state_path, state_cids.header_id) + AND state_cids.header_id = header_cids.block_hash AND header_cids.block_number = $1 AND storage_cids.node_type != 3` err = db.Select(context.Background(), &storageNodes, pgStr, mocks.BlockNumber.Uint64()) @@ -596,8 +596,8 @@ func TestSQLXIndexer(t *testing.T) { storageNodes = make([]models.StorageNodeWithStateKeyModel, 0) pgStr = `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path FROM eth.storage_cids, eth.state_cids, eth.header_cids - WHERE storage_cids.state_id = state_cids.id - AND state_cids.header_id = header_cids.id + WHERE (storage_cids.state_path, storage_cids.header_id) = (state_cids.state_path, state_cids.header_id) + AND state_cids.header_id = header_cids.block_hash AND header_cids.block_number = $1 AND storage_cids.node_type = 3` err = db.Select(context.Background(), &storageNodes, pgStr, mocks.BlockNumber.Uint64()) diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index 96d13d956..94b38c7e1 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -42,7 +42,7 @@ func NewWriter(db Database) *Writer { /* INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) -ON CONFLICT (block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16) +ON CONFLICT (block_hash) DO UPDATE SET (block_number, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = ($1, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16) */ func (in *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error { _, err := tx.Exec(in.db.Context(), in.db.InsertHeaderStm(), diff --git a/statediff/service.go b/statediff/service.go index 3fc8ac60c..04aaac458 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -239,8 +239,14 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { statediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh))) chainEventFwd <- chainEvent case err := <-errCh: + println("here") log.Error("Error from chain event subscription", "error", err) close(sds.QuitChan) + log.Info("Quitting the statediffing writing loop") + if err := sds.indexer.Close(); err != nil { + log.Error("Error closing indexer", "err", err) + } + return case <-sds.QuitChan: log.Info("Quitting the statediffing writing loop") if err := sds.indexer.Close(); err != nil { @@ -339,6 +345,9 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) { case err := <-errCh: log.Error("Error from chain event subscription", "error", err) close(sds.QuitChan) + log.Info("Quitting the statediffing listening loop") + sds.close() + return case <-sds.QuitChan: log.Info("Quitting the statediffing listening loop") sds.close()