Index Withdrawal objects (#25)
All checks were successful
Test / Run compliance tests (push) Successful in 4m17s
Test / Run unit tests (push) Successful in 13m20s
Test / Run integration tests (push) Successful in 29m18s

Indexes the new validator withdrawal objects (from Shanghai/Capella fork: https://eips.ethereum.org/EIPS/eip-4895)
- new table `eth.withdrawal_cids`
- new column `withdrawals_root` in `eth.header_cids`
- add unit tests
- use new external stack repo in CI/CT job (cerc-io/fixturenet-eth-stacks#14)

Reviewed-on: #25
This commit is contained in:
Roy Crihfield 2024-06-28 09:44:58 +00:00
parent e6150910af
commit b5642c612a
39 changed files with 765 additions and 484 deletions

View File

@ -9,7 +9,8 @@ on:
- ci-test - ci-test
env: env:
SO_VERSION: v1.1.0-87fffca-202404110321 SO_VERSION: roysc/fix-various
FIXTURENET_ETH_STACKS_VERSION: plugeth-stack
jobs: jobs:
unit-tests: unit-tests:
@ -31,34 +32,57 @@ jobs:
name: Run integration tests name: Run integration tests
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v4
with: with:
path: ./plugeth-statediff path: ./plugeth-statediff
progress: false
- name: Build docker image - name: Build docker image
run: | run: |
docker build ./plugeth-statediff -t cerc/plugeth-statediff:local docker build ./plugeth-statediff -t cerc/plugeth-statediff:local
- name: "Install Python for ARM on Linux"
if: ${{ runner.arch == 'arm64' && runner.os == 'Linux' }}
uses: deadsnakes/action@v3.0.1
with:
python-version: 3.11
- name: "Install Python cases other than ARM on Linux"
if: ${{ ! (runner.arch == 'arm64' && runner.os == 'Linux') }}
uses: actions/setup-python@v4
with:
python-version: 3.11
- name: "Print Python version"
run: python3 --version
- name: Install stack-orchestrator - name: Install stack-orchestrator
run: | # run: |
curl -L -O https://github.com/cerc-io/stack-orchestrator/releases/download/$SO_VERSION/laconic-so # curl -L -O https://github.com/cerc-io/stack-orchestrator/releases/download/$SO_VERSION/laconic-so
chmod +x laconic-so # chmod +x laconic-so
- name: Clone system-tests # echo PATH="$PATH:$(pwd)" >> $GITHUB_ENV
# FIXME: merge SO fixes and revert
uses: actions/checkout@v3 uses: actions/checkout@v3
with:
repository: cerc-io/stack-orchestrator
ref: ${{ env.SO_VERSION }}
path: .tools/stack-orchestrator
- name: "Install stack orchestrator"
run: pip3 install .tools/stack-orchestrator
- name: Clone system-tests
uses: actions/checkout@v4
with: with:
repository: cerc-io/system-tests repository: cerc-io/system-tests
ref: v0.1.0-20240411 ref: v0.1.0-20240411
path: ./system-tests path: ./system-tests
token: ${{ secrets.CICD_REPO_TOKEN }} token: ${{ secrets.CICD_REPO_TOKEN }}
progress: false
- name: Clone fixturenet stack repo
uses: actions/checkout@v4
with:
repository: cerc-io/fixturenet-eth-stacks
ref: ${{ env.FIXTURENET_ETH_STACKS_VERSION }}
path: ./fixturenet-eth-stacks
progress: false
- name: Run testnet stack - name: Run testnet stack
working-directory: ./plugeth-statediff working-directory: ./plugeth-statediff
env: run: ./scripts/run-test-stack.sh
LACONIC_SO: ../laconic-so
run: ./scripts/integration-setup.sh
- name: Install Python
uses: actions/setup-python@v5
with:
python-version: 3.11
- name: Run tests - name: Run tests
working-directory: ./system-tests working-directory: ./system-tests
run: | run: |

View File

@ -1,6 +1,6 @@
# Using image with same alpine as plugeth, # Using image with same alpine as plugeth,
# but go 1.21 to evade https://github.com/Consensys/gnark-crypto/issues/468 # but go 1.21 to evade https://github.com/Consensys/gnark-crypto/issues/468
FROM golang:1.21-alpine3.18 as builder FROM golang:1.21-alpine as builder
RUN apk add --no-cache gcc musl-dev binutils-gold linux-headers git RUN apk add --no-cache gcc musl-dev binutils-gold linux-headers git

View File

@ -82,7 +82,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
} }
// Generate the block iplds // Generate the block iplds
txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts) txNodes, rctNodes, logNodes, wdNodes, err := ipld.FromBlockAndReceipts(block, receipts)
if err != nil { if err != nil {
return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err) return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err)
} }
@ -123,15 +123,17 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String()) traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
t = time.Now() t = time.Now()
// Publish and index receipts and txs // Publish and index receipts and txs
err = sdi.processReceiptsAndTxs(blockTx, processArgs{ err = sdi.processObjects(blockTx, processArgs{
headerID: headerID, headerID: headerID,
blockNumber: block.Number(), blockNumber: block.Number(),
blockTime: block.Time(), blockTime: block.Time(),
receipts: receipts, receipts: receipts,
txs: transactions, txs: transactions,
withdrawals: block.Withdrawals(),
rctNodes: rctNodes, rctNodes: rctNodes,
txNodes: txNodes, txNodes: txNodes,
logNodes: logNodes, logNodes: logNodes,
wdNodes: wdNodes,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@ -151,7 +153,7 @@ func (sdi *StateDiffIndexer) PushHeader(batch interfaces.Batch, header *types.He
if !ok { if !ok {
return "", fmt.Errorf("sql: batch is expected to be of type %T, got %T", &BatchTx{}, batch) return "", fmt.Errorf("sql: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
} }
headerNode, err := ipld.NewEthHeader(header) headerNode, err := ipld.EncodeHeader(header)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -173,6 +175,7 @@ func (sdi *StateDiffIndexer) PushHeader(batch interfaces.Batch, header *types.He
Timestamp: header.Time, Timestamp: header.Time,
Coinbase: header.Coinbase.String(), Coinbase: header.Coinbase.String(),
Canonical: true, Canonical: true,
WithdrawalsRoot: shared.MaybeStringHash(header.WithdrawalsHash),
} }
_, err = fmt.Fprintf(sdi.dump, "%+v\r\n", mod) _, err = fmt.Fprintf(sdi.dump, "%+v\r\n", mod)
return headerID, err return headerID, err
@ -225,13 +228,15 @@ type processArgs struct {
blockTime uint64 blockTime uint64
receipts types.Receipts receipts types.Receipts
txs types.Transactions txs types.Transactions
rctNodes []*ipld.EthReceipt withdrawals types.Withdrawals
txNodes []*ipld.EthTx rctNodes []ipld.IPLD
logNodes [][]*ipld.EthLog txNodes []ipld.IPLD
logNodes [][]ipld.IPLD
wdNodes []ipld.IPLD
} }
// processReceiptsAndTxs publishes and indexes receipt and transaction IPLDs in Postgres // processObjects publishes and indexes receipt and transaction IPLDs in Postgres
func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs) error { func (sdi *StateDiffIndexer) processObjects(tx *BatchTx, args processArgs) error {
// Process receipts and txs // Process receipts and txs
signer := types.MakeSigner(sdi.chainConfig, args.blockNumber, args.blockTime) signer := types.MakeSigner(sdi.chainConfig, args.blockNumber, args.blockTime)
for i, receipt := range args.receipts { for i, receipt := range args.receipts {
@ -314,6 +319,23 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
return err return err
} }
} }
// Process withdrawals
for i, withdrawal := range args.withdrawals {
wdNode := args.wdNodes[i]
tx.cacheIPLD(wdNode)
wdModel := models.WithdrawalModel{
BlockNumber: args.blockNumber.String(),
HeaderID: args.headerID,
CID: wdNode.Cid().String(),
Index: withdrawal.Index,
Validator: withdrawal.Validator,
Address: withdrawal.Address.String(),
Amount: withdrawal.Amount,
}
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", wdModel); err != nil {
return err
}
}
return nil return nil
} }

View File

@ -24,6 +24,7 @@ import (
"path/filepath" "path/filepath"
"strings" "strings"
"testing" "testing"
"time"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -55,6 +56,8 @@ func setupLegacyCSVIndexer(t *testing.T) {
func setupLegacyCSV(t *testing.T) { func setupLegacyCSV(t *testing.T) {
setupLegacyCSVIndexer(t) setupLegacyCSVIndexer(t)
test.SetupLegacyTestData(t, ind) test.SetupLegacyTestData(t, ind)
t.Cleanup(func() { tearDownCSV(t) })
time.Sleep(delayForDockerSync)
} }
func dumpCSVFileData(t *testing.T) { func dumpCSVFileData(t *testing.T) {
@ -64,7 +67,7 @@ func dumpCSVFileData(t *testing.T) {
localOutputDir := filepath.Join(workingDir, file.CSVTestConfig.OutputDir) localOutputDir := filepath.Join(workingDir, file.CSVTestConfig.OutputDir)
for _, tbl := range file.Tables { for _, tbl := range schema.Tables {
err := test_helpers.DedupFile(file.TableFilePath(localOutputDir, tbl.Name)) err := test_helpers.DedupFile(file.TableFilePath(localOutputDir, tbl.Name))
require.NoError(t, err) require.NoError(t, err)
@ -89,6 +92,7 @@ func dumpCSVFileData(t *testing.T) {
func resetAndDumpWatchedAddressesCSVFileData(t *testing.T) { func resetAndDumpWatchedAddressesCSVFileData(t *testing.T) {
test_helpers.TearDownDB(t, db) test_helpers.TearDownDB(t, db)
time.Sleep(delayForDockerSync)
outputFilePath := filepath.Join(dbDirectory, file.CSVTestConfig.WatchedAddressesFilePath) outputFilePath := filepath.Join(dbDirectory, file.CSVTestConfig.WatchedAddressesFilePath)
stmt := fmt.Sprintf(pgCopyStatement, schema.TableWatchedAddresses.Name, outputFilePath) stmt := fmt.Sprintf(pgCopyStatement, schema.TableWatchedAddresses.Name, outputFilePath)
@ -111,7 +115,6 @@ func TestLegacyCSVFileIndexer(t *testing.T) {
t.Run("Publish and index header IPLDs", func(t *testing.T) { t.Run("Publish and index header IPLDs", func(t *testing.T) {
setupLegacyCSV(t) setupLegacyCSV(t)
dumpCSVFileData(t) dumpCSVFileData(t)
defer tearDownCSV(t)
test.TestLegacyIndexer(t, db) test.TestLegacyIndexer(t, db)
}) })

View File

@ -21,6 +21,7 @@ import (
"math/big" "math/big"
"os" "os"
"testing" "testing"
"time"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -30,6 +31,9 @@ import (
"github.com/cerc-io/plugeth-statediff/indexer/test" "github.com/cerc-io/plugeth-statediff/indexer/test"
) )
// docker bind mount is slow to sync files
var delayForDockerSync = 1 * time.Second
func setupCSVIndexer(t *testing.T) { func setupCSVIndexer(t *testing.T) {
if _, err := os.Stat(file.CSVTestConfig.OutputDir); !errors.Is(err, os.ErrNotExist) { if _, err := os.Stat(file.CSVTestConfig.OutputDir); !errors.Is(err, os.ErrNotExist) {
err := os.RemoveAll(file.CSVTestConfig.OutputDir) err := os.RemoveAll(file.CSVTestConfig.OutputDir)
@ -53,18 +57,21 @@ func setupCSVIndexer(t *testing.T) {
func setupCSV(t *testing.T) { func setupCSV(t *testing.T) {
setupCSVIndexer(t) setupCSVIndexer(t)
test.SetupTestData(t, ind) test.SetupTestData(t, ind)
t.Cleanup(func() { tearDownCSV(t) })
time.Sleep(delayForDockerSync)
} }
func setupCSVNonCanonical(t *testing.T) { func setupCSVNonCanonical(t *testing.T) {
setupCSVIndexer(t) setupCSVIndexer(t)
test.SetupTestDataNonCanonical(t, ind) test.SetupTestDataNonCanonical(t, ind)
t.Cleanup(func() { tearDownCSV(t) })
time.Sleep(delayForDockerSync)
} }
func TestCSVFileIndexer(t *testing.T) { func TestCSVFileIndexer(t *testing.T) {
t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) { t.Run("Publish and index header IPLDs in a single tx", func(t *testing.T) {
setupCSV(t) setupCSV(t)
dumpCSVFileData(t) dumpCSVFileData(t)
defer tearDownCSV(t)
test.DoTestPublishAndIndexHeaderIPLDs(t, db) test.DoTestPublishAndIndexHeaderIPLDs(t, db)
}) })
@ -72,7 +79,6 @@ func TestCSVFileIndexer(t *testing.T) {
t.Run("Publish and index transaction IPLDs in a single tx", func(t *testing.T) { t.Run("Publish and index transaction IPLDs in a single tx", func(t *testing.T) {
setupCSV(t) setupCSV(t)
dumpCSVFileData(t) dumpCSVFileData(t)
defer tearDownCSV(t)
test.DoTestPublishAndIndexTransactionIPLDs(t, db) test.DoTestPublishAndIndexTransactionIPLDs(t, db)
}) })
@ -80,7 +86,6 @@ func TestCSVFileIndexer(t *testing.T) {
t.Run("Publish and index log IPLDs for multiple receipt of a specific block", func(t *testing.T) { t.Run("Publish and index log IPLDs for multiple receipt of a specific block", func(t *testing.T) {
setupCSV(t) setupCSV(t)
dumpCSVFileData(t) dumpCSVFileData(t)
defer tearDownCSV(t)
test.DoTestPublishAndIndexLogIPLDs(t, db) test.DoTestPublishAndIndexLogIPLDs(t, db)
}) })
@ -88,15 +93,20 @@ func TestCSVFileIndexer(t *testing.T) {
t.Run("Publish and index receipt IPLDs in a single tx", func(t *testing.T) { t.Run("Publish and index receipt IPLDs in a single tx", func(t *testing.T) {
setupCSV(t) setupCSV(t)
dumpCSVFileData(t) dumpCSVFileData(t)
defer tearDownCSV(t)
test.DoTestPublishAndIndexReceiptIPLDs(t, db) test.DoTestPublishAndIndexReceiptIPLDs(t, db)
}) })
t.Run("Publish and index withdrawal IPLDs in a single tx", func(t *testing.T) {
setupCSV(t)
dumpCSVFileData(t)
test.DoTestPublishAndIndexWithdrawalIPLDs(t, db)
})
t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) { t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) {
setupCSV(t) setupCSV(t)
dumpCSVFileData(t) dumpCSVFileData(t)
defer tearDownCSV(t)
test.DoTestPublishAndIndexStateIPLDs(t, db) test.DoTestPublishAndIndexStateIPLDs(t, db)
}) })
@ -104,7 +114,6 @@ func TestCSVFileIndexer(t *testing.T) {
t.Run("Publish and index storage IPLDs in a single tx", func(t *testing.T) { t.Run("Publish and index storage IPLDs in a single tx", func(t *testing.T) {
setupCSV(t) setupCSV(t)
dumpCSVFileData(t) dumpCSVFileData(t)
defer tearDownCSV(t)
test.DoTestPublishAndIndexStorageIPLDs(t, db) test.DoTestPublishAndIndexStorageIPLDs(t, db)
}) })
@ -114,7 +123,6 @@ func TestCSVFileIndexerNonCanonical(t *testing.T) {
t.Run("Publish and index header", func(t *testing.T) { t.Run("Publish and index header", func(t *testing.T) {
setupCSVNonCanonical(t) setupCSVNonCanonical(t)
dumpCSVFileData(t) dumpCSVFileData(t)
defer tearDownCSV(t)
test.TestPublishAndIndexHeaderNonCanonical(t, db) test.TestPublishAndIndexHeaderNonCanonical(t, db)
}) })
@ -122,7 +130,6 @@ func TestCSVFileIndexerNonCanonical(t *testing.T) {
t.Run("Publish and index transactions", func(t *testing.T) { t.Run("Publish and index transactions", func(t *testing.T) {
setupCSVNonCanonical(t) setupCSVNonCanonical(t)
dumpCSVFileData(t) dumpCSVFileData(t)
defer tearDownCSV(t)
test.DoTestPublishAndIndexTransactionsNonCanonical(t, db) test.DoTestPublishAndIndexTransactionsNonCanonical(t, db)
}) })
@ -130,7 +137,6 @@ func TestCSVFileIndexerNonCanonical(t *testing.T) {
t.Run("Publish and index receipts", func(t *testing.T) { t.Run("Publish and index receipts", func(t *testing.T) {
setupCSVNonCanonical(t) setupCSVNonCanonical(t)
dumpCSVFileData(t) dumpCSVFileData(t)
defer tearDownCSV(t)
test.DoTestPublishAndIndexReceiptsNonCanonical(t, db) test.DoTestPublishAndIndexReceiptsNonCanonical(t, db)
}) })
@ -138,7 +144,6 @@ func TestCSVFileIndexerNonCanonical(t *testing.T) {
t.Run("Publish and index logs", func(t *testing.T) { t.Run("Publish and index logs", func(t *testing.T) {
setupCSVNonCanonical(t) setupCSVNonCanonical(t)
dumpCSVFileData(t) dumpCSVFileData(t)
defer tearDownCSV(t)
test.DoTestPublishAndIndexLogsNonCanonical(t, db) test.DoTestPublishAndIndexLogsNonCanonical(t, db)
}) })
@ -146,7 +151,6 @@ func TestCSVFileIndexerNonCanonical(t *testing.T) {
t.Run("Publish and index state nodes", func(t *testing.T) { t.Run("Publish and index state nodes", func(t *testing.T) {
setupCSVNonCanonical(t) setupCSVNonCanonical(t)
dumpCSVFileData(t) dumpCSVFileData(t)
defer tearDownCSV(t)
test.DoTestPublishAndIndexStateNonCanonical(t, db) test.DoTestPublishAndIndexStateNonCanonical(t, db)
}) })
@ -154,7 +158,6 @@ func TestCSVFileIndexerNonCanonical(t *testing.T) {
t.Run("Publish and index storage nodes", func(t *testing.T) { t.Run("Publish and index storage nodes", func(t *testing.T) {
setupCSVNonCanonical(t) setupCSVNonCanonical(t)
dumpCSVFileData(t) dumpCSVFileData(t)
defer tearDownCSV(t)
test.DoTestPublishAndIndexStorageNonCanonical(t, db) test.DoTestPublishAndIndexStorageNonCanonical(t, db)
}) })

View File

@ -36,22 +36,8 @@ import (
sdtypes "github.com/cerc-io/plugeth-statediff/types" sdtypes "github.com/cerc-io/plugeth-statediff/types"
) )
var (
Tables = []*schema.Table{
&schema.TableIPLDBlock,
&schema.TableNodeInfo,
&schema.TableHeader,
&schema.TableStateNode,
&schema.TableStorageNode,
&schema.TableUncle,
&schema.TableTransaction,
&schema.TableReceipt,
&schema.TableLog,
}
)
type tableRow struct { type tableRow struct {
table schema.Table table *schema.Table
values []interface{} values []interface{}
} }
@ -134,7 +120,7 @@ func NewCSVWriter(path string, watchedAddressesFilePath string, diff bool) (*CSV
return nil, fmt.Errorf("unable to create directory '%s': %w", path, err) return nil, fmt.Errorf("unable to create directory '%s': %w", path, err)
} }
writers, err := makeFileWriters(path, Tables) writers, err := makeFileWriters(path, schema.Tables)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -164,7 +150,7 @@ func (csw *CSVWriter) Loop() {
for { for {
select { select {
case row := <-csw.rows: case row := <-csw.rows:
err := csw.writers.write(&row.table, row.values...) err := csw.writers.write(row.table, row.values...)
if err != nil { if err != nil {
panic(fmt.Sprintf("error writing csv buffer: %v", err)) panic(fmt.Sprintf("error writing csv buffer: %v", err))
} }
@ -204,13 +190,13 @@ func (csw *CSVWriter) Close() error {
func (csw *CSVWriter) upsertNode(node nodeinfo.Info) { func (csw *CSVWriter) upsertNode(node nodeinfo.Info) {
var values []interface{} var values []interface{}
values = append(values, node.GenesisBlock, node.NetworkID, node.ID, node.ClientName, node.ChainID) values = append(values, node.GenesisBlock, node.NetworkID, node.ID, node.ClientName, node.ChainID)
csw.rows <- tableRow{schema.TableNodeInfo, values} csw.rows <- tableRow{&schema.TableNodeInfo, values}
} }
func (csw *CSVWriter) upsertIPLD(ipld models.IPLDModel) { func (csw *CSVWriter) upsertIPLD(ipld models.IPLDModel) {
var values []interface{} var values []interface{}
values = append(values, ipld.BlockNumber, ipld.Key, ipld.Data) values = append(values, ipld.BlockNumber, ipld.Key, ipld.Data)
csw.rows <- tableRow{schema.TableIPLDBlock, values} csw.rows <- tableRow{&schema.TableIPLDBlock, values}
} }
func (csw *CSVWriter) upsertIPLDDirect(blockNumber, key string, value []byte) { func (csw *CSVWriter) upsertIPLDDirect(blockNumber, key string, value []byte) {
@ -231,11 +217,25 @@ func (csw *CSVWriter) upsertIPLDNode(blockNumber string, i ipld.IPLD) {
func (csw *CSVWriter) upsertHeaderCID(header models.HeaderModel) { func (csw *CSVWriter) upsertHeaderCID(header models.HeaderModel) {
var values []interface{} var values []interface{}
values = append(values, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, values = append(values,
header.TotalDifficulty, header.NodeIDs, header.Reward, header.StateRoot, header.TxRoot, header.BlockNumber,
header.RctRoot, header.UnclesHash, header.Bloom, strconv.FormatUint(header.Timestamp, 10), header.Coinbase, header.BlockHash,
header.Canonical) header.ParentHash,
csw.rows <- tableRow{schema.TableHeader, values} header.CID,
header.TotalDifficulty,
header.NodeIDs,
header.Reward,
header.StateRoot,
header.TxRoot,
header.RctRoot,
header.UnclesHash,
header.Bloom,
strconv.FormatUint(header.Timestamp, 10),
header.Coinbase,
header.Canonical,
header.WithdrawalsRoot,
)
csw.rows <- tableRow{&schema.TableHeader, values}
metrics.IndexerMetrics.BlocksCounter.Inc(1) metrics.IndexerMetrics.BlocksCounter.Inc(1)
} }
@ -243,14 +243,14 @@ func (csw *CSVWriter) upsertUncleCID(uncle models.UncleModel) {
var values []interface{} var values []interface{}
values = append(values, uncle.BlockNumber, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, values = append(values, uncle.BlockNumber, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID,
uncle.Reward, uncle.Index) uncle.Reward, uncle.Index)
csw.rows <- tableRow{schema.TableUncle, values} csw.rows <- tableRow{&schema.TableUncle, values}
} }
func (csw *CSVWriter) upsertTransactionCID(transaction models.TxModel) { func (csw *CSVWriter) upsertTransactionCID(transaction models.TxModel) {
var values []interface{} var values []interface{}
values = append(values, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, values = append(values, transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst,
transaction.Src, transaction.Index, transaction.Type, transaction.Value) transaction.Src, transaction.Index, transaction.Type, transaction.Value)
csw.rows <- tableRow{schema.TableTransaction, values} csw.rows <- tableRow{&schema.TableTransaction, values}
metrics.IndexerMetrics.TransactionsCounter.Inc(1) metrics.IndexerMetrics.TransactionsCounter.Inc(1)
} }
@ -258,7 +258,7 @@ func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) {
var values []interface{} var values []interface{}
values = append(values, rct.BlockNumber, rct.HeaderID, rct.TxID, rct.CID, rct.Contract, values = append(values, rct.BlockNumber, rct.HeaderID, rct.TxID, rct.CID, rct.Contract,
rct.PostState, rct.PostStatus) rct.PostState, rct.PostStatus)
csw.rows <- tableRow{schema.TableReceipt, values} csw.rows <- tableRow{&schema.TableReceipt, values}
metrics.IndexerMetrics.ReceiptsCounter.Inc(1) metrics.IndexerMetrics.ReceiptsCounter.Inc(1)
} }
@ -267,11 +267,26 @@ func (csw *CSVWriter) upsertLogCID(logs []*models.LogsModel) {
var values []interface{} var values []interface{}
values = append(values, l.BlockNumber, l.HeaderID, l.CID, l.ReceiptID, l.Address, l.Index, l.Topic0, values = append(values, l.BlockNumber, l.HeaderID, l.CID, l.ReceiptID, l.Address, l.Index, l.Topic0,
l.Topic1, l.Topic2, l.Topic3) l.Topic1, l.Topic2, l.Topic3)
csw.rows <- tableRow{schema.TableLog, values} csw.rows <- tableRow{&schema.TableLog, values}
metrics.IndexerMetrics.LogsCounter.Inc(1) metrics.IndexerMetrics.LogsCounter.Inc(1)
} }
} }
func (csw *CSVWriter) upsertWithdrawalCID(withdrawal models.WithdrawalModel) {
var values []interface{}
values = append(values,
withdrawal.BlockNumber,
withdrawal.HeaderID,
withdrawal.CID,
withdrawal.Index,
withdrawal.Validator,
withdrawal.Address,
withdrawal.Amount,
)
csw.rows <- tableRow{&schema.TableWithdrawal, values}
metrics.IndexerMetrics.WithdrawalsCounter.Inc(1)
}
func (csw *CSVWriter) upsertStateCID(stateNode models.StateNodeModel) { func (csw *CSVWriter) upsertStateCID(stateNode models.StateNodeModel) {
balance := stateNode.Balance balance := stateNode.Balance
if stateNode.Removed { if stateNode.Removed {
@ -281,14 +296,14 @@ func (csw *CSVWriter) upsertStateCID(stateNode models.StateNodeModel) {
var values []interface{} var values []interface{}
values = append(values, stateNode.BlockNumber, stateNode.HeaderID, stateNode.StateKey, stateNode.CID, values = append(values, stateNode.BlockNumber, stateNode.HeaderID, stateNode.StateKey, stateNode.CID,
csw.isDiff, balance, strconv.FormatUint(stateNode.Nonce, 10), stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed) csw.isDiff, balance, strconv.FormatUint(stateNode.Nonce, 10), stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed)
csw.rows <- tableRow{schema.TableStateNode, values} csw.rows <- tableRow{&schema.TableStateNode, values}
} }
func (csw *CSVWriter) upsertStorageCID(storageCID models.StorageNodeModel) { func (csw *CSVWriter) upsertStorageCID(storageCID models.StorageNodeModel) {
var values []interface{} var values []interface{}
values = append(values, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID, values = append(values, storageCID.BlockNumber, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID,
csw.isDiff, storageCID.Value, storageCID.Removed) csw.isDiff, storageCID.Value, storageCID.Removed)
csw.rows <- tableRow{schema.TableStorageNode, values} csw.rows <- tableRow{&schema.TableStorageNode, values}
} }
// LoadWatchedAddresses loads watched addresses from a file // LoadWatchedAddresses loads watched addresses from a file

View File

@ -84,7 +84,7 @@ func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config, nodeInf
if _, err := os.Stat(outputDir); !errors.Is(err, os.ErrNotExist) { if _, err := os.Stat(outputDir); !errors.Is(err, os.ErrNotExist) {
return nil, fmt.Errorf("cannot create output directory, directory (%s) already exists", outputDir) return nil, fmt.Errorf("cannot create output directory, directory (%s) already exists", outputDir)
} }
log.Info("Writing statediff CSV files to directory", "file", outputDir) log.Info("Writing statediff CSV files", "directory", outputDir)
if watchedAddressesFilePath == "" { if watchedAddressesFilePath == "" {
watchedAddressesFilePath = defaultWatchedAddressesCSVFilePath watchedAddressesFilePath = defaultWatchedAddressesCSVFilePath
@ -156,7 +156,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
} }
// Generate the block iplds // Generate the block iplds
txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts) txNodes, rctNodes, logNodes, wdNodes, err := ipld.FromBlockAndReceipts(block, receipts)
if err != nil { if err != nil {
return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err) return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err)
} }
@ -197,15 +197,16 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String()) traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
t = time.Now() t = time.Now()
// write receipts and txs err = sdi.processObjects(processArgs{
err = sdi.processReceiptsAndTxs(processArgs{
headerID: headerID, headerID: headerID,
blockNumber: block.Number(), blockNumber: block.Number(),
receipts: receipts, receipts: receipts,
txs: transactions, txs: transactions,
withdrawals: block.Withdrawals(),
rctNodes: rctNodes, rctNodes: rctNodes,
txNodes: txNodes, txNodes: txNodes,
logNodes: logNodes, logNodes: logNodes,
wdNodes: wdNodes,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@ -222,7 +223,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
// it returns the headerID // it returns the headerID
func (sdi *StateDiffIndexer) PushHeader(_ interfaces.Batch, header *types.Header, reward, td *big.Int) (string, error) { func (sdi *StateDiffIndexer) PushHeader(_ interfaces.Batch, header *types.Header, reward, td *big.Int) (string, error) {
// Process the header // Process the header
headerNode, err := ipld.NewEthHeader(header) headerNode, err := ipld.EncodeHeader(header)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -245,6 +246,7 @@ func (sdi *StateDiffIndexer) PushHeader(_ interfaces.Batch, header *types.Header
Timestamp: header.Time, Timestamp: header.Time,
Coinbase: header.Coinbase.String(), Coinbase: header.Coinbase.String(),
Canonical: true, Canonical: true,
WithdrawalsRoot: shared.MaybeStringHash(header.WithdrawalsHash),
}) })
return headerID, nil return headerID, nil
} }
@ -293,13 +295,15 @@ type processArgs struct {
blockTime uint64 blockTime uint64
receipts types.Receipts receipts types.Receipts
txs types.Transactions txs types.Transactions
rctNodes []*ipld.EthReceipt withdrawals types.Withdrawals
txNodes []*ipld.EthTx rctNodes []ipld.IPLD
logNodes [][]*ipld.EthLog txNodes []ipld.IPLD
logNodes [][]ipld.IPLD
wdNodes []ipld.IPLD
} }
// processReceiptsAndTxs writes receipt and tx IPLD insert SQL stmts to a file // processObjects writes receipt and tx IPLD insert SQL stmts to a file
func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error { func (sdi *StateDiffIndexer) processObjects(args processArgs) error {
// Process receipts and txs // Process receipts and txs
signer := types.MakeSigner(sdi.chainConfig, args.blockNumber, args.blockTime) signer := types.MakeSigner(sdi.chainConfig, args.blockNumber, args.blockTime)
for i, receipt := range args.receipts { for i, receipt := range args.receipts {
@ -376,6 +380,21 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
} }
sdi.fileWriter.upsertLogCID(logDataSet) sdi.fileWriter.upsertLogCID(logDataSet)
} }
// Process withdrawals
for i, wd := range args.withdrawals {
wdNode := args.wdNodes[i]
sdi.fileWriter.upsertIPLDNode(args.blockNumber.String(), wdNode)
wdModel := models.WithdrawalModel{
BlockNumber: args.blockNumber.String(),
HeaderID: args.headerID,
CID: wdNode.Cid().String(),
Index: wd.Index,
Validator: wd.Validator,
Address: wd.Address.String(),
Amount: wd.Amount,
}
sdi.fileWriter.upsertWithdrawalCID(wdModel)
}
return nil return nil
} }

View File

@ -41,6 +41,7 @@ type FileWriter interface {
upsertTransactionCID(transaction models.TxModel) upsertTransactionCID(transaction models.TxModel)
upsertReceiptCID(rct *models.ReceiptModel) upsertReceiptCID(rct *models.ReceiptModel)
upsertLogCID(logs []*models.LogsModel) upsertLogCID(logs []*models.LogsModel)
upsertWithdrawalCID(models.WithdrawalModel)
upsertStateCID(stateNode models.StateNodeModel) upsertStateCID(stateNode models.StateNodeModel)
upsertStorageCID(storageCID models.StorageNodeModel) upsertStorageCID(storageCID models.StorageNodeModel)
upsertIPLD(ipld models.IPLDModel) upsertIPLD(ipld models.IPLDModel)

View File

@ -93,6 +93,14 @@ func TestSQLFileIndexer(t *testing.T) {
test.DoTestPublishAndIndexReceiptIPLDs(t, db) test.DoTestPublishAndIndexReceiptIPLDs(t, db)
}) })
t.Run("Publish and index withdrawal IPLDs in a single tx", func(t *testing.T) {
setup(t)
dumpFileData(t)
defer tearDown(t)
test.DoTestPublishAndIndexWithdrawalIPLDs(t, db)
})
t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) { t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) {
setup(t) setup(t)
dumpFileData(t) dumpFileData(t)

View File

@ -32,6 +32,7 @@ import (
"github.com/cerc-io/plugeth-statediff/indexer/ipld" "github.com/cerc-io/plugeth-statediff/indexer/ipld"
"github.com/cerc-io/plugeth-statediff/indexer/models" "github.com/cerc-io/plugeth-statediff/indexer/models"
nodeinfo "github.com/cerc-io/plugeth-statediff/indexer/node" nodeinfo "github.com/cerc-io/plugeth-statediff/indexer/node"
"github.com/cerc-io/plugeth-statediff/indexer/shared/schema"
"github.com/cerc-io/plugeth-statediff/types" "github.com/cerc-io/plugeth-statediff/types"
) )
@ -145,8 +146,8 @@ const (
ipldInsert = "INSERT INTO ipld.blocks (block_number, key, data) VALUES ('%s', '%s', '\\x%x');\n" ipldInsert = "INSERT INTO ipld.blocks (block_number, key, data) VALUES ('%s', '%s', '\\x%x');\n"
headerInsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, " + headerInsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, " +
"state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase, canonical) VALUES " + "state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase, canonical, withdrawals_root) VALUES " +
"('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %t);\n" "('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\\x%x', %d, '%s', %t, '%s');\n"
uncleInsert = "INSERT INTO eth.uncle_cids (block_number, block_hash, header_id, parent_hash, cid, reward, index) VALUES " + uncleInsert = "INSERT INTO eth.uncle_cids (block_number, block_hash, header_id, parent_hash, cid, reward, index) VALUES " +
"('%s', '%s', '%s', '%s', '%s', '%s', %d);\n" "('%s', '%s', '%s', '%s', '%s', '%s', %d);\n"
@ -167,6 +168,10 @@ const (
"removed, diff, val) VALUES ('%s', '%s', '%s', '%s', '%s', %t, %t, '\\x%x');\n" "removed, diff, val) VALUES ('%s', '%s', '%s', '%s', '%s', %t, %t, '\\x%x');\n"
) )
var (
withdrawalsInsert = schema.TableWithdrawal.FmtStringInsert() + ";\n"
)
func (sqw *SQLWriter) upsertNode(node nodeinfo.Info) { func (sqw *SQLWriter) upsertNode(node nodeinfo.Info) {
sqw.stmts <- []byte(fmt.Sprintf(nodeInsert, node.GenesisBlock, node.NetworkID, node.ID, node.ClientName, node.ChainID)) sqw.stmts <- []byte(fmt.Sprintf(nodeInsert, node.GenesisBlock, node.NetworkID, node.ID, node.ClientName, node.ChainID))
} }
@ -192,9 +197,24 @@ func (sqw *SQLWriter) upsertIPLDNode(blockNumber string, i ipld.IPLD) {
} }
func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) { func (sqw *SQLWriter) upsertHeaderCID(header models.HeaderModel) {
stmt := fmt.Sprintf(headerInsert, header.BlockNumber, header.BlockHash, header.ParentHash, header.CID, stmt := fmt.Sprintf(headerInsert,
header.TotalDifficulty, formatPostgresStringArray(header.NodeIDs), header.Reward, header.StateRoot, header.TxRoot, header.BlockNumber,
header.RctRoot, header.UnclesHash, header.Bloom, header.Timestamp, header.Coinbase, header.Canonical) header.BlockHash,
header.ParentHash,
header.CID,
header.TotalDifficulty,
formatPostgresStringArray(header.NodeIDs),
header.Reward,
header.StateRoot,
header.TxRoot,
header.RctRoot,
header.UnclesHash,
header.Bloom,
header.Timestamp,
header.Coinbase,
header.Canonical,
header.WithdrawalsRoot,
)
sqw.stmts <- []byte(stmt) sqw.stmts <- []byte(stmt)
metrics.IndexerMetrics.BlocksCounter.Inc(1) metrics.IndexerMetrics.BlocksCounter.Inc(1)
} }
@ -224,6 +244,19 @@ func (sqw *SQLWriter) upsertLogCID(logs []*models.LogsModel) {
} }
} }
func (sqw *SQLWriter) upsertWithdrawalCID(withdrawal models.WithdrawalModel) {
sqw.stmts <- []byte(fmt.Sprintf(withdrawalsInsert,
withdrawal.BlockNumber,
withdrawal.HeaderID,
withdrawal.CID,
withdrawal.Index,
withdrawal.Validator,
withdrawal.Address,
withdrawal.Amount,
))
metrics.IndexerMetrics.WithdrawalsCounter.Inc(1)
}
func (sqw *SQLWriter) upsertStateCID(stateNode models.StateNodeModel) { func (sqw *SQLWriter) upsertStateCID(stateNode models.StateNodeModel) {
balance := stateNode.Balance balance := stateNode.Balance
if stateNode.Removed { if stateNode.Removed {

View File

@ -56,6 +56,8 @@ type IndexerMetricsHandles struct {
ReceiptsCounter metrics.Counter ReceiptsCounter metrics.Counter
// The total number of processed logs // The total number of processed logs
LogsCounter metrics.Counter LogsCounter metrics.Counter
// The total number of processed logs
WithdrawalsCounter metrics.Counter
// The total number of access list entries processed // The total number of access list entries processed
AccessListEntriesCounter metrics.Counter AccessListEntriesCounter metrics.Counter
// Time spent waiting for free postgres tx // Time spent waiting for free postgres tx
@ -90,6 +92,7 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles {
TransactionsCounter: metrics.NewCounter(), TransactionsCounter: metrics.NewCounter(),
ReceiptsCounter: metrics.NewCounter(), ReceiptsCounter: metrics.NewCounter(),
LogsCounter: metrics.NewCounter(), LogsCounter: metrics.NewCounter(),
WithdrawalsCounter: metrics.NewCounter(),
AccessListEntriesCounter: metrics.NewCounter(), AccessListEntriesCounter: metrics.NewCounter(),
FreePostgresTimer: metrics.NewTimer(), FreePostgresTimer: metrics.NewTimer(),
PostgresCommitTimer: metrics.NewTimer(), PostgresCommitTimer: metrics.NewTimer(),
@ -113,6 +116,7 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles {
reg.Register(metricName(subsys, "transactions"), ctx.TransactionsCounter) reg.Register(metricName(subsys, "transactions"), ctx.TransactionsCounter)
reg.Register(metricName(subsys, "receipts"), ctx.ReceiptsCounter) reg.Register(metricName(subsys, "receipts"), ctx.ReceiptsCounter)
reg.Register(metricName(subsys, "logs"), ctx.LogsCounter) reg.Register(metricName(subsys, "logs"), ctx.LogsCounter)
reg.Register(metricName(subsys, "withdrawals"), ctx.WithdrawalsCounter)
reg.Register(metricName(subsys, "access_list_entries"), ctx.AccessListEntriesCounter) reg.Register(metricName(subsys, "access_list_entries"), ctx.AccessListEntriesCounter)
reg.Register(metricName(subsys, "t_free_postgres"), ctx.FreePostgresTimer) reg.Register(metricName(subsys, "t_free_postgres"), ctx.FreePostgresTimer)
reg.Register(metricName(subsys, "t_postgres_commit"), ctx.PostgresCommitTimer) reg.Register(metricName(subsys, "t_postgres_commit"), ctx.PostgresCommitTimer)

View File

@ -105,7 +105,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
} }
// Generate the block iplds // Generate the block iplds
txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts) txNodes, rctNodes, logNodes, wdNodes, err := ipld.FromBlockAndReceipts(block, receipts)
if err != nil { if err != nil {
return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %w", err) return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %w", err)
} }
@ -148,16 +148,18 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
} }
metrics2.IndexerMetrics.UncleProcessingTimer.Update(time.Since(t)) metrics2.IndexerMetrics.UncleProcessingTimer.Update(time.Since(t))
t = time.Now() t = time.Now()
// Publish and index receipts and txs
err = sdi.processReceiptsAndTxs(batch, processArgs{ err = sdi.processObjects(batch, processArgs{
headerID: headerID, headerID: headerID,
blockNumber: block.Number(), blockNumber: block.Number(),
blockTime: block.Time(), blockTime: block.Time(),
receipts: receipts, receipts: receipts,
withdrawals: block.Withdrawals(),
txs: transactions, txs: transactions,
rctNodes: rctNodes, rctNodes: rctNodes,
txNodes: txNodes, txNodes: txNodes,
logNodes: logNodes, logNodes: logNodes,
wdNodes: wdNodes,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@ -185,7 +187,7 @@ func (sdi *StateDiffIndexer) PushHeader(batch interfaces.Batch, header *types.He
return "", fmt.Errorf("sql: batch is expected to be of type %T, got %T", &BatchTx{}, batch) return "", fmt.Errorf("sql: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
} }
// Process the header // Process the header
headerNode, err := ipld.NewEthHeader(header) headerNode, err := ipld.EncodeHeader(header)
if err != nil { if err != nil {
return "", err return "", err
} }
@ -208,6 +210,7 @@ func (sdi *StateDiffIndexer) PushHeader(batch interfaces.Batch, header *types.He
Timestamp: header.Time, Timestamp: header.Time,
Coinbase: header.Coinbase.String(), Coinbase: header.Coinbase.String(),
Canonical: true, Canonical: true,
WithdrawalsRoot: shared.MaybeStringHash(header.WithdrawalsHash),
}) })
} }
@ -258,13 +261,15 @@ type processArgs struct {
blockTime uint64 blockTime uint64
receipts types.Receipts receipts types.Receipts
txs types.Transactions txs types.Transactions
rctNodes []*ipld.EthReceipt withdrawals types.Withdrawals
txNodes []*ipld.EthTx rctNodes []ipld.IPLD
logNodes [][]*ipld.EthLog txNodes []ipld.IPLD
logNodes [][]ipld.IPLD
wdNodes []ipld.IPLD
} }
// processReceiptsAndTxs publishes and indexes receipt and transaction IPLDs in Postgres // processObjects publishes and indexes receipt and transaction IPLDs in Postgres
func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs) error { func (sdi *StateDiffIndexer) processObjects(tx *BatchTx, args processArgs) error {
// Process receipts and txs // Process receipts and txs
signer := types.MakeSigner(sdi.chainConfig, args.blockNumber, args.blockTime) signer := types.MakeSigner(sdi.chainConfig, args.blockNumber, args.blockTime)
for i, receipt := range args.receipts { for i, receipt := range args.receipts {
@ -348,7 +353,23 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *BatchTx, args processArgs
return err return err
} }
} }
// Process withdrawals
for i, withdrawal := range args.withdrawals {
wdNode := args.wdNodes[i]
tx.cacheIPLD(wdNode)
wdModel := models.WithdrawalModel{
BlockNumber: args.blockNumber.String(),
HeaderID: args.headerID,
CID: wdNode.Cid().String(),
Index: withdrawal.Index,
Validator: withdrawal.Validator,
Address: withdrawal.Address.String(),
Amount: withdrawal.Amount,
}
if err := sdi.dbWriter.upsertWithdrawalCID(tx.dbtx, wdModel); err != nil {
return err
}
}
return nil return nil
} }

View File

@ -54,6 +54,7 @@ type Statements interface {
InsertTxStm() string InsertTxStm() string
InsertRctStm() string InsertRctStm() string
InsertLogStm() string InsertLogStm() string
InsertWithdrawalStm() string
InsertStateStm() string InsertStateStm() string
InsertStorageStm() string InsertStorageStm() string
InsertIPLDStm() string InsertIPLDStm() string

View File

@ -96,6 +96,14 @@ func TestPGXIndexer(t *testing.T) {
test.DoTestPublishAndIndexReceiptIPLDs(t, db) test.DoTestPublishAndIndexReceiptIPLDs(t, db)
}) })
t.Run("Publish and index withdrawal IPLDs in a single tx", func(t *testing.T) {
setupPGX(t)
defer tearDown(t)
defer checkTxClosure(t, 1, 0, 1)
test.DoTestPublishAndIndexWithdrawalIPLDs(t, db)
})
t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) { t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) {
setupPGX(t) setupPGX(t)
defer tearDown(t) defer tearDown(t)

View File

@ -18,6 +18,7 @@ package postgres
import ( import (
"fmt" "fmt"
"strings"
"github.com/cerc-io/plugeth-statediff/indexer/database/sql" "github.com/cerc-io/plugeth-statediff/indexer/database/sql"
"github.com/cerc-io/plugeth-statediff/indexer/shared/schema" "github.com/cerc-io/plugeth-statediff/indexer/shared/schema"
@ -43,7 +44,9 @@ type DB struct {
// MaxHeaderStm satisfies the sql.Statements interface // MaxHeaderStm satisfies the sql.Statements interface
func (db *DB) MaxHeaderStm() string { func (db *DB) MaxHeaderStm() string {
return fmt.Sprintf("SELECT block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase FROM %s ORDER BY block_number DESC LIMIT 1", schema.TableHeader.Name) return fmt.Sprintf("SELECT %s FROM %s ORDER BY block_number DESC LIMIT 1",
strings.Join(schema.TableHeader.ColumnNames(), ","),
schema.TableHeader.Name)
} }
// ExistsHeaderStm satisfies the sql.Statements interface // ExistsHeaderStm satisfies the sql.Statements interface
@ -59,7 +62,7 @@ func (db *DB) DetectGapsStm() string {
// InsertHeaderStm satisfies the sql.Statements interface // InsertHeaderStm satisfies the sql.Statements interface
// Stm == Statement // Stm == Statement
func (db *DB) InsertHeaderStm() string { func (db *DB) InsertHeaderStm() string {
return schema.TableHeader.ToInsertStatement(db.upsert) return schema.TableHeader.PreparedInsert(db.upsert)
} }
// SetCanonicalHeaderStm satisfies the sql.Statements interface // SetCanonicalHeaderStm satisfies the sql.Statements interface
@ -70,37 +73,42 @@ func (db *DB) SetCanonicalHeaderStm() string {
// InsertUncleStm satisfies the sql.Statements interface // InsertUncleStm satisfies the sql.Statements interface
func (db *DB) InsertUncleStm() string { func (db *DB) InsertUncleStm() string {
return schema.TableUncle.ToInsertStatement(db.upsert) return schema.TableUncle.PreparedInsert(db.upsert)
} }
// InsertTxStm satisfies the sql.Statements interface // InsertTxStm satisfies the sql.Statements interface
func (db *DB) InsertTxStm() string { func (db *DB) InsertTxStm() string {
return schema.TableTransaction.ToInsertStatement(db.upsert) return schema.TableTransaction.PreparedInsert(db.upsert)
} }
// InsertRctStm satisfies the sql.Statements interface // InsertRctStm satisfies the sql.Statements interface
func (db *DB) InsertRctStm() string { func (db *DB) InsertRctStm() string {
return schema.TableReceipt.ToInsertStatement(db.upsert) return schema.TableReceipt.PreparedInsert(db.upsert)
} }
// InsertLogStm satisfies the sql.Statements interface // InsertLogStm satisfies the sql.Statements interface
func (db *DB) InsertLogStm() string { func (db *DB) InsertLogStm() string {
return schema.TableLog.ToInsertStatement(db.upsert) return schema.TableLog.PreparedInsert(db.upsert)
}
// InsertLogStm satisfies the sql.Statements interface
func (db *DB) InsertWithdrawalStm() string {
return schema.TableWithdrawal.PreparedInsert(db.upsert)
} }
// InsertStateStm satisfies the sql.Statements interface // InsertStateStm satisfies the sql.Statements interface
func (db *DB) InsertStateStm() string { func (db *DB) InsertStateStm() string {
return schema.TableStateNode.ToInsertStatement(db.upsert) return schema.TableStateNode.PreparedInsert(db.upsert)
} }
// InsertStorageStm satisfies the sql.Statements interface // InsertStorageStm satisfies the sql.Statements interface
func (db *DB) InsertStorageStm() string { func (db *DB) InsertStorageStm() string {
return schema.TableStorageNode.ToInsertStatement(db.upsert) return schema.TableStorageNode.PreparedInsert(db.upsert)
} }
// InsertIPLDStm satisfies the sql.Statements interface // InsertIPLDStm satisfies the sql.Statements interface
func (db *DB) InsertIPLDStm() string { func (db *DB) InsertIPLDStm() string {
return schema.TableIPLDBlock.ToInsertStatement(db.upsert) return schema.TableIPLDBlock.PreparedInsert(db.upsert)
} }
// InsertIPLDsStm satisfies the sql.Statements interface // InsertIPLDsStm satisfies the sql.Statements interface

View File

@ -82,6 +82,14 @@ func TestSQLXIndexer(t *testing.T) {
test.DoTestPublishAndIndexReceiptIPLDs(t, db) test.DoTestPublishAndIndexReceiptIPLDs(t, db)
}) })
t.Run("Publish and index withdrawal IPLDs in a single tx", func(t *testing.T) {
setupSQLX(t)
defer tearDown(t)
defer checkTxClosure(t, 0, 0, 0)
test.DoTestPublishAndIndexWithdrawalIPLDs(t, db)
})
t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) { t.Run("Publish and index state IPLDs in a single tx", func(t *testing.T) {
setupSQLX(t) setupSQLX(t)
defer tearDown(t) defer tearDown(t)

View File

@ -95,6 +95,7 @@ func (w *Writer) maxHeader() (*models.HeaderModel, error) {
&model.Timestamp, &model.Timestamp,
&model.Coinbase, &model.Coinbase,
&model.Canonical, &model.Canonical,
&model.WithdrawalsRoot,
) )
model.BlockNumber = strconv.FormatUint(number, 10) model.BlockNumber = strconv.FormatUint(number, 10)
model.TotalDifficulty = strconv.FormatUint(td, 10) model.TotalDifficulty = strconv.FormatUint(td, 10)
@ -125,6 +126,7 @@ func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error {
header.Timestamp, header.Timestamp,
header.Coinbase, header.Coinbase,
header.Canonical, header.Canonical,
header.WithdrawalsRoot,
) )
if err != nil { if err != nil {
return insertError{"eth.header_cids", err, w.db.InsertHeaderStm(), header} return insertError{"eth.header_cids", err, w.db.InsertHeaderStm(), header}
@ -286,6 +288,41 @@ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error {
return nil return nil
} }
func (w *Writer) upsertWithdrawalCID(tx Tx, withdrawal models.WithdrawalModel) error {
if w.useCopyForTx(tx) {
blockNum, err := strconv.ParseUint(withdrawal.BlockNumber, 10, 64)
if err != nil {
return insertError{"eth.withdrawal_cids", err, "COPY", withdrawal}
}
_, err = tx.CopyFrom(w.db.Context(), schema.TableWithdrawal.TableName(), schema.TableWithdrawal.ColumnNames(),
toRows(toRow(blockNum,
withdrawal.HeaderID,
withdrawal.CID,
withdrawal.Index,
withdrawal.Validator,
withdrawal.Address,
withdrawal.Amount)))
if err != nil {
return insertError{"eth.withdrawal_cids", err, "COPY", withdrawal}
}
} else {
_, err := tx.Exec(w.db.Context(), w.db.InsertWithdrawalStm(),
withdrawal.BlockNumber,
withdrawal.HeaderID,
withdrawal.CID,
withdrawal.Index,
withdrawal.Validator,
withdrawal.Address,
withdrawal.Amount)
if err != nil {
return insertError{"eth.withdrawal_cids", err, w.db.InsertWithdrawalStm(), withdrawal}
}
}
metrics.IndexerMetrics.WithdrawalsCounter.Inc(1)
return nil
}
/* /*
INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, removed, diff, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, removed, diff, balance, nonce, code_hash, storage_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (header_id, state_leaf_key, block_number) DO NOTHING ON CONFLICT (header_id, state_leaf_key, block_number) DO NOTHING

102
indexer/ipld/encode.go Normal file
View File

@ -0,0 +1,102 @@
// VulcanizeDB
// Copyright © 2024 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ipld
import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
mh "github.com/multiformats/go-multihash"
)
// EncodeHeader converts a *types.Header into an IPLD node
func EncodeHeader(header *types.Header) (IPLD, error) {
headerRLP, err := rlp.EncodeToBytes(header)
if err != nil {
return nil, err
}
c, err := RawdataToCid(MEthHeader, headerRLP, mh.KECCAK_256)
if err != nil {
return nil, err
}
return &node{
cid: c,
rawdata: headerRLP,
}, nil
}
// encodeTx converts a *types.Transaction to an IPLD node
func encodeTx(tx *types.Transaction) (IPLD, error) {
txRaw, err := tx.MarshalBinary()
if err != nil {
return nil, err
}
c, err := RawdataToCid(MEthTx, txRaw, mh.KECCAK_256)
if err != nil {
return nil, err
}
return &node{
cid: c,
rawdata: txRaw,
}, nil
}
// encodeReceipt converts a types.Receipt to an IPLD node
func encodeReceipt(receipt *types.Receipt) (IPLD, error) {
rctRaw, err := receipt.MarshalBinary()
if err != nil {
return nil, err
}
c, err := RawdataToCid(MEthTxReceipt, rctRaw, mh.KECCAK_256)
if err != nil {
return nil, err
}
return &node{
cid: c,
rawdata: rctRaw,
}, nil
}
// encodeLog converts a Log to an IPLD node
func encodeLog(log *types.Log) (IPLD, error) {
logRaw, err := rlp.EncodeToBytes(log)
if err != nil {
return nil, err
}
c, err := RawdataToCid(MEthLog, logRaw, mh.KECCAK_256)
if err != nil {
return nil, err
}
return &node{
cid: c,
rawdata: logRaw,
}, nil
}
func encodeWithdrawal(w *types.Withdrawal) (IPLD, error) {
wRaw, err := rlp.EncodeToBytes(w)
if err != nil {
return nil, err
}
c, err := RawdataToCid(MEthWithdrawal, wRaw, mh.KECCAK_256)
if err != nil {
return nil, err
}
return &node{
cid: c,
rawdata: wRaw,
}, nil
}

View File

@ -1,60 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ipld
import (
"github.com/ipfs/go-cid"
mh "github.com/multiformats/go-multihash"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
)
// EthHeader (eth-block, codec 0x90), represents an ethereum block header
type EthHeader struct {
cid cid.Cid
rawdata []byte
}
// Static (compile time) check that EthHeader satisfies the node.Node interface.
var _ IPLD = (*EthHeader)(nil)
// NewEthHeader converts a *types.Header into an EthHeader IPLD node
func NewEthHeader(header *types.Header) (*EthHeader, error) {
headerRLP, err := rlp.EncodeToBytes(header)
if err != nil {
return nil, err
}
c, err := RawdataToCid(MEthHeader, headerRLP, mh.KECCAK_256)
if err != nil {
return nil, err
}
return &EthHeader{
cid: c,
rawdata: headerRLP,
}, nil
}
// RawData returns the binary of the RLP encode of the block header.
func (b *EthHeader) RawData() []byte {
return b.rawdata
}
// Cid returns the cid of the block header.
func (b *EthHeader) Cid() cid.Cid {
return b.cid
}

View File

@ -1,43 +0,0 @@
package ipld
import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ipfs/go-cid"
mh "github.com/multiformats/go-multihash"
)
// EthLog (eth-log, codec 0x9a), represents an ethereum block header
type EthLog struct {
rawData []byte
cid cid.Cid
}
// Static (compile time) check that EthLog satisfies the node.Node interface.
var _ IPLD = (*EthLog)(nil)
// NewLog create a new EthLog IPLD node
func NewLog(log *types.Log) (*EthLog, error) {
logRaw, err := rlp.EncodeToBytes(log)
if err != nil {
return nil, err
}
c, err := RawdataToCid(MEthLog, logRaw, mh.KECCAK_256)
if err != nil {
return nil, err
}
return &EthLog{
cid: c,
rawData: logRaw,
}, nil
}
// RawData returns the binary of the RLP encode of the log.
func (l *EthLog) RawData() []byte {
return l.rawData
}
// Cid returns the cid of the receipt log.
func (l *EthLog) Cid() cid.Cid {
return l.cid
}

View File

@ -22,25 +22,29 @@ import (
// FromBlockAndReceipts takes a block and processes it // FromBlockAndReceipts takes a block and processes it
// to return it a set of IPLD nodes for further processing. // to return it a set of IPLD nodes for further processing.
func FromBlockAndReceipts(block *types.Block, receipts []*types.Receipt) ([]*EthTx, []*EthReceipt, [][]*EthLog, error) { func FromBlockAndReceipts(block *types.Block, receipts []*types.Receipt) ([]IPLD, []IPLD, [][]IPLD, []IPLD, error) {
// Process the txs // Process the txs
txNodes, err := processTransactions(block.Transactions()) txNodes, err := processTransactions(block.Transactions())
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, nil, err
}
withdrawalNodes, err := processWithdrawals(block.Withdrawals())
if err != nil {
return nil, nil, nil, nil, err
} }
// Process the receipts and logs // Process the receipts and logs
rctNodes, logNodes, err := processReceiptsAndLogs(receipts) rctNodes, logNodes, err := processReceiptsAndLogs(receipts)
return txNodes, rctNodes, logNodes, err return txNodes, rctNodes, logNodes, withdrawalNodes, err
} }
// processTransactions will take the found transactions in a parsed block body // processTransactions will take the found transactions in a parsed block body
// to return IPLD node slices for eth-tx // to return IPLD node slices for eth-tx
func processTransactions(txs []*types.Transaction) ([]*EthTx, error) { func processTransactions(txs []*types.Transaction) ([]IPLD, error) {
var ethTxNodes []*EthTx var ethTxNodes []IPLD
for _, tx := range txs { for _, tx := range txs {
ethTx, err := NewEthTx(tx) ethTx, err := encodeTx(tx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -50,12 +54,25 @@ func processTransactions(txs []*types.Transaction) ([]*EthTx, error) {
return ethTxNodes, nil return ethTxNodes, nil
} }
func processWithdrawals(withdrawals []*types.Withdrawal) ([]IPLD, error) {
var withdrawalNodes []IPLD
for _, withdrawal := range withdrawals {
ethW, err := encodeWithdrawal(withdrawal)
if err != nil {
return nil, err
}
withdrawalNodes = append(withdrawalNodes, ethW)
}
return withdrawalNodes, nil
}
// processReceiptsAndLogs will take in receipts // processReceiptsAndLogs will take in receipts
// to return IPLD node slices for eth-rct and eth-log // to return IPLD node slices for eth-rct and eth-log
func processReceiptsAndLogs(rcts []*types.Receipt) ([]*EthReceipt, [][]*EthLog, error) { func processReceiptsAndLogs(rcts []*types.Receipt) ([]IPLD, [][]IPLD, error) {
// Pre allocating memory. // Pre allocating memory.
ethRctNodes := make([]*EthReceipt, len(rcts)) ethRctNodes := make([]IPLD, len(rcts))
ethLogNodes := make([][]*EthLog, len(rcts)) ethLogNodes := make([][]IPLD, len(rcts))
for idx, rct := range rcts { for idx, rct := range rcts {
logNodes, err := processLogs(rct.Logs) logNodes, err := processLogs(rct.Logs)
@ -63,7 +80,7 @@ func processReceiptsAndLogs(rcts []*types.Receipt) ([]*EthReceipt, [][]*EthLog,
return nil, nil, err return nil, nil, err
} }
ethRct, err := NewReceipt(rct) ethRct, err := encodeReceipt(rct)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -75,10 +92,10 @@ func processReceiptsAndLogs(rcts []*types.Receipt) ([]*EthReceipt, [][]*EthLog,
return ethRctNodes, ethLogNodes, nil return ethRctNodes, ethLogNodes, nil
} }
func processLogs(logs []*types.Log) ([]*EthLog, error) { func processLogs(logs []*types.Log) ([]IPLD, error) {
logNodes := make([]*EthLog, len(logs)) logNodes := make([]IPLD, len(logs))
for idx, log := range logs { for idx, log := range logs {
logNode, err := NewLog(log) logNode, err := encodeLog(log)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -92,7 +92,7 @@ func loadBlockData(t *testing.T) []testCase {
func TestFromBlockAndReceipts(t *testing.T) { func TestFromBlockAndReceipts(t *testing.T) {
testCases := loadBlockData(t) testCases := loadBlockData(t)
for _, tc := range testCases { for _, tc := range testCases {
_, _, _, err := FromBlockAndReceipts(tc.block, tc.receipts) _, _, _, _, err := FromBlockAndReceipts(tc.block, tc.receipts)
if err != nil { if err != nil {
t.Fatalf("error generating IPLDs from block and receipts, err %v, kind %s, block hash %s", err, tc.kind, tc.block.Hash()) t.Fatalf("error generating IPLDs from block and receipts, err %v, kind %s, block hash %s", err, tc.kind, tc.block.Hash())
} }

View File

@ -1,58 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ipld
import (
"github.com/ipfs/go-cid"
mh "github.com/multiformats/go-multihash"
"github.com/ethereum/go-ethereum/core/types"
)
type EthReceipt struct {
rawdata []byte
cid cid.Cid
}
// Static (compile time) check that EthReceipt satisfies the node.Node interface.
var _ IPLD = (*EthReceipt)(nil)
// NewReceipt converts a types.ReceiptForStorage to an EthReceipt IPLD node
func NewReceipt(receipt *types.Receipt) (*EthReceipt, error) {
rctRaw, err := receipt.MarshalBinary()
if err != nil {
return nil, err
}
c, err := RawdataToCid(MEthTxReceipt, rctRaw, mh.KECCAK_256)
if err != nil {
return nil, err
}
return &EthReceipt{
cid: c,
rawdata: rctRaw,
}, nil
}
// RawData returns the binary of the RLP encode of the receipt.
func (r *EthReceipt) RawData() []byte {
return r.rawdata
}
// Cid returns the cid of the receipt.
func (r *EthReceipt) Cid() cid.Cid {
return r.cid
}

View File

@ -1,59 +0,0 @@
// VulcanizeDB
// Copyright © 2019 Vulcanize
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package ipld
import (
"github.com/ipfs/go-cid"
mh "github.com/multiformats/go-multihash"
"github.com/ethereum/go-ethereum/core/types"
)
// EthTx (eth-tx codec 0x93) represents an ethereum transaction
type EthTx struct {
cid cid.Cid
rawdata []byte
}
// Static (compile time) check that EthTx satisfies the node.Node interface.
var _ IPLD = (*EthTx)(nil)
// NewEthTx converts a *types.Transaction to an EthTx IPLD node
func NewEthTx(tx *types.Transaction) (*EthTx, error) {
txRaw, err := tx.MarshalBinary()
if err != nil {
return nil, err
}
c, err := RawdataToCid(MEthTx, txRaw, mh.KECCAK_256)
if err != nil {
return nil, err
}
return &EthTx{
cid: c,
rawdata: txRaw,
}, nil
}
// RawData returns the binary of the RLP encode of the transaction.
func (t *EthTx) RawData() []byte {
return t.rawdata
}
// Cid returns the cid of the transaction.
func (t *EthTx) Cid() cid.Cid {
return t.cid
}

View File

@ -2,7 +2,25 @@ package ipld
import "github.com/ipfs/go-cid" import "github.com/ipfs/go-cid"
// Check that node satisfies the IPLD Node interface.
var _ IPLD = (*node)(nil)
type node struct {
cid cid.Cid
rawdata []byte
}
type IPLD interface { type IPLD interface {
Cid() cid.Cid Cid() cid.Cid
RawData() []byte RawData() []byte
} }
// RawData returns the RLP encoded bytes of the node.
func (b node) RawData() []byte {
return b.rawdata
}
// Cid returns the CID of the node.
func (b node) Cid() cid.Cid {
return b.cid
}

View File

@ -37,6 +37,7 @@ const (
MEthStorageTrie = 0x98 MEthStorageTrie = 0x98
MEthLogTrie = 0x99 MEthLogTrie = 0x99
MEthLog = 0x9a MEthLog = 0x9a
MEthWithdrawal = 0x9b // TODO add to multicodec registry
) )
// RawdataToCid takes the desired codec and a slice of bytes // RawdataToCid takes the desired codec and a slice of bytes

View File

@ -19,8 +19,8 @@ package mocks
import ( import (
"crypto/ecdsa" "crypto/ecdsa"
"crypto/elliptic" "crypto/elliptic"
"crypto/rand"
"math/big" "math/big"
"math/rand"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
@ -39,6 +39,9 @@ import (
// Test variables // Test variables
var ( var (
// RNG for deterministically generated keys
rng = rand.New(rand.NewSource(0))
// block data // block data
TestChainConfig = params.MainnetChainConfig TestChainConfig = params.MainnetChainConfig
BlockNumber = TestChainConfig.LondonBlock BlockNumber = TestChainConfig.LondonBlock
@ -58,15 +61,19 @@ var (
Coinbase: common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476777"), Coinbase: common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476777"),
} }
MockTransactions, MockReceipts, SenderAddr = createTransactionsAndReceipts(TestChainConfig, BlockNumber, BlockTime) MockTransactions, MockReceipts, SenderAddr = createTransactionsAndReceipts(TestChainConfig, BlockNumber, BlockTime)
MockBlock = types.NewBlock(&MockHeader, MockTransactions, nil, MockReceipts, trie.NewEmpty(nil)) MockWithdrawals = types.Withdrawals{
MockHeaderRlp, _ = rlp.EncodeToBytes(MockBlock.Header()) {Index: 0, Validator: 1, Address: Address, Amount: 1000000000},
{Index: 1, Validator: 5, Address: AnotherAddress, Amount: 2000000000},
}
MockBlock = types.NewBlockWithWithdrawals(&MockHeader, MockTransactions, nil, MockReceipts, MockWithdrawals, trie.NewEmpty(nil))
MockHeaderRlp, _ = rlp.EncodeToBytes(MockBlock.Header())
// non-canonical block at London height // non-canonical block at London height
// includes 2nd and 5th transactions from the canonical block // includes 2nd and 5th transactions from the canonical block
MockNonCanonicalHeader = MockHeader MockNonCanonicalHeader = MockHeader
MockNonCanonicalBlockTransactions = types.Transactions{MockTransactions[1], MockTransactions[4]} MockNonCanonicalBlockTransactions = types.Transactions{MockTransactions[1], MockTransactions[4]}
MockNonCanonicalBlockReceipts = createNonCanonicalBlockReceipts(TestChainConfig, BlockNumber, BlockTime, MockNonCanonicalBlockTransactions) MockNonCanonicalBlockReceipts = createNonCanonicalBlockReceipts(TestChainConfig, BlockNumber, BlockTime, MockNonCanonicalBlockTransactions)
MockNonCanonicalBlock = types.NewBlock(&MockNonCanonicalHeader, MockNonCanonicalBlockTransactions, nil, MockNonCanonicalBlockReceipts, trie.NewEmpty(nil)) MockNonCanonicalBlock = types.NewBlockWithWithdrawals(&MockNonCanonicalHeader, MockNonCanonicalBlockTransactions, nil, MockNonCanonicalBlockReceipts, MockWithdrawals[:1], trie.NewEmpty(nil))
MockNonCanonicalHeaderRlp, _ = rlp.EncodeToBytes(MockNonCanonicalBlock.Header()) MockNonCanonicalHeaderRlp, _ = rlp.EncodeToBytes(MockNonCanonicalBlock.Header())
// non-canonical block at London height + 1 // non-canonical block at London height + 1
@ -86,7 +93,7 @@ var (
} }
MockNonCanonicalBlock2Transactions = types.Transactions{MockTransactions[2], MockTransactions[4]} MockNonCanonicalBlock2Transactions = types.Transactions{MockTransactions[2], MockTransactions[4]}
MockNonCanonicalBlock2Receipts = createNonCanonicalBlockReceipts(TestChainConfig, Block2Number, BlockTime, MockNonCanonicalBlock2Transactions) MockNonCanonicalBlock2Receipts = createNonCanonicalBlockReceipts(TestChainConfig, Block2Number, BlockTime, MockNonCanonicalBlock2Transactions)
MockNonCanonicalBlock2 = types.NewBlock(&MockNonCanonicalHeader2, MockNonCanonicalBlock2Transactions, nil, MockNonCanonicalBlock2Receipts, trie.NewEmpty(nil)) MockNonCanonicalBlock2 = types.NewBlockWithWithdrawals(&MockNonCanonicalHeader2, MockNonCanonicalBlock2Transactions, nil, MockNonCanonicalBlock2Receipts, types.Withdrawals{}, trie.NewEmpty(nil))
MockNonCanonicalHeader2Rlp, _ = rlp.EncodeToBytes(MockNonCanonicalBlock2.Header()) MockNonCanonicalHeader2Rlp, _ = rlp.EncodeToBytes(MockNonCanonicalBlock2.Header())
Address = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476592") Address = common.HexToAddress("0xaE9BEa628c4Ce503DcFD7E305CaB4e29E7476592")
@ -348,7 +355,10 @@ func NewLegacyData(config *params.ChainConfig) *LegacyData {
mockTransactions, mockReceipts, senderAddr := createLegacyTransactionsAndReceipts(config, blockNumber) mockTransactions, mockReceipts, senderAddr := createLegacyTransactionsAndReceipts(config, blockNumber)
mockBlock := types.NewBlock(&mockHeader, mockTransactions, nil, mockReceipts, trie.NewEmpty(nil)) mockBlock := types.NewBlock(&mockHeader, mockTransactions, nil, mockReceipts, trie.NewEmpty(nil))
mockHeaderRlp, _ := rlp.EncodeToBytes(mockBlock.Header()) mockHeaderRlp, err := rlp.EncodeToBytes(mockBlock.Header())
if err != nil {
panic(err)
}
contractAddress := crypto.CreateAddress(senderAddr, mockTransactions[2].Nonce()) contractAddress := crypto.CreateAddress(senderAddr, mockTransactions[2].Nonce())
return &LegacyData{ return &LegacyData{
@ -388,7 +398,7 @@ func createLegacyTransactionsAndReceipts(config *params.ChainConfig, blockNumber
blockTime := uint64(0) blockTime := uint64(0)
transactionSigner := types.MakeSigner(config, blockNumber, blockTime) transactionSigner := types.MakeSigner(config, blockNumber, blockTime)
mockCurve := elliptic.P256() mockCurve := elliptic.P256()
mockPrvKey, err := ecdsa.GenerateKey(mockCurve, rand.Reader) mockPrvKey, err := ecdsa.GenerateKey(mockCurve, rng)
if err != nil { if err != nil {
log.Crit(err.Error()) log.Crit(err.Error())
} }
@ -460,7 +470,7 @@ func createTransactionsAndReceipts(config *params.ChainConfig, blockNumber *big.
transactionSigner := types.MakeSigner(config, blockNumber, blockTime) transactionSigner := types.MakeSigner(config, blockNumber, blockTime)
mockCurve := elliptic.P256() mockCurve := elliptic.P256()
mockPrvKey, err := ecdsa.GenerateKey(mockCurve, rand.Reader) mockPrvKey, err := ecdsa.GenerateKey(mockCurve, rng)
if err != nil { if err != nil {
log.Crit(err.Error()) log.Crit(err.Error())
} }
@ -524,7 +534,7 @@ func createTransactionsAndReceipts(config *params.ChainConfig, blockNumber *big.
func createNonCanonicalBlockReceipts(config *params.ChainConfig, blockNumber *big.Int, blockTime uint64, transactions types.Transactions) types.Receipts { func createNonCanonicalBlockReceipts(config *params.ChainConfig, blockNumber *big.Int, blockTime uint64, transactions types.Transactions) types.Receipts {
transactionSigner := types.MakeSigner(config, blockNumber, blockTime) transactionSigner := types.MakeSigner(config, blockNumber, blockTime)
mockCurve := elliptic.P256() mockCurve := elliptic.P256()
mockPrvKey, err := ecdsa.GenerateKey(mockCurve, rand.Reader) mockPrvKey, err := ecdsa.GenerateKey(mockCurve, rng)
if err != nil { if err != nil {
log.Crit(err.Error()) log.Crit(err.Error())
} }

View File

@ -42,6 +42,7 @@ type HeaderModel struct {
Timestamp uint64 `db:"timestamp"` Timestamp uint64 `db:"timestamp"`
Coinbase string `db:"coinbase"` Coinbase string `db:"coinbase"`
Canonical bool `db:"canonical"` Canonical bool `db:"canonical"`
WithdrawalsRoot string `db:"withdrawals_root"`
} }
// UncleModel is the db model for eth.uncle_cids // UncleModel is the db model for eth.uncle_cids
@ -105,7 +106,7 @@ type StorageNodeModel struct {
Value []byte `db:"val"` Value []byte `db:"val"`
} }
// LogsModel is the db model for eth.logs // LogsModel is the db model for eth.log_cids
type LogsModel struct { type LogsModel struct {
BlockNumber string `db:"block_number"` BlockNumber string `db:"block_number"`
HeaderID string `db:"header_id"` HeaderID string `db:"header_id"`
@ -118,3 +119,14 @@ type LogsModel struct {
Topic2 string `db:"topic2"` Topic2 string `db:"topic2"`
Topic3 string `db:"topic3"` Topic3 string `db:"topic3"`
} }
// WithdrawalModel is the db model for eth.withdrawal_cids
type WithdrawalModel struct {
BlockNumber string `db:"block_number"`
HeaderID string `db:"header_id"`
CID string `db:"cid"`
Index uint64 `db:"index"`
Validator uint64 `db:"validator"`
Address string `db:"address"`
Amount uint64 `db:"amount"`
}

View File

@ -35,3 +35,12 @@ func HandleZeroAddr(to common.Address) string {
} }
return to.String() return to.String()
} }
// MaybeStringHash calls String on its argument and returns a pointer to the result.
// When passed nil, it returns nil.
func MaybeStringHash(hash *common.Hash) string {
if hash == nil {
return ""
}
return hash.String()
}

View File

@ -16,6 +16,19 @@
package schema package schema
var Tables = []*Table{
&TableIPLDBlock,
&TableNodeInfo,
&TableHeader,
&TableStateNode,
&TableStorageNode,
&TableUncle,
&TableTransaction,
&TableReceipt,
&TableLog,
&TableWithdrawal,
}
var TableIPLDBlock = Table{ var TableIPLDBlock = Table{
Name: `ipld.blocks`, Name: `ipld.blocks`,
Columns: []Column{ Columns: []Column{
@ -52,9 +65,10 @@ var TableHeader = Table{
{Name: "receipt_root", Type: Dvarchar}, {Name: "receipt_root", Type: Dvarchar},
{Name: "uncles_hash", Type: Dvarchar}, {Name: "uncles_hash", Type: Dvarchar},
{Name: "bloom", Type: Dbytea}, {Name: "bloom", Type: Dbytea},
{Name: "timestamp", Type: Dnumeric}, {Name: "timestamp", Type: Dbigint},
{Name: "coinbase", Type: Dvarchar}, {Name: "coinbase", Type: Dvarchar},
{Name: "canonical", Type: Dboolean}, {Name: "canonical", Type: Dboolean},
{Name: "withdrawals_root", Type: Dvarchar},
}, },
UpsertClause: OnConflict("block_number", "block_hash").Set( UpsertClause: OnConflict("block_number", "block_hash").Set(
"parent_hash", "parent_hash",
@ -70,6 +84,7 @@ var TableHeader = Table{
"timestamp", "timestamp",
"coinbase", "coinbase",
"canonical", "canonical",
"withdrawals_root",
)} )}
var TableStateNode = Table{ var TableStateNode = Table{
@ -165,6 +180,20 @@ var TableLog = Table{
UpsertClause: OnConflict("block_number", "header_id", "rct_id", "index"), UpsertClause: OnConflict("block_number", "header_id", "rct_id", "index"),
} }
var TableWithdrawal = Table{
Name: "eth.withdrawal_cids",
Columns: []Column{
{Name: "block_number", Type: Dbigint},
{Name: "header_id", Type: Dvarchar},
{Name: "cid", Type: Dtext},
{Name: "index", Type: Dinteger},
{Name: "validator", Type: Dinteger},
{Name: "address", Type: Dvarchar},
{Name: "amount", Type: Dinteger},
},
UpsertClause: OnConflict("block_number", "header_id", "index"),
}
var TableWatchedAddresses = Table{ var TableWatchedAddresses = Table{
Name: "eth_meta.watched_addresses", Name: "eth_meta.watched_addresses",
Columns: []Column{ Columns: []Column{

View File

@ -53,34 +53,6 @@ type Table struct {
UpsertClause ConflictClause UpsertClause ConflictClause
} }
type colfmt = func(interface{}) string
func (tbl *Table) ToCsvRow(args ...interface{}) []string {
var row []string
for i, col := range tbl.Columns {
value := col.Type.formatter()(args[i])
if col.Array {
valueList := funk.Map(args[i], col.Type.formatter()).([]string)
value = fmt.Sprintf("{%s}", strings.Join(valueList, ","))
}
row = append(row, value)
}
return row
}
func (tbl *Table) VarcharColumns() []string {
columns := funk.Filter(tbl.Columns, func(col Column) bool {
return col.Type == Dvarchar
}).([]Column)
columnNames := funk.Map(columns, func(col Column) string {
return col.Name
}).([]string)
return columnNames
}
func OnConflict(target ...string) ConflictClause { func OnConflict(target ...string) ConflictClause {
return ConflictClause{Target: target} return ConflictClause{Target: target}
} }
@ -89,35 +61,6 @@ func (c ConflictClause) Set(fields ...string) ConflictClause {
return c return c
} }
// ToInsertStatement returns a Postgres-compatible SQL insert statement for the table
// using positional placeholders
func (tbl *Table) ToInsertStatement(upsert bool) string {
var colnames, placeholders []string
for i, col := range tbl.Columns {
colnames = append(colnames, col.Name)
placeholders = append(placeholders, fmt.Sprintf("$%d", i+1))
}
suffix := fmt.Sprintf("ON CONFLICT (%s)", strings.Join(tbl.UpsertClause.Target, ", "))
if upsert && len(tbl.UpsertClause.Update) != 0 {
var update_placeholders []string
for _, name := range tbl.UpsertClause.Update {
i := funk.IndexOf(tbl.Columns, func(col Column) bool { return col.Name == name })
update_placeholders = append(update_placeholders, fmt.Sprintf("$%d", i+1))
}
suffix += fmt.Sprintf(
" DO UPDATE SET (%s) = (%s)",
strings.Join(tbl.UpsertClause.Update, ", "), strings.Join(update_placeholders, ", "),
)
} else {
suffix += " DO NOTHING"
}
return fmt.Sprintf(
"INSERT INTO %s (%s) VALUES (%s) %s",
tbl.Name, strings.Join(colnames, ", "), strings.Join(placeholders, ", "), suffix,
)
}
// TableName returns a pgx-compatible table name. // TableName returns a pgx-compatible table name.
func (tbl *Table) TableName() []string { func (tbl *Table) TableName() []string {
return strings.Split(tbl.Name, ".") return strings.Split(tbl.Name, ".")
@ -132,11 +75,45 @@ func (tbl *Table) ColumnNames() []string {
return names return names
} }
// PreparedInsert returns a pgx/sqlx-compatible SQL prepared insert statement for the table
// using positional placeholders.
// If upsert is true, include an ON CONFLICT clause handling column updates.
func (tbl *Table) PreparedInsert(upsert bool) string {
var colnames, placeholders []string
for i, col := range tbl.Columns {
colnames = append(colnames, col.Name)
placeholders = append(placeholders, fmt.Sprintf("$%d", i+1))
}
suffix := " ON CONFLICT"
if len(tbl.UpsertClause.Target) > 0 {
suffix += fmt.Sprintf(" (%s)", strings.Join(tbl.UpsertClause.Target, ", "))
}
if upsert && len(tbl.UpsertClause.Update) != 0 {
var update_placeholders []string
for _, name := range tbl.UpsertClause.Update {
update_placeholders = append(update_placeholders, "EXCLUDED."+name)
}
suffix += fmt.Sprintf(
" DO UPDATE SET (%s) = ROW(%s)",
strings.Join(tbl.UpsertClause.Update, ", "), strings.Join(update_placeholders, ", "),
)
} else {
suffix += " DO NOTHING"
}
return fmt.Sprintf(
"INSERT INTO %s (%s) VALUES (%s)",
tbl.Name, strings.Join(colnames, ", "), strings.Join(placeholders, ", "),
) + suffix
}
type colfmt = func(interface{}) string
func sprintf(f string) colfmt { func sprintf(f string) colfmt {
return func(x interface{}) string { return fmt.Sprintf(f, x) } return func(x interface{}) string { return fmt.Sprintf(f, x) }
} }
func (typ colType) formatter() colfmt { func (typ colType) csvFormatter() colfmt {
switch typ { switch typ {
case Dinteger: case Dinteger:
return sprintf("%d") return sprintf("%d")
@ -157,6 +134,61 @@ func (typ colType) formatter() colfmt {
return sprintf("%s") return sprintf("%s")
case Dtext: case Dtext:
return sprintf("%s") return sprintf("%s")
default:
panic("invalid column type")
} }
panic("unreachable") }
// ToCsvRow converts a list of values to a list of strings suitable for CSV output.
func (tbl *Table) ToCsvRow(args ...interface{}) []string {
var row []string
for i, col := range tbl.Columns {
value := col.Type.csvFormatter()(args[i])
if col.Array {
valueList := funk.Map(args[i], col.Type.csvFormatter()).([]string)
value = fmt.Sprintf("{%s}", strings.Join(valueList, ","))
}
row = append(row, value)
}
return row
}
// VarcharColumns returns the names of columns with type VARCHAR.
func (tbl *Table) VarcharColumns() []string {
columns := funk.Filter(tbl.Columns, func(col Column) bool {
return col.Type == Dvarchar
}).([]Column)
columnNames := funk.Map(columns, func(col Column) string {
return col.Name
}).([]string)
return columnNames
}
func formatSpec(typ colType) string {
switch typ {
case Dinteger:
return "%d"
case Dboolean:
return "%t"
case Dbytea:
return `'\x%x'`
default:
return "'%s'"
}
}
// FmtStringInsert returns a format string for creating a Postgres insert statement.
func (tbl *Table) FmtStringInsert() string {
var colnames, placeholders []string
for _, col := range tbl.Columns {
colnames = append(colnames, col.Name)
placeholders = append(placeholders, formatSpec(col.Type))
}
return fmt.Sprintf(
"INSERT INTO %s (%s) VALUES (%s);",
tbl.Name, strings.Join(colnames, ", "), strings.Join(placeholders, ", "),
)
} }

View File

@ -8,47 +8,55 @@ import (
. "github.com/cerc-io/plugeth-statediff/indexer/shared/schema" . "github.com/cerc-io/plugeth-statediff/indexer/shared/schema"
) )
var testHeaderTable = Table{ var (
Name: "eth.header_cids", testTable = Table{
Columns: []Column{ Name: "test_table",
{Name: "block_number", Type: Dbigint}, Columns: []Column{
{Name: "block_hash", Type: Dvarchar}, {Name: "id", Type: Dbigint},
{Name: "parent_hash", Type: Dvarchar}, {Name: "name", Type: Dvarchar},
{Name: "cid", Type: Dtext}, {Name: "age", Type: Dinteger},
{Name: "td", Type: Dnumeric}, },
{Name: "node_id", Type: Dvarchar}, }
{Name: "reward", Type: Dnumeric}, testTableWithConflictClause = Table{
{Name: "state_root", Type: Dvarchar}, Name: "test_table_conflict",
{Name: "tx_root", Type: Dvarchar}, Columns: []Column{
{Name: "receipt_root", Type: Dvarchar}, {Name: "id", Type: Dbigint},
{Name: "uncle_root", Type: Dvarchar}, {Name: "name", Type: Dvarchar},
{Name: "bloom", Type: Dbytea}, {Name: "age", Type: Dinteger},
{Name: "timestamp", Type: Dnumeric}, },
{Name: "mh_key", Type: Dtext}, UpsertClause: OnConflict("id").Set("name", "age"),
{Name: "times_validated", Type: Dinteger}, }
{Name: "coinbase", Type: Dvarchar}, )
},
UpsertClause: OnConflict("block_hash", "block_number").Set( const (
"parent_hash", expectedHeaderPreparedWithUpsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase, canonical, withdrawals_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (block_number, block_hash) DO UPDATE SET (parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase, canonical, withdrawals_root) = ROW(EXCLUDED.parent_hash, EXCLUDED.cid, EXCLUDED.td, EXCLUDED.node_ids, EXCLUDED.reward, EXCLUDED.state_root, EXCLUDED.tx_root, EXCLUDED.receipt_root, EXCLUDED.uncles_hash, EXCLUDED.bloom, EXCLUDED.timestamp, EXCLUDED.coinbase, EXCLUDED.canonical, EXCLUDED.withdrawals_root)"
"cid",
"td", expectedHeaderPreparedWithoutUpsert = "INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase, canonical, withdrawals_root) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (block_number, block_hash) DO NOTHING"
"node_id",
"reward", expectedHeaderFmtString = `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_ids, reward, state_root, tx_root, receipt_root, uncles_hash, bloom, timestamp, coinbase, canonical, withdrawals_root) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '\x%x', '%s', '%s', %t, '%s');`
"state_root", )
"tx_root",
"receipt_root",
"uncle_root",
"bloom",
"timestamp",
"mh_key",
"times_validated",
"coinbase",
),
}
func TestTable(t *testing.T) { func TestTable(t *testing.T) {
headerUpsert := `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (block_hash, block_number) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)` require.Equal(t,
headerNoUpsert := `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (block_hash, block_number) DO NOTHING` "INSERT INTO test_table (id, name, age) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING",
require.Equal(t, headerNoUpsert, testHeaderTable.ToInsertStatement(false)) testTable.PreparedInsert(true),
require.Equal(t, headerUpsert, testHeaderTable.ToInsertStatement(true)) )
require.Equal(t,
"INSERT INTO test_table (id, name, age) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING",
testTable.PreparedInsert(false),
)
require.Equal(t, "INSERT INTO test_table (id, name, age) VALUES ('%s', '%s', %d);", testTable.FmtStringInsert())
require.Equal(t,
"INSERT INTO test_table_conflict (id, name, age) VALUES ($1, $2, $3) ON CONFLICT (id) DO UPDATE SET (name, age) = ROW(EXCLUDED.name, EXCLUDED.age)",
testTableWithConflictClause.PreparedInsert(true),
)
require.Equal(t,
"INSERT INTO test_table_conflict (id, name, age) VALUES ($1, $2, $3) ON CONFLICT (id) DO NOTHING",
testTableWithConflictClause.PreparedInsert(false),
)
require.Equal(t, expectedHeaderPreparedWithUpsert, TableHeader.PreparedInsert(true))
require.Equal(t, expectedHeaderPreparedWithoutUpsert, TableHeader.PreparedInsert(false))
require.Equal(t, expectedHeaderFmtString, TableHeader.FmtStringInsert())
} }

View File

@ -33,10 +33,9 @@ import (
"github.com/cerc-io/plugeth-statediff/indexer/mocks" "github.com/cerc-io/plugeth-statediff/indexer/mocks"
"github.com/cerc-io/plugeth-statediff/indexer/models" "github.com/cerc-io/plugeth-statediff/indexer/models"
"github.com/cerc-io/plugeth-statediff/indexer/shared" "github.com/cerc-io/plugeth-statediff/indexer/shared"
"github.com/cerc-io/plugeth-statediff/indexer/test_helpers"
) )
// SetupTestData indexes a single mock block along with it's state nodes // SetupTestData indexes a single mock block along with its state nodes
func SetupTestData(t *testing.T, ind interfaces.StateDiffIndexer) { func SetupTestData(t *testing.T, ind interfaces.StateDiffIndexer) {
var tx interfaces.Batch var tx interfaces.Batch
tx, err = ind.PushBlock( tx, err = ind.PushBlock(
@ -111,11 +110,11 @@ func DoTestPublishAndIndexTransactionIPLDs(t *testing.T, db sql.Database) {
t.Fatal(err) t.Fatal(err)
} }
require.Equal(t, 5, len(trxs)) require.Equal(t, 5, len(trxs))
expectTrue(t, test_helpers.ListContainsString(trxs, trx1CID.String())) require.Contains(t, trxs, trx1CID.String())
expectTrue(t, test_helpers.ListContainsString(trxs, trx2CID.String())) require.Contains(t, trxs, trx2CID.String())
expectTrue(t, test_helpers.ListContainsString(trxs, trx3CID.String())) require.Contains(t, trxs, trx3CID.String())
expectTrue(t, test_helpers.ListContainsString(trxs, trx4CID.String())) require.Contains(t, trxs, trx4CID.String())
expectTrue(t, test_helpers.ListContainsString(trxs, trx5CID.String())) require.Contains(t, trxs, trx5CID.String())
transactions := mocks.MockBlock.Transactions() transactions := mocks.MockBlock.Transactions()
type txResult struct { type txResult struct {
@ -257,11 +256,11 @@ func DoTestPublishAndIndexReceiptIPLDs(t *testing.T, db sql.Database) {
t.Fatal(err) t.Fatal(err)
} }
require.Equal(t, 5, len(rcts)) require.Equal(t, 5, len(rcts))
expectTrue(t, test_helpers.ListContainsString(rcts, rct1CID.String())) require.Contains(t, rcts, rct1CID.String())
expectTrue(t, test_helpers.ListContainsString(rcts, rct2CID.String())) require.Contains(t, rcts, rct2CID.String())
expectTrue(t, test_helpers.ListContainsString(rcts, rct3CID.String())) require.Contains(t, rcts, rct3CID.String())
expectTrue(t, test_helpers.ListContainsString(rcts, rct4CID.String())) require.Contains(t, rcts, rct4CID.String())
expectTrue(t, test_helpers.ListContainsString(rcts, rct5CID.String())) require.Contains(t, rcts, rct5CID.String())
for idx, c := range rcts { for idx, c := range rcts {
result := make([]models.IPLDModel, 0) result := make([]models.IPLDModel, 0)
@ -335,6 +334,41 @@ func DoTestPublishAndIndexReceiptIPLDs(t *testing.T, db sql.Database) {
} }
} }
func DoTestPublishAndIndexWithdrawalIPLDs(t *testing.T, db sql.Database) {
// check that withdrawals were properly indexed and published
wds := make([]string, 0)
pgStr := `SELECT withdrawal_cids.cid FROM eth.withdrawal_cids
INNER JOIN eth.header_cids ON (withdrawal_cids.header_id = header_cids.block_hash)
WHERE header_cids.block_number = $1
ORDER BY withdrawal_cids.index`
err = db.Select(context.Background(), &wds, pgStr, mocks.BlockNumber.Uint64())
if err != nil {
t.Fatal(err)
}
require.Equal(t, 2, len(wds))
require.Contains(t, wds, wd1CID.String())
require.Contains(t, wds, wd2CID.String())
for _, c := range wds {
dc, err := cid.Decode(c)
if err != nil {
t.Fatal(err)
}
var data []byte
err = db.Get(context.Background(), &data, ipfsPgGet, dc.String(), mocks.BlockNumber.Uint64())
if err != nil {
t.Fatal(err)
}
switch c {
case wd1CID.String():
require.Equal(t, wd1, data)
case wd2CID.String():
require.Equal(t, wd2, data)
}
}
}
func DoTestPublishAndIndexStateIPLDs(t *testing.T, db sql.Database) { func DoTestPublishAndIndexStateIPLDs(t *testing.T, db sql.Database) {
// check that state nodes were properly indexed and published // check that state nodes were properly indexed and published
stateNodes := make([]models.StateNodeModel, 0) stateNodes := make([]models.StateNodeModel, 0)
@ -594,7 +628,7 @@ func SetupTestDataNonCanonical(t *testing.T, ind interfaces.StateDiffIndexer) {
func TestPublishAndIndexHeaderNonCanonical(t *testing.T, db sql.Database) { func TestPublishAndIndexHeaderNonCanonical(t *testing.T, db sql.Database) {
// check indexed headers // check indexed headers
pgStr := `SELECT CAST(block_number as TEXT), block_hash, cid, cast(td AS TEXT), cast(reward AS TEXT), pgStr := `SELECT CAST(block_number as TEXT), block_hash, cid, cast(td AS TEXT), cast(reward AS TEXT),
tx_root, receipt_root, uncles_hash, coinbase tx_root, receipt_root, uncles_hash, coinbase, withdrawals_root
FROM eth.header_cids FROM eth.header_cids
ORDER BY block_number` ORDER BY block_number`
headerRes := make([]models.HeaderModel, 0) headerRes := make([]models.HeaderModel, 0)
@ -616,6 +650,7 @@ func TestPublishAndIndexHeaderNonCanonical(t *testing.T, db sql.Database) {
RctRoot: mockBlock.ReceiptHash().String(), RctRoot: mockBlock.ReceiptHash().String(),
UnclesHash: mockBlock.UncleHash().String(), UnclesHash: mockBlock.UncleHash().String(),
Coinbase: mocks.MockHeader.Coinbase.String(), Coinbase: mocks.MockHeader.Coinbase.String(),
WithdrawalsRoot: shared.MaybeStringHash(mockBlock.Header().WithdrawalsHash),
}, },
{ {
BlockNumber: mockNonCanonicalBlock.Number().String(), BlockNumber: mockNonCanonicalBlock.Number().String(),
@ -626,6 +661,7 @@ func TestPublishAndIndexHeaderNonCanonical(t *testing.T, db sql.Database) {
RctRoot: mockNonCanonicalBlock.ReceiptHash().String(), RctRoot: mockNonCanonicalBlock.ReceiptHash().String(),
UnclesHash: mockNonCanonicalBlock.UncleHash().String(), UnclesHash: mockNonCanonicalBlock.UncleHash().String(),
Coinbase: mocks.MockNonCanonicalHeader.Coinbase.String(), Coinbase: mocks.MockNonCanonicalHeader.Coinbase.String(),
WithdrawalsRoot: shared.MaybeStringHash(mockNonCanonicalBlock.Header().WithdrawalsHash),
}, },
{ {
BlockNumber: mockNonCanonicalBlock2.Number().String(), BlockNumber: mockNonCanonicalBlock2.Number().String(),
@ -636,6 +672,7 @@ func TestPublishAndIndexHeaderNonCanonical(t *testing.T, db sql.Database) {
RctRoot: mockNonCanonicalBlock2.ReceiptHash().String(), RctRoot: mockNonCanonicalBlock2.ReceiptHash().String(),
UnclesHash: mockNonCanonicalBlock2.UncleHash().String(), UnclesHash: mockNonCanonicalBlock2.UncleHash().String(),
Coinbase: mocks.MockNonCanonicalHeader2.Coinbase.String(), Coinbase: mocks.MockNonCanonicalHeader2.Coinbase.String(),
WithdrawalsRoot: shared.MaybeStringHash(mockNonCanonicalBlock2.Header().WithdrawalsHash),
}, },
} }
expectedRes[0].Reward = shared.CalcEthBlockReward(mockBlock.Header(), mockBlock.Uncles(), mockBlock.Transactions(), mocks.MockReceipts).String() expectedRes[0].Reward = shared.CalcEthBlockReward(mockBlock.Header(), mockBlock.Uncles(), mockBlock.Transactions(), mocks.MockReceipts).String()

View File

@ -38,12 +38,14 @@ var (
watchedAddressesPgGet = `SELECT * watchedAddressesPgGet = `SELECT *
FROM eth_meta.watched_addresses` FROM eth_meta.watched_addresses`
tx1, tx2, tx3, tx4, tx5, rct1, rct2, rct3, rct4, rct5 []byte tx1, tx2, tx3, tx4, tx5, rct1, rct2, rct3, rct4, rct5 []byte
wd1, wd2 []byte
nonCanonicalBlockRct1, nonCanonicalBlockRct2 []byte nonCanonicalBlockRct1, nonCanonicalBlockRct2 []byte
nonCanonicalBlock2Rct1, nonCanonicalBlock2Rct2 []byte nonCanonicalBlock2Rct1, nonCanonicalBlock2Rct2 []byte
mockBlock, mockNonCanonicalBlock, mockNonCanonicalBlock2 *types.Block mockBlock, mockNonCanonicalBlock, mockNonCanonicalBlock2 *types.Block
headerCID, mockNonCanonicalHeaderCID, mockNonCanonicalHeader2CID cid.Cid headerCID, mockNonCanonicalHeaderCID, mockNonCanonicalHeader2CID cid.Cid
trx1CID, trx2CID, trx3CID, trx4CID, trx5CID cid.Cid trx1CID, trx2CID, trx3CID, trx4CID, trx5CID cid.Cid
rct1CID, rct2CID, rct3CID, rct4CID, rct5CID cid.Cid rct1CID, rct2CID, rct3CID, rct4CID, rct5CID cid.Cid
wd1CID, wd2CID cid.Cid
nonCanonicalBlockRct1CID, nonCanonicalBlockRct2CID cid.Cid nonCanonicalBlockRct1CID, nonCanonicalBlockRct2CID cid.Cid
nonCanonicalBlock2Rct1CID, nonCanonicalBlock2Rct2CID cid.Cid nonCanonicalBlock2Rct1CID, nonCanonicalBlock2Rct2CID cid.Cid
state1CID, state2CID, storageCID cid.Cid state1CID, state2CID, storageCID cid.Cid
@ -114,6 +116,18 @@ func init() {
copy(rct5, buf.Bytes()) copy(rct5, buf.Bytes())
buf.Reset() buf.Reset()
// encode mock withdrawals
// wds
mocks.MockWithdrawals.EncodeIndex(0, buf)
wd1 = make([]byte, buf.Len())
copy(wd1, buf.Bytes())
buf.Reset()
mocks.MockWithdrawals.EncodeIndex(1, buf)
wd2 = make([]byte, buf.Len())
copy(wd2, buf.Bytes())
buf.Reset()
// encode mock receipts for non-canonical blocks // encode mock receipts for non-canonical blocks
nonCanonicalBlockRcts.EncodeIndex(0, buf) nonCanonicalBlockRcts.EncodeIndex(0, buf)
nonCanonicalBlockRct1 = make([]byte, buf.Len()) nonCanonicalBlockRct1 = make([]byte, buf.Len())
@ -152,6 +166,9 @@ func init() {
rct4CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct4, multihash.KECCAK_256) rct4CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct4, multihash.KECCAK_256)
rct5CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct5, multihash.KECCAK_256) rct5CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct5, multihash.KECCAK_256)
wd1CID, _ = ipld.RawdataToCid(ipld.MEthWithdrawal, wd1, multihash.KECCAK_256)
wd2CID, _ = ipld.RawdataToCid(ipld.MEthWithdrawal, wd2, multihash.KECCAK_256)
// create raw receipts for non-canonical blocks // create raw receipts for non-canonical blocks
nonCanonicalBlockRct1CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, nonCanonicalBlockRct1, multihash.KECCAK_256) nonCanonicalBlockRct1CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, nonCanonicalBlockRct1, multihash.KECCAK_256)
nonCanonicalBlockRct2CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, nonCanonicalBlockRct2, multihash.KECCAK_256) nonCanonicalBlockRct2CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, nonCanonicalBlockRct2, multihash.KECCAK_256)

View File

@ -25,16 +25,6 @@ import (
"github.com/cerc-io/plugeth-statediff/indexer/database/sql" "github.com/cerc-io/plugeth-statediff/indexer/database/sql"
) )
// ListContainsString used to check if a list of strings contains a particular string
func ListContainsString(sss []string, s string) bool {
for _, str := range sss {
if s == str {
return true
}
}
return false
}
// DedupFile removes duplicates from the given file // DedupFile removes duplicates from the given file
func DedupFile(filePath string) error { func DedupFile(filePath string) error {
f, err := os.OpenFile(filePath, os.O_CREATE|os.O_RDONLY, os.ModePerm) f, err := os.OpenFile(filePath, os.O_CREATE|os.O_RDONLY, os.ModePerm)
@ -86,6 +76,7 @@ func TearDownDB(t *testing.T, db sql.Database) {
`TRUNCATE eth.state_cids`, `TRUNCATE eth.state_cids`,
`TRUNCATE eth.storage_cids`, `TRUNCATE eth.storage_cids`,
`TRUNCATE eth.log_cids`, `TRUNCATE eth.log_cids`,
`TRUNCATE eth.withdrawal_cids`,
`TRUNCATE eth_meta.watched_addresses`, `TRUNCATE eth_meta.watched_addresses`,
} }
for _, stm := range statements { for _, stm := range statements {

View File

@ -1,23 +1,21 @@
#!/bin/bash #!/bin/bash
# Builds and deploys a stack with only what we need.
# This script assumes we are running in the project root.
set -e set -ex
# Note: stack path should be absolute, otherwise SO looks for it in packaged stacks # Note: stack path should be absolute, otherwise SO looks for it in packaged stacks
laconic_so="${LACONIC_SO:-laconic-so} --stack $(readlink -f test) --verbose" stack_dir=$(readlink -f "${1:-../fixturenet-eth-stacks/stack-orchestrator/stacks/fixturenet-plugeth}")
[[ -d "$stack_dir" ]]
CONFIG_DIR=$(readlink -f "${CONFIG_DIR:-$(mktemp -d)}") CONFIG_DIR=$(readlink -f "${CONFIG_DIR:-$(mktemp -d)}")
# By default assume we are running in the project root.
# Point stack-orchestrator to the multi-project root
export CERC_REPO_BASE_DIR="${CERC_REPO_BASE_DIR:-$(git rev-parse --show-toplevel)/..}" export CERC_REPO_BASE_DIR="${CERC_REPO_BASE_DIR:-$(git rev-parse --show-toplevel)/..}"
# v5 migrations only go up to version 20 laconic_so="laconic-so --verbose --stack $stack_dir"
echo CERC_STATEDIFF_DB_GOOSE_MIN_VER=20 >> $CONFIG_DIR/stack.env
# don't run plugeth in the debugger # Don't run geth/plugeth in the debugger, it will swallow error backtraces
echo CERC_REMOTE_DEBUG=false >> $CONFIG_DIR/stack.env echo CERC_REMOTE_DEBUG=false >> $CONFIG_DIR/stack.env
set -x
if [[ -z $SKIP_BUILD ]]; then if [[ -z $SKIP_BUILD ]]; then
$laconic_so setup-repositories \ $laconic_so setup-repositories \
@ -27,6 +25,10 @@ if [[ -z $SKIP_BUILD ]]; then
--exclude cerc/plugeth-statediff --exclude cerc/plugeth-statediff
fi fi
$laconic_so deploy \ if ! $laconic_so deploy \
--env-file $CONFIG_DIR/stack.env \ --env-file $CONFIG_DIR/stack.env \
--cluster test up --cluster test up
then
$laconic_so deploy --cluster test logs
exit 1
fi

View File

@ -3,7 +3,7 @@ services:
restart: on-failure restart: on-failure
depends_on: depends_on:
- ipld-eth-db - ipld-eth-db
image: git.vdb.to/cerc-io/ipld-eth-db/ipld-eth-db:v5.2.1-alpha image: git.vdb.to/cerc-io/ipld-eth-db/ipld-eth-db:v5.3.0-alpha
environment: environment:
DATABASE_USER: "vdbm" DATABASE_USER: "vdbm"
DATABASE_NAME: "cerc_testing" DATABASE_NAME: "cerc_testing"

View File

@ -5,7 +5,7 @@ repos:
- git.vdb.to/cerc-io/plugeth@v1.13.14-cerc-2 - git.vdb.to/cerc-io/plugeth@v1.13.14-cerc-2
- git.vdb.to/cerc-io/plugeth-statediff - git.vdb.to/cerc-io/plugeth-statediff
- git.vdb.to/cerc-io/lighthouse - git.vdb.to/cerc-io/lighthouse
- git.vdb.to/cerc-io/ipld-eth-db@v5.2.1-alpha - git.vdb.to/cerc-io/ipld-eth-db@v5.3.0-alpha
containers: containers:
- cerc/plugeth-statediff - cerc/plugeth-statediff
- cerc/plugeth - cerc/plugeth

View File

@ -22,6 +22,7 @@ func ClearDB(db *sqlx.DB) error {
`TRUNCATE eth.state_cids`, `TRUNCATE eth.state_cids`,
`TRUNCATE eth.storage_cids`, `TRUNCATE eth.storage_cids`,
`TRUNCATE eth.log_cids`, `TRUNCATE eth.log_cids`,
`TRUNCATE eth.withdrawal_cids`,
`TRUNCATE eth_meta.watched_addresses`, `TRUNCATE eth_meta.watched_addresses`,
} }
for _, stm := range statements { for _, stm := range statements {