diff --git a/.gitea/workflows/test.yml b/.gitea/workflows/test.yml index d302e68..e203eec 100644 --- a/.gitea/workflows/test.yml +++ b/.gitea/workflows/test.yml @@ -9,7 +9,8 @@ on: - ci-test env: - SO_VERSION: v1.1.0-87fffca-202404110321 + SO_VERSION: roysc/fix-various + FIXTURENET_ETH_STACKS_VERSION: plugeth-stack jobs: unit-tests: @@ -31,34 +32,57 @@ jobs: name: Run integration tests runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 with: path: ./plugeth-statediff + progress: false - name: Build docker image run: | docker build ./plugeth-statediff -t cerc/plugeth-statediff:local + - name: "Install Python for ARM on Linux" + if: ${{ runner.arch == 'arm64' && runner.os == 'Linux' }} + uses: deadsnakes/action@v3.0.1 + with: + python-version: 3.11 + - name: "Install Python cases other than ARM on Linux" + if: ${{ ! (runner.arch == 'arm64' && runner.os == 'Linux') }} + uses: actions/setup-python@v4 + with: + python-version: 3.11 + - name: "Print Python version" + run: python3 --version - name: Install stack-orchestrator - run: | - curl -L -O https://github.com/cerc-io/stack-orchestrator/releases/download/$SO_VERSION/laconic-so - chmod +x laconic-so - - name: Clone system-tests + # run: | + # curl -L -O https://github.com/cerc-io/stack-orchestrator/releases/download/$SO_VERSION/laconic-so + # chmod +x laconic-so + # echo PATH="$PATH:$(pwd)" >> $GITHUB_ENV + # FIXME: merge SO fixes and revert uses: actions/checkout@v3 + with: + repository: cerc-io/stack-orchestrator + ref: ${{ env.SO_VERSION }} + path: .tools/stack-orchestrator + - name: "Install stack orchestrator" + run: pip3 install .tools/stack-orchestrator + - name: Clone system-tests + uses: actions/checkout@v4 with: repository: cerc-io/system-tests ref: v0.1.0-20240411 path: ./system-tests token: ${{ secrets.CICD_REPO_TOKEN }} + progress: false + - name: Clone fixturenet stack repo + uses: actions/checkout@v4 + with: + repository: cerc-io/fixturenet-eth-stacks + ref: ${{ env.FIXTURENET_ETH_STACKS_VERSION }} + path: ./fixturenet-eth-stacks + progress: false - name: Run testnet stack working-directory: ./plugeth-statediff - env: - LACONIC_SO: ../laconic-so - run: ./scripts/integration-setup.sh - - name: Install Python - uses: actions/setup-python@v5 - with: - python-version: 3.11 - + run: ./scripts/run-test-stack.sh - name: Run tests working-directory: ./system-tests run: | diff --git a/Dockerfile b/Dockerfile index 820d80e..b74f77d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ # Using image with same alpine as plugeth, # but go 1.21 to evade https://github.com/Consensys/gnark-crypto/issues/468 -FROM golang:1.21-alpine3.18 as builder +FROM golang:1.21-alpine as builder RUN apk add --no-cache gcc musl-dev binutils-gold linux-headers git diff --git a/indexer/database/dump/indexer.go b/indexer/database/dump/indexer.go index 34f7ec0..cf84afa 100644 --- a/indexer/database/dump/indexer.go +++ b/indexer/database/dump/indexer.go @@ -82,7 +82,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip } // Generate the block iplds - txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts) + txNodes, rctNodes, logNodes, wdNodes, err := ipld.FromBlockAndReceipts(block, receipts) if err != nil { return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err) } @@ -123,15 +123,17 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String()) t = time.Now() // Publish and index receipts and txs - err = sdi.processReceiptsAndTxs(blockTx, processArgs{ + err = sdi.processObjects(blockTx, processArgs{ headerID: headerID, blockNumber: block.Number(), blockTime: block.Time(), receipts: receipts, txs: transactions, + withdrawals: block.Withdrawals(), rctNodes: rctNodes, txNodes: txNodes, logNodes: logNodes, + wdNodes: wdNodes, }) if err != nil { return nil, err @@ -151,7 +153,7 @@ func (sdi *StateDiffIndexer) PushHeader(batch interfaces.Batch, header *types.He if !ok { return "", fmt.Errorf("sql: batch is expected to be of type %T, got %T", &BatchTx{}, batch) } - headerNode, err := ipld.NewEthHeader(header) + headerNode, err := ipld.EncodeHeader(header) if err != nil { return "", err } @@ -173,6 +175,7 @@ func (sdi *StateDiffIndexer) PushHeader(batch interfaces.Batch, header *types.He Timestamp: header.Time, Coinbase: header.Coinbase.String(), Canonical: true, + WithdrawalsRoot: shared.MaybeStringHash(header.WithdrawalsHash), } _, err = fmt.Fprintf(sdi.dump, "%+v\r\n", mod) return headerID, err @@ -225,13 +228,15 @@ type processArgs struct { blockTime uint64 receipts types.Receipts txs types.Transactions - rctNodes []*ipld.EthReceipt - txNodes []*ipld.EthTx - logNodes [][]*ipld.EthLog + withdrawals types.Withdrawals + rctNodes []ipld.IPLD + txNodes []ipld.IPLD + logNodes [][]ipld.IPLD + wdNodes []ipld.IPLD } -// processReceiptsAndTxs publishes and indexes receipt and transaction IPLDs in Postgres -func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs) error { +// processObjects publishes and indexes receipt and transaction IPLDs in Postgres +func (sdi *StateDiffIndexer) processObjects(tx *BatchTx, args processArgs) error { // Process receipts and txs signer := types.MakeSigner(sdi.chainConfig, args.blockNumber, args.blockTime) for i, receipt := range args.receipts { @@ -314,6 +319,23 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs return err } } + // Process withdrawals + for i, withdrawal := range args.withdrawals { + wdNode := args.wdNodes[i] + tx.cacheIPLD(wdNode) + wdModel := models.WithdrawalModel{ + BlockNumber: args.blockNumber.String(), + HeaderID: args.headerID, + CID: wdNode.Cid().String(), + Index: withdrawal.Index, + Validator: withdrawal.Validator, + Address: withdrawal.Address.String(), + Amount: withdrawal.Amount, + } + if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", wdModel); err != nil { + return err + } + } return nil } diff --git a/indexer/database/file/csv_indexer_legacy_test.go b/indexer/database/file/csv_indexer_legacy_test.go index 411b90b..1bb0d46 100644 --- a/indexer/database/file/csv_indexer_legacy_test.go +++ b/indexer/database/file/csv_indexer_legacy_test.go @@ -24,6 +24,7 @@ import ( "path/filepath" "strings" "testing" + "time" "github.com/stretchr/testify/require" @@ -55,6 +56,8 @@ func setupLegacyCSVIndexer(t *testing.T) { func setupLegacyCSV(t *testing.T) { setupLegacyCSVIndexer(t) test.SetupLegacyTestData(t, ind) + t.Cleanup(func() { tearDownCSV(t) }) + time.Sleep(delayForDockerSync) } func dumpCSVFileData(t *testing.T) { @@ -64,7 +67,7 @@ func dumpCSVFileData(t *testing.T) { localOutputDir := filepath.Join(workingDir, file.CSVTestConfig.OutputDir) - for _, tbl := range file.Tables { + for _, tbl := range schema.Tables { err := test_helpers.DedupFile(file.TableFilePath(localOutputDir, tbl.Name)) require.NoError(t, err) @@ -89,6 +92,7 @@ func dumpCSVFileData(t *testing.T) { func resetAndDumpWatchedAddressesCSVFileData(t *testing.T) { test_helpers.TearDownDB(t, db) + time.Sleep(delayForDockerSync) outputFilePath := filepath.Join(dbDirectory, file.CSVTestConfig.WatchedAddressesFilePath) stmt := fmt.Sprintf(pgCopyStatement, schema.TableWatchedAddresses.Name, outputFilePath) @@ -111,7 +115,6 @@ func TestLegacyCSVFileIndexer(t *testing.T) { t.Run("Publish and index header IPLDs", func(t *testing.T) { setupLegacyCSV(t) dumpCSVFileData(t) - defer tearDownCSV(t) test.TestLegacyIndexer(t, db) }) diff --git a/indexer/database/file/csv_indexer_test.go b/indexer/database/file/csv_indexer_test.go index 98095f2..f1453b6 100644 --- a/indexer/database/file/csv_indexer_test.go +++ b/indexer/database/file/csv_indexer_test.go @@ -21,6 +21,7 @@ import ( "math/big" "os" "testing" + "time" "github.com/stretchr/testify/require" @@ -30,6 +31,9 @@ import ( "github.com/cerc-io/plugeth-statediff/indexer/test" ) +// docker bind mount is slow to sync files +var delayForDockerSync = 1 * time.Second + func setupCSVIndexer(t *testing.T) { if _, err := os.Stat(file.CSVTestConfig.OutputDir); !errors.Is(err, os.ErrNotExist) { err := os.RemoveAll(file.CSVTestConfig.OutputDir) @@ -53,18 +57,21 @@ func setupCSVIndexer(t *testing.T) { func setupCSV(t *testing.T) { setupCSVIndexer(t) test.SetupTestData(t, ind) + t.Cleanup(func() { tearDownCSV(t) }) + time.Sleep(delayForDockerSync) } func setupCSVNonCanonical(t *testing.T) { setupCSVIndexer(t) test.SetupTestDataNonCanonical(t, ind) + t.Cleanup(func() { tearDownCSV(t) }) + time.Sleep(delayForDockerSync) } func TestCSVFileIndexer(t *testing.T) { t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) { setupCSV(t) dumpCSVFileData(t) - defer tearDownCSV(t) test.DoTestPublishAndIndexHeaderIPLDs(t, db) }) @@ -72,7 +79,6 @@ func TestCSVFileIndexer(t *testing.T) { t.Run("Publish and index transaction IPLDs in a single tx", func(t *testing.T) { setupCSV(t) dumpCSVFileData(t) - defer tearDownCSV(t) test.DoTestPublishAndIndexTransactionIPLDs(t, db) }) @@ -80,7 +86,6 @@ func TestCSVFileIndexer(t *testing.T) { t.Run("Publish and index log IPLDs for multiple receipt of a specific block", func(t *testing.T) { setupCSV(t) dumpCSVFileData(t) - defer tearDownCSV(t) test.DoTestPublishAndIndexLogIPLDs(t, db) }) @@ -88,15 +93,20 @@ func TestCSVFileIndexer(t *testing.T) { t.Run("Publish and index receipt IPLDs in a single tx", func(t *testing.T) { setupCSV(t) dumpCSVFileData(t) - defer tearDownCSV(t) test.DoTestPublishAndIndexReceiptIPLDs(t, db) }) + t.Run("Publish and index withdrawal IPLDs in a single tx", func(t *testing.T) { + setupCSV(t) + dumpCSVFileData(t) + + test.DoTestPublishAndIndexWithdrawalIPLDs(t, db) + }) + t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) { setupCSV(t) dumpCSVFileData(t) - defer tearDownCSV(t) test.DoTestPublishAndIndexStateIPLDs(t, db) }) @@ -104,7 +114,6 @@ func TestCSVFileIndexer(t *testing.T) { t.Run("Publish and index storage IPLDs in a single tx", func(t *testing.T) { setupCSV(t) dumpCSVFileData(t) - defer tearDownCSV(t) test.DoTestPublishAndIndexStorageIPLDs(t, db) }) @@ -114,7 +123,6 @@ func TestCSVFileIndexerNonCanonical(t *testing.T) { t.Run("Publish and index header", func(t *testing.T) { setupCSVNonCanonical(t) dumpCSVFileData(t) - defer tearDownCSV(t) test.TestPublishAndIndexHeaderNonCanonical(t, db) }) @@ -122,7 +130,6 @@ func TestCSVFileIndexerNonCanonical(t *testing.T) { t.Run("Publish and index transactions", func(t *testing.T) { setupCSVNonCanonical(t) dumpCSVFileData(t) - defer tearDownCSV(t) test.DoTestPublishAndIndexTransactionsNonCanonical(t, db) }) @@ -130,7 +137,6 @@ func TestCSVFileIndexerNonCanonical(t *testing.T) { t.Run("Publish and index receipts", func(t *testing.T) { setupCSVNonCanonical(t) dumpCSVFileData(t) - defer tearDownCSV(t) test.DoTestPublishAndIndexReceiptsNonCanonical(t, db) }) @@ -138,7 +144,6 @@ func TestCSVFileIndexerNonCanonical(t *testing.T) { t.Run("Publish and index logs", func(t *testing.T) { setupCSVNonCanonical(t) dumpCSVFileData(t) - defer tearDownCSV(t) test.DoTestPublishAndIndexLogsNonCanonical(t, db) }) @@ -146,7 +151,6 @@ func TestCSVFileIndexerNonCanonical(t *testing.T) { t.Run("Publish and index state nodes", func(t *testing.T) { setupCSVNonCanonical(t) dumpCSVFileData(t) - defer tearDownCSV(t) test.DoTestPublishAndIndexStateNonCanonical(t, db) }) @@ -154,7 +158,6 @@ func TestCSVFileIndexerNonCanonical(t *testing.T) { t.Run("Publish and index storage nodes", func(t *testing.T) { setupCSVNonCanonical(t) dumpCSVFileData(t) - defer tearDownCSV(t) test.DoTestPublishAndIndexStorageNonCanonical(t, db) }) diff --git a/indexer/database/file/csv_writer.go b/indexer/database/file/csv_writer.go index a634906..ad8a034 100644 --- a/indexer/database/file/csv_writer.go +++ b/indexer/database/file/csv_writer.go @@ -36,22 +36,8 @@ import ( sdtypes "github.com/cerc-io/plugeth-statediff/types" ) -var ( - Tables = []*schema.Table{ - &schema.TableIPLDBlock, - &schema.TableNodeInfo, - &schema.TableHeader, - &schema.TableStateNode, - &schema.TableStorageNode, - &schema.TableUncle, - &schema.TableTransaction, - &schema.TableReceipt, - &schema.TableLog, - } -) - type tableRow struct { - table schema.Table + table *schema.Table values []interface{} } @@ -134,7 +120,7 @@ func NewCSVWriter(path string, watchedAddressesFilePath string, diff bool) (*CSV return nil, fmt.Errorf("unable to create directory '%s': %w", path, err) } - writers, err := makeFileWriters(path, Tables) + writers, err := makeFileWriters(path, schema.Tables) if err != nil { return nil, err } @@ -164,7 +150,7 @@ func (csw *CSVWriter) Loop() { for { select { case row := <-csw.rows: - err := csw.writers.write(&row.table, row.values...) + err := csw.writers.write(row.table, row.values...) if err != nil { panic(fmt.Sprintf("error writing csv buffer: %v", err)) } @@ -204,13 +190,13 @@ func (csw *CSVWriter) Close() error { func (csw *CSVWriter) upsertNode(node nodeinfo.Info) { var values []interface{} values = append(values, node.GenesisBlock, node.NetworkID, node.ID, node.ClientName, node.ChainID) - csw.rows <- tableRow{schema.TableNodeInfo, values} + csw.rows <- tableRow{&schema.TableNodeInfo, values} } func (csw *CSVWriter) upsertIPLD(ipld models.IPLDModel) { var values []interface{} values = append(values, ipld.BlockNumber, ipld.Key, ipld.Data) - csw.rows <- tableRow{schema.TableIPLDBlock, values} + csw.rows <- tableRow{&schema.TableIPLDBlock, values} } func (csw *CSVWriter) upsertIPLDDirect(blockNumber, key string, value []byte) { @@ -231,11 +217,25 @@ func (csw *CSVWriter) upsertIPLDNode(blockNumber string, i ipld.IPLD) { func (csw *CSVWriter) upsertHeaderCID(header models.HeaderModel) { var values []interface{} - values = append(values, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, - header.TotalDifficulty, header.NodeIDs, header.Reward, header.StateRoot, header.TxRoot, - header.RctRoot, header.UnclesHash, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.Coinbase, - header.Canonical) - csw.rows <- tableRow{schema.TableHeader, values} + values = append(values, + header.BlockNumber, + header.BlockHash, + header.ParentHash, + header.CID, + header.TotalDifficulty, + header.NodeIDs, + header.Reward, + header.StateRoot, + header.TxRoot, + header.RctRoot, + header.UnclesHash, + header.Bloom, + strconv.FormatUint(header.Timestamp, 10), + header.Coinbase, + header.Canonical, + header.WithdrawalsRoot, + ) + csw.rows <- tableRow{&schema.TableHeader, values} metrics.IndexerMetrics.BlocksCounter.Inc(1) } @@ -243,14 +243,14 @@ func (csw *CSVWriter) upsertUncleCID(uncle models.UncleModel) { var values []interface{} values = append(values, uncle.BlockNumber, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.Index) - csw.rows <- tableRow{schema.TableUncle, values} + csw.rows <- tableRow{&schema.TableUncle, values} } func (csw *CSVWriter) upsertTransactionCID(transaction models.TxModel) { var values []interface{} values = append(values, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.Type, transaction.Value) - csw.rows <- tableRow{schema.TableTransaction, values} + csw.rows <- tableRow{&schema.TableTransaction, values} metrics.IndexerMetrics.TransactionsCounter.Inc(1) } @@ -258,7 +258,7 @@ func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) { var values []interface{} values = append(values, rct.BlockNumber, rct.HeaderID, rct.TxID, rct.CID, rct.Contract, rct.PostState, rct.PostStatus) - csw.rows <- tableRow{schema.TableReceipt, values} + csw.rows <- tableRow{&schema.TableReceipt, values} metrics.IndexerMetrics.ReceiptsCounter.Inc(1) } @@ -267,11 +267,26 @@ func (csw *CSVWriter) upsertLogCID(logs []*models.LogsModel) { var values []interface{} values = append(values, l.BlockNumber, l.HeaderID, l.CID, l.ReceiptID, l.Address, l.Index, l.Topic0, l.Topic1, l.Topic2, l.Topic3) - csw.rows <- tableRow{schema.TableLog, values} + csw.rows <- tableRow{&schema.TableLog, values} metrics.IndexerMetrics.LogsCounter.Inc(1) } } +func (csw *CSVWriter) upsertWithdrawalCID(withdrawal models.WithdrawalModel) { + var values []interface{} + values = append(values, + withdrawal.BlockNumber, + withdrawal.HeaderID, + withdrawal.CID, + withdrawal.Index, + withdrawal.Validator, + withdrawal.Address, + withdrawal.Amount, + ) + csw.rows <- tableRow{&schema.TableWithdrawal, values} + metrics.IndexerMetrics.WithdrawalsCounter.Inc(1) +} + func (csw *CSVWriter) upsertStateCID(stateNode models.StateNodeModel) { balance := stateNode.Balance if stateNode.Removed { @@ -281,14 +296,14 @@ func (csw *CSVWriter) upsertStateCID(stateNode models.StateNodeModel) { var values []interface{} values = append(values, stateNode.BlockNumber, stateNode.HeaderID, stateNode.StateKey, stateNode.CID, csw.isDiff, balance, strconv.FormatUint(stateNode.Nonce, 10), stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed) - csw.rows <- tableRow{schema.TableStateNode, values} + csw.rows <- tableRow{&schema.TableStateNode, values} } func (csw *CSVWriter) upsertStorageCID(storageCID models.StorageNodeModel) { var values []interface{} values = append(values, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID, csw.isDiff, storageCID.Value, storageCID.Removed) - csw.rows <- tableRow{schema.TableStorageNode, values} + csw.rows <- tableRow{&schema.TableStorageNode, values} } // LoadWatchedAddresses loads watched addresses from a file diff --git a/indexer/database/file/indexer.go b/indexer/database/file/indexer.go index ccd3f0e..e6bfa52 100644 --- a/indexer/database/file/indexer.go +++ b/indexer/database/file/indexer.go @@ -84,7 +84,7 @@ func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config, nodeInf if _, err := os.Stat(outputDir); !errors.Is(err, os.ErrNotExist) { return nil, fmt.Errorf("cannot create output directory, directory (%s) already exists", outputDir) } - log.Info("Writing statediff CSV files to directory", "file", outputDir) + log.Info("Writing statediff CSV files", "directory", outputDir) if watchedAddressesFilePath == "" { watchedAddressesFilePath = defaultWatchedAddressesCSVFilePath @@ -156,7 +156,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip } // Generate the block iplds - txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts) + txNodes, rctNodes, logNodes, wdNodes, err := ipld.FromBlockAndReceipts(block, receipts) if err != nil { return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err) } @@ -197,15 +197,16 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String()) t = time.Now() - // write receipts and txs - err = sdi.processReceiptsAndTxs(processArgs{ + err = sdi.processObjects(processArgs{ headerID: headerID, blockNumber: block.Number(), receipts: receipts, txs: transactions, + withdrawals: block.Withdrawals(), rctNodes: rctNodes, txNodes: txNodes, logNodes: logNodes, + wdNodes: wdNodes, }) if err != nil { return nil, err @@ -222,7 +223,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip // it returns the headerID func (sdi *StateDiffIndexer) PushHeader(_ interfaces.Batch, header *types.Header, reward, td *big.Int) (string, error) { // Process the header - headerNode, err := ipld.NewEthHeader(header) + headerNode, err := ipld.EncodeHeader(header) if err != nil { return "", err } @@ -245,6 +246,7 @@ func (sdi *StateDiffIndexer) PushHeader(_ interfaces.Batch, header *types.Header Timestamp: header.Time, Coinbase: header.Coinbase.String(), Canonical: true, + WithdrawalsRoot: shared.MaybeStringHash(header.WithdrawalsHash), }) return headerID, nil } @@ -293,13 +295,15 @@ type processArgs struct { blockTime uint64 receipts types.Receipts txs types.Transactions - rctNodes []*ipld.EthReceipt - txNodes []*ipld.EthTx - logNodes [][]*ipld.EthLog + withdrawals types.Withdrawals + rctNodes []ipld.IPLD + txNodes []ipld.IPLD + logNodes [][]ipld.IPLD + wdNodes []ipld.IPLD } -// processReceiptsAndTxs writes receipt and tx IPLD insert SQL stmts to a file -func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { +// processObjects writes receipt and tx IPLD insert SQL stmts to a file +func (sdi *StateDiffIndexer) processObjects(args processArgs) error { // Process receipts and txs signer := types.MakeSigner(sdi.chainConfig, args.blockNumber, args.blockTime) for i, receipt := range args.receipts { @@ -376,6 +380,21 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { } sdi.fileWriter.upsertLogCID(logDataSet) } + // Process withdrawals + for i, wd := range args.withdrawals { + wdNode := args.wdNodes[i] + sdi.fileWriter.upsertIPLDNode(args.blockNumber.String(), wdNode) + wdModel := models.WithdrawalModel{ + BlockNumber: args.blockNumber.String(), + HeaderID: args.headerID, + CID: wdNode.Cid().String(), + Index: wd.Index, + Validator: wd.Validator, + Address: wd.Address.String(), + Amount: wd.Amount, + } + sdi.fileWriter.upsertWithdrawalCID(wdModel) + } return nil } diff --git a/indexer/database/file/interfaces.go b/indexer/database/file/interfaces.go index ba38954..9ae4a8b 100644 --- a/indexer/database/file/interfaces.go +++ b/indexer/database/file/interfaces.go @@ -41,6 +41,7 @@ type FileWriter interface { upsertTransactionCID(transaction models.TxModel) upsertReceiptCID(rct *models.ReceiptModel) upsertLogCID(logs []*models.LogsModel) + upsertWithdrawalCID(models.WithdrawalModel) upsertStateCID(stateNode models.StateNodeModel) upsertStorageCID(storageCID models.StorageNodeModel) upsertIPLD(ipld models.IPLDModel) diff --git a/indexer/database/file/sql_indexer_test.go b/indexer/database/file/sql_indexer_test.go index 688ae70..7f8a14e 100644 --- a/indexer/database/file/sql_indexer_test.go +++ b/indexer/database/file/sql_indexer_test.go @@ -93,6 +93,14 @@ func TestSQLFileIndexer(t *testing.T) { test.DoTestPublishAndIndexReceiptIPLDs(t, db) }) + t.Run("Publish and index withdrawal IPLDs in a single tx", func(t *testing.T) { + setup(t) + dumpFileData(t) + defer tearDown(t) + + test.DoTestPublishAndIndexWithdrawalIPLDs(t, db) + }) + t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) { setup(t) dumpFileData(t) diff --git a/indexer/database/file/sql_writer.go b/indexer/database/file/sql_writer.go index 5326b6f..b4430b0 100644 --- a/indexer/database/file/sql_writer.go +++ b/indexer/database/file/sql_writer.go @@ -32,6 +32,7 @@ import ( "github.com/cerc-io/plugeth-statediff/indexer/ipld" "github.com/cerc-io/plugeth-statediff/indexer/models" nodeinfo "github.com/cerc-io/plugeth-statediff/indexer/node" + "github.com/cerc-io/plugeth-statediff/indexer/shared/schema" "github.com/cerc-io/plugeth-statediff/types" ) @@ -145,8 +146,8 @@ const ( ipldInsert = "INSERT INTO ipld.blocks (block_number, key, data) VALUES ('%s', '%s', '\\x%x');\n" headerInsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, " + - "state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase, canonical) VALUES " + - "('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %t);\n" + "state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase, canonical, withdrawals_root) VALUES " + + "('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %t, '%s');\n" uncleInsert = "INSERT INTO eth.uncle_cids (block_number, block_hash, header_id, parent_hash, cid, reward, index) VALUES " + "('%s', '%s', '%s', '%s', '%s', '%s', %d);\n" @@ -167,6 +168,10 @@ const ( "removed, diff, val) VALUES ('%s', '%s', '%s', '%s', '%s', %t, %t, '\\x%x');\n" ) +var ( + withdrawalsInsert = schema.TableWithdrawal.FmtStringInsert() + ";\n" +) + func (sqw *SQLWriter) upsertNode(node nodeinfo.Info) { sqw.stmts <- []byte(fmt.Sprintf(nodeInsert, node.GenesisBlock, node.NetworkID, node.ID, node.ClientName, node.ChainID)) } @@ -192,9 +197,24 @@ func (sqw *SQLWriter) upsertIPLDNode(blockNumber string, i ipld.IPLD) { } func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) { - stmt := fmt.Sprintf(headerInsert, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, - header.TotalDifficulty, formatPostgresStringArray(header.NodeIDs), header.Reward, header.StateRoot, header.TxRoot, - header.RctRoot, header.UnclesHash, header.Bloom, header.Timestamp, header.Coinbase, header.Canonical) + stmt := fmt.Sprintf(headerInsert, + header.BlockNumber, + header.BlockHash, + header.ParentHash, + header.CID, + header.TotalDifficulty, + formatPostgresStringArray(header.NodeIDs), + header.Reward, + header.StateRoot, + header.TxRoot, + header.RctRoot, + header.UnclesHash, + header.Bloom, + header.Timestamp, + header.Coinbase, + header.Canonical, + header.WithdrawalsRoot, + ) sqw.stmts <- []byte(stmt) metrics.IndexerMetrics.BlocksCounter.Inc(1) } @@ -224,6 +244,19 @@ func (sqw *SQLWriter) upsertLogCID(logs []*models.LogsModel) { } } +func (sqw *SQLWriter) upsertWithdrawalCID(withdrawal models.WithdrawalModel) { + sqw.stmts <- []byte(fmt.Sprintf(withdrawalsInsert, + withdrawal.BlockNumber, + withdrawal.HeaderID, + withdrawal.CID, + withdrawal.Index, + withdrawal.Validator, + withdrawal.Address, + withdrawal.Amount, + )) + metrics.IndexerMetrics.WithdrawalsCounter.Inc(1) +} + func (sqw *SQLWriter) upsertStateCID(stateNode models.StateNodeModel) { balance := stateNode.Balance if stateNode.Removed { diff --git a/indexer/database/metrics/metrics.go b/indexer/database/metrics/metrics.go index 92e56e6..97527aa 100644 --- a/indexer/database/metrics/metrics.go +++ b/indexer/database/metrics/metrics.go @@ -56,6 +56,8 @@ type IndexerMetricsHandles struct { ReceiptsCounter metrics.Counter // The total number of processed logs LogsCounter metrics.Counter + // The total number of processed logs + WithdrawalsCounter metrics.Counter // The total number of access list entries processed AccessListEntriesCounter metrics.Counter // Time spent waiting for free postgres tx @@ -90,6 +92,7 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles { TransactionsCounter: metrics.NewCounter(), ReceiptsCounter: metrics.NewCounter(), LogsCounter: metrics.NewCounter(), + WithdrawalsCounter: metrics.NewCounter(), AccessListEntriesCounter: metrics.NewCounter(), FreePostgresTimer: metrics.NewTimer(), PostgresCommitTimer: metrics.NewTimer(), @@ -113,6 +116,7 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles { reg.Register(metricName(subsys, "transactions"), ctx.TransactionsCounter) reg.Register(metricName(subsys, "receipts"), ctx.ReceiptsCounter) reg.Register(metricName(subsys, "logs"), ctx.LogsCounter) + reg.Register(metricName(subsys, "withdrawals"), ctx.WithdrawalsCounter) reg.Register(metricName(subsys, "access_list_entries"), ctx.AccessListEntriesCounter) reg.Register(metricName(subsys, "t_free_postgres"), ctx.FreePostgresTimer) reg.Register(metricName(subsys, "t_postgres_commit"), ctx.PostgresCommitTimer) diff --git a/indexer/database/sql/indexer.go b/indexer/database/sql/indexer.go index 8ac4a98..776e920 100644 --- a/indexer/database/sql/indexer.go +++ b/indexer/database/sql/indexer.go @@ -105,7 +105,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip } // Generate the block iplds - txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts) + txNodes, rctNodes, logNodes, wdNodes, err := ipld.FromBlockAndReceipts(block, receipts) if err != nil { return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %w", err) } @@ -148,16 +148,18 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip } metrics2.IndexerMetrics.UncleProcessingTimer.Update(time.Since(t)) t = time.Now() - // Publish and index receipts and txs - err = sdi.processReceiptsAndTxs(batch, processArgs{ + + err = sdi.processObjects(batch, processArgs{ headerID: headerID, blockNumber: block.Number(), blockTime: block.Time(), receipts: receipts, + withdrawals: block.Withdrawals(), txs: transactions, rctNodes: rctNodes, txNodes: txNodes, logNodes: logNodes, + wdNodes: wdNodes, }) if err != nil { return nil, err @@ -185,7 +187,7 @@ func (sdi *StateDiffIndexer) PushHeader(batch interfaces.Batch, header *types.He return "", fmt.Errorf("sql: batch is expected to be of type %T, got %T", &BatchTx{}, batch) } // Process the header - headerNode, err := ipld.NewEthHeader(header) + headerNode, err := ipld.EncodeHeader(header) if err != nil { return "", err } @@ -208,6 +210,7 @@ func (sdi *StateDiffIndexer) PushHeader(batch interfaces.Batch, header *types.He Timestamp: header.Time, Coinbase: header.Coinbase.String(), Canonical: true, + WithdrawalsRoot: shared.MaybeStringHash(header.WithdrawalsHash), }) } @@ -258,13 +261,15 @@ type processArgs struct { blockTime uint64 receipts types.Receipts txs types.Transactions - rctNodes []*ipld.EthReceipt - txNodes []*ipld.EthTx - logNodes [][]*ipld.EthLog + withdrawals types.Withdrawals + rctNodes []ipld.IPLD + txNodes []ipld.IPLD + logNodes [][]ipld.IPLD + wdNodes []ipld.IPLD } -// processReceiptsAndTxs publishes and indexes receipt and transaction IPLDs in Postgres -func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs) error { +// processObjects publishes and indexes receipt and transaction IPLDs in Postgres +func (sdi *StateDiffIndexer) processObjects(tx *BatchTx, args processArgs) error { // Process receipts and txs signer := types.MakeSigner(sdi.chainConfig, args.blockNumber, args.blockTime) for i, receipt := range args.receipts { @@ -348,7 +353,23 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs return err } } - + // Process withdrawals + for i, withdrawal := range args.withdrawals { + wdNode := args.wdNodes[i] + tx.cacheIPLD(wdNode) + wdModel := models.WithdrawalModel{ + BlockNumber: args.blockNumber.String(), + HeaderID: args.headerID, + CID: wdNode.Cid().String(), + Index: withdrawal.Index, + Validator: withdrawal.Validator, + Address: withdrawal.Address.String(), + Amount: withdrawal.Amount, + } + if err := sdi.dbWriter.upsertWithdrawalCID(tx.dbtx, wdModel); err != nil { + return err + } + } return nil } diff --git a/indexer/database/sql/interfaces.go b/indexer/database/sql/interfaces.go index 845f603..e8c732b 100644 --- a/indexer/database/sql/interfaces.go +++ b/indexer/database/sql/interfaces.go @@ -54,6 +54,7 @@ type Statements interface { InsertTxStm() string InsertRctStm() string InsertLogStm() string + InsertWithdrawalStm() string InsertStateStm() string InsertStorageStm() string InsertIPLDStm() string diff --git a/indexer/database/sql/pgx_indexer_test.go b/indexer/database/sql/pgx_indexer_test.go index 944d682..c27fd8e 100644 --- a/indexer/database/sql/pgx_indexer_test.go +++ b/indexer/database/sql/pgx_indexer_test.go @@ -96,6 +96,14 @@ func TestPGXIndexer(t *testing.T) { test.DoTestPublishAndIndexReceiptIPLDs(t, db) }) + t.Run("Publish and index withdrawal IPLDs in a single tx", func(t *testing.T) { + setupPGX(t) + defer tearDown(t) + defer checkTxClosure(t, 1, 0, 1) + + test.DoTestPublishAndIndexWithdrawalIPLDs(t, db) + }) + t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) { setupPGX(t) defer tearDown(t) diff --git a/indexer/database/sql/postgres/database.go b/indexer/database/sql/postgres/database.go index f73b882..0249778 100644 --- a/indexer/database/sql/postgres/database.go +++ b/indexer/database/sql/postgres/database.go @@ -18,6 +18,7 @@ package postgres import ( "fmt" + "strings" "github.com/cerc-io/plugeth-statediff/indexer/database/sql" "github.com/cerc-io/plugeth-statediff/indexer/shared/schema" @@ -43,7 +44,9 @@ type DB struct { // MaxHeaderStm satisfies the sql.Statements interface func (db *DB) MaxHeaderStm() string { - return fmt.Sprintf("SELECT block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase FROM %s ORDER BY block_number DESC LIMIT 1", schema.TableHeader.Name) + return fmt.Sprintf("SELECT %s FROM %s ORDER BY block_number DESC LIMIT 1", + strings.Join(schema.TableHeader.ColumnNames(), ","), + schema.TableHeader.Name) } // ExistsHeaderStm satisfies the sql.Statements interface @@ -59,7 +62,7 @@ func (db *DB) DetectGapsStm() string { // InsertHeaderStm satisfies the sql.Statements interface // Stm == Statement func (db *DB) InsertHeaderStm() string { - return schema.TableHeader.ToInsertStatement(db.upsert) + return schema.TableHeader.PreparedInsert(db.upsert) } // SetCanonicalHeaderStm satisfies the sql.Statements interface @@ -70,37 +73,42 @@ func (db *DB) SetCanonicalHeaderStm() string { // InsertUncleStm satisfies the sql.Statements interface func (db *DB) InsertUncleStm() string { - return schema.TableUncle.ToInsertStatement(db.upsert) + return schema.TableUncle.PreparedInsert(db.upsert) } // InsertTxStm satisfies the sql.Statements interface func (db *DB) InsertTxStm() string { - return schema.TableTransaction.ToInsertStatement(db.upsert) + return schema.TableTransaction.PreparedInsert(db.upsert) } // InsertRctStm satisfies the sql.Statements interface func (db *DB) InsertRctStm() string { - return schema.TableReceipt.ToInsertStatement(db.upsert) + return schema.TableReceipt.PreparedInsert(db.upsert) } // InsertLogStm satisfies the sql.Statements interface func (db *DB) InsertLogStm() string { - return schema.TableLog.ToInsertStatement(db.upsert) + return schema.TableLog.PreparedInsert(db.upsert) +} + +// InsertLogStm satisfies the sql.Statements interface +func (db *DB) InsertWithdrawalStm() string { + return schema.TableWithdrawal.PreparedInsert(db.upsert) } // InsertStateStm satisfies the sql.Statements interface func (db *DB) InsertStateStm() string { - return schema.TableStateNode.ToInsertStatement(db.upsert) + return schema.TableStateNode.PreparedInsert(db.upsert) } // InsertStorageStm satisfies the sql.Statements interface func (db *DB) InsertStorageStm() string { - return schema.TableStorageNode.ToInsertStatement(db.upsert) + return schema.TableStorageNode.PreparedInsert(db.upsert) } // InsertIPLDStm satisfies the sql.Statements interface func (db *DB) InsertIPLDStm() string { - return schema.TableIPLDBlock.ToInsertStatement(db.upsert) + return schema.TableIPLDBlock.PreparedInsert(db.upsert) } // InsertIPLDsStm satisfies the sql.Statements interface diff --git a/indexer/database/sql/sqlx_indexer_test.go b/indexer/database/sql/sqlx_indexer_test.go index fb099ef..81917ee 100644 --- a/indexer/database/sql/sqlx_indexer_test.go +++ b/indexer/database/sql/sqlx_indexer_test.go @@ -82,6 +82,14 @@ func TestSQLXIndexer(t *testing.T) { test.DoTestPublishAndIndexReceiptIPLDs(t, db) }) + t.Run("Publish and index withdrawal IPLDs in a single tx", func(t *testing.T) { + setupSQLX(t) + defer tearDown(t) + defer checkTxClosure(t, 0, 0, 0) + + test.DoTestPublishAndIndexWithdrawalIPLDs(t, db) + }) + t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) { setupSQLX(t) defer tearDown(t) diff --git a/indexer/database/sql/writer.go b/indexer/database/sql/writer.go index 752761b..3373166 100644 --- a/indexer/database/sql/writer.go +++ b/indexer/database/sql/writer.go @@ -95,6 +95,7 @@ func (w *Writer) maxHeader() (*models.HeaderModel, error) { &model.Timestamp, &model.Coinbase, &model.Canonical, + &model.WithdrawalsRoot, ) model.BlockNumber = strconv.FormatUint(number, 10) model.TotalDifficulty = strconv.FormatUint(td, 10) @@ -125,6 +126,7 @@ func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error { header.Timestamp, header.Coinbase, header.Canonical, + header.WithdrawalsRoot, ) if err != nil { return insertError{"eth.header_cids", err, w.db.InsertHeaderStm(), header} @@ -286,6 +288,41 @@ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { return nil } +func (w *Writer) upsertWithdrawalCID(tx Tx, withdrawal models.WithdrawalModel) error { + if w.useCopyForTx(tx) { + blockNum, err := strconv.ParseUint(withdrawal.BlockNumber, 10, 64) + if err != nil { + return insertError{"eth.withdrawal_cids", err, "COPY", withdrawal} + } + + _, err = tx.CopyFrom(w.db.Context(), schema.TableWithdrawal.TableName(), schema.TableWithdrawal.ColumnNames(), + toRows(toRow(blockNum, + withdrawal.HeaderID, + withdrawal.CID, + withdrawal.Index, + withdrawal.Validator, + withdrawal.Address, + withdrawal.Amount))) + if err != nil { + return insertError{"eth.withdrawal_cids", err, "COPY", withdrawal} + } + } else { + _, err := tx.Exec(w.db.Context(), w.db.InsertWithdrawalStm(), + withdrawal.BlockNumber, + withdrawal.HeaderID, + withdrawal.CID, + withdrawal.Index, + withdrawal.Validator, + withdrawal.Address, + withdrawal.Amount) + if err != nil { + return insertError{"eth.withdrawal_cids", err, w.db.InsertWithdrawalStm(), withdrawal} + } + } + metrics.IndexerMetrics.WithdrawalsCounter.Inc(1) + return nil +} + /* INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, removed, diff, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT (header_id, state_leaf_key, block_number) DO NOTHING diff --git a/indexer/ipld/encode.go b/indexer/ipld/encode.go new file mode 100644 index 0000000..8108f8e --- /dev/null +++ b/indexer/ipld/encode.go @@ -0,0 +1,102 @@ +// VulcanizeDB +// Copyright © 2024 Vulcanize + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package ipld + +import ( + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rlp" + mh "github.com/multiformats/go-multihash" +) + +// EncodeHeader converts a *types.Header into an IPLD node +func EncodeHeader(header *types.Header) (IPLD, error) { + headerRLP, err := rlp.EncodeToBytes(header) + if err != nil { + return nil, err + } + c, err := RawdataToCid(MEthHeader, headerRLP, mh.KECCAK_256) + if err != nil { + return nil, err + } + return &node{ + cid: c, + rawdata: headerRLP, + }, nil +} + +// encodeTx converts a *types.Transaction to an IPLD node +func encodeTx(tx *types.Transaction) (IPLD, error) { + txRaw, err := tx.MarshalBinary() + if err != nil { + return nil, err + } + c, err := RawdataToCid(MEthTx, txRaw, mh.KECCAK_256) + if err != nil { + return nil, err + } + return &node{ + cid: c, + rawdata: txRaw, + }, nil +} + +// encodeReceipt converts a types.Receipt to an IPLD node +func encodeReceipt(receipt *types.Receipt) (IPLD, error) { + rctRaw, err := receipt.MarshalBinary() + if err != nil { + return nil, err + } + c, err := RawdataToCid(MEthTxReceipt, rctRaw, mh.KECCAK_256) + if err != nil { + return nil, err + } + return &node{ + cid: c, + rawdata: rctRaw, + }, nil +} + +// encodeLog converts a Log to an IPLD node +func encodeLog(log *types.Log) (IPLD, error) { + logRaw, err := rlp.EncodeToBytes(log) + if err != nil { + return nil, err + } + c, err := RawdataToCid(MEthLog, logRaw, mh.KECCAK_256) + if err != nil { + return nil, err + } + return &node{ + cid: c, + rawdata: logRaw, + }, nil +} + +func encodeWithdrawal(w *types.Withdrawal) (IPLD, error) { + wRaw, err := rlp.EncodeToBytes(w) + if err != nil { + return nil, err + } + c, err := RawdataToCid(MEthWithdrawal, wRaw, mh.KECCAK_256) + if err != nil { + return nil, err + } + return &node{ + cid: c, + rawdata: wRaw, + }, nil +} diff --git a/indexer/ipld/eth_header.go b/indexer/ipld/eth_header.go deleted file mode 100644 index d71ea4d..0000000 --- a/indexer/ipld/eth_header.go +++ /dev/null @@ -1,60 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package ipld - -import ( - "github.com/ipfs/go-cid" - mh "github.com/multiformats/go-multihash" - - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/rlp" -) - -// EthHeader (eth-block, codec 0x90), represents an ethereum block header -type EthHeader struct { - cid cid.Cid - rawdata []byte -} - -// Static (compile time) check that EthHeader satisfies the node.Node interface. -var _ IPLD = (*EthHeader)(nil) - -// NewEthHeader converts a *types.Header into an EthHeader IPLD node -func NewEthHeader(header *types.Header) (*EthHeader, error) { - headerRLP, err := rlp.EncodeToBytes(header) - if err != nil { - return nil, err - } - c, err := RawdataToCid(MEthHeader, headerRLP, mh.KECCAK_256) - if err != nil { - return nil, err - } - return &EthHeader{ - cid: c, - rawdata: headerRLP, - }, nil -} - -// RawData returns the binary of the RLP encode of the block header. -func (b *EthHeader) RawData() []byte { - return b.rawdata -} - -// Cid returns the cid of the block header. -func (b *EthHeader) Cid() cid.Cid { - return b.cid -} diff --git a/indexer/ipld/eth_log.go b/indexer/ipld/eth_log.go deleted file mode 100644 index 71db98a..0000000 --- a/indexer/ipld/eth_log.go +++ /dev/null @@ -1,43 +0,0 @@ -package ipld - -import ( - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/rlp" - "github.com/ipfs/go-cid" - mh "github.com/multiformats/go-multihash" -) - -// EthLog (eth-log, codec 0x9a), represents an ethereum block header -type EthLog struct { - rawData []byte - cid cid.Cid -} - -// Static (compile time) check that EthLog satisfies the node.Node interface. -var _ IPLD = (*EthLog)(nil) - -// NewLog create a new EthLog IPLD node -func NewLog(log *types.Log) (*EthLog, error) { - logRaw, err := rlp.EncodeToBytes(log) - if err != nil { - return nil, err - } - c, err := RawdataToCid(MEthLog, logRaw, mh.KECCAK_256) - if err != nil { - return nil, err - } - return &EthLog{ - cid: c, - rawData: logRaw, - }, nil -} - -// RawData returns the binary of the RLP encode of the log. -func (l *EthLog) RawData() []byte { - return l.rawData -} - -// Cid returns the cid of the receipt log. -func (l *EthLog) Cid() cid.Cid { - return l.cid -} diff --git a/indexer/ipld/eth_parser.go b/indexer/ipld/eth_parser.go index 5ec8bf9..c247df6 100644 --- a/indexer/ipld/eth_parser.go +++ b/indexer/ipld/eth_parser.go @@ -22,25 +22,29 @@ import ( // FromBlockAndReceipts takes a block and processes it // to return it a set of IPLD nodes for further processing. -func FromBlockAndReceipts(block *types.Block, receipts []*types.Receipt) ([]*EthTx, []*EthReceipt, [][]*EthLog, error) { +func FromBlockAndReceipts(block *types.Block, receipts []*types.Receipt) ([]IPLD, []IPLD, [][]IPLD, []IPLD, error) { // Process the txs txNodes, err := processTransactions(block.Transactions()) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err + } + withdrawalNodes, err := processWithdrawals(block.Withdrawals()) + if err != nil { + return nil, nil, nil, nil, err } // Process the receipts and logs rctNodes, logNodes, err := processReceiptsAndLogs(receipts) - return txNodes, rctNodes, logNodes, err + return txNodes, rctNodes, logNodes, withdrawalNodes, err } // processTransactions will take the found transactions in a parsed block body // to return IPLD node slices for eth-tx -func processTransactions(txs []*types.Transaction) ([]*EthTx, error) { - var ethTxNodes []*EthTx +func processTransactions(txs []*types.Transaction) ([]IPLD, error) { + var ethTxNodes []IPLD for _, tx := range txs { - ethTx, err := NewEthTx(tx) + ethTx, err := encodeTx(tx) if err != nil { return nil, err } @@ -50,12 +54,25 @@ func processTransactions(txs []*types.Transaction) ([]*EthTx, error) { return ethTxNodes, nil } +func processWithdrawals(withdrawals []*types.Withdrawal) ([]IPLD, error) { + var withdrawalNodes []IPLD + for _, withdrawal := range withdrawals { + ethW, err := encodeWithdrawal(withdrawal) + if err != nil { + return nil, err + } + withdrawalNodes = append(withdrawalNodes, ethW) + } + + return withdrawalNodes, nil +} + // processReceiptsAndLogs will take in receipts // to return IPLD node slices for eth-rct and eth-log -func processReceiptsAndLogs(rcts []*types.Receipt) ([]*EthReceipt, [][]*EthLog, error) { +func processReceiptsAndLogs(rcts []*types.Receipt) ([]IPLD, [][]IPLD, error) { // Pre allocating memory. - ethRctNodes := make([]*EthReceipt, len(rcts)) - ethLogNodes := make([][]*EthLog, len(rcts)) + ethRctNodes := make([]IPLD, len(rcts)) + ethLogNodes := make([][]IPLD, len(rcts)) for idx, rct := range rcts { logNodes, err := processLogs(rct.Logs) @@ -63,7 +80,7 @@ func processReceiptsAndLogs(rcts []*types.Receipt) ([]*EthReceipt, [][]*EthLog, return nil, nil, err } - ethRct, err := NewReceipt(rct) + ethRct, err := encodeReceipt(rct) if err != nil { return nil, nil, err } @@ -75,10 +92,10 @@ func processReceiptsAndLogs(rcts []*types.Receipt) ([]*EthReceipt, [][]*EthLog, return ethRctNodes, ethLogNodes, nil } -func processLogs(logs []*types.Log) ([]*EthLog, error) { - logNodes := make([]*EthLog, len(logs)) +func processLogs(logs []*types.Log) ([]IPLD, error) { + logNodes := make([]IPLD, len(logs)) for idx, log := range logs { - logNode, err := NewLog(log) + logNode, err := encodeLog(log) if err != nil { return nil, err } diff --git a/indexer/ipld/eth_parser_test.go b/indexer/ipld/eth_parser_test.go index 8deb260..fd44058 100644 --- a/indexer/ipld/eth_parser_test.go +++ b/indexer/ipld/eth_parser_test.go @@ -92,7 +92,7 @@ func loadBlockData(t *testing.T) []testCase { func TestFromBlockAndReceipts(t *testing.T) { testCases := loadBlockData(t) for _, tc := range testCases { - _, _, _, err := FromBlockAndReceipts(tc.block, tc.receipts) + _, _, _, _, err := FromBlockAndReceipts(tc.block, tc.receipts) if err != nil { t.Fatalf("error generating IPLDs from block and receipts, err %v, kind %s, block hash %s", err, tc.kind, tc.block.Hash()) } diff --git a/indexer/ipld/eth_receipt.go b/indexer/ipld/eth_receipt.go deleted file mode 100644 index eac2ba6..0000000 --- a/indexer/ipld/eth_receipt.go +++ /dev/null @@ -1,58 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package ipld - -import ( - "github.com/ipfs/go-cid" - mh "github.com/multiformats/go-multihash" - - "github.com/ethereum/go-ethereum/core/types" -) - -type EthReceipt struct { - rawdata []byte - cid cid.Cid -} - -// Static (compile time) check that EthReceipt satisfies the node.Node interface. -var _ IPLD = (*EthReceipt)(nil) - -// NewReceipt converts a types.ReceiptForStorage to an EthReceipt IPLD node -func NewReceipt(receipt *types.Receipt) (*EthReceipt, error) { - rctRaw, err := receipt.MarshalBinary() - if err != nil { - return nil, err - } - c, err := RawdataToCid(MEthTxReceipt, rctRaw, mh.KECCAK_256) - if err != nil { - return nil, err - } - return &EthReceipt{ - cid: c, - rawdata: rctRaw, - }, nil -} - -// RawData returns the binary of the RLP encode of the receipt. -func (r *EthReceipt) RawData() []byte { - return r.rawdata -} - -// Cid returns the cid of the receipt. -func (r *EthReceipt) Cid() cid.Cid { - return r.cid -} diff --git a/indexer/ipld/eth_tx.go b/indexer/ipld/eth_tx.go deleted file mode 100644 index ca5fe65..0000000 --- a/indexer/ipld/eth_tx.go +++ /dev/null @@ -1,59 +0,0 @@ -// VulcanizeDB -// Copyright © 2019 Vulcanize - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. - -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -package ipld - -import ( - "github.com/ipfs/go-cid" - mh "github.com/multiformats/go-multihash" - - "github.com/ethereum/go-ethereum/core/types" -) - -// EthTx (eth-tx codec 0x93) represents an ethereum transaction -type EthTx struct { - cid cid.Cid - rawdata []byte -} - -// Static (compile time) check that EthTx satisfies the node.Node interface. -var _ IPLD = (*EthTx)(nil) - -// NewEthTx converts a *types.Transaction to an EthTx IPLD node -func NewEthTx(tx *types.Transaction) (*EthTx, error) { - txRaw, err := tx.MarshalBinary() - if err != nil { - return nil, err - } - c, err := RawdataToCid(MEthTx, txRaw, mh.KECCAK_256) - if err != nil { - return nil, err - } - return &EthTx{ - cid: c, - rawdata: txRaw, - }, nil -} - -// RawData returns the binary of the RLP encode of the transaction. -func (t *EthTx) RawData() []byte { - return t.rawdata -} - -// Cid returns the cid of the transaction. -func (t *EthTx) Cid() cid.Cid { - return t.cid -} diff --git a/indexer/ipld/interface.go b/indexer/ipld/interface.go index 73a4bed..3909c63 100644 --- a/indexer/ipld/interface.go +++ b/indexer/ipld/interface.go @@ -2,7 +2,25 @@ package ipld import "github.com/ipfs/go-cid" +// Check that node satisfies the IPLD Node interface. +var _ IPLD = (*node)(nil) + +type node struct { + cid cid.Cid + rawdata []byte +} + type IPLD interface { Cid() cid.Cid RawData() []byte } + +// RawData returns the RLP encoded bytes of the node. +func (b node) RawData() []byte { + return b.rawdata +} + +// Cid returns the CID of the node. +func (b node) Cid() cid.Cid { + return b.cid +} diff --git a/indexer/ipld/shared.go b/indexer/ipld/shared.go index 7758f32..9d82e12 100644 --- a/indexer/ipld/shared.go +++ b/indexer/ipld/shared.go @@ -37,6 +37,7 @@ const ( MEthStorageTrie = 0x98 MEthLogTrie = 0x99 MEthLog = 0x9a + MEthWithdrawal = 0x9b // TODO add to multicodec registry ) // RawdataToCid takes the desired codec and a slice of bytes diff --git a/indexer/mocks/test_data.go b/indexer/mocks/test_data.go index b940207..39ab197 100644 --- a/indexer/mocks/test_data.go +++ b/indexer/mocks/test_data.go @@ -19,8 +19,8 @@ package mocks import ( "crypto/ecdsa" "crypto/elliptic" - "crypto/rand" "math/big" + "math/rand" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -39,6 +39,9 @@ import ( // Test variables var ( + // RNG for deterministically generated keys + rng = rand.New(rand.NewSource(0)) + // block data TestChainConfig = params.MainnetChainConfig BlockNumber = TestChainConfig.LondonBlock @@ -58,15 +61,19 @@ var ( Coinbase: common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476777"), } MockTransactions, MockReceipts, SenderAddr = createTransactionsAndReceipts(TestChainConfig, BlockNumber, BlockTime) - MockBlock = types.NewBlock(&MockHeader, MockTransactions, nil, MockReceipts, trie.NewEmpty(nil)) - MockHeaderRlp, _ = rlp.EncodeToBytes(MockBlock.Header()) + MockWithdrawals = types.Withdrawals{ + {Index: 0, Validator: 1, Address: Address, Amount: 1000000000}, + {Index: 1, Validator: 5, Address: AnotherAddress, Amount: 2000000000}, + } + MockBlock = types.NewBlockWithWithdrawals(&MockHeader, MockTransactions, nil, MockReceipts, MockWithdrawals, trie.NewEmpty(nil)) + MockHeaderRlp, _ = rlp.EncodeToBytes(MockBlock.Header()) // non-canonical block at London height // includes 2nd and 5th transactions from the canonical block MockNonCanonicalHeader = MockHeader MockNonCanonicalBlockTransactions = types.Transactions{MockTransactions[1], MockTransactions[4]} MockNonCanonicalBlockReceipts = createNonCanonicalBlockReceipts(TestChainConfig, BlockNumber, BlockTime, MockNonCanonicalBlockTransactions) - MockNonCanonicalBlock = types.NewBlock(&MockNonCanonicalHeader, MockNonCanonicalBlockTransactions, nil, MockNonCanonicalBlockReceipts, trie.NewEmpty(nil)) + MockNonCanonicalBlock = types.NewBlockWithWithdrawals(&MockNonCanonicalHeader, MockNonCanonicalBlockTransactions, nil, MockNonCanonicalBlockReceipts, MockWithdrawals[:1], trie.NewEmpty(nil)) MockNonCanonicalHeaderRlp, _ = rlp.EncodeToBytes(MockNonCanonicalBlock.Header()) // non-canonical block at London height + 1 @@ -86,7 +93,7 @@ var ( } MockNonCanonicalBlock2Transactions = types.Transactions{MockTransactions[2], MockTransactions[4]} MockNonCanonicalBlock2Receipts = createNonCanonicalBlockReceipts(TestChainConfig, Block2Number, BlockTime, MockNonCanonicalBlock2Transactions) - MockNonCanonicalBlock2 = types.NewBlock(&MockNonCanonicalHeader2, MockNonCanonicalBlock2Transactions, nil, MockNonCanonicalBlock2Receipts, trie.NewEmpty(nil)) + MockNonCanonicalBlock2 = types.NewBlockWithWithdrawals(&MockNonCanonicalHeader2, MockNonCanonicalBlock2Transactions, nil, MockNonCanonicalBlock2Receipts, types.Withdrawals{}, trie.NewEmpty(nil)) MockNonCanonicalHeader2Rlp, _ = rlp.EncodeToBytes(MockNonCanonicalBlock2.Header()) Address = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476592") @@ -348,7 +355,10 @@ func NewLegacyData(config *params.ChainConfig) *LegacyData { mockTransactions, mockReceipts, senderAddr := createLegacyTransactionsAndReceipts(config, blockNumber) mockBlock := types.NewBlock(&mockHeader, mockTransactions, nil, mockReceipts, trie.NewEmpty(nil)) - mockHeaderRlp, _ := rlp.EncodeToBytes(mockBlock.Header()) + mockHeaderRlp, err := rlp.EncodeToBytes(mockBlock.Header()) + if err != nil { + panic(err) + } contractAddress := crypto.CreateAddress(senderAddr, mockTransactions[2].Nonce()) return &LegacyData{ @@ -388,7 +398,7 @@ func createLegacyTransactionsAndReceipts(config *params.ChainConfig, blockNumber blockTime := uint64(0) transactionSigner := types.MakeSigner(config, blockNumber, blockTime) mockCurve := elliptic.P256() - mockPrvKey, err := ecdsa.GenerateKey(mockCurve, rand.Reader) + mockPrvKey, err := ecdsa.GenerateKey(mockCurve, rng) if err != nil { log.Crit(err.Error()) } @@ -460,7 +470,7 @@ func createTransactionsAndReceipts(config *params.ChainConfig, blockNumber *big. transactionSigner := types.MakeSigner(config, blockNumber, blockTime) mockCurve := elliptic.P256() - mockPrvKey, err := ecdsa.GenerateKey(mockCurve, rand.Reader) + mockPrvKey, err := ecdsa.GenerateKey(mockCurve, rng) if err != nil { log.Crit(err.Error()) } @@ -524,7 +534,7 @@ func createTransactionsAndReceipts(config *params.ChainConfig, blockNumber *big. func createNonCanonicalBlockReceipts(config *params.ChainConfig, blockNumber *big.Int, blockTime uint64, transactions types.Transactions) types.Receipts { transactionSigner := types.MakeSigner(config, blockNumber, blockTime) mockCurve := elliptic.P256() - mockPrvKey, err := ecdsa.GenerateKey(mockCurve, rand.Reader) + mockPrvKey, err := ecdsa.GenerateKey(mockCurve, rng) if err != nil { log.Crit(err.Error()) } diff --git a/indexer/models/models.go b/indexer/models/models.go index 0fcc964..124b431 100644 --- a/indexer/models/models.go +++ b/indexer/models/models.go @@ -42,6 +42,7 @@ type HeaderModel struct { Timestamp uint64 `db:"timestamp"` Coinbase string `db:"coinbase"` Canonical bool `db:"canonical"` + WithdrawalsRoot string `db:"withdrawals_root"` } // UncleModel is the db model for eth.uncle_cids @@ -105,7 +106,7 @@ type StorageNodeModel struct { Value []byte `db:"val"` } -// LogsModel is the db model for eth.logs +// LogsModel is the db model for eth.log_cids type LogsModel struct { BlockNumber string `db:"block_number"` HeaderID string `db:"header_id"` @@ -118,3 +119,14 @@ type LogsModel struct { Topic2 string `db:"topic2"` Topic3 string `db:"topic3"` } + +// WithdrawalModel is the db model for eth.withdrawal_cids +type WithdrawalModel struct { + BlockNumber string `db:"block_number"` + HeaderID string `db:"header_id"` + CID string `db:"cid"` + Index uint64 `db:"index"` + Validator uint64 `db:"validator"` + Address string `db:"address"` + Amount uint64 `db:"amount"` +} diff --git a/indexer/shared/functions.go b/indexer/shared/functions.go index 58306bd..1797fc2 100644 --- a/indexer/shared/functions.go +++ b/indexer/shared/functions.go @@ -35,3 +35,12 @@ func HandleZeroAddr(to common.Address) string { } return to.String() } + +// MaybeStringHash calls String on its argument and returns a pointer to the result. +// When passed nil, it returns nil. +func MaybeStringHash(hash *common.Hash) string { + if hash == nil { + return "" + } + return hash.String() +} diff --git a/indexer/shared/schema/schema.go b/indexer/shared/schema/schema.go index 1fbc54e..4db2baa 100644 --- a/indexer/shared/schema/schema.go +++ b/indexer/shared/schema/schema.go @@ -16,6 +16,19 @@ package schema +var Tables = []*Table{ + &TableIPLDBlock, + &TableNodeInfo, + &TableHeader, + &TableStateNode, + &TableStorageNode, + &TableUncle, + &TableTransaction, + &TableReceipt, + &TableLog, + &TableWithdrawal, +} + var TableIPLDBlock = Table{ Name: `ipld.blocks`, Columns: []Column{ @@ -52,9 +65,10 @@ var TableHeader = Table{ {Name: "receipt_root", Type: Dvarchar}, {Name: "uncles_hash", Type: Dvarchar}, {Name: "bloom", Type: Dbytea}, - {Name: "timestamp", Type: Dnumeric}, + {Name: "timestamp", Type: Dbigint}, {Name: "coinbase", Type: Dvarchar}, {Name: "canonical", Type: Dboolean}, + {Name: "withdrawals_root", Type: Dvarchar}, }, UpsertClause: OnConflict("block_number", "block_hash").Set( "parent_hash", @@ -70,6 +84,7 @@ var TableHeader = Table{ "timestamp", "coinbase", "canonical", + "withdrawals_root", )} var TableStateNode = Table{ @@ -165,6 +180,20 @@ var TableLog = Table{ UpsertClause: OnConflict("block_number", "header_id", "rct_id", "index"), } +var TableWithdrawal = Table{ + Name: "eth.withdrawal_cids", + Columns: []Column{ + {Name: "block_number", Type: Dbigint}, + {Name: "header_id", Type: Dvarchar}, + {Name: "cid", Type: Dtext}, + {Name: "index", Type: Dinteger}, + {Name: "validator", Type: Dinteger}, + {Name: "address", Type: Dvarchar}, + {Name: "amount", Type: Dinteger}, + }, + UpsertClause: OnConflict("block_number", "header_id", "index"), +} + var TableWatchedAddresses = Table{ Name: "eth_meta.watched_addresses", Columns: []Column{ diff --git a/indexer/shared/schema/table.go b/indexer/shared/schema/table.go index bf6968e..9cc1c2d 100644 --- a/indexer/shared/schema/table.go +++ b/indexer/shared/schema/table.go @@ -53,34 +53,6 @@ type Table struct { UpsertClause ConflictClause } -type colfmt = func(interface{}) string - -func (tbl *Table) ToCsvRow(args ...interface{}) []string { - var row []string - for i, col := range tbl.Columns { - value := col.Type.formatter()(args[i]) - - if col.Array { - valueList := funk.Map(args[i], col.Type.formatter()).([]string) - value = fmt.Sprintf("{%s}", strings.Join(valueList, ",")) - } - - row = append(row, value) - } - return row -} - -func (tbl *Table) VarcharColumns() []string { - columns := funk.Filter(tbl.Columns, func(col Column) bool { - return col.Type == Dvarchar - }).([]Column) - - columnNames := funk.Map(columns, func(col Column) string { - return col.Name - }).([]string) - return columnNames -} - func OnConflict(target ...string) ConflictClause { return ConflictClause{Target: target} } @@ -89,35 +61,6 @@ func (c ConflictClause) Set(fields ...string) ConflictClause { return c } -// ToInsertStatement returns a Postgres-compatible SQL insert statement for the table -// using positional placeholders -func (tbl *Table) ToInsertStatement(upsert bool) string { - var colnames, placeholders []string - for i, col := range tbl.Columns { - colnames = append(colnames, col.Name) - placeholders = append(placeholders, fmt.Sprintf("$%d", i+1)) - } - suffix := fmt.Sprintf("ON CONFLICT (%s)", strings.Join(tbl.UpsertClause.Target, ", ")) - if upsert && len(tbl.UpsertClause.Update) != 0 { - var update_placeholders []string - for _, name := range tbl.UpsertClause.Update { - i := funk.IndexOf(tbl.Columns, func(col Column) bool { return col.Name == name }) - update_placeholders = append(update_placeholders, fmt.Sprintf("$%d", i+1)) - } - suffix += fmt.Sprintf( - " DO UPDATE SET (%s) = (%s)", - strings.Join(tbl.UpsertClause.Update, ", "), strings.Join(update_placeholders, ", "), - ) - } else { - suffix += " DO NOTHING" - } - - return fmt.Sprintf( - "INSERT INTO %s (%s) VALUES (%s) %s", - tbl.Name, strings.Join(colnames, ", "), strings.Join(placeholders, ", "), suffix, - ) -} - // TableName returns a pgx-compatible table name. func (tbl *Table) TableName() []string { return strings.Split(tbl.Name, ".") @@ -132,11 +75,45 @@ func (tbl *Table) ColumnNames() []string { return names } +// PreparedInsert returns a pgx/sqlx-compatible SQL prepared insert statement for the table +// using positional placeholders. +// If upsert is true, include an ON CONFLICT clause handling column updates. +func (tbl *Table) PreparedInsert(upsert bool) string { + var colnames, placeholders []string + for i, col := range tbl.Columns { + colnames = append(colnames, col.Name) + placeholders = append(placeholders, fmt.Sprintf("$%d", i+1)) + } + suffix := " ON CONFLICT" + if len(tbl.UpsertClause.Target) > 0 { + suffix += fmt.Sprintf(" (%s)", strings.Join(tbl.UpsertClause.Target, ", ")) + } + if upsert && len(tbl.UpsertClause.Update) != 0 { + var update_placeholders []string + for _, name := range tbl.UpsertClause.Update { + update_placeholders = append(update_placeholders, "EXCLUDED."+name) + } + suffix += fmt.Sprintf( + " DO UPDATE SET (%s) = ROW(%s)", + strings.Join(tbl.UpsertClause.Update, ", "), strings.Join(update_placeholders, ", "), + ) + } else { + suffix += " DO NOTHING" + } + + return fmt.Sprintf( + "INSERT INTO %s (%s) VALUES (%s)", + tbl.Name, strings.Join(colnames, ", "), strings.Join(placeholders, ", "), + ) + suffix +} + +type colfmt = func(interface{}) string + func sprintf(f string) colfmt { return func(x interface{}) string { return fmt.Sprintf(f, x) } } -func (typ colType) formatter() colfmt { +func (typ colType) csvFormatter() colfmt { switch typ { case Dinteger: return sprintf("%d") @@ -157,6 +134,61 @@ func (typ colType) formatter() colfmt { return sprintf("%s") case Dtext: return sprintf("%s") + default: + panic("invalid column type") } - panic("unreachable") +} + +// ToCsvRow converts a list of values to a list of strings suitable for CSV output. +func (tbl *Table) ToCsvRow(args ...interface{}) []string { + var row []string + for i, col := range tbl.Columns { + value := col.Type.csvFormatter()(args[i]) + + if col.Array { + valueList := funk.Map(args[i], col.Type.csvFormatter()).([]string) + value = fmt.Sprintf("{%s}", strings.Join(valueList, ",")) + } + + row = append(row, value) + } + return row +} + +// VarcharColumns returns the names of columns with type VARCHAR. +func (tbl *Table) VarcharColumns() []string { + columns := funk.Filter(tbl.Columns, func(col Column) bool { + return col.Type == Dvarchar + }).([]Column) + + columnNames := funk.Map(columns, func(col Column) string { + return col.Name + }).([]string) + return columnNames +} + +func formatSpec(typ colType) string { + switch typ { + case Dinteger: + return "%d" + case Dboolean: + return "%t" + case Dbytea: + return `'\x%x'` + default: + return "'%s'" + } +} + +// FmtStringInsert returns a format string for creating a Postgres insert statement. +func (tbl *Table) FmtStringInsert() string { + var colnames, placeholders []string + for _, col := range tbl.Columns { + colnames = append(colnames, col.Name) + placeholders = append(placeholders, formatSpec(col.Type)) + } + return fmt.Sprintf( + "INSERT INTO %s (%s) VALUES (%s);", + tbl.Name, strings.Join(colnames, ", "), strings.Join(placeholders, ", "), + ) } diff --git a/indexer/shared/schema/table_test.go b/indexer/shared/schema/table_test.go index aaa026b..f31e10b 100644 --- a/indexer/shared/schema/table_test.go +++ b/indexer/shared/schema/table_test.go @@ -8,47 +8,55 @@ import ( . "github.com/cerc-io/plugeth-statediff/indexer/shared/schema" ) -var testHeaderTable = Table{ - Name: "eth.header_cids", - Columns: []Column{ - {Name: "block_number", Type: Dbigint}, - {Name: "block_hash", Type: Dvarchar}, - {Name: "parent_hash", Type: Dvarchar}, - {Name: "cid", Type: Dtext}, - {Name: "td", Type: Dnumeric}, - {Name: "node_id", Type: Dvarchar}, - {Name: "reward", Type: Dnumeric}, - {Name: "state_root", Type: Dvarchar}, - {Name: "tx_root", Type: Dvarchar}, - {Name: "receipt_root", Type: Dvarchar}, - {Name: "uncle_root", Type: Dvarchar}, - {Name: "bloom", Type: Dbytea}, - {Name: "timestamp", Type: Dnumeric}, - {Name: "mh_key", Type: Dtext}, - {Name: "times_validated", Type: Dinteger}, - {Name: "coinbase", Type: Dvarchar}, - }, - UpsertClause: OnConflict("block_hash", "block_number").Set( - "parent_hash", - "cid", - "td", - "node_id", - "reward", - "state_root", - "tx_root", - "receipt_root", - "uncle_root", - "bloom", - "timestamp", - "mh_key", - "times_validated", - "coinbase", - ), -} +var ( + testTable = Table{ + Name: "test_table", + Columns: []Column{ + {Name: "id", Type: Dbigint}, + {Name: "name", Type: Dvarchar}, + {Name: "age", Type: Dinteger}, + }, + } + testTableWithConflictClause = Table{ + Name: "test_table_conflict", + Columns: []Column{ + {Name: "id", Type: Dbigint}, + {Name: "name", Type: Dvarchar}, + {Name: "age", Type: Dinteger}, + }, + UpsertClause: OnConflict("id").Set("name", "age"), + } +) + +const ( + expectedHeaderPreparedWithUpsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase, canonical, withdrawals_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (block_number, block_hash) DO UPDATE SET (parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase, canonical, withdrawals_root) = ROW(EXCLUDED.parent_hash, EXCLUDED.cid, EXCLUDED.td, EXCLUDED.node_ids, EXCLUDED.reward, EXCLUDED.state_root, EXCLUDED.tx_root, EXCLUDED.receipt_root, EXCLUDED.uncles_hash, EXCLUDED.bloom, EXCLUDED.timestamp, EXCLUDED.coinbase, EXCLUDED.canonical, EXCLUDED.withdrawals_root)" + + expectedHeaderPreparedWithoutUpsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase, canonical, withdrawals_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (block_number, block_hash) DO NOTHING" + + expectedHeaderFmtString = `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase, canonical, withdrawals_root) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\x%x', '%s', '%s', %t, '%s');` +) func TestTable(t *testing.T) { - headerUpsert := `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (block_hash, block_number) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)` - headerNoUpsert := `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (block_hash, block_number) DO NOTHING` - require.Equal(t, headerNoUpsert, testHeaderTable.ToInsertStatement(false)) - require.Equal(t, headerUpsert, testHeaderTable.ToInsertStatement(true)) + require.Equal(t, + "INSERT INTO test_table (id, name, age) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING", + testTable.PreparedInsert(true), + ) + require.Equal(t, + "INSERT INTO test_table (id, name, age) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING", + testTable.PreparedInsert(false), + ) + require.Equal(t, "INSERT INTO test_table (id, name, age) VALUES ('%s', '%s', %d);", testTable.FmtStringInsert()) + + require.Equal(t, + "INSERT INTO test_table_conflict (id, name, age) VALUES ($1, $2, $3) ON CONFLICT (id) DO UPDATE SET (name, age) = ROW(EXCLUDED.name, EXCLUDED.age)", + testTableWithConflictClause.PreparedInsert(true), + ) + require.Equal(t, + "INSERT INTO test_table_conflict (id, name, age) VALUES ($1, $2, $3) ON CONFLICT (id) DO NOTHING", + testTableWithConflictClause.PreparedInsert(false), + ) + + require.Equal(t, expectedHeaderPreparedWithUpsert, TableHeader.PreparedInsert(true)) + require.Equal(t, expectedHeaderPreparedWithoutUpsert, TableHeader.PreparedInsert(false)) + require.Equal(t, expectedHeaderFmtString, TableHeader.FmtStringInsert()) } diff --git a/indexer/test/test.go b/indexer/test/test.go index 5f9cf7c..df86ec6 100644 --- a/indexer/test/test.go +++ b/indexer/test/test.go @@ -33,10 +33,9 @@ import ( "github.com/cerc-io/plugeth-statediff/indexer/mocks" "github.com/cerc-io/plugeth-statediff/indexer/models" "github.com/cerc-io/plugeth-statediff/indexer/shared" - "github.com/cerc-io/plugeth-statediff/indexer/test_helpers" ) -// SetupTestData indexes a single mock block along with it's state nodes +// SetupTestData indexes a single mock block along with its state nodes func SetupTestData(t *testing.T, ind interfaces.StateDiffIndexer) { var tx interfaces.Batch tx, err = ind.PushBlock( @@ -111,11 +110,11 @@ func DoTestPublishAndIndexTransactionIPLDs(t *testing.T, db sql.Database) { t.Fatal(err) } require.Equal(t, 5, len(trxs)) - expectTrue(t, test_helpers.ListContainsString(trxs, trx1CID.String())) - expectTrue(t, test_helpers.ListContainsString(trxs, trx2CID.String())) - expectTrue(t, test_helpers.ListContainsString(trxs, trx3CID.String())) - expectTrue(t, test_helpers.ListContainsString(trxs, trx4CID.String())) - expectTrue(t, test_helpers.ListContainsString(trxs, trx5CID.String())) + require.Contains(t, trxs, trx1CID.String()) + require.Contains(t, trxs, trx2CID.String()) + require.Contains(t, trxs, trx3CID.String()) + require.Contains(t, trxs, trx4CID.String()) + require.Contains(t, trxs, trx5CID.String()) transactions := mocks.MockBlock.Transactions() type txResult struct { @@ -257,11 +256,11 @@ func DoTestPublishAndIndexReceiptIPLDs(t *testing.T, db sql.Database) { t.Fatal(err) } require.Equal(t, 5, len(rcts)) - expectTrue(t, test_helpers.ListContainsString(rcts, rct1CID.String())) - expectTrue(t, test_helpers.ListContainsString(rcts, rct2CID.String())) - expectTrue(t, test_helpers.ListContainsString(rcts, rct3CID.String())) - expectTrue(t, test_helpers.ListContainsString(rcts, rct4CID.String())) - expectTrue(t, test_helpers.ListContainsString(rcts, rct5CID.String())) + require.Contains(t, rcts, rct1CID.String()) + require.Contains(t, rcts, rct2CID.String()) + require.Contains(t, rcts, rct3CID.String()) + require.Contains(t, rcts, rct4CID.String()) + require.Contains(t, rcts, rct5CID.String()) for idx, c := range rcts { result := make([]models.IPLDModel, 0) @@ -335,6 +334,41 @@ func DoTestPublishAndIndexReceiptIPLDs(t *testing.T, db sql.Database) { } } +func DoTestPublishAndIndexWithdrawalIPLDs(t *testing.T, db sql.Database) { + // check that withdrawals were properly indexed and published + wds := make([]string, 0) + pgStr := `SELECT withdrawal_cids.cid FROM eth.withdrawal_cids + INNER JOIN eth.header_cids ON (withdrawal_cids.header_id = header_cids.block_hash) + WHERE header_cids.block_number = $1 + ORDER BY withdrawal_cids.index` + err = db.Select(context.Background(), &wds, pgStr, mocks.BlockNumber.Uint64()) + if err != nil { + t.Fatal(err) + } + require.Equal(t, 2, len(wds)) + require.Contains(t, wds, wd1CID.String()) + require.Contains(t, wds, wd2CID.String()) + + for _, c := range wds { + dc, err := cid.Decode(c) + if err != nil { + t.Fatal(err) + } + var data []byte + err = db.Get(context.Background(), &data, ipfsPgGet, dc.String(), mocks.BlockNumber.Uint64()) + if err != nil { + t.Fatal(err) + } + + switch c { + case wd1CID.String(): + require.Equal(t, wd1, data) + case wd2CID.String(): + require.Equal(t, wd2, data) + } + } +} + func DoTestPublishAndIndexStateIPLDs(t *testing.T, db sql.Database) { // check that state nodes were properly indexed and published stateNodes := make([]models.StateNodeModel, 0) @@ -594,7 +628,7 @@ func SetupTestDataNonCanonical(t *testing.T, ind interfaces.StateDiffIndexer) { func TestPublishAndIndexHeaderNonCanonical(t *testing.T, db sql.Database) { // check indexed headers pgStr := `SELECT CAST(block_number as TEXT), block_hash, cid, cast(td AS TEXT), cast(reward AS TEXT), - tx_root, receipt_root, uncles_hash, coinbase + tx_root, receipt_root, uncles_hash, coinbase, withdrawals_root FROM eth.header_cids ORDER BY block_number` headerRes := make([]models.HeaderModel, 0) @@ -616,6 +650,7 @@ func TestPublishAndIndexHeaderNonCanonical(t *testing.T, db sql.Database) { RctRoot: mockBlock.ReceiptHash().String(), UnclesHash: mockBlock.UncleHash().String(), Coinbase: mocks.MockHeader.Coinbase.String(), + WithdrawalsRoot: shared.MaybeStringHash(mockBlock.Header().WithdrawalsHash), }, { BlockNumber: mockNonCanonicalBlock.Number().String(), @@ -626,6 +661,7 @@ func TestPublishAndIndexHeaderNonCanonical(t *testing.T, db sql.Database) { RctRoot: mockNonCanonicalBlock.ReceiptHash().String(), UnclesHash: mockNonCanonicalBlock.UncleHash().String(), Coinbase: mocks.MockNonCanonicalHeader.Coinbase.String(), + WithdrawalsRoot: shared.MaybeStringHash(mockNonCanonicalBlock.Header().WithdrawalsHash), }, { BlockNumber: mockNonCanonicalBlock2.Number().String(), @@ -636,6 +672,7 @@ func TestPublishAndIndexHeaderNonCanonical(t *testing.T, db sql.Database) { RctRoot: mockNonCanonicalBlock2.ReceiptHash().String(), UnclesHash: mockNonCanonicalBlock2.UncleHash().String(), Coinbase: mocks.MockNonCanonicalHeader2.Coinbase.String(), + WithdrawalsRoot: shared.MaybeStringHash(mockNonCanonicalBlock2.Header().WithdrawalsHash), }, } expectedRes[0].Reward = shared.CalcEthBlockReward(mockBlock.Header(), mockBlock.Uncles(), mockBlock.Transactions(), mocks.MockReceipts).String() diff --git a/indexer/test/test_init.go b/indexer/test/test_init.go index 1a8b70f..ebcbe2a 100644 --- a/indexer/test/test_init.go +++ b/indexer/test/test_init.go @@ -38,12 +38,14 @@ var ( watchedAddressesPgGet = `SELECT * FROM eth_meta.watched_addresses` tx1, tx2, tx3, tx4, tx5, rct1, rct2, rct3, rct4, rct5 []byte + wd1, wd2 []byte nonCanonicalBlockRct1, nonCanonicalBlockRct2 []byte nonCanonicalBlock2Rct1, nonCanonicalBlock2Rct2 []byte mockBlock, mockNonCanonicalBlock, mockNonCanonicalBlock2 *types.Block headerCID, mockNonCanonicalHeaderCID, mockNonCanonicalHeader2CID cid.Cid trx1CID, trx2CID, trx3CID, trx4CID, trx5CID cid.Cid rct1CID, rct2CID, rct3CID, rct4CID, rct5CID cid.Cid + wd1CID, wd2CID cid.Cid nonCanonicalBlockRct1CID, nonCanonicalBlockRct2CID cid.Cid nonCanonicalBlock2Rct1CID, nonCanonicalBlock2Rct2CID cid.Cid state1CID, state2CID, storageCID cid.Cid @@ -114,6 +116,18 @@ func init() { copy(rct5, buf.Bytes()) buf.Reset() + // encode mock withdrawals + // wds + mocks.MockWithdrawals.EncodeIndex(0, buf) + wd1 = make([]byte, buf.Len()) + copy(wd1, buf.Bytes()) + buf.Reset() + + mocks.MockWithdrawals.EncodeIndex(1, buf) + wd2 = make([]byte, buf.Len()) + copy(wd2, buf.Bytes()) + buf.Reset() + // encode mock receipts for non-canonical blocks nonCanonicalBlockRcts.EncodeIndex(0, buf) nonCanonicalBlockRct1 = make([]byte, buf.Len()) @@ -152,6 +166,9 @@ func init() { rct4CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct4, multihash.KECCAK_256) rct5CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct5, multihash.KECCAK_256) + wd1CID, _ = ipld.RawdataToCid(ipld.MEthWithdrawal, wd1, multihash.KECCAK_256) + wd2CID, _ = ipld.RawdataToCid(ipld.MEthWithdrawal, wd2, multihash.KECCAK_256) + // create raw receipts for non-canonical blocks nonCanonicalBlockRct1CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, nonCanonicalBlockRct1, multihash.KECCAK_256) nonCanonicalBlockRct2CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, nonCanonicalBlockRct2, multihash.KECCAK_256) diff --git a/indexer/test_helpers/test_helpers.go b/indexer/test_helpers/test_helpers.go index 1cb7b26..7e65154 100644 --- a/indexer/test_helpers/test_helpers.go +++ b/indexer/test_helpers/test_helpers.go @@ -25,16 +25,6 @@ import ( "github.com/cerc-io/plugeth-statediff/indexer/database/sql" ) -// ListContainsString used to check if a list of strings contains a particular string -func ListContainsString(sss []string, s string) bool { - for _, str := range sss { - if s == str { - return true - } - } - return false -} - // DedupFile removes duplicates from the given file func DedupFile(filePath string) error { f, err := os.OpenFile(filePath, os.O_CREATE|os.O_RDONLY, os.ModePerm) @@ -86,6 +76,7 @@ func TearDownDB(t *testing.T, db sql.Database) { `TRUNCATE eth.state_cids`, `TRUNCATE eth.storage_cids`, `TRUNCATE eth.log_cids`, + `TRUNCATE eth.withdrawal_cids`, `TRUNCATE eth_meta.watched_addresses`, } for _, stm := range statements { diff --git a/scripts/integration-setup.sh b/scripts/run-test-stack.sh similarity index 53% rename from scripts/integration-setup.sh rename to scripts/run-test-stack.sh index 97b602d..29cb096 100755 --- a/scripts/integration-setup.sh +++ b/scripts/run-test-stack.sh @@ -1,23 +1,21 @@ #!/bin/bash -# Builds and deploys a stack with only what we need. -# This script assumes we are running in the project root. -set -e +set -ex # Note: stack path should be absolute, otherwise SO looks for it in packaged stacks -laconic_so="${LACONIC_SO:-laconic-so} --stack $(readlink -f test) --verbose" +stack_dir=$(readlink -f "${1:-../fixturenet-eth-stacks/stack-orchestrator/stacks/fixturenet-plugeth}") + +[[ -d "$stack_dir" ]] CONFIG_DIR=$(readlink -f "${CONFIG_DIR:-$(mktemp -d)}") - -# Point stack-orchestrator to the multi-project root +# By default assume we are running in the project root. export CERC_REPO_BASE_DIR="${CERC_REPO_BASE_DIR:-$(git rev-parse --show-toplevel)/..}" -# v5 migrations only go up to version 20 -echo CERC_STATEDIFF_DB_GOOSE_MIN_VER=20 >> $CONFIG_DIR/stack.env -# don't run plugeth in the debugger +laconic_so="laconic-so --verbose --stack $stack_dir" + +# Don't run geth/plugeth in the debugger, it will swallow error backtraces echo CERC_REMOTE_DEBUG=false >> $CONFIG_DIR/stack.env -set -x if [[ -z $SKIP_BUILD ]]; then $laconic_so setup-repositories \ @@ -27,6 +25,10 @@ if [[ -z $SKIP_BUILD ]]; then --exclude cerc/plugeth-statediff fi -$laconic_so deploy \ - --env-file $CONFIG_DIR/stack.env \ - --cluster test up +if ! $laconic_so deploy \ + --env-file $CONFIG_DIR/stack.env \ + --cluster test up +then + $laconic_so deploy --cluster test logs + exit 1 +fi diff --git a/test/compose.yml b/test/compose.yml index 4fcac0c..8c0c1e2 100644 --- a/test/compose.yml +++ b/test/compose.yml @@ -3,7 +3,7 @@ services: restart: on-failure depends_on: - ipld-eth-db - image: git.vdb.to/cerc-io/ipld-eth-db/ipld-eth-db:v5.2.1-alpha + image: git.vdb.to/cerc-io/ipld-eth-db/ipld-eth-db:v5.3.0-alpha environment: DATABASE_USER: "vdbm" DATABASE_NAME: "cerc_testing" diff --git a/test/stack.yml b/test/stack.yml index c5d9b7a..f7f8cd0 100644 --- a/test/stack.yml +++ b/test/stack.yml @@ -5,7 +5,7 @@ repos: - git.vdb.to/cerc-io/plugeth@v1.13.14-cerc-2 - git.vdb.to/cerc-io/plugeth-statediff - git.vdb.to/cerc-io/lighthouse - - git.vdb.to/cerc-io/ipld-eth-db@v5.2.1-alpha + - git.vdb.to/cerc-io/ipld-eth-db@v5.3.0-alpha containers: - cerc/plugeth-statediff - cerc/plugeth diff --git a/test_helpers/db.go b/test_helpers/db.go index 964b9ce..34f5462 100644 --- a/test_helpers/db.go +++ b/test_helpers/db.go @@ -22,6 +22,7 @@ func ClearDB(db *sqlx.DB) error { `TRUNCATE eth.state_cids`, `TRUNCATE eth.storage_cids`, `TRUNCATE eth.log_cids`, + `TRUNCATE eth.withdrawal_cids`, `TRUNCATE eth_meta.watched_addresses`, } for _, stm := range statements {