diff --git a/statediff/indexer/database/dump/indexer.go b/statediff/indexer/database/dump/indexer.go index b75fb1af9..faad44e85 100644 --- a/statediff/indexer/database/dump/indexer.go +++ b/statediff/indexer/database/dump/indexer.go @@ -184,10 +184,10 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (string, error) { tx.cacheIPLD(headerNode) - var baseFee *int64 + var baseFee *string if header.BaseFee != nil { - baseFee = new(int64) - *baseFee = header.BaseFee.Int64() + baseFee = new(string) + *baseFee = header.BaseFee.String() } headerID := header.Hash().String() diff --git a/statediff/indexer/database/file/config.go b/statediff/indexer/database/file/config.go index 2553174a3..c2c6804c0 100644 --- a/statediff/indexer/database/file/config.go +++ b/statediff/indexer/database/file/config.go @@ -31,3 +31,15 @@ type Config struct { func (c Config) Type() shared.DBType { return shared.FILE } + +// TestConfig config for unit tests +var TestConfig = Config{ + FilePath: "./statediffing_test_file.sql", + NodeInfo: node.Info{ + GenesisBlock: "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3", + NetworkID: "1", + ChainID: 1, + ID: "mockNodeID", + ClientName: "go-ethereum", + }, +} diff --git a/statediff/indexer/database/file/indexer.go b/statediff/indexer/database/file/indexer.go index 454d1e3d1..5c2bbcb52 100644 --- a/statediff/indexer/database/file/indexer.go +++ b/statediff/indexer/database/file/indexer.go @@ -53,7 +53,7 @@ var ( // StateDiffIndexer satisfies the indexer.StateDiffIndexer interface for ethereum statediff objects on top of a void type StateDiffIndexer struct { - writer *SQLWriter + fileWriter *SQLWriter chainConfig *params.ChainConfig nodeID string wg *sync.WaitGroup @@ -79,7 +79,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, c w.upsertNode(config.NodeInfo) w.upsertIPLDDirect(shared.RemovedNodeMhKey, []byte{}) return &StateDiffIndexer{ - writer: w, + fileWriter: w, chainConfig: chainConfig, nodeID: config.NodeInfo.ID, wg: wg, @@ -133,7 +133,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip indexerMetrics.tStateStoreCodeProcessing.Update(tDiff) traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String()) t = time.Now() - sdi.writer.Flush() + sdi.fileWriter.Flush() tDiff = time.Since(t) indexerMetrics.tPostgresCommit.Update(tDiff) traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String()) @@ -189,15 +189,15 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip // processHeader write a header IPLD insert SQL stmt to a file // it returns the headerID func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node.Node, reward, td *big.Int) string { - sdi.writer.upsertIPLDNode(headerNode) + sdi.fileWriter.upsertIPLDNode(headerNode) - var baseFee *int64 + var baseFee *string if header.BaseFee != nil { - baseFee = new(int64) - *baseFee = header.BaseFee.Int64() + baseFee = new(string) + *baseFee = header.BaseFee.String() } headerID := header.Hash().String() - sdi.writer.upsertHeaderCID(models.HeaderModel{ + sdi.fileWriter.upsertHeaderCID(models.HeaderModel{ NodeID: sdi.nodeID, CID: headerNode.Cid().String(), MhKey: shared.MultihashKeyFromCID(headerNode.Cid()), @@ -221,7 +221,7 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode node func (sdi *StateDiffIndexer) processUncles(headerID string, blockNumber uint64, uncleNodes []*ipld2.EthHeader) { // publish and index uncles for _, uncleNode := range uncleNodes { - sdi.writer.upsertIPLDNode(uncleNode) + sdi.fileWriter.upsertIPLDNode(uncleNode) var uncleReward *big.Int // in PoA networks uncle reward is 0 if sdi.chainConfig.Clique != nil { @@ -229,7 +229,7 @@ func (sdi *StateDiffIndexer) processUncles(headerID string, blockNumber uint64, } else { uncleReward = shared.CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64()) } - sdi.writer.upsertUncleCID(models.UncleModel{ + sdi.fileWriter.upsertUncleCID(models.UncleModel{ HeaderID: headerID, CID: uncleNode.Cid().String(), MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()), @@ -261,10 +261,10 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { signer := types.MakeSigner(sdi.chainConfig, args.blockNumber) for i, receipt := range args.receipts { for _, logTrieNode := range args.logTrieNodes[i] { - sdi.writer.upsertIPLDNode(logTrieNode) + sdi.fileWriter.upsertIPLDNode(logTrieNode) } txNode := args.txNodes[i] - sdi.writer.upsertIPLDNode(txNode) + sdi.fileWriter.upsertIPLDNode(txNode) // index tx trx := args.txs[i] @@ -285,7 +285,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { MhKey: shared.MultihashKeyFromCID(txNode.Cid()), Type: trx.Type(), } - sdi.writer.upsertTransactionCID(txModel) + sdi.fileWriter.upsertTransactionCID(txModel) // index access list if this is one for j, accessListElement := range trx.AccessList() { @@ -299,7 +299,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { Address: accessListElement.Address.Hex(), StorageKeys: storageKeys, } - sdi.writer.upsertAccessListElement(accessListElementModel) + sdi.fileWriter.upsertAccessListElement(accessListElementModel) } // this is the contract address if this receipt is for a contract creation tx @@ -327,7 +327,7 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { } else { rctModel.PostState = common.Bytes2Hex(receipt.PostState) } - sdi.writer.upsertReceiptCID(rctModel) + sdi.fileWriter.upsertReceiptCID(rctModel) // index logs logDataSet := make([]*models.LogsModel, len(receipt.Logs)) @@ -354,13 +354,13 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { Topic3: topicSet[3], } } - sdi.writer.upsertLogCID(logDataSet) + sdi.fileWriter.upsertLogCID(logDataSet) } // publish trie nodes, these aren't indexed directly for i, n := range args.txTrieNodes { - sdi.writer.upsertIPLDNode(n) - sdi.writer.upsertIPLDNode(args.rctTrieNodes[i]) + sdi.fileWriter.upsertIPLDNode(n) + sdi.fileWriter.upsertIPLDNode(args.rctTrieNodes[i]) } return nil @@ -380,10 +380,10 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt MhKey: shared.RemovedNodeMhKey, NodeType: stateNode.NodeType.Int(), } - sdi.writer.upsertStateCID(stateModel) + sdi.fileWriter.upsertStateCID(stateModel) return nil } - stateCIDStr, stateMhKey, err := sdi.writer.upsertIPLDRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue) + stateCIDStr, stateMhKey, err := sdi.fileWriter.upsertIPLDRaw(ipld2.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue) if err != nil { return fmt.Errorf("error generating and cacheing state node IPLD: %v", err) } @@ -396,7 +396,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt NodeType: stateNode.NodeType.Int(), } // index the state node - sdi.writer.upsertStateCID(stateModel) + sdi.fileWriter.upsertStateCID(stateModel) // if we have a leaf, decode and index the account data if stateNode.NodeType == sdtypes.Leaf { var i []interface{} @@ -418,7 +418,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt CodeHash: account.CodeHash, StorageRoot: account.Root.String(), } - sdi.writer.upsertStateAccount(accountModel) + sdi.fileWriter.upsertStateAccount(accountModel) } // if there are any storage nodes associated with this node, publish and index them for _, storageNode := range stateNode.StorageNodes { @@ -434,10 +434,10 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt MhKey: shared.RemovedNodeMhKey, NodeType: storageNode.NodeType.Int(), } - sdi.writer.upsertStorageCID(storageModel) + sdi.fileWriter.upsertStorageCID(storageModel) continue } - storageCIDStr, storageMhKey, err := sdi.writer.upsertIPLDRaw(ipld2.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue) + storageCIDStr, storageMhKey, err := sdi.fileWriter.upsertIPLDRaw(ipld2.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue) if err != nil { return fmt.Errorf("error generating and cacheing storage node IPLD: %v", err) } @@ -450,7 +450,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt MhKey: storageMhKey, NodeType: storageNode.NodeType.Int(), } - sdi.writer.upsertStorageCID(storageModel) + sdi.fileWriter.upsertStorageCID(storageModel) } return nil @@ -463,11 +463,11 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAnd if err != nil { return fmt.Errorf("error deriving multihash key from codehash: %v", err) } - sdi.writer.upsertIPLDDirect(mhKey, codeAndCodeHash.Code) + sdi.fileWriter.upsertIPLDDirect(mhKey, codeAndCodeHash.Code) return nil } // Close satisfies io.Closer func (sdi *StateDiffIndexer) Close() error { - return sdi.writer.Close() + return sdi.fileWriter.Close() } diff --git a/statediff/indexer/database/file/indexer_legacy_test.go b/statediff/indexer/database/file/indexer_legacy_test.go new file mode 100644 index 000000000..9259f4c13 --- /dev/null +++ b/statediff/indexer/database/file/indexer_legacy_test.go @@ -0,0 +1,132 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package file_test + +import ( + "context" + "errors" + "os" + "testing" + + "github.com/ipfs/go-cid" + "github.com/jmoiron/sqlx" + "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/require" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/statediff/indexer/database/file" + "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres" + "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" + "github.com/ethereum/go-ethereum/statediff/indexer/ipld" + "github.com/ethereum/go-ethereum/statediff/indexer/mocks" + "github.com/ethereum/go-ethereum/statediff/indexer/test_helpers" +) + +var ( + legacyData = mocks.NewLegacyData() + mockLegacyBlock *types.Block + legacyHeaderCID cid.Cid +) + +func setupLegacy(t *testing.T) { + mockLegacyBlock = legacyData.MockBlock + legacyHeaderCID, _ = ipld.RawdataToCid(ipld.MEthHeader, legacyData.MockHeaderRlp, multihash.KECCAK_256) + if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) { + err := os.Remove(file.TestConfig.FilePath) + require.NoError(t, err) + } + ind, err := file.NewStateDiffIndexer(context.Background(), legacyData.Config, file.TestConfig) + require.NoError(t, err) + var tx interfaces.Batch + tx, err = ind.PushBlock( + mockLegacyBlock, + legacyData.MockReceipts, + legacyData.MockBlock.Difficulty()) + require.NoError(t, err) + + defer func() { + if err := tx.Submit(err); err != nil { + t.Fatal(err) + } + if err := ind.Close(); err != nil { + t.Fatal(err) + } + }() + for _, node := range legacyData.StateDiffs { + err = ind.PushStateNode(tx, node, legacyData.MockBlock.Hash().String()) + require.NoError(t, err) + } + + test_helpers.ExpectEqual(t, tx.(*file.BatchTx).BlockNumber, legacyData.BlockNumber.Uint64()) + + connStr := postgres.DefaultConfig.DbConnectionString() + + sqlxdb, err = sqlx.Connect("postgres", connStr) + if err != nil { + t.Fatalf("failed to connect to db with connection string: %s err: %v", connStr, err) + } +} + +func dumpData(t *testing.T) { + sqlFileBytes, err := os.ReadFile(file.TestConfig.FilePath) + require.NoError(t, err) + + _, err = sqlxdb.Exec(string(sqlFileBytes)) + require.NoError(t, err) +} + +func tearDown(t *testing.T) { + file.TearDownDB(t, sqlxdb) + err := os.Remove(file.TestConfig.FilePath) + require.NoError(t, err) + err = sqlxdb.Close() + require.NoError(t, err) +} + +func expectTrue(t *testing.T, value bool) { + if !value { + t.Fatalf("Assertion failed") + } +} + +func TestFileIndexerLegacy(t *testing.T) { + t.Run("Publish and index header IPLDs", func(t *testing.T) { + setupLegacy(t) + dumpData(t) + defer tearDown(t) + pgStr := `SELECT cid, td, reward, block_hash, base_fee + FROM eth.header_cids + WHERE block_number = $1` + // check header was properly indexed + type res struct { + CID string + TD string + Reward string + BlockHash string `db:"block_hash"` + BaseFee *string `db:"base_fee"` + } + header := new(res) + err = sqlxdb.QueryRowx(pgStr, legacyData.BlockNumber.Uint64()).StructScan(header) + require.NoError(t, err) + + test_helpers.ExpectEqual(t, header.CID, legacyHeaderCID.String()) + test_helpers.ExpectEqual(t, header.TD, legacyData.MockBlock.Difficulty().String()) + test_helpers.ExpectEqual(t, header.Reward, "5000000000000011250") + require.Nil(t, legacyData.MockHeader.BaseFee) + require.Nil(t, header.BaseFee) + }) +} diff --git a/statediff/indexer/database/file/indexer_test.go b/statediff/indexer/database/file/indexer_test.go new file mode 100644 index 000000000..b0489fc08 --- /dev/null +++ b/statediff/indexer/database/file/indexer_test.go @@ -0,0 +1,634 @@ +// VulcanizeDB +// Copyright © 2019 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package file_test + +import ( + "bytes" + "context" + "errors" + "fmt" + "os" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/statediff/indexer/models" + "github.com/ethereum/go-ethereum/statediff/indexer/shared" + + "github.com/ipfs/go-cid" + blockstore "github.com/ipfs/go-ipfs-blockstore" + dshelp "github.com/ipfs/go-ipfs-ds-help" + "github.com/jmoiron/sqlx" + "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/require" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/statediff/indexer/database/file" + "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres" + "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" + "github.com/ethereum/go-ethereum/statediff/indexer/ipld" + "github.com/ethereum/go-ethereum/statediff/indexer/mocks" + "github.com/ethereum/go-ethereum/statediff/indexer/test_helpers" +) + +var ( + sqlxdb *sqlx.DB + err error + ind interfaces.StateDiffIndexer + ipfsPgGet = `SELECT data FROM public.blocks + WHERE key = $1` + tx1, tx2, tx3, tx4, tx5, rct1, rct2, rct3, rct4, rct5 []byte + mockBlock *types.Block + headerCID, trx1CID, trx2CID, trx3CID, trx4CID, trx5CID cid.Cid + rct1CID, rct2CID, rct3CID, rct4CID, rct5CID cid.Cid + state1CID, state2CID, storageCID cid.Cid +) + +func init() { + if os.Getenv("MODE") != "statediff" { + fmt.Println("Skipping statediff test") + os.Exit(0) + } + + mockBlock = mocks.MockBlock + txs, rcts := mocks.MockBlock.Transactions(), mocks.MockReceipts + + buf := new(bytes.Buffer) + txs.EncodeIndex(0, buf) + tx1 = make([]byte, buf.Len()) + copy(tx1, buf.Bytes()) + buf.Reset() + + txs.EncodeIndex(1, buf) + tx2 = make([]byte, buf.Len()) + copy(tx2, buf.Bytes()) + buf.Reset() + + txs.EncodeIndex(2, buf) + tx3 = make([]byte, buf.Len()) + copy(tx3, buf.Bytes()) + buf.Reset() + + txs.EncodeIndex(3, buf) + tx4 = make([]byte, buf.Len()) + copy(tx4, buf.Bytes()) + buf.Reset() + + txs.EncodeIndex(4, buf) + tx5 = make([]byte, buf.Len()) + copy(tx5, buf.Bytes()) + buf.Reset() + + rcts.EncodeIndex(0, buf) + rct1 = make([]byte, buf.Len()) + copy(rct1, buf.Bytes()) + buf.Reset() + + rcts.EncodeIndex(1, buf) + rct2 = make([]byte, buf.Len()) + copy(rct2, buf.Bytes()) + buf.Reset() + + rcts.EncodeIndex(2, buf) + rct3 = make([]byte, buf.Len()) + copy(rct3, buf.Bytes()) + buf.Reset() + + rcts.EncodeIndex(3, buf) + rct4 = make([]byte, buf.Len()) + copy(rct4, buf.Bytes()) + buf.Reset() + + rcts.EncodeIndex(4, buf) + rct5 = make([]byte, buf.Len()) + copy(rct5, buf.Bytes()) + buf.Reset() + + headerCID, _ = ipld.RawdataToCid(ipld.MEthHeader, mocks.MockHeaderRlp, multihash.KECCAK_256) + trx1CID, _ = ipld.RawdataToCid(ipld.MEthTx, tx1, multihash.KECCAK_256) + trx2CID, _ = ipld.RawdataToCid(ipld.MEthTx, tx2, multihash.KECCAK_256) + trx3CID, _ = ipld.RawdataToCid(ipld.MEthTx, tx3, multihash.KECCAK_256) + trx4CID, _ = ipld.RawdataToCid(ipld.MEthTx, tx4, multihash.KECCAK_256) + trx5CID, _ = ipld.RawdataToCid(ipld.MEthTx, tx5, multihash.KECCAK_256) + rct1CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct1, multihash.KECCAK_256) + rct2CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct2, multihash.KECCAK_256) + rct3CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct3, multihash.KECCAK_256) + rct4CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct4, multihash.KECCAK_256) + rct5CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct5, multihash.KECCAK_256) + state1CID, _ = ipld.RawdataToCid(ipld.MEthStateTrie, mocks.ContractLeafNode, multihash.KECCAK_256) + state2CID, _ = ipld.RawdataToCid(ipld.MEthStateTrie, mocks.AccountLeafNode, multihash.KECCAK_256) + storageCID, _ = ipld.RawdataToCid(ipld.MEthStorageTrie, mocks.StorageLeafNode, multihash.KECCAK_256) +} + +func setup(t *testing.T) { + if _, err := os.Stat(file.TestConfig.FilePath); !errors.Is(err, os.ErrNotExist) { + err := os.Remove(file.TestConfig.FilePath) + require.NoError(t, err) + } + ind, err = file.NewStateDiffIndexer(context.Background(), mocks.TestConfig, file.TestConfig) + require.NoError(t, err) + var tx interfaces.Batch + tx, err = ind.PushBlock( + mockBlock, + mocks.MockReceipts, + mocks.MockBlock.Difficulty()) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := tx.Submit(err); err != nil { + t.Fatal(err) + } + if err := ind.Close(); err != nil { + t.Fatal(err) + } + }() + for _, node := range mocks.StateDiffs { + err = ind.PushStateNode(tx, node, mockBlock.Hash().String()) + require.NoError(t, err) + } + + test_helpers.ExpectEqual(t, tx.(*file.BatchTx).BlockNumber, mocks.BlockNumber.Uint64()) + + connStr := postgres.DefaultConfig.DbConnectionString() + + sqlxdb, err = sqlx.Connect("postgres", connStr) + if err != nil { + t.Fatalf("failed to connect to db with connection string: %s err: %v", connStr, err) + } +} + +func TestFileIndexer(t *testing.T) { + t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) { + setup(t) + dumpData(t) + defer tearDown(t) + pgStr := `SELECT cid, td, reward, block_hash, base_fee + FROM eth.header_cids + WHERE block_number = $1` + // check header was properly indexed + type res struct { + CID string + TD string + Reward string + BlockHash string `db:"block_hash"` + BaseFee *string `db:"base_fee"` + } + header := new(res) + err = sqlxdb.QueryRowx(pgStr, mocks.BlockNumber.Uint64()).StructScan(header) + if err != nil { + t.Fatal(err) + } + + test_helpers.ExpectEqual(t, header.CID, headerCID.String()) + test_helpers.ExpectEqual(t, header.TD, mocks.MockBlock.Difficulty().String()) + test_helpers.ExpectEqual(t, header.Reward, "2000000000000021250") + test_helpers.ExpectEqual(t, *header.BaseFee, mocks.MockHeader.BaseFee.String()) + dc, err := cid.Decode(header.CID) + if err != nil { + t.Fatal(err) + } + mhKey := dshelp.MultihashToDsKey(dc.Hash()) + prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() + var data []byte + err = sqlxdb.Get(&data, ipfsPgGet, prefixedKey) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, data, mocks.MockHeaderRlp) + }) + t.Run("Publish and index transaction IPLDs in a single tx", func(t *testing.T) { + setup(t) + dumpData(t) + defer tearDown(t) + + // check that txs were properly indexed + trxs := make([]string, 0) + pgStr := `SELECT transaction_cids.cid FROM eth.transaction_cids INNER JOIN eth.header_cids ON (transaction_cids.header_id = header_cids.block_hash) + WHERE header_cids.block_number = $1` + err = sqlxdb.Select(&trxs, pgStr, mocks.BlockNumber.Uint64()) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, len(trxs), 5) + expectTrue(t, test_helpers.ListContainsString(trxs, trx1CID.String())) + expectTrue(t, test_helpers.ListContainsString(trxs, trx2CID.String())) + expectTrue(t, test_helpers.ListContainsString(trxs, trx3CID.String())) + expectTrue(t, test_helpers.ListContainsString(trxs, trx4CID.String())) + expectTrue(t, test_helpers.ListContainsString(trxs, trx5CID.String())) + // and published + for _, c := range trxs { + dc, err := cid.Decode(c) + if err != nil { + t.Fatal(err) + } + mhKey := dshelp.MultihashToDsKey(dc.Hash()) + prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() + var data []byte + err = sqlxdb.Get(&data, ipfsPgGet, prefixedKey) + if err != nil { + t.Fatal(err) + } + txTypePgStr := `SELECT tx_type FROM eth.transaction_cids WHERE cid = $1` + switch c { + case trx1CID.String(): + test_helpers.ExpectEqual(t, data, tx1) + var txType uint8 + err = sqlxdb.Get(&txType, txTypePgStr, c) + if err != nil { + t.Fatal(err) + } + if txType != 0 { + t.Fatalf("expected LegacyTxType (0), got %d", txType) + } + case trx2CID.String(): + test_helpers.ExpectEqual(t, data, tx2) + var txType uint8 + err = sqlxdb.Get(&txType, txTypePgStr, c) + if err != nil { + t.Fatal(err) + } + if txType != 0 { + t.Fatalf("expected LegacyTxType (0), got %d", txType) + } + case trx3CID.String(): + test_helpers.ExpectEqual(t, data, tx3) + var txType uint8 + err = sqlxdb.Get(&txType, txTypePgStr, c) + if err != nil { + t.Fatal(err) + } + if txType != 0 { + t.Fatalf("expected LegacyTxType (0), got %d", txType) + } + case trx4CID.String(): + test_helpers.ExpectEqual(t, data, tx4) + var txType uint8 + err = sqlxdb.Get(&txType, txTypePgStr, c) + if err != nil { + t.Fatal(err) + } + if txType != types.AccessListTxType { + t.Fatalf("expected AccessListTxType (1), got %d", txType) + } + accessListElementModels := make([]models.AccessListElementModel, 0) + pgStr = `SELECT access_list_elements.* FROM eth.access_list_elements INNER JOIN eth.transaction_cids ON (tx_id = transaction_cids.tx_hash) WHERE cid = $1 ORDER BY access_list_elements.index ASC` + err = sqlxdb.Select(&accessListElementModels, pgStr, c) + if err != nil { + t.Fatal(err) + } + if len(accessListElementModels) != 2 { + t.Fatalf("expected two access list entries, got %d", len(accessListElementModels)) + } + model1 := models.AccessListElementModel{ + Index: accessListElementModels[0].Index, + Address: accessListElementModels[0].Address, + } + model2 := models.AccessListElementModel{ + Index: accessListElementModels[1].Index, + Address: accessListElementModels[1].Address, + StorageKeys: accessListElementModels[1].StorageKeys, + } + test_helpers.ExpectEqual(t, model1, mocks.AccessListEntry1Model) + test_helpers.ExpectEqual(t, model2, mocks.AccessListEntry2Model) + case trx5CID.String(): + test_helpers.ExpectEqual(t, data, tx5) + var txType *uint8 + err = sqlxdb.Get(&txType, txTypePgStr, c) + if err != nil { + t.Fatal(err) + } + if *txType != types.DynamicFeeTxType { + t.Fatalf("expected DynamicFeeTxType (2), got %d", *txType) + } + } + } + }) + + t.Run("Publish and index log IPLDs for multiple receipt of a specific block", func(t *testing.T) { + setup(t) + dumpData(t) + defer tearDown(t) + + rcts := make([]string, 0) + pgStr := `SELECT receipt_cids.leaf_cid FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids + WHERE receipt_cids.tx_id = transaction_cids.tx_hash + AND transaction_cids.header_id = header_cids.block_hash + AND header_cids.block_number = $1 + ORDER BY transaction_cids.index` + err = sqlxdb.Select(&rcts, pgStr, mocks.BlockNumber.Uint64()) + if err != nil { + t.Fatal(err) + } + + type logIPLD struct { + Index int `db:"index"` + Address string `db:"address"` + Data []byte `db:"data"` + Topic0 string `db:"topic0"` + Topic1 string `db:"topic1"` + } + for i := range rcts { + results := make([]logIPLD, 0) + pgStr = `SELECT log_cids.index, log_cids.address, log_cids.topic0, log_cids.topic1, data FROM eth.log_cids + INNER JOIN eth.receipt_cids ON (log_cids.rct_id = receipt_cids.tx_id) + INNER JOIN public.blocks ON (log_cids.leaf_mh_key = blocks.key) + WHERE receipt_cids.leaf_cid = $1 ORDER BY eth.log_cids.index ASC` + err = sqlxdb.Select(&results, pgStr, rcts[i]) + require.NoError(t, err) + + // expecting MockLog1 and MockLog2 for mockReceipt4 + expectedLogs := mocks.MockReceipts[i].Logs + test_helpers.ExpectEqual(t, len(results), len(expectedLogs)) + + var nodeElements []interface{} + for idx, r := range results { + // Decode the log leaf node. + err = rlp.DecodeBytes(r.Data, &nodeElements) + require.NoError(t, err) + + logRaw, err := rlp.EncodeToBytes(expectedLogs[idx]) + require.NoError(t, err) + + // 2nd element of the leaf node contains the encoded log data. + test_helpers.ExpectEqual(t, logRaw, nodeElements[1].([]byte)) + } + } + }) + + t.Run("Publish and index receipt IPLDs in a single tx", func(t *testing.T) { + setup(t) + dumpData(t) + defer tearDown(t) + + // check receipts were properly indexed + rcts := make([]string, 0) + pgStr := `SELECT receipt_cids.leaf_cid FROM eth.receipt_cids, eth.transaction_cids, eth.header_cids + WHERE receipt_cids.tx_id = transaction_cids.tx_hash + AND transaction_cids.header_id = header_cids.block_hash + AND header_cids.block_number = $1 order by transaction_cids.index` + err = sqlxdb.Select(&rcts, pgStr, mocks.BlockNumber.Uint64()) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, len(rcts), 5) + + for idx, rctLeafCID := range rcts { + result := make([]models.IPLDModel, 0) + pgStr = `SELECT data + FROM eth.receipt_cids + INNER JOIN public.blocks ON (receipt_cids.leaf_mh_key = public.blocks.key) + WHERE receipt_cids.leaf_cid = $1` + err = sqlxdb.Select(&result, pgStr, rctLeafCID) + if err != nil { + t.Fatal(err) + } + + // Decode the log leaf node. + var nodeElements []interface{} + err = rlp.DecodeBytes(result[0].Data, &nodeElements) + require.NoError(t, err) + + expectedRct, err := mocks.MockReceipts[idx].MarshalBinary() + require.NoError(t, err) + + test_helpers.ExpectEqual(t, expectedRct, nodeElements[1].([]byte)) + } + + // and published + for _, c := range rcts { + dc, err := cid.Decode(c) + if err != nil { + t.Fatal(err) + } + mhKey := dshelp.MultihashToDsKey(dc.Hash()) + prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() + var data []byte + err = sqlxdb.Get(&data, ipfsPgGet, prefixedKey) + if err != nil { + t.Fatal(err) + } + postStatePgStr := `SELECT post_state FROM eth.receipt_cids WHERE leaf_cid = $1` + switch c { + case rct1CID.String(): + test_helpers.ExpectEqual(t, data, rct1) + var postStatus uint64 + pgStr = `SELECT post_status FROM eth.receipt_cids WHERE leaf_cid = $1` + err = sqlxdb.Get(&postStatus, pgStr, c) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, postStatus, mocks.ExpectedPostStatus) + case rct2CID.String(): + test_helpers.ExpectEqual(t, data, rct2) + var postState string + err = sqlxdb.Get(&postState, postStatePgStr, c) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, postState, mocks.ExpectedPostState1) + case rct3CID.String(): + test_helpers.ExpectEqual(t, data, rct3) + var postState string + err = sqlxdb.Get(&postState, postStatePgStr, c) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, postState, mocks.ExpectedPostState2) + case rct4CID.String(): + test_helpers.ExpectEqual(t, data, rct4) + var postState string + err = sqlxdb.Get(&postState, postStatePgStr, c) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, postState, mocks.ExpectedPostState3) + case rct5CID.String(): + test_helpers.ExpectEqual(t, data, rct5) + var postState string + err = sqlxdb.Get(&postState, postStatePgStr, c) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, postState, mocks.ExpectedPostState3) + } + } + }) + + t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) { + setup(t) + dumpData(t) + defer tearDown(t) + + // check that state nodes were properly indexed and published + stateNodes := make([]models.StateNodeModel, 0) + pgStr := `SELECT state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id + FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash) + WHERE header_cids.block_number = $1 AND node_type != 3` + err = sqlxdb.Select(&stateNodes, pgStr, mocks.BlockNumber.Uint64()) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, len(stateNodes), 2) + for _, stateNode := range stateNodes { + var data []byte + dc, err := cid.Decode(stateNode.CID) + if err != nil { + t.Fatal(err) + } + mhKey := dshelp.MultihashToDsKey(dc.Hash()) + prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() + err = sqlxdb.Get(&data, ipfsPgGet, prefixedKey) + if err != nil { + t.Fatal(err) + } + pgStr = `SELECT * from eth.state_accounts WHERE header_id = $1 AND state_path = $2` + var account models.StateAccountModel + err = sqlxdb.Get(&account, pgStr, stateNode.HeaderID, stateNode.Path) + if err != nil { + t.Fatal(err) + } + if stateNode.CID == state1CID.String() { + test_helpers.ExpectEqual(t, stateNode.NodeType, 2) + test_helpers.ExpectEqual(t, stateNode.StateKey, common.BytesToHash(mocks.ContractLeafKey).Hex()) + test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x06'}) + test_helpers.ExpectEqual(t, data, mocks.ContractLeafNode) + test_helpers.ExpectEqual(t, account, models.StateAccountModel{ + HeaderID: account.HeaderID, + StatePath: stateNode.Path, + Balance: "0", + CodeHash: mocks.ContractCodeHash.Bytes(), + StorageRoot: mocks.ContractRoot, + Nonce: 1, + }) + } + if stateNode.CID == state2CID.String() { + test_helpers.ExpectEqual(t, stateNode.NodeType, 2) + test_helpers.ExpectEqual(t, stateNode.StateKey, common.BytesToHash(mocks.AccountLeafKey).Hex()) + test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x0c'}) + test_helpers.ExpectEqual(t, data, mocks.AccountLeafNode) + test_helpers.ExpectEqual(t, account, models.StateAccountModel{ + HeaderID: account.HeaderID, + StatePath: stateNode.Path, + Balance: "1000", + CodeHash: mocks.AccountCodeHash.Bytes(), + StorageRoot: mocks.AccountRoot, + Nonce: 0, + }) + } + } + + // check that Removed state nodes were properly indexed and published + stateNodes = make([]models.StateNodeModel, 0) + pgStr = `SELECT state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id + FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.block_hash) + WHERE header_cids.block_number = $1 AND node_type = 3` + err = sqlxdb.Select(&stateNodes, pgStr, mocks.BlockNumber.Uint64()) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, len(stateNodes), 1) + stateNode := stateNodes[0] + var data []byte + dc, err := cid.Decode(stateNode.CID) + if err != nil { + t.Fatal(err) + } + mhKey := dshelp.MultihashToDsKey(dc.Hash()) + prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() + test_helpers.ExpectEqual(t, prefixedKey, shared.RemovedNodeMhKey) + err = sqlxdb.Get(&data, ipfsPgGet, prefixedKey) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, stateNode.CID, shared.RemovedNodeStateCID) + test_helpers.ExpectEqual(t, stateNode.Path, []byte{'\x02'}) + test_helpers.ExpectEqual(t, data, []byte{}) + }) + + t.Run("Publish and index storage IPLDs in a single tx", func(t *testing.T) { + setup(t) + dumpData(t) + defer tearDown(t) + + // check that storage nodes were properly indexed + storageNodes := make([]models.StorageNodeWithStateKeyModel, 0) + pgStr := `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path + FROM eth.storage_cids, eth.state_cids, eth.header_cids + WHERE (storage_cids.state_path, storage_cids.header_id) = (state_cids.state_path, state_cids.header_id) + AND state_cids.header_id = header_cids.block_hash + AND header_cids.block_number = $1 + AND storage_cids.node_type != 3` + err = sqlxdb.Select(&storageNodes, pgStr, mocks.BlockNumber.Uint64()) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, len(storageNodes), 1) + test_helpers.ExpectEqual(t, storageNodes[0], models.StorageNodeWithStateKeyModel{ + CID: storageCID.String(), + NodeType: 2, + StorageKey: common.BytesToHash(mocks.StorageLeafKey).Hex(), + StateKey: common.BytesToHash(mocks.ContractLeafKey).Hex(), + Path: []byte{}, + }) + var data []byte + dc, err := cid.Decode(storageNodes[0].CID) + if err != nil { + t.Fatal(err) + } + mhKey := dshelp.MultihashToDsKey(dc.Hash()) + prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() + err = sqlxdb.Get(&data, ipfsPgGet, prefixedKey) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, data, mocks.StorageLeafNode) + + // check that Removed storage nodes were properly indexed + storageNodes = make([]models.StorageNodeWithStateKeyModel, 0) + pgStr = `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path + FROM eth.storage_cids, eth.state_cids, eth.header_cids + WHERE (storage_cids.state_path, storage_cids.header_id) = (state_cids.state_path, state_cids.header_id) + AND state_cids.header_id = header_cids.block_hash + AND header_cids.block_number = $1 + AND storage_cids.node_type = 3` + err = sqlxdb.Select(&storageNodes, pgStr, mocks.BlockNumber.Uint64()) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, len(storageNodes), 1) + test_helpers.ExpectEqual(t, storageNodes[0], models.StorageNodeWithStateKeyModel{ + CID: shared.RemovedNodeStorageCID, + NodeType: 3, + StorageKey: common.BytesToHash(mocks.RemovedLeafKey).Hex(), + StateKey: common.BytesToHash(mocks.ContractLeafKey).Hex(), + Path: []byte{'\x03'}, + }) + dc, err = cid.Decode(storageNodes[0].CID) + if err != nil { + t.Fatal(err) + } + mhKey = dshelp.MultihashToDsKey(dc.Hash()) + prefixedKey = blockstore.BlockPrefix.String() + mhKey.String() + test_helpers.ExpectEqual(t, prefixedKey, shared.RemovedNodeMhKey) + err = sqlxdb.Get(&data, ipfsPgGet, prefixedKey) + if err != nil { + t.Fatal(err) + } + test_helpers.ExpectEqual(t, data, []byte{}) + }) +} diff --git a/statediff/indexer/database/file/test_helpers.go b/statediff/indexer/database/file/test_helpers.go new file mode 100644 index 000000000..27d204d55 --- /dev/null +++ b/statediff/indexer/database/file/test_helpers.go @@ -0,0 +1,64 @@ +package file + +import ( + "testing" + + "github.com/jmoiron/sqlx" +) + +// TearDownDB is used to tear down the watcher dbs after tests +func TearDownDB(t *testing.T, db *sqlx.DB) { + tx, err := db.Begin() + if err != nil { + t.Fatal(err) + } + + _, err = tx.Exec(`DELETE FROM eth.header_cids`) + if err != nil { + t.Fatal(err) + } + _, err = tx.Exec(`DELETE FROM eth.uncle_cids`) + if err != nil { + t.Fatal(err) + } + _, err = tx.Exec(`DELETE FROM eth.transaction_cids`) + if err != nil { + t.Fatal(err) + } + _, err = tx.Exec(`DELETE FROM eth.receipt_cids`) + if err != nil { + t.Fatal(err) + } + _, err = tx.Exec(`DELETE FROM eth.state_cids`) + if err != nil { + t.Fatal(err) + } + _, err = tx.Exec(`DELETE FROM eth.storage_cids`) + if err != nil { + t.Fatal(err) + } + _, err = tx.Exec(`DELETE FROM eth.state_accounts`) + if err != nil { + t.Fatal(err) + } + _, err = tx.Exec(`DELETE FROM eth.access_list_elements`) + if err != nil { + t.Fatal(err) + } + _, err = tx.Exec(`DELETE FROM eth.log_cids`) + if err != nil { + t.Fatal(err) + } + _, err = tx.Exec(`DELETE FROM blocks`) + if err != nil { + t.Fatal(err) + } + _, err = tx.Exec(`DELETE FROM nodes`) + if err != nil { + t.Fatal(err) + } + err = tx.Commit() + if err != nil { + t.Fatal(err) + } +} diff --git a/statediff/indexer/database/file/writer.go b/statediff/indexer/database/file/writer.go index fdfa87b08..6329ecae1 100644 --- a/statediff/indexer/database/file/writer.go +++ b/statediff/indexer/database/file/writer.go @@ -18,7 +18,7 @@ package file import ( "fmt" - "os" + "io" blockstore "github.com/ipfs/go-ipfs-blockstore" dshelp "github.com/ipfs/go-ipfs-ds-help" @@ -38,7 +38,7 @@ var ( // SQLWriter writes sql statements to a file type SQLWriter struct { - file *os.File + wc io.WriteCloser stmts chan []byte collatedStmt []byte collationIndex int @@ -50,9 +50,9 @@ type SQLWriter struct { } // NewSQLWriter creates a new pointer to a Writer -func NewSQLWriter(file *os.File) *SQLWriter { +func NewSQLWriter(wc io.WriteCloser) *SQLWriter { return &SQLWriter{ - file: file, + wc: wc, stmts: make(chan []byte), collatedStmt: make([]byte, collatedStmtSize), flushChan: make(chan struct{}), @@ -100,7 +100,7 @@ func (sqw *SQLWriter) Loop() { func (sqw *SQLWriter) Close() error { close(sqw.quitChan) <-sqw.doneChan - return nil + return sqw.wc.Close() } // Flush sends a flush signal to the looping process @@ -110,7 +110,7 @@ func (sqw *SQLWriter) Flush() { } func (sqw *SQLWriter) flush() error { - if _, err := sqw.file.Write(sqw.collatedStmt[0:sqw.collationIndex]); err != nil { + if _, err := sqw.wc.Write(sqw.collatedStmt[0:sqw.collationIndex]); err != nil { return err } sqw.collationIndex = 0 @@ -121,21 +121,21 @@ const ( nodeInsert = "INSERT INTO nodes (genesis_block, network_id, node_id, client_name, chain_id) VALUES " + "('%s', '%s', '%s', '%s', %d);\n" - ipldInsert = "INSERT INTO public.blocks (key, data) VALUES ('%s', '%x');\n" + ipldInsert = "INSERT INTO public.blocks (key, data) VALUES ('%s', '\\x%x');\n" headerInsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, " + "state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) VALUES " + - "('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%x', %d, '%s', %d, %d);\n" + "('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %d, %s);\n" headerInsertWithoutBaseFee = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, " + "reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) VALUES " + - "('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%x', %d, '%s', %d, NULL);\n" + "('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %d, NULL);\n" uncleInsert = "INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES " + "('%s', '%s', '%s', '%s', '%s', '%s');\n" txInsert = "INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) " + - "VALUES ('%s', '%s', '%s', '%s', '%s', %d, '%s', '%x', %d);\n" + "VALUES ('%s', '%s', '%s', '%s', '%s', %d, '%s', '\\x%x', %d);\n" alInsert = "INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES ('%s', %d, '%s', '%s');\n" @@ -143,16 +143,16 @@ const ( "post_status, log_root) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', %d, '%s');\n" logInsert = "INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, " + - "topic3, log_data) VALUES ('%s', '%s', '%s', '%s', %d, '%s', '%s', '%s', '%s', '%x');\n" + "topic3, log_data) VALUES ('%s', '%s', '%s', '%s', %d, '%s', '%s', '%s', '%s', '\\x%x');\n" stateInsert = "INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) " + - "VALUES ('%s', '%s', '%s', '%x', %d, %t, '%s');\n" + "VALUES ('%s', '%s', '%s', '\\x%x', %d, %t, '%s');\n" accountInsert = "INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) " + - "VALUES ('%s', '%x', '%s', %d, '%x', '%s');\n" + "VALUES ('%s', '\\x%x', '%s', %d, '\\x%x', '%s');\n" storageInsert = "INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, " + - "node_type, diff, mh_key) VALUES ('%s', '%x', '%s', '%s', '%x', %d, %t, '%s');\n" + "node_type, diff, mh_key) VALUES ('%s', '\\x%x', '%s', '%s', '\\x%x', %d, %t, '%s');\n" ) func (sqw *SQLWriter) upsertNode(node nodeinfo.Info) { @@ -199,7 +199,7 @@ func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) { } else { stmt = fmt.Sprintf(headerInsert, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, header.NodeID, header.Reward, header.StateRoot, header.TxRoot, - header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.BaseFee) + header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, *header.BaseFee) } sqw.stmts <- []byte(stmt) indexerMetrics.blocks.Inc(1) diff --git a/statediff/indexer/database/sql/batch_writer.go b/statediff/indexer/database/sql/batch_writer.go deleted file mode 100644 index f186d8052..000000000 --- a/statediff/indexer/database/sql/batch_writer.go +++ /dev/null @@ -1,216 +0,0 @@ -// VulcanizeDB -// Copyright © 2021 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package sql - -/* -import ( - "fmt" - - "github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres" - - "github.com/ethereum/go-ethereum/statediff/indexer/models" - "github.com/jmoiron/sqlx" -) - -*/ -/* -// PG_MAX_PARAMS is the max number of placeholders+args a statement can support -// above this limit we need to split into a separate batch -const PG_MAX_PARAMS int = 32767 - -const ( - ipldInsertPgStr string = `INSERT INTO public.blocks (key, data) VALUES (unnest($1), unnest($2)) ON CONFLICT (key) DO NOTHING` - headerCIDsPgStr string = `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) - VALUES (unnest($1), unnest($2), unnest($3), unnest($4), unnest($5), unnest($6), unnest($7), unnest($8), unnest($9), unnest($10), unnest($11), unnest($12), unnest($13), unnest($14), unnest($15), unnest($16)) - ON CONFLICT (block_number, block_hash) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = (excluded.parent_hash, excluded.cid, excluded.td, excluded.node_id, excluded.reward, excluded.state_root, excluded.tx_root, excluded.receipt_root, excluded.uncle_root, excluded.bloom, excluded.timestamp, excluded.mh_key, eth.header_cids.times_validated + 1, excluded.base_fee) - RETURNING id` - unclesCIDsPgStr string = `INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES (unnest($1), unnest($2), unnest($3), unnest($4), unnest($5), unnest($6)) - ON CONFLICT (header_id, block_hash) DO UPDATE SET (parent_hash, cid, reward, mh_key) = (excluded.parent_hash, excluded.cid, excluded.reward, excluded.mh_key)` - txCIDsPgStr string = `INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) VALUES (unnest($1), unnest($2), unnest($3), unnest($4), unnest($5), unnest($6), unnest($7), unnest($8), unnest($9)) - ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index, mh_key, tx_data, tx_type) = (excluded.cid, excluded.dst, excluded.src, excluded.index, excluded.mh_key, excluded.tx_data, excluded.tx_type) - RETURNING id` - accessListPgStr string = `INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES (unnest($1), unnest($2), unnest($3), unnest($4)) - ON CONFLICT (tx_id, index) DO UPDATE SET (address, storage_keys) = (excluded.address, excluded.storage_keys)` - rctCIDsPgStr string = `INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES (unnest($1), unnest($2), unnest($3), unnest($4), unnest($5), unnest($6), unnest($7), unnest($8)) - ON CONFLICT (tx_id) DO UPDATE SET (leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) = (excluded.leaf_cid, excluded.contract, excluded.contract_hash, excluded.leaf_mh_key, excluded.post_state, excluded.post_status, excluded.log_root) - RETURNING id` - logCIDsPgStr string = `INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, receipt_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES (unnest($1), unnest($2), unnest($3), unnest($4), unnest($5), unnest($6), unnest($7), unnest($8), unnest($9), unnest($10)) - ON CONFLICT (receipt_id, index) DO UPDATE SET (leaf_cid, leaf_mh_key, address, topic0, topic1, topic2, topic3, log_data) = (excluded.leaf_cid, excluded.leaf_mh_key, excluded.address, excluded.topic0, excluded.topic1, excluded.topic2, excluded.topic3, excluded.log_data)` - stateCIDsPgStr string = `INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES (unnest($1), unnest($2), unnest($3), unnest($4), unnest($5), unnest($6), unnest($7)) - ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = (excluded.state_leaf_key, excluded.cid, excluded.node_type, excluded.diff, excluded.mh_key) - RETURNING id` - stateAccountsPgStr string = `INSERT INTO eth.state_accounts (state_id, balance, nonce, code_hash, storage_root) VALUES (unnest($1), unnest($2), unnest($3), unnest($4), unnest($5)) - ON CONFLICT (state_id) DO UPDATE SET (balance, nonce, code_hash, storage_root) = (excluded.balance, excluded.nonce, excluded.code_hash, excluded.storage_root)` - storageCIDsPgStr string = `INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES (unnest($1), unnest($2), unnest($3), unnest($4), unnest($5), unnest($6), unnest($7)) - ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = (excluded.storage_leaf_key, excluded.cid, excluded.node_type, excluded.diff, excluded.mh_key)` -) - -// PostgresBatchWriter is used to write statediff data to Postgres using batch inserts/upserts -type PostgresBatchWriter struct { - db *postgres.DB - - // prepared statements (prepared inside tx) - ipldsPreparedStm *sqlx.Stmt - unclesPrepared *sqlx.Stmt - txPreparedStm *sqlx.Stmt - accessListPreparedStm *sqlx.Stmt - rctPreparedStm *sqlx.Stmt - logPreparedStm *sqlx.Stmt - statePreparedStm *sqlx.Stmt - accountPreparedStm *sqlx.Stmt - storagePreparedStm *sqlx.Stmt - - // cached arguments - queuedHeaderArgs models.HeaderModel - queuedUnclesArgs models.UncleBatch - queuedTxArgs models.TxBatch - queuedAccessListArgs models.AccessListBatch - queuedRctArgs models.ReceiptBatch - queuedLogArgs models.LogBatch - queuedStateArgs models.StateBatch - queuedAccountArgs models.AccountBatch - queuedStorageArgs models.StorageBatch -} - -// NewPostgresBatchWriter creates a new pointer to a PostgresBatchWriter -func NewPostgresBatchWriter(db *postgres.DB) *PostgresBatchWriter { - return &PostgresBatchWriter{ - db: db, - } -} - -func (pbw *PostgresBatchWriter) queueHeader(header models.HeaderModel) { - pbw.queuedHeaderArgs = header -} - -func (pbw *PostgresBatchWriter) queueUncle(uncle models.UncleModel) { - pbw.queuedUnclesArgs.BlockHashes = append(pbw.queuedUnclesArgs.BlockHashes, uncle.BlockHash) - pbw.queuedUnclesArgs.ParentHashes = append(pbw.queuedUnclesArgs.ParentHashes, uncle.ParentHash) - pbw.queuedUnclesArgs.CIDs = append(pbw.queuedUnclesArgs.CIDs, uncle.CID) - pbw.queuedUnclesArgs.MhKeys = append(pbw.queuedUnclesArgs.MhKeys, uncle.MhKey) - pbw.queuedUnclesArgs.Rewards = append(pbw.queuedUnclesArgs.Rewards, uncle.Reward) -} - -func (pbw *PostgresBatchWriter) queueTransaction(tx models.TxModel) { - pbw.queuedTxArgs.Indexes = append(pbw.queuedTxArgs.Indexes, tx.Index) - pbw.queuedTxArgs.TxHashes = append(pbw.queuedTxArgs.TxHashes, tx.TxHash) - pbw.queuedTxArgs.CIDs = append(pbw.queuedTxArgs.CIDs, tx.CID) - pbw.queuedTxArgs.MhKeys = append(pbw.queuedTxArgs.MhKeys, tx.MhKey) - pbw.queuedTxArgs.Dsts = append(pbw.queuedTxArgs.Dsts, tx.Dst) - pbw.queuedTxArgs.Srcs = append(pbw.queuedTxArgs.Srcs, tx.Src) - pbw.queuedTxArgs.Datas = append(pbw.queuedTxArgs.Datas, tx.Data) - pbw.queuedTxArgs.Types = append(pbw.queuedTxArgs.Types, tx.Type) -} - -func (pbw *PostgresBatchWriter) queueAccessListElement(al models.AccessListElementModel) { - -} - -func (pbw *PostgresBatchWriter) queueReceipt(rct models.ReceiptModel) { - -} - -func (pbw *PostgresBatchWriter) upsertTransactionCID(tx *sqlx.Tx, transaction models.TxModel, headerID int64) (int64, error) { - var txID int64 - err := tx.QueryRowx(`INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) - ON CONFLICT (header_id, tx_hash) DO UPDATE SET (cid, dst, src, index, mh_key, tx_data, tx_type) = ($3, $4, $5, $6, $7, $8, $9) - RETURNING id`, - headerID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type).Scan(&txID) - if err != nil { - return 0, fmt.Errorf("error upserting transaction_cids entry: %v", err) - } - indexerMetrics.transactions.Inc(1) - return txID, nil -} - -func (pbw *PostgresBatchWriter) upsertAccessListElement(tx *sqlx.Tx, accessListElement models.AccessListElementModel, txID int64) error { - _, err := tx.Exec(`INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4) - ON CONFLICT (tx_id, index) DO UPDATE SET (address, storage_keys) = ($3, $4)`, - txID, accessListElement.Index, accessListElement.Address, accessListElement.StorageKeys) - if err != nil { - return fmt.Errorf("error upserting access_list_element entry: %v", err) - } - indexerMetrics.accessListEntries.Inc(1) - return nil -} - -func (pbw *PostgresBatchWriter) upsertReceiptCID(tx *sqlx.Tx, rct *models.ReceiptModel, txID int64) (int64, error) { - var receiptID int64 - err := tx.QueryRowx(`INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) - ON CONFLICT (tx_id) DO UPDATE SET (leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) = ($2, $3, $4, $5, $6, $7, $8) - RETURNING id`, - txID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot).Scan(&receiptID) - if err != nil { - return 0, fmt.Errorf("error upserting receipt_cids entry: %w", err) - } - indexerMetrics.receipts.Inc(1) - return receiptID, nil -} - -func (pbw *PostgresBatchWriter) upsertLogCID(tx *sqlx.Tx, logs []*models.LogsModel, receiptID int64) error { - for _, log := range logs { - _, err := tx.Exec(`INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, receipt_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) - ON CONFLICT (receipt_id, index) DO UPDATE SET (leaf_cid, leaf_mh_key, address, topic0, topic1, topic2, topic3, log_data) = ($1, $2, $4, $6, $7, $8, $9, $10)`, - log.LeafCID, log.LeafMhKey, receiptID, log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3, log.Data) - if err != nil { - return fmt.Errorf("error upserting logs entry: %w", err) - } - indexerMetrics.logs.Inc(1) - } - return nil -} - -func (pbw *PostgresBatchWriter) upsertStateCID(tx *sqlx.Tx, stateNode models.StateNodeModel, headerID int64) (int64, error) { - var stateID int64 - var stateKey string - if stateNode.StateKey != nullHash.String() { - stateKey = stateNode.StateKey - } - err := tx.QueryRowx(`INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) - ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7) - RETURNING id`, - headerID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey).Scan(&stateID) - if err != nil { - return 0, fmt.Errorf("error upserting state_cids entry: %v", err) - } - return stateID, nil -} - -func (pbw *PostgresBatchWriter) upsertStateAccount(tx *sqlx.Tx, stateAccount models.StateAccountModel, stateID int64) error { - _, err := tx.Exec(`INSERT INTO eth.state_accounts (state_id, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5) - ON CONFLICT (state_id) DO UPDATE SET (balance, nonce, code_hash, storage_root) = ($2, $3, $4, $5)`, - stateID, stateAccount.Balance, stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot) - if err != nil { - return fmt.Errorf("error upserting state_accounts entry: %v", err) - } - return nil -} - -func (pbw *PostgresBatchWriter) upsertStorageCID(tx *sqlx.Tx, storageCID models.StorageNodeModel, stateID int64) error { - var storageKey string - if storageCID.StorageKey != nullHash.String() { - storageKey = storageCID.StorageKey - } - _, err := tx.Exec(`INSERT INTO eth.storage_cids (state_id, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) - ON CONFLICT (state_id, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7)`, - stateID, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey) - if err != nil { - return fmt.Errorf("error upserting storage_cids entry: %v", err) - } - return nil -} -*/ diff --git a/statediff/indexer/database/sql/indexer.go b/statediff/indexer/database/sql/indexer.go index b557ec903..d5f7c3660 100644 --- a/statediff/indexer/database/sql/indexer.go +++ b/statediff/indexer/database/sql/indexer.go @@ -235,10 +235,10 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode node.Node, reward, td *big.Int) (string, error) { tx.cacheIPLD(headerNode) - var baseFee *int64 + var baseFee *string if header.BaseFee != nil { - baseFee = new(int64) - *baseFee = header.BaseFee.Int64() + baseFee = new(string) + *baseFee = header.BaseFee.String() } headerID := header.Hash().String() // index header @@ -541,5 +541,5 @@ func (sdi *StateDiffIndexer) PushCodeAndCodeHash(batch interfaces.Batch, codeAnd // Close satisfies io.Closer func (sdi *StateDiffIndexer) Close() error { - return sdi.dbWriter.db.Close() + return sdi.dbWriter.Close() } diff --git a/statediff/indexer/database/sql/pgx_indexer_legacy_test.go b/statediff/indexer/database/sql/pgx_indexer_legacy_test.go index 21b74b3b2..7dc38a3ca 100644 --- a/statediff/indexer/database/sql/pgx_indexer_legacy_test.go +++ b/statediff/indexer/database/sql/pgx_indexer_legacy_test.go @@ -60,7 +60,7 @@ func setupLegacyPGX(t *testing.T) { } func TestPGXIndexerLegacy(t *testing.T) { - t.Run("Publish and index header IPLDs in a legacy tx", func(t *testing.T) { + t.Run("Publish and index header IPLDs", func(t *testing.T) { setupLegacyPGX(t) defer tearDown(t) pgStr := `SELECT cid, cast(td AS TEXT), cast(reward AS TEXT), block_hash, base_fee diff --git a/statediff/indexer/database/sql/pgx_indexer_test.go b/statediff/indexer/database/sql/pgx_indexer_test.go index 710ad23d9..a378424db 100644 --- a/statediff/indexer/database/sql/pgx_indexer_test.go +++ b/statediff/indexer/database/sql/pgx_indexer_test.go @@ -140,9 +140,7 @@ func setupPGX(t *testing.T) { }() for _, node := range mocks.StateDiffs { err = ind.PushStateNode(tx, node, mockBlock.Hash().String()) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) } test_helpers.ExpectEqual(t, tx.(*sql.BatchTx).BlockNumber, mocks.BlockNumber.Uint64()) @@ -152,7 +150,7 @@ func TestPGXIndexer(t *testing.T) { t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) { setupPGX(t) defer tearDown(t) - pgStr := `SELECT cid, cast(td AS TEXT), cast(reward AS TEXT), block_hash, base_fee + pgStr := `SELECT cid, cast(td AS TEXT), cast(reward AS TEXT), block_hash, cast(base_fee AS TEXT) FROM eth.header_cids WHERE block_number = $1` // check header was properly indexed @@ -160,8 +158,8 @@ func TestPGXIndexer(t *testing.T) { CID string TD string Reward string - BlockHash string `db:"block_hash"` - BaseFee *int64 `db:"base_fee"` + BlockHash string `db:"block_hash"` + BaseFee *string `db:"base_fee"` } header := new(res) err = db.QueryRow(context.Background(), pgStr, mocks.BlockNumber.Uint64()).Scan( @@ -176,7 +174,7 @@ func TestPGXIndexer(t *testing.T) { test_helpers.ExpectEqual(t, header.CID, headerCID.String()) test_helpers.ExpectEqual(t, header.TD, mocks.MockBlock.Difficulty().String()) test_helpers.ExpectEqual(t, header.Reward, "2000000000000021250") - test_helpers.ExpectEqual(t, *header.BaseFee, mocks.MockHeader.BaseFee.Int64()) + test_helpers.ExpectEqual(t, *header.BaseFee, mocks.MockHeader.BaseFee.String()) dc, err := cid.Decode(header.CID) if err != nil { t.Fatal(err) diff --git a/statediff/indexer/database/sql/postgres/config.go b/statediff/indexer/database/sql/postgres/config.go index a7c7cc9b4..5794bd0af 100644 --- a/statediff/indexer/database/sql/postgres/config.go +++ b/statediff/indexer/database/sql/postgres/config.go @@ -49,9 +49,9 @@ func ResolveDriverType(str string) (DriverType, error) { var DefaultConfig = Config{ Hostname: "localhost", Port: 5432, - DatabaseName: "vulcanize_test", + DatabaseName: "vulcanize_testing", Username: "postgres", - Password: "", + Password: "password", } // Config holds params for a Postgres db diff --git a/statediff/indexer/database/sql/postgres/pgx_test.go b/statediff/indexer/database/sql/postgres/pgx_test.go index ea66737f5..64616e356 100644 --- a/statediff/indexer/database/sql/postgres/pgx_test.go +++ b/statediff/indexer/database/sql/postgres/pgx_test.go @@ -47,10 +47,10 @@ func TestPostgresPGX(t *testing.T) { if err != nil { t.Fatalf("failed to connect to db with connection string: %s err: %v", pgConfig.ConnString(), err) } - defer dbPool.Close() if dbPool == nil { t.Fatal("DB pool is nil") } + dbPool.Close() }) t.Run("serializes big.Int to db", func(t *testing.T) { diff --git a/statediff/indexer/database/sql/postgres/sqlx_test.go b/statediff/indexer/database/sql/postgres/sqlx_test.go index 95975a868..03f24e9f5 100644 --- a/statediff/indexer/database/sql/postgres/sqlx_test.go +++ b/statediff/indexer/database/sql/postgres/sqlx_test.go @@ -39,11 +39,15 @@ func TestPostgresSQLX(t *testing.T) { sqlxdb, err = sqlx.Connect("postgres", connStr) if err != nil { - t.Fatalf("failed to connect to db with connection string: %s err: %v", pgConfig.ConnString(), err) + t.Fatalf("failed to connect to db with connection string: %s err: %v", connStr, err) } if sqlxdb == nil { t.Fatal("DB is nil") } + err = sqlxdb.Close() + if err != nil { + t.Fatal(err) + } }) t.Run("serializes big.Int to db", func(t *testing.T) { @@ -59,9 +63,7 @@ func TestPostgresSQLX(t *testing.T) { if err != nil { t.Fatal(err) } - if err != nil { - t.Fatal(err) - } + defer db.Close() bi := new(big.Int) bi.SetString("34940183920000000000", 10) diff --git a/statediff/indexer/database/sql/sqlx_indexer_legacy_test.go b/statediff/indexer/database/sql/sqlx_indexer_legacy_test.go index 4349850ed..d0ed3568b 100644 --- a/statediff/indexer/database/sql/sqlx_indexer_legacy_test.go +++ b/statediff/indexer/database/sql/sqlx_indexer_legacy_test.go @@ -70,7 +70,7 @@ func setupLegacySQLX(t *testing.T) { } func TestSQLXIndexerLegacy(t *testing.T) { - t.Run("Publish and index header IPLDs in a legacy tx", func(t *testing.T) { + t.Run("Publish and index header IPLDs", func(t *testing.T) { setupLegacySQLX(t) defer tearDown(t) pgStr := `SELECT cid, td, reward, block_hash, base_fee diff --git a/statediff/indexer/database/sql/sqlx_indexer_test.go b/statediff/indexer/database/sql/sqlx_indexer_test.go index e0b5f2967..68f06ecaa 100644 --- a/statediff/indexer/database/sql/sqlx_indexer_test.go +++ b/statediff/indexer/database/sql/sqlx_indexer_test.go @@ -160,9 +160,7 @@ func setupSQLX(t *testing.T) { }() for _, node := range mocks.StateDiffs { err = ind.PushStateNode(tx, node, mockBlock.Hash().String()) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) } test_helpers.ExpectEqual(t, tx.(*sql.BatchTx).BlockNumber, mocks.BlockNumber.Uint64()) @@ -187,8 +185,8 @@ func TestSQLXIndexer(t *testing.T) { CID string TD string Reward string - BlockHash string `db:"block_hash"` - BaseFee *int64 `db:"base_fee"` + BlockHash string `db:"block_hash"` + BaseFee *string `db:"base_fee"` } header := new(res) err = db.QueryRow(context.Background(), pgStr, mocks.BlockNumber.Uint64()).(*sqlx.Row).StructScan(header) @@ -198,7 +196,7 @@ func TestSQLXIndexer(t *testing.T) { test_helpers.ExpectEqual(t, header.CID, headerCID.String()) test_helpers.ExpectEqual(t, header.TD, mocks.MockBlock.Difficulty().String()) test_helpers.ExpectEqual(t, header.Reward, "2000000000000021250") - test_helpers.ExpectEqual(t, *header.BaseFee, mocks.MockHeader.BaseFee.Int64()) + test_helpers.ExpectEqual(t, *header.BaseFee, mocks.MockHeader.BaseFee.String()) dc, err := cid.Decode(header.CID) if err != nil { t.Fatal(err) diff --git a/statediff/indexer/database/sql/test_helpers.go b/statediff/indexer/database/sql/test_helpers.go index 46f9b766b..b1032f8ff 100644 --- a/statediff/indexer/database/sql/test_helpers.go +++ b/statediff/indexer/database/sql/test_helpers.go @@ -33,6 +33,10 @@ func TearDownDB(t *testing.T, db Database) { if err != nil { t.Fatal(err) } + _, err = tx.Exec(ctx, `DELETE FROM eth.uncle_cids`) + if err != nil { + t.Fatal(err) + } _, err = tx.Exec(ctx, `DELETE FROM eth.transaction_cids`) if err != nil { t.Fatal(err) @@ -65,6 +69,10 @@ func TearDownDB(t *testing.T, db Database) { if err != nil { t.Fatal(err) } + _, err = tx.Exec(ctx, `DELETE FROM nodes`) + if err != nil { + t.Fatal(err) + } err = tx.Commit(ctx) if err != nil { t.Fatal(err) diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index 3089b6d50..f426263e0 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -39,14 +39,19 @@ func NewWriter(db Database) *Writer { } } +// Close satisfies io.Closer +func (w *Writer) Close() error { + return w.db.Close() +} + /* INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (block_hash) DO UPDATE SET (block_number, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, base_fee) = ($1, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16) */ -func (in *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error { - _, err := tx.Exec(in.db.Context(), in.db.InsertHeaderStm(), - header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, in.db.NodeID(), header.Reward, header.StateRoot, header.TxRoot, +func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error { + _, err := tx.Exec(w.db.Context(), w.db.InsertHeaderStm(), + header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, header.TotalDifficulty, w.db.NodeID(), header.Reward, header.StateRoot, header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.BaseFee) if err != nil { return fmt.Errorf("error upserting header_cids entry: %v", err) @@ -59,8 +64,8 @@ func (in *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error { INSERT INTO eth.uncle_cids (block_hash, header_id, parent_hash, cid, reward, mh_key) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (block_hash) DO NOTHING */ -func (in *Writer) upsertUncleCID(tx Tx, uncle models.UncleModel) error { - _, err := tx.Exec(in.db.Context(), in.db.InsertUncleStm(), +func (w *Writer) upsertUncleCID(tx Tx, uncle models.UncleModel) error { + _, err := tx.Exec(w.db.Context(), w.db.InsertUncleStm(), uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey) if err != nil { return fmt.Errorf("error upserting uncle_cids entry: %v", err) @@ -72,8 +77,8 @@ func (in *Writer) upsertUncleCID(tx Tx, uncle models.UncleModel) error { INSERT INTO eth.transaction_cids (header_id, tx_hash, cid, dst, src, index, mh_key, tx_data, tx_type) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (tx_hash) DO NOTHING */ -func (in *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error { - _, err := tx.Exec(in.db.Context(), in.db.InsertTxStm(), +func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error { + _, err := tx.Exec(w.db.Context(), w.db.InsertTxStm(), transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type) if err != nil { return fmt.Errorf("error upserting transaction_cids entry: %v", err) @@ -86,8 +91,8 @@ func (in *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error INSERT INTO eth.access_list_elements (tx_id, index, address, storage_keys) VALUES ($1, $2, $3, $4) ON CONFLICT (tx_id, index) DO NOTHING */ -func (in *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessListElementModel) error { - _, err := tx.Exec(in.db.Context(), in.db.InsertAccessListElementStm(), +func (w *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessListElementModel) error { + _, err := tx.Exec(w.db.Context(), w.db.InsertAccessListElementStm(), accessListElement.TxID, accessListElement.Index, accessListElement.Address, accessListElement.StorageKeys) if err != nil { return fmt.Errorf("error upserting access_list_element entry: %v", err) @@ -100,8 +105,8 @@ func (in *Writer) upsertAccessListElement(tx Tx, accessListElement models.Access INSERT INTO eth.receipt_cids (tx_id, leaf_cid, contract, contract_hash, leaf_mh_key, post_state, post_status, log_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (tx_id) DO NOTHING */ -func (in *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error { - _, err := tx.Exec(in.db.Context(), in.db.InsertRctStm(), +func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error { + _, err := tx.Exec(w.db.Context(), w.db.InsertRctStm(), rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot) if err != nil { return fmt.Errorf("error upserting receipt_cids entry: %w", err) @@ -114,9 +119,9 @@ func (in *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error { INSERT INTO eth.log_cids (leaf_cid, leaf_mh_key, rct_id, address, index, topic0, topic1, topic2, topic3, log_data) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT (rct_id, index) DO NOTHING */ -func (in *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { +func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { for _, log := range logs { - _, err := tx.Exec(in.db.Context(), in.db.InsertLogStm(), + _, err := tx.Exec(w.db.Context(), w.db.InsertLogStm(), log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3, log.Data) if err != nil { return fmt.Errorf("error upserting logs entry: %w", err) @@ -130,12 +135,12 @@ func (in *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { INSERT INTO eth.state_cids (header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (header_id, state_path) DO UPDATE SET (state_leaf_key, cid, node_type, diff, mh_key) = ($2, $3, $5, $6, $7) */ -func (in *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error { +func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error { var stateKey string if stateNode.StateKey != nullHash.String() { stateKey = stateNode.StateKey } - _, err := tx.Exec(in.db.Context(), in.db.InsertStateStm(), + _, err := tx.Exec(w.db.Context(), w.db.InsertStateStm(), stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey) if err != nil { return fmt.Errorf("error upserting state_cids entry: %v", err) @@ -147,8 +152,8 @@ func (in *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error { INSERT INTO eth.state_accounts (header_id, state_path, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (header_id, state_path) DO NOTHING */ -func (in *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel) error { - _, err := tx.Exec(in.db.Context(), in.db.InsertAccountStm(), +func (w *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel) error { + _, err := tx.Exec(w.db.Context(), w.db.InsertAccountStm(), stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance, stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot) if err != nil { return fmt.Errorf("error upserting state_accounts entry: %v", err) @@ -160,12 +165,12 @@ func (in *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountMode INSERT INTO eth.storage_cids (header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (header_id, state_path, storage_path) DO UPDATE SET (storage_leaf_key, cid, node_type, diff, mh_key) = ($3, $4, $6, $7, $8) */ -func (in *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) error { +func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) error { var storageKey string if storageCID.StorageKey != nullHash.String() { storageKey = storageCID.StorageKey } - _, err := tx.Exec(in.db.Context(), in.db.InsertStorageStm(), + _, err := tx.Exec(w.db.Context(), w.db.InsertStorageStm(), storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey) if err != nil { return fmt.Errorf("error upserting storage_cids entry: %v", err) diff --git a/statediff/indexer/mocks/test_data.go b/statediff/indexer/mocks/test_data.go index f437dc8e4..be61edb87 100644 --- a/statediff/indexer/mocks/test_data.go +++ b/statediff/indexer/mocks/test_data.go @@ -22,18 +22,16 @@ import ( "crypto/rand" "math/big" - "github.com/ethereum/go-ethereum/statediff/indexer/models" - - "github.com/ethereum/go-ethereum/trie" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/statediff/indexer/models" "github.com/ethereum/go-ethereum/statediff/test_helpers" sdtypes "github.com/ethereum/go-ethereum/statediff/types" + "github.com/ethereum/go-ethereum/trie" ) // Test variables diff --git a/statediff/indexer/models/models.go b/statediff/indexer/models/models.go index d37aa5449..e471d20ec 100644 --- a/statediff/indexer/models/models.go +++ b/statediff/indexer/models/models.go @@ -26,22 +26,22 @@ type IPLDModel struct { // HeaderModel is the db model for eth.header_cids type HeaderModel struct { - BlockNumber string `db:"block_number"` - BlockHash string `db:"block_hash"` - ParentHash string `db:"parent_hash"` - CID string `db:"cid"` - MhKey string `db:"mh_key"` - TotalDifficulty string `db:"td"` - NodeID string `db:"node_id"` - Reward string `db:"reward"` - StateRoot string `db:"state_root"` - UncleRoot string `db:"uncle_root"` - TxRoot string `db:"tx_root"` - RctRoot string `db:"receipt_root"` - Bloom []byte `db:"bloom"` - Timestamp uint64 `db:"timestamp"` - TimesValidated int64 `db:"times_validated"` - BaseFee *int64 `db:"base_fee"` + BlockNumber string `db:"block_number"` + BlockHash string `db:"block_hash"` + ParentHash string `db:"parent_hash"` + CID string `db:"cid"` + MhKey string `db:"mh_key"` + TotalDifficulty string `db:"td"` + NodeID string `db:"node_id"` + Reward string `db:"reward"` + StateRoot string `db:"state_root"` + UncleRoot string `db:"uncle_root"` + TxRoot string `db:"tx_root"` + RctRoot string `db:"receipt_root"` + Bloom []byte `db:"bloom"` + Timestamp uint64 `db:"timestamp"` + TimesValidated int64 `db:"times_validated"` + BaseFee *string `db:"base_fee"` } // UncleModel is the db model for eth.uncle_cids