Compare commits

...

6 Commits

Author SHA1 Message Date
50224c1273 guard delayed tx cache 2024-08-01 08:18:43 -05:00
8fa5eb7dd6 sort imports 2024-08-01 07:40:57 -05:00
175b7b9968 godoc for indexer 2024-08-01 07:22:00 -05:00
cfa64279da rm unused stack.yml 2024-08-01 07:21:58 -05:00
9a18f199e1 Index blob hashes (#27)
All checks were successful
Test / Run compliance tests (push) Successful in 4m22s
Test / Run unit tests (push) Successful in 11m29s
Test / Run integration tests (push) Successful in 24m58s
Reviewed-on: #27
2024-08-01 02:50:29 +00:00
589f8a3977 Add test utilities for dependents (#30)
All checks were successful
Test / Run unit tests (push) Successful in 14m21s
Test / Run compliance tests (push) Successful in 6m19s
Test / Run integration tests (push) Successful in 30m50s
- Exports IPLD encoders.
- Refactors `ClearDB` to source list of tables from `schema` package, and to allow downstream usage.

Reviewed-on: #30
2024-07-13 11:09:39 +00:00
34 changed files with 491 additions and 354 deletions

View File

@ -7,14 +7,16 @@ import (
)
func ChainConfig(cc *plugeth_params.ChainConfig) *params.ChainConfig {
return &params.ChainConfig{
ret := &params.ChainConfig{
ChainID: cc.ChainID,
HomesteadBlock: cc.HomesteadBlock,
DAOForkBlock: cc.DAOForkBlock,
DAOForkSupport: cc.DAOForkSupport,
EIP150Block: cc.EIP150Block,
EIP155Block: cc.EIP155Block,
EIP158Block: cc.EIP158Block,
ByzantiumBlock: cc.ByzantiumBlock,
ConstantinopleBlock: cc.ConstantinopleBlock,
PetersburgBlock: cc.PetersburgBlock,
@ -22,5 +24,23 @@ func ChainConfig(cc *plugeth_params.ChainConfig) *params.ChainConfig {
MuirGlacierBlock: cc.MuirGlacierBlock,
BerlinBlock: cc.BerlinBlock,
LondonBlock: cc.LondonBlock,
ArrowGlacierBlock: cc.ArrowGlacierBlock,
GrayGlacierBlock: cc.GrayGlacierBlock,
MergeNetsplitBlock: cc.MergeNetsplitBlock,
ShanghaiTime: cc.ShanghaiTime,
CancunTime: cc.CancunTime,
PragueTime: cc.PragueTime,
TerminalTotalDifficulty: cc.TerminalTotalDifficulty,
TerminalTotalDifficultyPassed: cc.TerminalTotalDifficultyPassed,
}
if cc.Ethash != nil {
ret.Ethash = &params.EthashConfig{}
}
if cc.Clique != nil {
ret.Clique = &params.CliqueConfig{cc.Clique.Period, cc.Clique.Epoch}
}
return ret
}

View File

@ -70,12 +70,12 @@ func NewStateDiffIndexer(
var driver sql.Driver
switch pgc.Driver {
case postgres.PGX:
driver, err = postgres.NewPGXDriver(ctx, pgc, nodeInfo)
driver, err = postgres.ConnectPGXDriver(ctx, pgc, nodeInfo)
if err != nil {
return nil, nil, err
}
case postgres.SQLX:
driver, err = postgres.NewSQLXDriver(ctx, pgc, nodeInfo)
driver, err = postgres.ConnectSQLXDriver(ctx, pgc, nodeInfo)
if err != nil {
return nil, nil, err
}

View File

@ -273,6 +273,20 @@ func (sdi *StateDiffIndexer) processObjects(tx *BatchTx, args processArgs) error
return err
}
if trx.Type() == types.BlobTxType {
blobHashes := trx.BlobHashes()
for i, hash := range blobHashes {
bhModel := models.BlobHashModel{
TxHash: trxID,
Index: uint64(i),
BlobHash: hash,
}
if _, err := fmt.Fprintf(sdi.dump, "%+v\r\n", bhModel); err != nil {
return err
}
}
}
// this is the contract address if this receipt is for a contract creation tx
contract := shared.HandleZeroAddr(receipt.ContractAddress)

View File

@ -67,7 +67,7 @@ func dumpCSVFileData(t *testing.T) {
localOutputDir := filepath.Join(workingDir, file.CSVTestConfig.OutputDir)
for _, tbl := range schema.Tables {
for _, tbl := range schema.EthTables {
err := test_helpers.DedupFile(file.TableFilePath(localOutputDir, tbl.Name))
require.NoError(t, err)

View File

@ -120,7 +120,7 @@ func NewCSVWriter(path string, watchedAddressesFilePath string, diff bool) (*CSV
return nil, fmt.Errorf("unable to create directory '%s': %w", path, err)
}
writers, err := makeFileWriters(path, schema.Tables)
writers, err := makeFileWriters(path, schema.EthTables)
if err != nil {
return nil, err
}
@ -254,6 +254,13 @@ func (csw *CSVWriter) upsertTransactionCID(transaction models.TxModel) {
metrics.IndexerMetrics.TransactionsCounter.Inc(1)
}
func (csw *CSVWriter) upsertBlobHash(blobHash models.BlobHashModel) {
var values []interface{}
values = append(values, blobHash.TxHash, blobHash.Index, blobHash.BlobHash)
csw.rows <- tableRow{&schema.TableBlobHash, values}
metrics.IndexerMetrics.BlobHashCounter.Inc(1)
}
func (csw *CSVWriter) upsertReceiptCID(rct *models.ReceiptModel) {
var values []interface{}
values = append(values, rct.BlockNumber, rct.HeaderID, rct.TxID, rct.CID, rct.Contract,

View File

@ -191,7 +191,10 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
t = time.Now()
// write uncles
sdi.processUncles(headerID, block.Number(), block.UncleHash(), block.Uncles())
err = sdi.processUncles(headerID, block.Number(), block.UncleHash(), block.Uncles())
if err != nil {
return nil, err
}
tDiff = time.Since(t)
metrics.IndexerMetrics.UncleProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
@ -200,6 +203,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
err = sdi.processObjects(processArgs{
headerID: headerID,
blockNumber: block.Number(),
blockTime: block.Time(),
receipts: receipts,
txs: transactions,
withdrawals: block.Withdrawals(),
@ -338,6 +342,17 @@ func (sdi *StateDiffIndexer) processObjects(args processArgs) error {
}
sdi.fileWriter.upsertTransactionCID(txModel)
if trx.Type() == types.BlobTxType {
blobHashes := trx.BlobHashes()
for i, hash := range blobHashes {
sdi.fileWriter.upsertBlobHash(models.BlobHashModel{
TxHash: txID,
Index: uint64(i),
BlobHash: hash,
})
}
}
// this is the contract address if this receipt is for a contract creation tx
contract := shared.HandleZeroAddr(receipt.ContractAddress)

View File

@ -39,6 +39,7 @@ type FileWriter interface {
upsertHeaderCID(header models.HeaderModel)
upsertUncleCID(uncle models.UncleModel)
upsertTransactionCID(transaction models.TxModel)
upsertBlobHash(models.BlobHashModel)
upsertReceiptCID(rct *models.ReceiptModel)
upsertLogCID(logs []*models.LogsModel)
upsertWithdrawalCID(models.WithdrawalModel)

View File

@ -170,6 +170,7 @@ const (
var (
withdrawalsInsert = schema.TableWithdrawal.FmtStringInsert() + ";\n"
blobHashesInsert = schema.TableBlobHash.FmtStringInsert() + ";\n"
)
func (sqw *SQLWriter) upsertNode(node nodeinfo.Info) {
@ -230,6 +231,11 @@ func (sqw *SQLWriter) upsertTransactionCID(transaction models.TxModel) {
metrics.IndexerMetrics.TransactionsCounter.Inc(1)
}
func (sqw *SQLWriter) upsertBlobHash(bh models.BlobHashModel) {
sqw.stmts <- []byte(fmt.Sprintf(blobHashesInsert, bh.TxHash, bh.Index, bh.BlobHash))
metrics.IndexerMetrics.BlobHashCounter.Inc(1)
}
func (sqw *SQLWriter) upsertReceiptCID(rct *models.ReceiptModel) {
sqw.stmts <- []byte(fmt.Sprintf(rctInsert, rct.BlockNumber, rct.HeaderID, rct.TxID, rct.CID, rct.Contract,
rct.PostState, rct.PostStatus))

View File

@ -52,6 +52,8 @@ type IndexerMetricsHandles struct {
BlocksCounter metrics.Counter
// The total number of processed transactions
TransactionsCounter metrics.Counter
// The total number of indexed blob hashes
BlobHashCounter metrics.Counter
// The total number of processed receipts
ReceiptsCounter metrics.Counter
// The total number of processed logs
@ -90,6 +92,7 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles {
ctx := IndexerMetricsHandles{
BlocksCounter: metrics.NewCounter(),
TransactionsCounter: metrics.NewCounter(),
BlobHashCounter: metrics.NewCounter(),
ReceiptsCounter: metrics.NewCounter(),
LogsCounter: metrics.NewCounter(),
WithdrawalsCounter: metrics.NewCounter(),
@ -114,6 +117,7 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles {
subsys := "indexer"
reg.Register(metricName(subsys, "blocks"), ctx.BlocksCounter)
reg.Register(metricName(subsys, "transactions"), ctx.TransactionsCounter)
reg.Register(metricName(subsys, "blob_hashes"), ctx.BlobHashCounter)
reg.Register(metricName(subsys, "receipts"), ctx.ReceiptsCounter)
reg.Register(metricName(subsys, "logs"), ctx.LogsCounter)
reg.Register(metricName(subsys, "withdrawals"), ctx.WithdrawalsCounter)

View File

@ -306,6 +306,19 @@ func (sdi *StateDiffIndexer) processObjects(tx *BatchTx, args processArgs) error
return err
}
if trx.Type() == types.BlobTxType {
blobHashes := trx.BlobHashes()
for i, hash := range blobHashes {
if err := sdi.dbWriter.upsertBlobHash(tx.dbtx, models.BlobHashModel{
TxHash: txID,
Index: uint64(i),
BlobHash: hash,
}); err != nil {
return err
}
}
}
// this is the contract address if this receipt is for a contract creation tx
contract := shared.HandleZeroAddr(receipt.ContractAddress)

View File

@ -52,6 +52,7 @@ type Statements interface {
SetCanonicalHeaderStm() string
InsertUncleStm() string
InsertTxStm() string
InsertBlobHashStm() string
InsertRctStm() string
InsertLogStm() string
InsertWithdrawalStm() string

View File

@ -3,6 +3,7 @@ package sql
import (
"context"
"reflect"
"sync"
"time"
"github.com/cerc-io/plugeth-statediff/indexer/database/metrics"
@ -15,6 +16,7 @@ const copyFromCheckLimit = 100
type DelayedTx struct {
cache []interface{}
db Database
sync.RWMutex
}
type cachedStmt struct {
sql string
@ -27,6 +29,8 @@ type copyFrom struct {
rows [][]interface{}
}
type result int64
func (cf *copyFrom) appendRows(rows [][]interface{}) {
cf.rows = append(cf.rows, rows...)
}
@ -44,6 +48,8 @@ func (tx *DelayedTx) QueryRow(ctx context.Context, sql string, args ...interface
}
func (tx *DelayedTx) findPrevCopyFrom(tableName []string, columnNames []string, limit int) (*copyFrom, int) {
tx.RLock()
defer tx.RUnlock()
for pos, count := len(tx.cache)-1, 0; pos >= 0 && count < limit; pos, count = pos-1, count+1 {
prevCopy, ok := tx.cache[pos].(*copyFrom)
if ok && prevCopy.matches(tableName, columnNames) {
@ -59,6 +65,8 @@ func (tx *DelayedTx) CopyFrom(ctx context.Context, tableName []string, columnNam
"current", len(prevCopy.rows), "new", len(rows), "distance", distance)
prevCopy.appendRows(rows)
} else {
tx.Lock()
defer tx.Unlock()
tx.cache = append(tx.cache, &copyFrom{tableName, columnNames, rows})
}
@ -66,8 +74,10 @@ func (tx *DelayedTx) CopyFrom(ctx context.Context, tableName []string, columnNam
}
func (tx *DelayedTx) Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) {
tx.Lock()
defer tx.Unlock()
tx.cache = append(tx.cache, cachedStmt{sql, args})
return nil, nil
return result(0), nil
}
func (tx *DelayedTx) Commit(ctx context.Context) error {
@ -85,6 +95,8 @@ func (tx *DelayedTx) Commit(ctx context.Context) error {
rollback(ctx, base)
}
}()
tx.Lock()
defer tx.Unlock()
for _, item := range tx.cache {
switch item := item.(type) {
case *copyFrom:
@ -105,6 +117,13 @@ func (tx *DelayedTx) Commit(ctx context.Context) error {
}
func (tx *DelayedTx) Rollback(ctx context.Context) error {
tx.Lock()
defer tx.Unlock()
tx.cache = nil
return nil
}
// RowsAffected satisfies sql.Result
func (r result) RowsAffected() (int64, error) {
return int64(r), nil
}

View File

@ -81,6 +81,11 @@ func (db *DB) InsertTxStm() string {
return schema.TableTransaction.PreparedInsert(db.upsert)
}
// InsertBlobHashStm satisfies the sql.Statements interface
func (db *DB) InsertBlobHashStm() string {
return schema.TableBlobHash.PreparedInsert(db.upsert)
}
// InsertRctStm satisfies the sql.Statements interface
func (db *DB) InsertRctStm() string {
return schema.TableReceipt.PreparedInsert(db.upsert)

View File

@ -37,7 +37,6 @@ type PGXDriver struct {
ctx context.Context
pool *pgxpool.Pool
nodeInfo node.Info
nodeID string
config Config
}
@ -50,21 +49,25 @@ func ConnectPGX(ctx context.Context, config Config) (*pgxpool.Pool, error) {
return pgxpool.ConnectConfig(ctx, pgConf)
}
// NewPGXDriver returns a new pgx driver
// ConnectPGXDriver returns a new pgx driver
// it initializes the connection pool and creates the node info table
func NewPGXDriver(ctx context.Context, config Config, node node.Info) (*PGXDriver, error) {
func ConnectPGXDriver(ctx context.Context, config Config, node node.Info) (*PGXDriver, error) {
dbPool, err := ConnectPGX(ctx, config)
if err != nil {
return nil, ErrDBConnectionFailed(err)
}
pg := &PGXDriver{ctx: ctx, pool: dbPool, nodeInfo: node, config: config}
nodeErr := pg.createNode()
pg := NewPGXDriver(ctx, dbPool, config)
nodeErr := pg.createNode(node)
if nodeErr != nil {
return &PGXDriver{}, ErrUnableToSetNode(nodeErr)
}
return pg, nil
}
func NewPGXDriver(ctx context.Context, pool *pgxpool.Pool, config Config) *PGXDriver {
return &PGXDriver{ctx: ctx, pool: pool, config: config}
}
// MakeConfig creates a pgxpool.Config from the provided Config
func MakeConfig(config Config) (*pgxpool.Config, error) {
conf, err := pgxpool.ParseConfig("")
@ -102,19 +105,19 @@ func MakeConfig(config Config) (*pgxpool.Config, error) {
return conf, nil
}
func (pgx *PGXDriver) createNode() error {
func (pgx *PGXDriver) createNode(nodeInfo node.Info) error {
_, err := pgx.pool.Exec(
pgx.ctx,
createNodeStm,
pgx.nodeInfo.GenesisBlock,
pgx.nodeInfo.NetworkID,
pgx.nodeInfo.ID,
pgx.nodeInfo.ClientName,
pgx.nodeInfo.ChainID)
nodeInfo.GenesisBlock,
nodeInfo.NetworkID,
nodeInfo.ID,
nodeInfo.ClientName,
nodeInfo.ChainID)
if err != nil {
return ErrUnableToSetNode(err)
}
pgx.nodeID = pgx.nodeInfo.ID
pgx.nodeInfo = nodeInfo
return nil
}
@ -155,7 +158,7 @@ func (pgx *PGXDriver) Stats() metrics.DbStats {
// NodeID satisfies sql.Database
func (pgx *PGXDriver) NodeID() string {
return pgx.nodeID
return pgx.nodeInfo.ID
}
// Close satisfies sql.Database/io.Closer

View File

@ -94,7 +94,7 @@ func TestPostgresPGX(t *testing.T) {
t.Run("throws error when can't connect to the database", func(t *testing.T) {
goodInfo := node.Info{GenesisBlock: "GENESIS", NetworkID: "1", ID: "x123", ClientName: "geth"}
_, err := postgres.NewPGXDriver(ctx, postgres.Config{}, goodInfo)
_, err := postgres.ConnectPGXDriver(ctx, postgres.Config{}, goodInfo)
if err == nil {
t.Fatal("Expected an error")
}
@ -106,7 +106,7 @@ func TestPostgresPGX(t *testing.T) {
badHash := fmt.Sprintf("x %s", strings.Repeat("1", 100))
badInfo := node.Info{GenesisBlock: badHash, NetworkID: "1", ID: "x123", ClientName: "geth"}
_, err := postgres.NewPGXDriver(ctx, pgConfig, badInfo)
_, err := postgres.ConnectPGXDriver(ctx, pgConfig, badInfo)
if err == nil {
t.Fatal("Expected an error")
}

View File

@ -34,7 +34,6 @@ type SQLXDriver struct {
ctx context.Context
db *sqlx.DB
nodeInfo node.Info
nodeID string
}
// ConnectSQLX initializes and returns a SQLX connection pool for postgres
@ -53,32 +52,36 @@ func ConnectSQLX(ctx context.Context, config Config) (*sqlx.DB, error) {
return db, nil
}
// NewSQLXDriver returns a new sqlx driver for Postgres
// ConnectSQLXDriver returns a new sqlx driver for Postgres
// it initializes the connection pool and creates the node info table
func NewSQLXDriver(ctx context.Context, config Config, node node.Info) (*SQLXDriver, error) {
func ConnectSQLXDriver(ctx context.Context, config Config, node node.Info) (*SQLXDriver, error) {
db, err := ConnectSQLX(ctx, config)
if err != nil {
return nil, err
}
driver := &SQLXDriver{ctx: ctx, db: db, nodeInfo: node}
if err := driver.createNode(); err != nil {
driver := NewSQLXDriver(ctx, db)
if err := driver.createNode(node); err != nil {
return nil, err
}
return driver, nil
}
func (driver *SQLXDriver) createNode() error {
func NewSQLXDriver(ctx context.Context, db *sqlx.DB) *SQLXDriver {
return &SQLXDriver{ctx: ctx, db: db}
}
func (driver *SQLXDriver) createNode(nodeInfo node.Info) error {
_, err := driver.db.Exec(
createNodeStm,
driver.nodeInfo.GenesisBlock,
driver.nodeInfo.NetworkID,
driver.nodeInfo.ID,
driver.nodeInfo.ClientName,
driver.nodeInfo.ChainID)
nodeInfo.GenesisBlock,
nodeInfo.NetworkID,
nodeInfo.ID,
nodeInfo.ClientName,
nodeInfo.ChainID)
if err != nil {
return ErrUnableToSetNode(err)
}
driver.nodeID = driver.nodeInfo.ID
driver.nodeInfo = nodeInfo
return nil
}
@ -118,7 +121,7 @@ func (driver *SQLXDriver) Stats() metrics.DbStats {
// NodeID satisfies sql.Database
func (driver *SQLXDriver) NodeID() string {
return driver.nodeID
return driver.nodeInfo.ID
}
// Close satisfies sql.Database/io.Closer

View File

@ -97,7 +97,7 @@ func TestPostgresSQLX(t *testing.T) {
t.Run("throws error when can't connect to the database", func(t *testing.T) {
goodInfo := node.Info{GenesisBlock: "GENESIS", NetworkID: "1", ID: "x123", ClientName: "geth"}
_, err := postgres.NewSQLXDriver(ctx, postgres.Config{}, goodInfo)
_, err := postgres.ConnectSQLXDriver(ctx, postgres.Config{}, goodInfo)
if err == nil {
t.Fatal("Expected an error")
}
@ -109,7 +109,7 @@ func TestPostgresSQLX(t *testing.T) {
badHash := fmt.Sprintf("x %s", strings.Repeat("1", 100))
badInfo := node.Info{GenesisBlock: badHash, NetworkID: "1", ID: "x123", ClientName: "geth"}
_, err := postgres.NewSQLXDriver(ctx, pgConfig, badInfo)
_, err := postgres.ConnectSQLXDriver(ctx, pgConfig, badInfo)
if err == nil {
t.Fatal("Expected an error")
}

View File

@ -30,7 +30,7 @@ func SetupSQLXDB() (sql.Database, error) {
return nil, err
}
conf.MaxIdle = 0
driver, err := NewSQLXDriver(context.Background(), conf, node.Info{})
driver, err := ConnectSQLXDriver(context.Background(), conf, node.Info{})
if err != nil {
return nil, err
}
@ -39,7 +39,7 @@ func SetupSQLXDB() (sql.Database, error) {
// SetupPGXDB is used to setup a pgx db for tests
func SetupPGXDB(config Config) (sql.Database, error) {
driver, err := NewPGXDriver(context.Background(), config, node.Info{})
driver, err := ConnectPGXDriver(context.Background(), config, node.Info{})
if err != nil {
return nil, err
}

View File

@ -209,6 +209,30 @@ func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error {
return nil
}
/*
INSERT INTO eth.blob_hashes (tx_hash, index, blob_hash) VALUES ($1, $2, $3)
ON CONFLICT (tx_hash, index) DO NOTHING
*/
func (w *Writer) upsertBlobHash(tx Tx, blobHash models.BlobHashModel) error {
if w.useCopyForTx(tx) {
rows := toRows(toRow(blobHash.TxHash, blobHash.Index, blobHash.BlobHash))
_, err := tx.CopyFrom(w.db.Context(), schema.TableBlobHash.TableName(), schema.TableBlobHash.ColumnNames(), rows)
if err != nil {
return insertError{"eth.blob_hashes", err, "COPY", blobHash}
}
} else {
_, err := tx.Exec(w.db.Context(), w.db.InsertBlobHashStm(),
blobHash.TxHash,
blobHash.Index,
blobHash.BlobHash,
)
if err != nil {
return insertError{"eth.blob_hashes", err, w.db.InsertBlobHashStm(), blobHash}
}
}
return nil
}
/*
INSERT INTO eth.receipt_cids (block_number, header_id, tx_id, cid, contract, post_state, post_status) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (tx_id, header_id, block_number) DO NOTHING

View File

@ -29,27 +29,39 @@ import (
"github.com/ethereum/go-ethereum/core/types"
)
// StateDiffIndexer interface required to index statediff data
// StateDiffIndexer describes the interface for indexing state data.
type StateDiffIndexer interface {
DetectGaps(beginBlock uint64, endBlock uint64) ([]*BlockGap, error)
CurrentBlock() (*models.HeaderModel, error)
HasBlock(hash common.Hash, number uint64) (bool, error)
// PushBlock indexes block data except for state & storage nodes: header, uncles, transactions &
// receipts. Returns an initiated DB transaction which must be committed or rolled back.
PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error)
// PushHeader indexes a block header.
PushHeader(batch Batch, header *types.Header, reward, td *big.Int) (string, error)
// PushStateNode indexes a state node and its storage trie.
PushStateNode(tx Batch, stateNode sdtypes.StateLeafNode, headerID string) error
// PushIPLD indexes an IPLD node.
PushIPLD(tx Batch, ipld sdtypes.IPLD) error
ReportDBMetrics(delay time.Duration, quit <-chan bool)
// BeginTx starts a new DB transaction.
BeginTx(number *big.Int, ctx context.Context) Batch
// DetectGaps returns a list of gaps in the block range, if any.
DetectGaps(beginBlock uint64, endBlock uint64) ([]*BlockGap, error)
// CurrentBlock returns the latest indexed block.
CurrentBlock() (*models.HeaderModel, error)
// HasBlock returns true if the block is indexed.
HasBlock(hash common.Hash, number uint64) (bool, error)
// Close closes the associated output DB connection or files.
Close() error
// Methods used by WatchAddress API/functionality
LoadWatchedAddresses() ([]common.Address, error)
InsertWatchedAddresses(addresses []sdtypes.WatchAddressArg, currentBlock *big.Int) error
RemoveWatchedAddresses(addresses []sdtypes.WatchAddressArg) error
SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error
ClearWatchedAddresses() error
Close() error
ReportDBMetrics(delay time.Duration, quit <-chan bool)
}
// Batch required for indexing data atomically

View File

@ -38,8 +38,8 @@ func EncodeHeader(header *types.Header) (IPLD, error) {
}, nil
}
// encodeTx converts a *types.Transaction to an IPLD node
func encodeTx(tx *types.Transaction) (IPLD, error) {
// EncodeTx converts a *types.Transaction to an IPLD node
func EncodeTx(tx *types.Transaction) (IPLD, error) {
txRaw, err := tx.MarshalBinary()
if err != nil {
return nil, err
@ -54,8 +54,8 @@ func encodeTx(tx *types.Transaction) (IPLD, error) {
}, nil
}
// encodeReceipt converts a types.Receipt to an IPLD node
func encodeReceipt(receipt *types.Receipt) (IPLD, error) {
// EncodeReceipt converts a types.Receipt to an IPLD node
func EncodeReceipt(receipt *types.Receipt) (IPLD, error) {
rctRaw, err := receipt.MarshalBinary()
if err != nil {
return nil, err
@ -70,8 +70,8 @@ func encodeReceipt(receipt *types.Receipt) (IPLD, error) {
}, nil
}
// encodeLog converts a Log to an IPLD node
func encodeLog(log *types.Log) (IPLD, error) {
// EncodeLog converts a Log to an IPLD node
func EncodeLog(log *types.Log) (IPLD, error) {
logRaw, err := rlp.EncodeToBytes(log)
if err != nil {
return nil, err
@ -86,7 +86,7 @@ func encodeLog(log *types.Log) (IPLD, error) {
}, nil
}
func encodeWithdrawal(w *types.Withdrawal) (IPLD, error) {
func EncodeWithdrawal(w *types.Withdrawal) (IPLD, error) {
wRaw, err := rlp.EncodeToBytes(w)
if err != nil {
return nil, err

View File

@ -44,7 +44,7 @@ func FromBlockAndReceipts(block *types.Block, receipts []*types.Receipt) ([]IPLD
func processTransactions(txs []*types.Transaction) ([]IPLD, error) {
var ethTxNodes []IPLD
for _, tx := range txs {
ethTx, err := encodeTx(tx)
ethTx, err := EncodeTx(tx)
if err != nil {
return nil, err
}
@ -57,7 +57,7 @@ func processTransactions(txs []*types.Transaction) ([]IPLD, error) {
func processWithdrawals(withdrawals []*types.Withdrawal) ([]IPLD, error) {
var withdrawalNodes []IPLD
for _, withdrawal := range withdrawals {
ethW, err := encodeWithdrawal(withdrawal)
ethW, err := EncodeWithdrawal(withdrawal)
if err != nil {
return nil, err
}
@ -80,7 +80,7 @@ func processReceiptsAndLogs(rcts []*types.Receipt) ([]IPLD, [][]IPLD, error) {
return nil, nil, err
}
ethRct, err := encodeReceipt(rct)
ethRct, err := EncodeReceipt(rct)
if err != nil {
return nil, nil, err
}
@ -95,7 +95,7 @@ func processReceiptsAndLogs(rcts []*types.Receipt) ([]IPLD, [][]IPLD, error) {
func processLogs(logs []*types.Log) ([]IPLD, error) {
logNodes := make([]IPLD, len(logs))
for idx, log := range logs {
logNode, err := encodeLog(log)
logNode, err := EncodeLog(log)
if err != nil {
return nil, err
}

View File

@ -50,7 +50,7 @@ var (
// canonical block at London height
// includes 5 transactions: 3 Legacy + 1 EIP-2930 + 1 EIP-1559
MockHeader = types.Header{
Time: 0,
Time: BlockTime,
Number: new(big.Int).Set(BlockNumber),
Root: common.HexToHash("0x0"),
TxHash: common.HexToHash("0x0"),
@ -434,13 +434,15 @@ func createLegacyTransactionsAndReceipts(config *params.ChainConfig, blockNumber
return types.Transactions{signedTrx1, signedTrx2, signedTrx3}, types.Receipts{mockReceipt1, mockReceipt2, mockReceipt3}, senderAddr
}
// createTransactionsAndReceipts is a helper function to generate signed mock transactions and mock receipts with mock logs
// createTransactionsAndReceipts generates signed mock transactions and mock receipts with mock logs, and returns the address of the sender with them.
func createTransactionsAndReceipts(config *params.ChainConfig, blockNumber *big.Int, blockTime uint64) (types.Transactions, types.Receipts, common.Address) {
const txCount = 6
// make transactions
trx1 := types.NewTransaction(0, Address, big.NewInt(1000), 50, big.NewInt(100), []byte{})
trx2 := types.NewTransaction(1, AnotherAddress, big.NewInt(2000), 100, big.NewInt(200), []byte{})
trx3 := types.NewContractCreation(2, big.NewInt(1500), 75, big.NewInt(150), MockContractByteCode)
trx4 := types.NewTx(&types.AccessListTx{
txs := make(types.Transactions, txCount)
txs[0] = types.NewTransaction(0, Address, big.NewInt(1000), 50, big.NewInt(100), []byte{})
txs[1] = types.NewTransaction(1, AnotherAddress, big.NewInt(2000), 100, big.NewInt(200), []byte{})
txs[2] = types.NewContractCreation(2, big.NewInt(1500), 75, big.NewInt(150), MockContractByteCode)
txs[3] = types.NewTx(&types.AccessListTx{
ChainID: config.ChainID,
Nonce: 0,
GasPrice: big.NewInt(100),
@ -453,7 +455,7 @@ func createTransactionsAndReceipts(config *params.ChainConfig, blockNumber *big.
AccessListEntry2,
},
})
trx5 := types.NewTx(&types.DynamicFeeTx{
txs[4] = types.NewTx(&types.DynamicFeeTx{
ChainID: config.ChainID,
Nonce: 0,
GasTipCap: big.NewInt(100),
@ -467,6 +469,20 @@ func createTransactionsAndReceipts(config *params.ChainConfig, blockNumber *big.
AccessListEntry2,
},
})
txs[5] = types.NewTx(&types.BlobTx{
ChainID: uint256.MustFromBig(config.ChainID),
Nonce: 0,
GasTipCap: uint256.NewInt(100),
GasFeeCap: uint256.NewInt(100),
Gas: 50,
To: AnotherAddress,
Value: uint256.NewInt(0),
BlobFeeCap: uint256.NewInt(1e6),
BlobHashes: []common.Hash{
common.HexToHash("0x0100000000000000000000000000000000000000000000000000000000000001"),
common.HexToHash("0x0100000000000000000000000000000000000000000000000000000000000002"),
},
})
transactionSigner := types.MakeSigner(config, blockNumber, blockTime)
mockCurve := elliptic.P256()
@ -474,60 +490,56 @@ func createTransactionsAndReceipts(config *params.ChainConfig, blockNumber *big.
if err != nil {
log.Crit(err.Error())
}
signedTrx1, err := types.SignTx(trx1, transactionSigner, mockPrvKey)
var signedTxs types.Transactions
for _, tx := range txs {
signed, err := types.SignTx(tx, transactionSigner, mockPrvKey)
if err != nil {
log.Crit(err.Error())
}
signedTrx2, err := types.SignTx(trx2, transactionSigner, mockPrvKey)
if err != nil {
log.Crit(err.Error())
signedTxs = append(signedTxs, signed)
}
signedTrx3, err := types.SignTx(trx3, transactionSigner, mockPrvKey)
if err != nil {
log.Crit(err.Error())
}
signedTrx4, err := types.SignTx(trx4, transactionSigner, mockPrvKey)
if err != nil {
log.Crit(err.Error())
}
signedTrx5, err := types.SignTx(trx5, transactionSigner, mockPrvKey)
if err != nil {
log.Crit(err.Error())
}
senderAddr, err := types.Sender(transactionSigner, signedTrx1) // same for both trx
senderAddr, err := types.Sender(transactionSigner, signedTxs[0]) // same for both trx
if err != nil {
log.Crit(err.Error())
}
// make receipts
mockReceipt1 := types.NewReceipt(nil, false, 50)
mockReceipt1.Logs = []*types.Log{MockLog1}
mockReceipt1.TxHash = signedTrx1.Hash()
mockReceipt2 := types.NewReceipt(common.HexToHash("0x1").Bytes(), false, 100)
mockReceipt2.Logs = []*types.Log{MockLog2, ShortLog1}
mockReceipt2.TxHash = signedTrx2.Hash()
mockReceipt3 := types.NewReceipt(common.HexToHash("0x2").Bytes(), false, 75)
mockReceipt3.Logs = []*types.Log{}
mockReceipt3.TxHash = signedTrx3.Hash()
mockReceipt4 := &types.Receipt{
receipts := make(types.Receipts, txCount)
receipts[0] = types.NewReceipt(nil, false, 50)
receipts[0].Logs = []*types.Log{MockLog1}
receipts[0].TxHash = signedTxs[0].Hash()
receipts[1] = types.NewReceipt(common.HexToHash("0x1").Bytes(), false, 100)
receipts[1].Logs = []*types.Log{MockLog2, ShortLog1}
receipts[1].TxHash = signedTxs[1].Hash()
receipts[2] = types.NewReceipt(common.HexToHash("0x2").Bytes(), false, 75)
receipts[2].Logs = []*types.Log{}
receipts[2].TxHash = signedTxs[2].Hash()
receipts[3] = &types.Receipt{
Type: types.AccessListTxType,
PostState: common.HexToHash("0x3").Bytes(),
Status: types.ReceiptStatusSuccessful,
CumulativeGasUsed: 175,
Logs: []*types.Log{MockLog3, MockLog4, ShortLog2},
TxHash: signedTrx4.Hash(),
TxHash: signedTxs[3].Hash(),
}
mockReceipt5 := &types.Receipt{
receipts[4] = &types.Receipt{
Type: types.DynamicFeeTxType,
PostState: common.HexToHash("0x3").Bytes(),
Status: types.ReceiptStatusSuccessful,
CumulativeGasUsed: 175,
Logs: []*types.Log{},
TxHash: signedTrx5.Hash(),
TxHash: signedTxs[4].Hash(),
}
receipts[5] = &types.Receipt{
Type: types.BlobTxType,
PostState: common.HexToHash("0x3").Bytes(),
Status: types.ReceiptStatusSuccessful,
CumulativeGasUsed: 175,
Logs: []*types.Log{},
TxHash: signedTxs[5].Hash(),
}
return types.Transactions{signedTrx1, signedTrx2, signedTrx3, signedTrx4, signedTrx5}, types.Receipts{mockReceipt1, mockReceipt2, mockReceipt3, mockReceipt4, mockReceipt5}, senderAddr
return signedTxs, receipts, senderAddr
}
// createNonCanonicalBlockReceipts is a helper function to generate mock receipts with mock logs for non-canonical blocks

View File

@ -16,7 +16,10 @@
package models
import "github.com/lib/pq"
import (
"github.com/ethereum/go-ethereum/common"
"github.com/lib/pq"
)
// IPLDModel is the db model for ipld.blocks
type IPLDModel struct {
@ -130,3 +133,10 @@ type WithdrawalModel struct {
Address string `db:"address"`
Amount uint64 `db:"amount"`
}
// BlobHashModel is the DB model for eth.blob_hashes
type BlobHashModel struct {
TxHash string `db:"tx_hash"`
Index uint64 `db:"index"`
BlobHash common.Hash `db:"blob_hash"`
}

View File

@ -16,7 +16,7 @@
package schema
var Tables = []*Table{
var EthTables = []*Table{
&TableIPLDBlock,
&TableNodeInfo,
&TableHeader,
@ -27,8 +27,14 @@ var Tables = []*Table{
&TableReceipt,
&TableLog,
&TableWithdrawal,
&TableBlobHash,
}
var AllTables = append(
EthTables,
&TableWatchedAddresses,
)
var TableIPLDBlock = Table{
Name: `ipld.blocks`,
Columns: []Column{
@ -194,6 +200,15 @@ var TableWithdrawal = Table{
UpsertClause: OnConflict("block_number", "header_id", "index"),
}
var TableBlobHash = Table{
Name: "eth.blob_hashes",
Columns: []Column{
{Name: "tx_hash", Type: Dvarchar},
{Name: "index", Type: Dinteger},
{Name: "blob_hash", Type: Dbytea},
},
}
var TableWatchedAddresses = Table{
Name: "eth_meta.watched_addresses",
Columns: []Column{

View File

@ -109,16 +109,14 @@ func DoTestPublishAndIndexTransactionIPLDs(t *testing.T, db sql.Database) {
if err != nil {
t.Fatal(err)
}
require.Equal(t, 5, len(trxs))
require.Contains(t, trxs, trx1CID.String())
require.Contains(t, trxs, trx2CID.String())
require.Contains(t, trxs, trx3CID.String())
require.Contains(t, trxs, trx4CID.String())
require.Contains(t, trxs, trx5CID.String())
require.Equal(t, len(txCIDs), len(trxs))
for _, c := range txCIDs {
require.Contains(t, trxs, c.String())
}
transactions := mocks.MockBlock.Transactions()
type txResult struct {
TxType uint8 `db:"tx_type"`
TxType int `db:"tx_type"`
Value string
}
for _, c := range trxs {
@ -132,9 +130,11 @@ func DoTestPublishAndIndexTransactionIPLDs(t *testing.T, db sql.Database) {
t.Fatal(err)
}
txTypeAndValueStr := `SELECT tx_type, CAST(value as TEXT) FROM eth.transaction_cids WHERE cid = $1`
txBlobHashQuery := `SELECT blob_hash FROM eth.blob_hashes WHERE tx_hash = $1`
txBlobIndexQuery := `SELECT index FROM eth.blob_hashes WHERE tx_hash = $1`
switch c {
case trx1CID.String():
require.Equal(t, tx1, data)
case txCIDs[0].String():
require.Equal(t, encodedTxs[0], data)
txRes := new(txResult)
err = db.QueryRow(context.Background(), txTypeAndValueStr, c).Scan(&txRes.TxType, &txRes.Value)
if err != nil {
@ -146,8 +146,8 @@ func DoTestPublishAndIndexTransactionIPLDs(t *testing.T, db sql.Database) {
if txRes.Value != transactions[0].Value().String() {
t.Fatalf("expected tx value %s got %s", transactions[0].Value().String(), txRes.Value)
}
case trx2CID.String():
require.Equal(t, tx2, data)
case txCIDs[1].String():
require.Equal(t, encodedTxs[1], data)
txRes := new(txResult)
err = db.QueryRow(context.Background(), txTypeAndValueStr, c).Scan(&txRes.TxType, &txRes.Value)
if err != nil {
@ -159,8 +159,8 @@ func DoTestPublishAndIndexTransactionIPLDs(t *testing.T, db sql.Database) {
if txRes.Value != transactions[1].Value().String() {
t.Fatalf("expected tx value %s got %s", transactions[1].Value().String(), txRes.Value)
}
case trx3CID.String():
require.Equal(t, tx3, data)
case txCIDs[2].String():
require.Equal(t, encodedTxs[2], data)
txRes := new(txResult)
err = db.QueryRow(context.Background(), txTypeAndValueStr, c).Scan(&txRes.TxType, &txRes.Value)
if err != nil {
@ -172,8 +172,8 @@ func DoTestPublishAndIndexTransactionIPLDs(t *testing.T, db sql.Database) {
if txRes.Value != transactions[2].Value().String() {
t.Fatalf("expected tx value %s got %s", transactions[2].Value().String(), txRes.Value)
}
case trx4CID.String():
require.Equal(t, tx4, data)
case txCIDs[3].String():
require.Equal(t, encodedTxs[3], data)
txRes := new(txResult)
err = db.QueryRow(context.Background(), txTypeAndValueStr, c).Scan(&txRes.TxType, &txRes.Value)
if err != nil {
@ -185,8 +185,8 @@ func DoTestPublishAndIndexTransactionIPLDs(t *testing.T, db sql.Database) {
if txRes.Value != transactions[3].Value().String() {
t.Fatalf("expected tx value %s got %s", transactions[3].Value().String(), txRes.Value)
}
case trx5CID.String():
require.Equal(t, tx5, data)
case txCIDs[4].String():
require.Equal(t, encodedTxs[4], data)
txRes := new(txResult)
err = db.QueryRow(context.Background(), txTypeAndValueStr, c).Scan(&txRes.TxType, &txRes.Value)
if err != nil {
@ -198,6 +198,28 @@ func DoTestPublishAndIndexTransactionIPLDs(t *testing.T, db sql.Database) {
if txRes.Value != transactions[4].Value().String() {
t.Fatalf("expected tx value %s got %s", transactions[4].Value().String(), txRes.Value)
}
case txCIDs[5].String():
require.Equal(t, encodedTxs[5], data)
var txRes txResult
err = db.QueryRow(context.Background(), txTypeAndValueStr, c).Scan(&txRes.TxType, &txRes.Value)
if err != nil {
t.Fatal(err)
}
require.Equal(t, types.BlobTxType, txRes.TxType)
require.Equal(t, transactions[5].Value().String(), txRes.Value)
var txBlobHashes []common.Hash
var txBlobIndices []uint64
err = db.Select(context.Background(), &txBlobHashes, txBlobHashQuery, transactions[5].Hash().String())
if err != nil {
t.Fatal(err)
}
require.Equal(t, transactions[5].BlobHashes(), txBlobHashes)
err = db.Select(context.Background(), &txBlobIndices, txBlobIndexQuery, transactions[5].Hash().String())
if err != nil {
t.Fatal(err)
}
require.Equal(t, []uint64{0, 1}, txBlobIndices)
}
}
}
@ -255,12 +277,10 @@ func DoTestPublishAndIndexReceiptIPLDs(t *testing.T, db sql.Database) {
if err != nil {
t.Fatal(err)
}
require.Equal(t, 5, len(rcts))
require.Contains(t, rcts, rct1CID.String())
require.Contains(t, rcts, rct2CID.String())
require.Contains(t, rcts, rct3CID.String())
require.Contains(t, rcts, rct4CID.String())
require.Contains(t, rcts, rct5CID.String())
require.Equal(t, len(rctCIDs), len(rcts))
for _, c := range rctCIDs {
require.Contains(t, rcts, c.String())
}
for idx, c := range rcts {
result := make([]models.IPLDModel, 0)
@ -289,8 +309,8 @@ func DoTestPublishAndIndexReceiptIPLDs(t *testing.T, db sql.Database) {
postStatePgStr := `SELECT post_state FROM eth.receipt_cids WHERE cid = $1`
switch c {
case rct1CID.String():
require.Equal(t, rct1, data)
case rctCIDs[0].String():
require.Equal(t, encodedRcts[0], data)
var postStatus uint64
pgStr = `SELECT post_status FROM eth.receipt_cids WHERE cid = $1`
err = db.Get(context.Background(), &postStatus, pgStr, c)
@ -298,32 +318,40 @@ func DoTestPublishAndIndexReceiptIPLDs(t *testing.T, db sql.Database) {
t.Fatal(err)
}
require.Equal(t, mocks.ExpectedPostStatus, postStatus)
case rct2CID.String():
require.Equal(t, rct2, data)
case rctCIDs[1].String():
require.Equal(t, encodedRcts[1], data)
var postState string
err = db.Get(context.Background(), &postState, postStatePgStr, c)
if err != nil {
t.Fatal(err)
}
require.Equal(t, mocks.ExpectedPostState1, postState)
case rct3CID.String():
require.Equal(t, rct3, data)
case rctCIDs[2].String():
require.Equal(t, encodedRcts[2], data)
var postState string
err = db.Get(context.Background(), &postState, postStatePgStr, c)
if err != nil {
t.Fatal(err)
}
require.Equal(t, mocks.ExpectedPostState2, postState)
case rct4CID.String():
require.Equal(t, rct4, data)
case rctCIDs[3].String():
require.Equal(t, encodedRcts[3], data)
var postState string
err = db.Get(context.Background(), &postState, postStatePgStr, c)
if err != nil {
t.Fatal(err)
}
require.Equal(t, mocks.ExpectedPostState3, postState)
case rct5CID.String():
require.Equal(t, rct5, data)
case rctCIDs[4].String():
require.Equal(t, encodedRcts[4], data)
var postState string
err = db.Get(context.Background(), &postState, postStatePgStr, c)
if err != nil {
t.Fatal(err)
}
require.Equal(t, mocks.ExpectedPostState3, postState)
case rctCIDs[5].String():
require.Equal(t, encodedRcts[5], data)
var postState string
err = db.Get(context.Background(), &postState, postStatePgStr, c)
if err != nil {
@ -723,62 +751,19 @@ func DoTestPublishAndIndexTransactionsNonCanonical(t *testing.T, db sql.Database
// expected transactions in the canonical block
mockBlockTxs := mocks.MockBlock.Transactions()
expectedBlockTxs := []models.TxModel{
{
expectedBlockTxs := make([]models.TxModel, len(mockBlockTxs))
for i, tx := range mockBlockTxs {
expectedBlockTxs[i] = models.TxModel{
BlockNumber: mockBlock.Number().String(),
HeaderID: mockBlock.Hash().String(),
TxHash: mockBlockTxs[0].Hash().String(),
CID: trx1CID.String(),
Dst: shared.HandleZeroAddrPointer(mockBlockTxs[0].To()),
TxHash: tx.Hash().String(),
CID: txCIDs[i].String(),
Dst: shared.HandleZeroAddrPointer(tx.To()),
Src: mocks.SenderAddr.String(),
Index: 0,
Type: mockBlockTxs[0].Type(),
Value: mockBlockTxs[0].Value().String(),
},
{
BlockNumber: mockBlock.Number().String(),
HeaderID: mockBlock.Hash().String(),
TxHash: mockBlockTxs[1].Hash().String(),
CID: trx2CID.String(),
Dst: shared.HandleZeroAddrPointer(mockBlockTxs[1].To()),
Src: mocks.SenderAddr.String(),
Index: 1,
Type: mockBlockTxs[1].Type(),
Value: mockBlockTxs[1].Value().String(),
},
{
BlockNumber: mockBlock.Number().String(),
HeaderID: mockBlock.Hash().String(),
TxHash: mockBlockTxs[2].Hash().String(),
CID: trx3CID.String(),
Dst: shared.HandleZeroAddrPointer(mockBlockTxs[2].To()),
Src: mocks.SenderAddr.String(),
Index: 2,
Type: mockBlockTxs[2].Type(),
Value: mockBlockTxs[2].Value().String(),
},
{
BlockNumber: mockBlock.Number().String(),
HeaderID: mockBlock.Hash().String(),
TxHash: mockBlockTxs[3].Hash().String(),
CID: trx4CID.String(),
Dst: shared.HandleZeroAddrPointer(mockBlockTxs[3].To()),
Src: mocks.SenderAddr.String(),
Index: 3,
Type: mockBlockTxs[3].Type(),
Value: mockBlockTxs[3].Value().String(),
},
{
BlockNumber: mockBlock.Number().String(),
HeaderID: mockBlock.Hash().String(),
TxHash: mockBlockTxs[4].Hash().String(),
CID: trx5CID.String(),
Dst: shared.HandleZeroAddrPointer(mockBlockTxs[4].To()),
Src: mocks.SenderAddr.String(),
Index: 4,
Type: mockBlockTxs[4].Type(),
Value: mockBlockTxs[4].Value().String(),
},
Index: int64(i),
Type: tx.Type(),
Value: tx.Value().String(),
}
}
// expected transactions in the non-canonical block at London height
@ -788,7 +773,7 @@ func DoTestPublishAndIndexTransactionsNonCanonical(t *testing.T, db sql.Database
BlockNumber: mockNonCanonicalBlock.Number().String(),
HeaderID: mockNonCanonicalBlock.Hash().String(),
TxHash: mockNonCanonicalBlockTxs[0].Hash().String(),
CID: trx2CID.String(),
CID: txCIDs[1].String(),
Dst: mockNonCanonicalBlockTxs[0].To().String(),
Src: mocks.SenderAddr.String(),
Index: 0,
@ -799,7 +784,7 @@ func DoTestPublishAndIndexTransactionsNonCanonical(t *testing.T, db sql.Database
BlockNumber: mockNonCanonicalBlock.Number().String(),
HeaderID: mockNonCanonicalBlock.Hash().String(),
TxHash: mockNonCanonicalBlockTxs[1].Hash().String(),
CID: trx5CID.String(),
CID: txCIDs[4].String(),
Dst: mockNonCanonicalBlockTxs[1].To().String(),
Src: mocks.SenderAddr.String(),
Index: 1,
@ -815,7 +800,7 @@ func DoTestPublishAndIndexTransactionsNonCanonical(t *testing.T, db sql.Database
BlockNumber: mockNonCanonicalBlock2.Number().String(),
HeaderID: mockNonCanonicalBlock2.Hash().String(),
TxHash: mockNonCanonicalBlock2Txs[0].Hash().String(),
CID: trx3CID.String(),
CID: txCIDs[2].String(),
Dst: "",
Src: mocks.SenderAddr.String(),
Index: 0,
@ -826,7 +811,7 @@ func DoTestPublishAndIndexTransactionsNonCanonical(t *testing.T, db sql.Database
BlockNumber: mockNonCanonicalBlock2.Number().String(),
HeaderID: mockNonCanonicalBlock2.Hash().String(),
TxHash: mockNonCanonicalBlock2Txs[1].Hash().String(),
CID: trx5CID.String(),
CID: txCIDs[4].String(),
Dst: mockNonCanonicalBlock2Txs[1].To().String(),
Src: mocks.SenderAddr.String(),
Index: 1,
@ -863,14 +848,12 @@ func DoTestPublishAndIndexTransactionsNonCanonical(t *testing.T, db sql.Database
// check indexed IPLD blocks
var data []byte
txCIDs := []cid.Cid{trx1CID, trx2CID, trx3CID, trx4CID, trx5CID}
txRLPs := [][]byte{tx1, tx2, tx3, tx4, tx5}
for i, txCID := range txCIDs {
err = db.Get(context.Background(), &data, ipfsPgGet, txCID.String(), mocks.BlockNumber.Uint64())
if err != nil {
t.Fatal(err)
}
require.Equal(t, txRLPs[i], data)
require.Equal(t, encodedTxs[i], data)
}
}
@ -886,11 +869,10 @@ func DoTestPublishAndIndexReceiptsNonCanonical(t *testing.T, db sql.Database) {
}
// expected receipts in the canonical block
rctCids := []cid.Cid{rct1CID, rct2CID, rct3CID, rct4CID, rct5CID}
expectedBlockRctsMap := make(map[string]models.ReceiptModel, len(mocks.MockReceipts))
for i, mockBlockRct := range mocks.MockReceipts {
rctModel := createRctModel(mockBlockRct, rctCids[i], mockBlock.Number().String())
expectedBlockRctsMap[rctCids[i].String()] = rctModel
rctModel := createRctModel(mockBlockRct, rctCIDs[i], mockBlock.Number().String())
expectedBlockRctsMap[rctCIDs[i].String()] = rctModel
}
// expected receipts in the non-canonical block at London height
@ -945,10 +927,8 @@ func DoTestPublishAndIndexReceiptsNonCanonical(t *testing.T, db sql.Database) {
// check indexed rct IPLD blocks
var data []byte
rctRLPs := [][]byte{
rct1, rct2, rct3, rct4, rct5, nonCanonicalBlockRct1, nonCanonicalBlockRct2,
}
for i, rctCid := range append(rctCids, nonCanonicalBlockRctCids...) {
rctRLPs := append(encodedRcts, nonCanonicalBlockRct1, nonCanonicalBlockRct2)
for i, rctCid := range append(rctCIDs, nonCanonicalBlockRctCids...) {
err = db.Get(context.Background(), &data, ipfsPgGet, rctCid.String(), mocks.BlockNumber.Uint64())
if err != nil {
t.Fatal(err)

View File

@ -37,14 +37,13 @@ var (
WHERE key = $1 AND block_number = $2`
watchedAddressesPgGet = `SELECT *
FROM eth_meta.watched_addresses`
tx1, tx2, tx3, tx4, tx5, rct1, rct2, rct3, rct4, rct5 []byte
encodedTxs, encodedRcts [][]byte
wd1, wd2 []byte
nonCanonicalBlockRct1, nonCanonicalBlockRct2 []byte
nonCanonicalBlock2Rct1, nonCanonicalBlock2Rct2 []byte
mockBlock, mockNonCanonicalBlock, mockNonCanonicalBlock2 *types.Block
headerCID, mockNonCanonicalHeaderCID, mockNonCanonicalHeader2CID cid.Cid
trx1CID, trx2CID, trx3CID, trx4CID, trx5CID cid.Cid
rct1CID, rct2CID, rct3CID, rct4CID, rct5CID cid.Cid
txCIDs, rctCIDs []cid.Cid
wd1CID, wd2CID cid.Cid
nonCanonicalBlockRct1CID, nonCanonicalBlockRct2CID cid.Cid
nonCanonicalBlock2Rct1CID, nonCanonicalBlock2Rct2CID cid.Cid
@ -64,60 +63,28 @@ func init() {
mockNonCanonicalBlock2 = mocks.MockNonCanonicalBlock2
nonCanonicalBlock2Rcts := mocks.MockNonCanonicalBlock2Receipts
// encode mock receipts
// encode mock txs and receipts
buf := new(bytes.Buffer)
txs.EncodeIndex(0, buf)
tx1 = make([]byte, buf.Len())
copy(tx1, buf.Bytes())
buf.Reset()
encodedTxs = make([][]byte, len(txs))
encodedRcts = make([][]byte, len(rcts))
txs.EncodeIndex(1, buf)
tx2 = make([]byte, buf.Len())
copy(tx2, buf.Bytes())
for i := 0; i < len(txs); i++ {
txs.EncodeIndex(i, buf)
tx := make([]byte, buf.Len())
copy(tx, buf.Bytes())
buf.Reset()
encodedTxs[i] = tx
}
txs.EncodeIndex(2, buf)
tx3 = make([]byte, buf.Len())
copy(tx3, buf.Bytes())
buf.Reset()
txs.EncodeIndex(3, buf)
tx4 = make([]byte, buf.Len())
copy(tx4, buf.Bytes())
buf.Reset()
txs.EncodeIndex(4, buf)
tx5 = make([]byte, buf.Len())
copy(tx5, buf.Bytes())
buf.Reset()
rcts.EncodeIndex(0, buf)
rct1 = make([]byte, buf.Len())
copy(rct1, buf.Bytes())
buf.Reset()
rcts.EncodeIndex(1, buf)
rct2 = make([]byte, buf.Len())
copy(rct2, buf.Bytes())
buf.Reset()
rcts.EncodeIndex(2, buf)
rct3 = make([]byte, buf.Len())
copy(rct3, buf.Bytes())
buf.Reset()
rcts.EncodeIndex(3, buf)
rct4 = make([]byte, buf.Len())
copy(rct4, buf.Bytes())
buf.Reset()
rcts.EncodeIndex(4, buf)
rct5 = make([]byte, buf.Len())
copy(rct5, buf.Bytes())
for i := 0; i < len(rcts); i++ {
rcts.EncodeIndex(i, buf)
rct := make([]byte, buf.Len())
copy(rct, buf.Bytes())
buf.Reset()
encodedRcts[i] = rct
}
// encode mock withdrawals
// wds
mocks.MockWithdrawals.EncodeIndex(0, buf)
wd1 = make([]byte, buf.Len())
copy(wd1, buf.Bytes())
@ -152,19 +119,20 @@ func init() {
headerCID, _ = ipld.RawdataToCid(ipld.MEthHeader, mocks.MockHeaderRlp, multihash.KECCAK_256)
mockNonCanonicalHeaderCID, _ = ipld.RawdataToCid(ipld.MEthHeader, mocks.MockNonCanonicalHeaderRlp, multihash.KECCAK_256)
mockNonCanonicalHeader2CID, _ = ipld.RawdataToCid(ipld.MEthHeader, mocks.MockNonCanonicalHeader2Rlp, multihash.KECCAK_256)
trx1CID, _ = ipld.RawdataToCid(ipld.MEthTx, tx1, multihash.KECCAK_256)
trx2CID, _ = ipld.RawdataToCid(ipld.MEthTx, tx2, multihash.KECCAK_256)
trx3CID, _ = ipld.RawdataToCid(ipld.MEthTx, tx3, multihash.KECCAK_256)
trx4CID, _ = ipld.RawdataToCid(ipld.MEthTx, tx4, multihash.KECCAK_256)
trx5CID, _ = ipld.RawdataToCid(ipld.MEthTx, tx5, multihash.KECCAK_256)
for i := 0; i < len(txs); i++ {
tx, _ := ipld.RawdataToCid(ipld.MEthTx, encodedTxs[i], multihash.KECCAK_256)
txCIDs = append(txCIDs, tx)
}
state1CID, _ = ipld.RawdataToCid(ipld.MEthStateTrie, mocks.ContractLeafNode, multihash.KECCAK_256)
state2CID, _ = ipld.RawdataToCid(ipld.MEthStateTrie, mocks.AccountLeafNode, multihash.KECCAK_256)
storageCID, _ = ipld.RawdataToCid(ipld.MEthStorageTrie, mocks.StorageLeafNode, multihash.KECCAK_256)
rct1CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct1, multihash.KECCAK_256)
rct2CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct2, multihash.KECCAK_256)
rct3CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct3, multihash.KECCAK_256)
rct4CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct4, multihash.KECCAK_256)
rct5CID, _ = ipld.RawdataToCid(ipld.MEthTxReceipt, rct5, multihash.KECCAK_256)
for i := 0; i < len(rcts); i++ {
rct, _ := ipld.RawdataToCid(ipld.MEthTxReceipt, encodedRcts[i], multihash.KECCAK_256)
rctCIDs = append(rctCIDs, rct)
}
wd1CID, _ = ipld.RawdataToCid(ipld.MEthWithdrawal, wd1, multihash.KECCAK_256)
wd2CID, _ = ipld.RawdataToCid(ipld.MEthWithdrawal, wd2, multihash.KECCAK_256)

View File

@ -19,10 +19,14 @@ package test_helpers
import (
"bufio"
"context"
"fmt"
"os"
"testing"
"github.com/cerc-io/plugeth-statediff/indexer/database/sql"
"github.com/cerc-io/plugeth-statediff/indexer/database/sql/postgres"
"github.com/cerc-io/plugeth-statediff/indexer/shared/schema"
"github.com/jmoiron/sqlx"
)
// DedupFile removes duplicates from the given file
@ -39,9 +43,6 @@ func DedupFile(filePath string) error {
s := sc.Text()
stmts[s] = struct{}{}
}
if err != nil {
return err
}
f.Close()
@ -60,31 +61,30 @@ func DedupFile(filePath string) error {
// TearDownDB is used to tear down the watcher dbs after tests
func TearDownDB(t *testing.T, db sql.Database) {
ctx := context.Background()
tx, err := db.Begin(ctx)
err := ClearDB(db)
if err != nil {
t.Fatal(err)
}
}
statements := []string{
`TRUNCATE nodes`,
`TRUNCATE ipld.blocks`,
`TRUNCATE eth.header_cids`,
`TRUNCATE eth.uncle_cids`,
`TRUNCATE eth.transaction_cids`,
`TRUNCATE eth.receipt_cids`,
`TRUNCATE eth.state_cids`,
`TRUNCATE eth.storage_cids`,
`TRUNCATE eth.log_cids`,
`TRUNCATE eth.withdrawal_cids`,
`TRUNCATE eth_meta.watched_addresses`,
func ClearSqlxDB(sqlxdb *sqlx.DB) error {
driver := postgres.NewSQLXDriver(context.Background(), sqlxdb)
db := postgres.NewPostgresDB(driver, false)
return ClearDB(db)
}
for _, stm := range statements {
func ClearDB(db sql.Database) error {
ctx := context.Background()
tx, err := db.Begin(ctx)
if err != nil {
return err
}
for _, tbl := range schema.AllTables {
stm := fmt.Sprintf("TRUNCATE %s", tbl.Name)
if _, err = tx.Exec(ctx, stm); err != nil {
t.Fatal(err)
return fmt.Errorf("error executing `%s`: %w", stm, err)
}
}
if err = tx.Commit(ctx); err != nil {
t.Fatal(err)
}
return tx.Commit(ctx)
}

View File

@ -3,7 +3,7 @@ services:
restart: on-failure
depends_on:
- ipld-eth-db
image: git.vdb.to/cerc-io/ipld-eth-db/ipld-eth-db:v5.3.0-alpha
image: git.vdb.to/cerc-io/ipld-eth-db/ipld-eth-db:v5.4.0-alpha
environment:
DATABASE_USER: "vdbm"
DATABASE_NAME: "cerc_testing"

View File

@ -1,20 +0,0 @@
version: "1.2"
name: fixturenet-plugeth-tx
description: "Plugeth Ethereum Fixturenet for testing plugeth-statediff"
repos:
- git.vdb.to/cerc-io/plugeth@v1.13.14-cerc-2
- git.vdb.to/cerc-io/plugeth-statediff
- git.vdb.to/cerc-io/lighthouse
- git.vdb.to/cerc-io/ipld-eth-db@v5.3.0-alpha
containers:
- cerc/plugeth-statediff
- cerc/plugeth
- cerc/fixturenet-eth-genesis
- cerc/fixturenet-plugeth-plugeth
- cerc/lighthouse
- cerc/lighthouse-cli
- cerc/fixturenet-eth-lighthouse
- cerc/ipld-eth-db
pods:
- fixturenet-plugeth
- ipld-eth-db

View File

@ -12,14 +12,15 @@ import (
"testing"
"github.com/cerc-io/eth-iterator-utils/tracker"
statediff "github.com/cerc-io/plugeth-statediff"
"github.com/cerc-io/plugeth-statediff/adapt"
sdtypes "github.com/cerc-io/plugeth-statediff/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/stretchr/testify/require"
"github.com/cerc-io/plugeth-statediff"
"github.com/cerc-io/plugeth-statediff/adapt"
sdtypes "github.com/cerc-io/plugeth-statediff/types"
)
var subtrieCounts = []uint{1, 8, 32}

View File

@ -11,8 +11,10 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/kzg4844"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/params"
"github.com/holiman/uint256"
)
const secondsPerBlock = 12
@ -148,3 +150,58 @@ func (gen *GenContext) createTx(from common.Address, to *common.Address, amount
}
return types.SignTx(tx, signer, priv)
}
func (gen *GenContext) createBlobTx(
from common.Address,
to common.Address,
amount *uint256.Int,
gasLimit uint64,
blobData []byte,
) (*types.Transaction, error) {
signer := types.MakeSigner(gen.ChainConfig, gen.block.Number(), gen.time)
nonce := gen.block.TxNonce(from)
priv, ok := gen.Keys[from]
if !ok {
return nil, errors.New("No private key for sender address" + from.String())
}
if !gen.ChainConfig.IsCancun(gen.block.Number(), gen.time) {
return nil, errors.New("blob tx is only supported from Cancun fork")
}
sidecar := MakeSidecar([][]byte{blobData})
tx := types.NewTx(&types.BlobTx{
ChainID: uint256.MustFromBig(gen.ChainConfig.ChainID),
Nonce: nonce,
To: to,
Gas: gasLimit,
GasTipCap: uint256.NewInt(50),
GasFeeCap: uint256.NewInt(1000000000),
Value: amount,
BlobFeeCap: uint256.NewInt(1000000),
BlobHashes: sidecar.BlobHashes(),
Sidecar: sidecar,
})
return types.SignTx(tx, signer, priv)
}
// From go-ethereum/cmd/devp2p/internal/ethtest/chain.go
func MakeSidecar(data [][]byte) *types.BlobTxSidecar {
var (
blobs = make([]kzg4844.Blob, len(data))
commitments []kzg4844.Commitment
proofs []kzg4844.Proof
)
for i := range blobs {
copy(blobs[i][:], data[i])
c, _ := kzg4844.BlobToCommitment(blobs[i])
p, _ := kzg4844.ComputeBlobProof(blobs[i], c)
commitments = append(commitments, c)
proofs = append(proofs, p)
}
return &types.BlobTxSidecar{
Blobs: blobs,
Commitments: commitments,
Proofs: proofs,
}
}

View File

@ -1,34 +0,0 @@
package test_helpers
import (
"fmt"
"github.com/jmoiron/sqlx"
)
// ClearDB is used to empty the IPLD-ETH tables after tests
func ClearDB(db *sqlx.DB) error {
tx, err := db.Beginx()
if err != nil {
return err
}
statements := []string{
`TRUNCATE nodes`,
`TRUNCATE ipld.blocks`,
`TRUNCATE eth.header_cids`,
`TRUNCATE eth.uncle_cids`,
`TRUNCATE eth.transaction_cids`,
`TRUNCATE eth.receipt_cids`,
`TRUNCATE eth.state_cids`,
`TRUNCATE eth.storage_cids`,
`TRUNCATE eth.log_cids`,
`TRUNCATE eth.withdrawal_cids`,
`TRUNCATE eth_meta.watched_addresses`,
}
for _, stm := range statements {
if _, err = tx.Exec(stm); err != nil {
return fmt.Errorf("error executing `%s`: %w", stm, err)
}
}
return tx.Commit()
}

View File

@ -5,15 +5,16 @@ import (
"fmt"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/params"
"github.com/cerc-io/plugeth-statediff"
"github.com/cerc-io/plugeth-statediff/adapt"
"github.com/cerc-io/plugeth-statediff/indexer"
"github.com/cerc-io/plugeth-statediff/indexer/interfaces"
"github.com/cerc-io/plugeth-statediff/indexer/node"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/params"
)
type IndexChainParams struct {