From 14208b281f643644a3f86be88ded86b79c8feaf4 Mon Sep 17 00:00:00 2001 From: i-norden Date: Fri, 10 Sep 2021 13:23:06 -0500 Subject: [PATCH 1/5] new cli flag for initializing db first time service is ran --- cmd/geth/config.go | 3 +++ cmd/geth/main.go | 1 + cmd/geth/usage.go | 1 + cmd/utils/flags.go | 4 ++++ statediff/types.go | 1 + 5 files changed, 10 insertions(+) diff --git a/cmd/geth/config.go b/cmd/geth/config.go index e884a4d42..cbe0df5a0 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -187,6 +187,9 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { } else { utils.Fatalf("Must specify client name for statediff DB output") } + if ctx.GlobalIsSet(utils.StateDiffDBInitFlag.Name) { + dbParams.Init = ctx.GlobalBool(utils.StateDiffDBInitFlag.Name) + } } else { if ctx.GlobalBool(utils.StateDiffWritingFlag.Name) { utils.Fatalf("Must pass DB parameters if enabling statediff write loop") diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 1e4f7497c..5e8132d2e 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -150,6 +150,7 @@ var ( utils.MinerNotifyFullFlag, utils.StateDiffFlag, utils.StateDiffDBFlag, + utils.StateDiffDBInitFlag, utils.StateDiffDBNodeIDFlag, utils.StateDiffDBClientNameFlag, utils.StateDiffWritingFlag, diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index 8112a675f..fb969ca31 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -232,6 +232,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{ Flags: []cli.Flag{ utils.StateDiffFlag, utils.StateDiffDBFlag, + utils.StateDiffDBInitFlag, utils.StateDiffDBNodeIDFlag, utils.StateDiffDBClientNameFlag, utils.StateDiffWritingFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 9e6b963fd..899595868 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -785,6 +785,10 @@ var ( Name: "statediff.db", Usage: "PostgreSQL database connection string for writing state diffs", } + StateDiffDBInitFlag = cli.BoolFlag{ + Name: "statediff.db.init", + Usage: "Whether or not the statediff database needs to be initialized; set true the first time this process is ran on a given database", + } StateDiffDBNodeIDFlag = cli.StringFlag{ Name: "statediff.dbnodeid", Usage: "Node ID to use when writing state diffs to database", diff --git a/statediff/types.go b/statediff/types.go index 148567dd7..d56922c2f 100644 --- a/statediff/types.go +++ b/statediff/types.go @@ -39,6 +39,7 @@ type DBParams struct { ConnectionURL string ID string ClientName string + Init bool } // Params is used to carry in parameters from subscribing/requesting clients configuration -- 2.45.2 From 53ede0413a0d8360eb5fcec63674f7cb817468ea Mon Sep 17 00:00:00 2001 From: i-norden Date: Fri, 10 Sep 2021 13:23:41 -0500 Subject: [PATCH 2/5] only write Removed node ipld block (on db init) and reuse constant cid and mhkey --- statediff/README.md | 3 +- statediff/builder_test.go | 1 + statediff/indexer/helpers.go | 14 +---- statediff/indexer/indexer.go | 67 +++++++++++++++++++----- statediff/indexer/indexer_legacy_test.go | 3 +- statediff/indexer/indexer_test.go | 3 +- statediff/indexer/shared/functions.go | 16 ++++-- statediff/service.go | 5 +- statediff/types/types.go | 20 ++++++- 9 files changed, 97 insertions(+), 35 deletions(-) diff --git a/statediff/README.md b/statediff/README.md index 0d8163e02..74c82f2d2 100644 --- a/statediff/README.md +++ b/statediff/README.md @@ -81,6 +81,7 @@ This service introduces a CLI flag namespace `statediff` `--statediff.writing` is used to tell the service to write state diff objects it produces from synced ChainEvents directly to a configured Postgres database `--statediff.workers` is used to set the number of concurrent workers to process state diff objects and write them into the database `--statediff.db` is the connection string for the Postgres database to write to +`--statediff.db.init` indicates whether we need to initialize a new database; set true if its the first time running the process on a given database `--statediff.dbnodeid` is the node id to use in the Postgres database `--statediff.dbclientname` is the client name to use in the Postgres database @@ -88,7 +89,7 @@ The service can only operate in full sync mode (`--syncmode=full`), but only the e.g. ` -./build/bin/geth --syncmode=full --gcmode=archive --statediff --statediff.writing --statediff.db=postgres://localhost:5432/vulcanize_testing?sslmode=disable --statediff.dbnodeid={nodeId} --statediff.dbclientname={dbClientName} +./build/bin/geth --syncmode=full --gcmode=archive --statediff --statediff.writing --statediff.db=postgres://localhost:5432/vulcanize_testing?sslmode=disable --statediff.db.init=true --statediff.dbnodeid={nodeId} --statediff.dbclientname={dbClientName} ` ### RPC endpoints diff --git a/statediff/builder_test.go b/statediff/builder_test.go index a99a39664..916652256 100644 --- a/statediff/builder_test.go +++ b/statediff/builder_test.go @@ -1485,6 +1485,7 @@ func TestBuilderWithRemovedAccountAndStorage(t *testing.T) { NodeType: sdtypes.Removed, LeafKey: contractLeafKey, NodeValue: []byte{}, + StorageNodes: emptyStorage, }, { Path: []byte{'\x0c'}, diff --git a/statediff/indexer/helpers.go b/statediff/indexer/helpers.go index bb62fd079..0b3a9287d 100644 --- a/statediff/indexer/helpers.go +++ b/statediff/indexer/helpers.go @@ -23,19 +23,9 @@ import ( "github.com/ethereum/go-ethereum/statediff/types" ) +// ResolveFromNodeType wrapper around NodeType.Int() so that we maintain backwards compatability func ResolveFromNodeType(nodeType types.NodeType) int { - switch nodeType { - case types.Branch: - return 0 - case types.Extension: - return 1 - case types.Leaf: - return 2 - case types.Removed: - return 3 - default: - return -1 - } + return nodeType.Int() } // ChainConfig returns the appropriate ethereum chain config for the provided chain id diff --git a/statediff/indexer/indexer.go b/statediff/indexer/indexer.go index 761bd3c29..7f6fe0dd1 100644 --- a/statediff/indexer/indexer.go +++ b/statediff/indexer/indexer.go @@ -47,6 +47,12 @@ var ( dbMetrics = RegisterDBMetrics(metrics.DefaultRegistry) ) +const ( + removedNodeStorageCID = "bagmacgzayxjemamg64rtzet6pwznzrydydsqbnstzkbcoo337lmaixmfurya" + removedNodeStateCID = "baglacgzayxjemamg64rtzet6pwznzrydydsqbnstzkbcoo337lmaixmfurya" + removedNodeMhKey = "/blocks/DMQMLUSGAGDPOIZ4SJ7H3MW4Y4B4BZIAWZJ4VARHHN57VWAELWC2I4A" +) + // Indexer interface to allow substitution of mocks for testing type Indexer interface { PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error) @@ -59,14 +65,21 @@ type Indexer interface { type StateDiffIndexer struct { chainConfig *params.ChainConfig dbWriter *PostgresCIDWriter + init bool } // NewStateDiffIndexer creates a pointer to a new PayloadConverter which satisfies the PayloadConverter interface -func NewStateDiffIndexer(chainConfig *params.ChainConfig, db *postgres.DB) *StateDiffIndexer { +func NewStateDiffIndexer(chainConfig *params.ChainConfig, db *postgres.DB, init bool) (*StateDiffIndexer, error) { + // If this is the first time writing to this db, write the public.blocks entry for an empty node (for Removed state and storage node types) + if init { + if err := shared.PublishDirectWithDB(db, removedNodeMhKey, []byte{}); err != nil { + return nil, err + } + } return &StateDiffIndexer{ chainConfig: chainConfig, dbWriter: NewPostgresCIDWriter(db), - } + }, nil } type BlockTx struct { @@ -76,7 +89,7 @@ type BlockTx struct { Close func(err error) error } -// Reporting function to run as goroutine +// ReportDBMetrics is a reporting function to run as goroutine func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bool) { if !metrics.Enabled { return @@ -95,7 +108,7 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bo }() } -// Pushes and indexes block data in database, except state & storage nodes (includes header, uncles, transactions & receipts) +// PushBlock pushes and indexes block data in database, except state & storage nodes (includes header, uncles, transactions & receipts) // Returns an initiated DB transaction which must be Closed via defer to commit or rollback func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error) { start, t := time.Now(), time.Now() @@ -250,6 +263,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *sqlx.Tx, header *types.Header, he }) } +// processUncles publishes and indexes uncle IPLDs in Postgres func (sdi *StateDiffIndexer) processUncles(tx *sqlx.Tx, headerID int64, blockNumber uint64, uncleNodes []*ipld.EthHeader) error { // publish and index uncles for _, uncleNode := range uncleNodes { @@ -434,19 +448,32 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *sqlx.Tx, args processArgs return nil } +// PushStateNode publishes and indexes a state diff node object (including any child storage nodes) in the IPLD database func (sdi *StateDiffIndexer) PushStateNode(tx *BlockTx, stateNode sdtypes.StateNode) error { // publish the state node - stateCIDStr, err := shared.PublishRaw(tx.dbtx, ipld.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue) + if stateNode.NodeType == sdtypes.Removed { + // short circuit if it is a Removed node + // this assumes the db has been initialized and a public.blocks entry for the Removed node is present + stateModel := models.StateNodeModel{ + Path: stateNode.Path, + StateKey: common.BytesToHash(stateNode.LeafKey).String(), + CID: removedNodeStateCID, + MhKey: removedNodeMhKey, + NodeType: stateNode.NodeType.Int(), + } + _, err := sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel, tx.headerID) + return err + } + stateCIDStr, stateMhKey, err := shared.PublishRaw(tx.dbtx, ipld.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue) if err != nil { return fmt.Errorf("error publishing state node IPLD: %v", err) } - mhKey, _ := shared.MultihashKeyFromCIDString(stateCIDStr) stateModel := models.StateNodeModel{ Path: stateNode.Path, StateKey: common.BytesToHash(stateNode.LeafKey).String(), CID: stateCIDStr, - MhKey: mhKey, - NodeType: ResolveFromNodeType(stateNode.NodeType), + MhKey: stateMhKey, + NodeType: stateNode.NodeType.Int(), } // index the state node, collect the stateID to reference by FK stateID, err := sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel, tx.headerID) @@ -478,17 +505,31 @@ func (sdi *StateDiffIndexer) PushStateNode(tx *BlockTx, stateNode sdtypes.StateN } // if there are any storage nodes associated with this node, publish and index them for _, storageNode := range stateNode.StorageNodes { - storageCIDStr, err := shared.PublishRaw(tx.dbtx, ipld.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue) + if storageNode.NodeType == sdtypes.Removed { + // short circuit if it is a Removed node + // this assumes the db has been initialized and a public.blocks entry for the Removed node is present + storageModel := models.StorageNodeModel{ + Path: storageNode.Path, + StorageKey: common.BytesToHash(storageNode.LeafKey).String(), + CID: removedNodeStorageCID, + MhKey: removedNodeMhKey, + NodeType: storageNode.NodeType.Int(), + } + if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel, stateID); err != nil { + return err + } + continue + } + storageCIDStr, storageMhKey, err := shared.PublishRaw(tx.dbtx, ipld.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue) if err != nil { return fmt.Errorf("error publishing storage node IPLD: %v", err) } - mhKey, _ := shared.MultihashKeyFromCIDString(storageCIDStr) storageModel := models.StorageNodeModel{ Path: storageNode.Path, StorageKey: common.BytesToHash(storageNode.LeafKey).String(), CID: storageCIDStr, - MhKey: mhKey, - NodeType: ResolveFromNodeType(storageNode.NodeType), + MhKey: storageMhKey, + NodeType: storageNode.NodeType.Int(), } if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel, stateID); err != nil { return err @@ -498,7 +539,7 @@ func (sdi *StateDiffIndexer) PushStateNode(tx *BlockTx, stateNode sdtypes.StateN return nil } -// Publishes code and codehash pairs to the ipld database +// PushCodeAndCodeHash publishes code and codehash pairs to the ipld database func (sdi *StateDiffIndexer) PushCodeAndCodeHash(tx *BlockTx, codeAndCodeHash sdtypes.CodeAndCodeHash) error { // codec doesn't matter since db key is multihash-based mhKey, err := shared.MultihashKeyFromKeccak256(codeAndCodeHash.Hash) diff --git a/statediff/indexer/indexer_legacy_test.go b/statediff/indexer/indexer_legacy_test.go index 588107cc0..005f3e445 100644 --- a/statediff/indexer/indexer_legacy_test.go +++ b/statediff/indexer/indexer_legacy_test.go @@ -42,7 +42,8 @@ func setupLegacy(t *testing.T) { db, err = shared.SetupDB() require.NoError(t, err) - ind = indexer.NewStateDiffIndexer(legacyData.Config, db) + ind, err = indexer.NewStateDiffIndexer(legacyData.Config, db, false) + require.NoError(t, err) var tx *indexer.BlockTx tx, err = ind.PushBlock( mockLegacyBlock, diff --git a/statediff/indexer/indexer_test.go b/statediff/indexer/indexer_test.go index 70ab4cd90..725c69781 100644 --- a/statediff/indexer/indexer_test.go +++ b/statediff/indexer/indexer_test.go @@ -139,7 +139,8 @@ func setup(t *testing.T) { if err != nil { t.Fatal(err) } - ind = indexer.NewStateDiffIndexer(mocks.TestConfig, db) + ind, err = indexer.NewStateDiffIndexer(mocks.TestConfig, db, false) + require.NoError(t, err) var tx *indexer.BlockTx tx, err = ind.PushBlock( mockBlock, diff --git a/statediff/indexer/shared/functions.go b/statediff/indexer/shared/functions.go index 92d5e6f2f..cb2ca6cea 100644 --- a/statediff/indexer/shared/functions.go +++ b/statediff/indexer/shared/functions.go @@ -20,6 +20,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld" + "github.com/ethereum/go-ethereum/statediff/indexer/postgres" "github.com/ipfs/go-cid" blockstore "github.com/ipfs/go-ipfs-blockstore" @@ -96,15 +97,16 @@ func MultihashKeyFromCIDString(c string) (string, error) { } // PublishRaw derives a cid from raw bytes and provided codec and multihash type, and writes it to the db tx -func PublishRaw(tx *sqlx.Tx, codec, mh uint64, raw []byte) (string, error) { +// returns the CID and blockstore prefixed multihash key +func PublishRaw(tx *sqlx.Tx, codec, mh uint64, raw []byte) (string, string, error) { c, err := ipld.RawdataToCid(codec, raw, mh) if err != nil { - return "", err + return "", "", err } dbKey := dshelp.MultihashToDsKey(c.Hash()) prefixedKey := blockstore.BlockPrefix.String() + dbKey.String() _, err = tx.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`, prefixedKey, raw) - return c.String(), err + return c.String(), prefixedKey, err } // MultihashKeyFromKeccak256 converts keccak256 hash bytes into a blockstore-prefixed multihash db key string @@ -117,8 +119,14 @@ func MultihashKeyFromKeccak256(hash common.Hash) (string, error) { return blockstore.BlockPrefix.String() + dbKey.String(), nil } -// PublishDirect diretly writes a previously derived mhkey => value pair to the ipld database +// PublishDirect diretly writes a previously derived mhkey => value pair to the ipld database in the provided tx func PublishDirect(tx *sqlx.Tx, key string, value []byte) error { _, err := tx.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`, key, value) return err } + +// PublishDirectWithDB diretly writes a previously derived mhkey => value pair to the ipld database +func PublishDirectWithDB(db *postgres.DB, key string, value []byte) error { + _, err := db.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`, key, value) + return err +} diff --git a/statediff/service.go b/statediff/service.go index 7935c4887..2d5f8b0ab 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -165,7 +165,10 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params if err != nil { return err } - indexer = ind.NewStateDiffIndexer(blockChain.Config(), db) + indexer, err = ind.NewStateDiffIndexer(blockChain.Config(), db, params.DBParams.Init) + if err != nil { + return err + } } workers := params.NumWorkers if workers == 0 { diff --git a/statediff/types/types.go b/statediff/types/types.go index 08e2124fa..b3acdb743 100644 --- a/statediff/types/types.go +++ b/statediff/types/types.go @@ -22,16 +22,32 @@ package types import "github.com/ethereum/go-ethereum/common" // NodeType for explicitly setting type of node +// we use a string because it is RLP serializable, whereas an int is not type NodeType string const ( Unknown NodeType = "Unknown" - Leaf NodeType = "Leaf" - Extension NodeType = "Extension" Branch NodeType = "Branch" + Extension NodeType = "Extension" + Leaf NodeType = "Leaf" Removed NodeType = "Removed" // used to represent pathes which have been emptied ) +func (n NodeType) Int() int { + switch n { + case Branch: + return 0 + case Extension: + return 1 + case Leaf: + return 2 + case Removed: + return 3 + default: + return -1 + } +} + // StateNode holds the data for a single state diff node type StateNode struct { NodeType NodeType `json:"nodeType" gencodec:"required"` -- 2.45.2 From 2c45f0922111aa675e50270a9009817486a69f08 Mon Sep 17 00:00:00 2001 From: i-norden Date: Fri, 10 Sep 2021 14:53:05 -0500 Subject: [PATCH 3/5] linting --- cmd/utils/flags.go | 2 +- statediff/builder_test.go | 8 ++++---- statediff/indexer/helpers.go | 2 +- statediff/indexer/indexer.go | 14 +++++++------- statediff/types.go | 2 +- 5 files changed, 14 insertions(+), 14 deletions(-) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 899595868..5aa032676 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -786,7 +786,7 @@ var ( Usage: "PostgreSQL database connection string for writing state diffs", } StateDiffDBInitFlag = cli.BoolFlag{ - Name: "statediff.db.init", + Name: "statediff.db.init", Usage: "Whether or not the statediff database needs to be initialized; set true the first time this process is ran on a given database", } StateDiffDBNodeIDFlag = cli.StringFlag{ diff --git a/statediff/builder_test.go b/statediff/builder_test.go index 916652256..1cd8ea0f1 100644 --- a/statediff/builder_test.go +++ b/statediff/builder_test.go @@ -1481,10 +1481,10 @@ func TestBuilderWithRemovedAccountAndStorage(t *testing.T) { StorageNodes: emptyStorage, }, { - Path: []byte{'\x06'}, - NodeType: sdtypes.Removed, - LeafKey: contractLeafKey, - NodeValue: []byte{}, + Path: []byte{'\x06'}, + NodeType: sdtypes.Removed, + LeafKey: contractLeafKey, + NodeValue: []byte{}, StorageNodes: emptyStorage, }, { diff --git a/statediff/indexer/helpers.go b/statediff/indexer/helpers.go index 0b3a9287d..4e4f30c19 100644 --- a/statediff/indexer/helpers.go +++ b/statediff/indexer/helpers.go @@ -23,7 +23,7 @@ import ( "github.com/ethereum/go-ethereum/statediff/types" ) -// ResolveFromNodeType wrapper around NodeType.Int() so that we maintain backwards compatability +// ResolveFromNodeType wrapper around NodeType.Int() so that we maintain backwards compatibility func ResolveFromNodeType(nodeType types.NodeType) int { return nodeType.Int() } diff --git a/statediff/indexer/indexer.go b/statediff/indexer/indexer.go index 7f6fe0dd1..7a784554c 100644 --- a/statediff/indexer/indexer.go +++ b/statediff/indexer/indexer.go @@ -49,8 +49,8 @@ var ( const ( removedNodeStorageCID = "bagmacgzayxjemamg64rtzet6pwznzrydydsqbnstzkbcoo337lmaixmfurya" - removedNodeStateCID = "baglacgzayxjemamg64rtzet6pwznzrydydsqbnstzkbcoo337lmaixmfurya" - removedNodeMhKey = "/blocks/DMQMLUSGAGDPOIZ4SJ7H3MW4Y4B4BZIAWZJ4VARHHN57VWAELWC2I4A" + removedNodeStateCID = "baglacgzayxjemamg64rtzet6pwznzrydydsqbnstzkbcoo337lmaixmfurya" + removedNodeMhKey = "/blocks/DMQMLUSGAGDPOIZ4SJ7H3MW4Y4B4BZIAWZJ4VARHHN57VWAELWC2I4A" ) // Indexer interface to allow substitution of mocks for testing @@ -65,7 +65,7 @@ type Indexer interface { type StateDiffIndexer struct { chainConfig *params.ChainConfig dbWriter *PostgresCIDWriter - init bool + init bool } // NewStateDiffIndexer creates a pointer to a new PayloadConverter which satisfies the PayloadConverter interface @@ -509,11 +509,11 @@ func (sdi *StateDiffIndexer) PushStateNode(tx *BlockTx, stateNode sdtypes.StateN // short circuit if it is a Removed node // this assumes the db has been initialized and a public.blocks entry for the Removed node is present storageModel := models.StorageNodeModel{ - Path: storageNode.Path, + Path: storageNode.Path, StorageKey: common.BytesToHash(storageNode.LeafKey).String(), - CID: removedNodeStorageCID, - MhKey: removedNodeMhKey, - NodeType: storageNode.NodeType.Int(), + CID: removedNodeStorageCID, + MhKey: removedNodeMhKey, + NodeType: storageNode.NodeType.Int(), } if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel, stateID); err != nil { return err diff --git a/statediff/types.go b/statediff/types.go index d56922c2f..f96718bce 100644 --- a/statediff/types.go +++ b/statediff/types.go @@ -39,7 +39,7 @@ type DBParams struct { ConnectionURL string ID string ClientName string - Init bool + Init bool } // Params is used to carry in parameters from subscribing/requesting clients configuration -- 2.45.2 From 55d90af25523550888520042455083828aba8d5f Mon Sep 17 00:00:00 2001 From: i-norden Date: Tue, 21 Sep 2021 12:31:07 -0500 Subject: [PATCH 4/5] test new handling of Removed nodes; don't require init flag --- cmd/geth/config.go | 3 -- cmd/geth/main.go | 1 - cmd/geth/usage.go | 1 - cmd/utils/flags.go | 4 -- statediff/indexer/indexer.go | 26 +++++---- statediff/indexer/indexer_legacy_test.go | 2 +- statediff/indexer/indexer_test.go | 67 ++++++++++++++++++++++-- statediff/indexer/mocks/test_data.go | 29 +++++----- statediff/indexer/shared/data_type.go | 3 +- statediff/indexer/shared/types.go | 2 +- statediff/service.go | 2 +- statediff/types.go | 1 - statediff/types/types.go | 3 +- 13 files changed, 95 insertions(+), 49 deletions(-) diff --git a/cmd/geth/config.go b/cmd/geth/config.go index cbe0df5a0..e884a4d42 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -187,9 +187,6 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { } else { utils.Fatalf("Must specify client name for statediff DB output") } - if ctx.GlobalIsSet(utils.StateDiffDBInitFlag.Name) { - dbParams.Init = ctx.GlobalBool(utils.StateDiffDBInitFlag.Name) - } } else { if ctx.GlobalBool(utils.StateDiffWritingFlag.Name) { utils.Fatalf("Must pass DB parameters if enabling statediff write loop") diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 5e8132d2e..1e4f7497c 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -150,7 +150,6 @@ var ( utils.MinerNotifyFullFlag, utils.StateDiffFlag, utils.StateDiffDBFlag, - utils.StateDiffDBInitFlag, utils.StateDiffDBNodeIDFlag, utils.StateDiffDBClientNameFlag, utils.StateDiffWritingFlag, diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index fb969ca31..8112a675f 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -232,7 +232,6 @@ var AppHelpFlagGroups = []flags.FlagGroup{ Flags: []cli.Flag{ utils.StateDiffFlag, utils.StateDiffDBFlag, - utils.StateDiffDBInitFlag, utils.StateDiffDBNodeIDFlag, utils.StateDiffDBClientNameFlag, utils.StateDiffWritingFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 5aa032676..9e6b963fd 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -785,10 +785,6 @@ var ( Name: "statediff.db", Usage: "PostgreSQL database connection string for writing state diffs", } - StateDiffDBInitFlag = cli.BoolFlag{ - Name: "statediff.db.init", - Usage: "Whether or not the statediff database needs to be initialized; set true the first time this process is ran on a given database", - } StateDiffDBNodeIDFlag = cli.StringFlag{ Name: "statediff.dbnodeid", Usage: "Node ID to use when writing state diffs to database", diff --git a/statediff/indexer/indexer.go b/statediff/indexer/indexer.go index 7a784554c..82761df77 100644 --- a/statediff/indexer/indexer.go +++ b/statediff/indexer/indexer.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -// This package provides an interface for pushing and indexing IPLD objects into a Postgres database +// Package indexer provides an interface for pushing and indexing IPLD objects into a Postgres database // Metrics for reporting processing and connection stats are defined in ./metrics.go package indexer @@ -48,9 +48,9 @@ var ( ) const ( - removedNodeStorageCID = "bagmacgzayxjemamg64rtzet6pwznzrydydsqbnstzkbcoo337lmaixmfurya" - removedNodeStateCID = "baglacgzayxjemamg64rtzet6pwznzrydydsqbnstzkbcoo337lmaixmfurya" - removedNodeMhKey = "/blocks/DMQMLUSGAGDPOIZ4SJ7H3MW4Y4B4BZIAWZJ4VARHHN57VWAELWC2I4A" + RemovedNodeStorageCID = "bagmacgzayxjemamg64rtzet6pwznzrydydsqbnstzkbcoo337lmaixmfurya" + RemovedNodeStateCID = "baglacgzayxjemamg64rtzet6pwznzrydydsqbnstzkbcoo337lmaixmfurya" + RemovedNodeMhKey = "/blocks/DMQMLUSGAGDPOIZ4SJ7H3MW4Y4B4BZIAWZJ4VARHHN57VWAELWC2I4A" ) // Indexer interface to allow substitution of mocks for testing @@ -69,12 +69,10 @@ type StateDiffIndexer struct { } // NewStateDiffIndexer creates a pointer to a new PayloadConverter which satisfies the PayloadConverter interface -func NewStateDiffIndexer(chainConfig *params.ChainConfig, db *postgres.DB, init bool) (*StateDiffIndexer, error) { - // If this is the first time writing to this db, write the public.blocks entry for an empty node (for Removed state and storage node types) - if init { - if err := shared.PublishDirectWithDB(db, removedNodeMhKey, []byte{}); err != nil { - return nil, err - } +func NewStateDiffIndexer(chainConfig *params.ChainConfig, db *postgres.DB) (*StateDiffIndexer, error) { + // Write the removed node to the db on init + if err := shared.PublishDirectWithDB(db, RemovedNodeMhKey, []byte{}); err != nil { + return nil, err } return &StateDiffIndexer{ chainConfig: chainConfig, @@ -457,8 +455,8 @@ func (sdi *StateDiffIndexer) PushStateNode(tx *BlockTx, stateNode sdtypes.StateN stateModel := models.StateNodeModel{ Path: stateNode.Path, StateKey: common.BytesToHash(stateNode.LeafKey).String(), - CID: removedNodeStateCID, - MhKey: removedNodeMhKey, + CID: RemovedNodeStateCID, + MhKey: RemovedNodeMhKey, NodeType: stateNode.NodeType.Int(), } _, err := sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel, tx.headerID) @@ -511,8 +509,8 @@ func (sdi *StateDiffIndexer) PushStateNode(tx *BlockTx, stateNode sdtypes.StateN storageModel := models.StorageNodeModel{ Path: storageNode.Path, StorageKey: common.BytesToHash(storageNode.LeafKey).String(), - CID: removedNodeStorageCID, - MhKey: removedNodeMhKey, + CID: RemovedNodeStorageCID, + MhKey: RemovedNodeMhKey, NodeType: storageNode.NodeType.Int(), } if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel, stateID); err != nil { diff --git a/statediff/indexer/indexer_legacy_test.go b/statediff/indexer/indexer_legacy_test.go index 005f3e445..4b1563190 100644 --- a/statediff/indexer/indexer_legacy_test.go +++ b/statediff/indexer/indexer_legacy_test.go @@ -42,7 +42,7 @@ func setupLegacy(t *testing.T) { db, err = shared.SetupDB() require.NoError(t, err) - ind, err = indexer.NewStateDiffIndexer(legacyData.Config, db, false) + ind, err = indexer.NewStateDiffIndexer(legacyData.Config, db) require.NoError(t, err) var tx *indexer.BlockTx tx, err = ind.PushBlock( diff --git a/statediff/indexer/indexer_test.go b/statediff/indexer/indexer_test.go index 725c69781..67645a12d 100644 --- a/statediff/indexer/indexer_test.go +++ b/statediff/indexer/indexer_test.go @@ -139,7 +139,7 @@ func setup(t *testing.T) { if err != nil { t.Fatal(err) } - ind, err = indexer.NewStateDiffIndexer(mocks.TestConfig, db, false) + ind, err = indexer.NewStateDiffIndexer(mocks.TestConfig, db) require.NoError(t, err) var tx *indexer.BlockTx tx, err = ind.PushBlock( @@ -471,7 +471,7 @@ func TestPublishAndIndexer(t *testing.T) { stateNodes := make([]models.StateNodeModel, 0) pgStr := `SELECT state_cids.id, state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id) - WHERE header_cids.block_number = $1` + WHERE header_cids.block_number = $1 AND node_type != 3` err = db.Select(&stateNodes, pgStr, mocks.BlockNumber.Uint64()) if err != nil { t.Fatal(err) @@ -524,6 +524,33 @@ func TestPublishAndIndexer(t *testing.T) { }) } } + + // check that Removed state nodes were properly indexed and published + stateNodes = make([]models.StateNodeModel, 0) + pgStr = `SELECT state_cids.id, state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id + FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id) + WHERE header_cids.block_number = $1 AND node_type = 3` + err = db.Select(&stateNodes, pgStr, mocks.BlockNumber.Uint64()) + if err != nil { + t.Fatal(err) + } + shared.ExpectEqual(t, len(stateNodes), 1) + stateNode := stateNodes[0] + var data []byte + dc, err := cid.Decode(stateNode.CID) + if err != nil { + t.Fatal(err) + } + mhKey := dshelp.MultihashToDsKey(dc.Hash()) + prefixedKey := blockstore.BlockPrefix.String() + mhKey.String() + shared.ExpectEqual(t, prefixedKey, indexer.RemovedNodeMhKey) + err = db.Get(&data, ipfsPgGet, prefixedKey) + if err != nil { + t.Fatal(err) + } + shared.ExpectEqual(t, stateNode.CID, indexer.RemovedNodeStateCID) + shared.ExpectEqual(t, stateNode.Path, []byte{'\x02'}) + shared.ExpectEqual(t, data, []byte{}) }) t.Run("Publish and index storage IPLDs in a single tx", func(t *testing.T) { @@ -535,7 +562,8 @@ func TestPublishAndIndexer(t *testing.T) { FROM eth.storage_cids, eth.state_cids, eth.header_cids WHERE storage_cids.state_id = state_cids.id AND state_cids.header_id = header_cids.id - AND header_cids.block_number = $1` + AND header_cids.block_number = $1 + AND storage_cids.node_type != 3` err = db.Select(&storageNodes, pgStr, mocks.BlockNumber.Uint64()) if err != nil { t.Fatal(err) @@ -560,5 +588,38 @@ func TestPublishAndIndexer(t *testing.T) { t.Fatal(err) } shared.ExpectEqual(t, data, mocks.StorageLeafNode) + + // check that Removed storage nodes were properly indexed + storageNodes = make([]models.StorageNodeWithStateKeyModel, 0) + pgStr = `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path + FROM eth.storage_cids, eth.state_cids, eth.header_cids + WHERE storage_cids.state_id = state_cids.id + AND state_cids.header_id = header_cids.id + AND header_cids.block_number = $1 + AND storage_cids.node_type = 3` + err = db.Select(&storageNodes, pgStr, mocks.BlockNumber.Uint64()) + if err != nil { + t.Fatal(err) + } + shared.ExpectEqual(t, len(storageNodes), 1) + shared.ExpectEqual(t, storageNodes[0], models.StorageNodeWithStateKeyModel{ + CID: indexer.RemovedNodeStorageCID, + NodeType: 3, + StorageKey: common.BytesToHash(mocks.RemovedLeafKey).Hex(), + StateKey: common.BytesToHash(mocks.ContractLeafKey).Hex(), + Path: []byte{'\x03'}, + }) + dc, err = cid.Decode(storageNodes[0].CID) + if err != nil { + t.Fatal(err) + } + mhKey = dshelp.MultihashToDsKey(dc.Hash()) + prefixedKey = blockstore.BlockPrefix.String() + mhKey.String() + shared.ExpectEqual(t, prefixedKey, indexer.RemovedNodeMhKey) + err = db.Get(&data, ipfsPgGet, prefixedKey) + if err != nil { + t.Fatal(err) + } + shared.ExpectEqual(t, data, []byte{}) }) } diff --git a/statediff/indexer/mocks/test_data.go b/statediff/indexer/mocks/test_data.go index 635c7fa3b..e658640b9 100644 --- a/statediff/indexer/mocks/test_data.go +++ b/statediff/indexer/mocks/test_data.go @@ -129,6 +129,7 @@ var ( AccountRoot = "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421" AccountCodeHash = common.HexToHash("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470") AccountLeafKey = testhelpers.Account2LeafKey + RemovedLeafKey = testhelpers.Account1LeafKey Account, _ = rlp.EncodeToBytes(state.Account{ Nonce: nonce0, Balance: big.NewInt(1000), @@ -154,6 +155,12 @@ var ( LeafKey: StorageLeafKey, NodeValue: StorageLeafNode, }, + { + Path: []byte{'\x03'}, + NodeType: sdtypes.Removed, + LeafKey: RemovedLeafKey, + NodeValue: []byte{}, + }, }, }, { @@ -163,25 +170,15 @@ var ( NodeValue: AccountLeafNode, StorageNodes: []sdtypes.StorageNode{}, }, + { + Path: []byte{'\x02'}, + NodeType: sdtypes.Removed, + LeafKey: RemovedLeafKey, + NodeValue: []byte{}, + }, } ) -/* -// AccessListTx is the data of EIP-2930 access list transactions. -type AccessListTx struct { - ChainID *big.Int // destination chain ID - Nonce uint64 // nonce of sender account - GasPrice *big.Int // wei per gas - Gas uint64 // gas limit - To *common.Address `rlp:"nil"` // nil means contract creation - Value *big.Int // wei amount - Data []byte // contract invocation input data - AccessList AccessList // EIP-2930 access list - V, R, S *big.Int // signature values -} - -*/ - type LegacyData struct { Config *params.ChainConfig BlockNumber *big.Int diff --git a/statediff/indexer/shared/data_type.go b/statediff/indexer/shared/data_type.go index 01fed57f7..ccab92c1e 100644 --- a/statediff/indexer/shared/data_type.go +++ b/statediff/indexer/shared/data_type.go @@ -57,7 +57,7 @@ func (r DataType) String() string { } } -// GenerateDataTypeFromString +// GenerateDataTypeFromString returns a DataType from a provided string func GenerateDataTypeFromString(str string) (DataType, error) { switch strings.ToLower(str) { case "full", "f": @@ -79,6 +79,7 @@ func GenerateDataTypeFromString(str string) (DataType, error) { } } +// SupportedDataType returns whether a DataType is supported func SupportedDataType(d DataType) (bool, error) { switch d { case Full: diff --git a/statediff/indexer/shared/types.go b/statediff/indexer/shared/types.go index 544d4e07e..1337ba68a 100644 --- a/statediff/indexer/shared/types.go +++ b/statediff/indexer/shared/types.go @@ -22,7 +22,7 @@ import ( "github.com/ethereum/go-ethereum/statediff/types" ) -// Trie struct used to flag node as leaf or not +// TrieNode struct used to flag node as leaf or not type TrieNode struct { Path []byte LeafKey common.Hash diff --git a/statediff/service.go b/statediff/service.go index 2d5f8b0ab..cc7b821a3 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -165,7 +165,7 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params if err != nil { return err } - indexer, err = ind.NewStateDiffIndexer(blockChain.Config(), db, params.DBParams.Init) + indexer, err = ind.NewStateDiffIndexer(blockChain.Config(), db) if err != nil { return err } diff --git a/statediff/types.go b/statediff/types.go index f96718bce..148567dd7 100644 --- a/statediff/types.go +++ b/statediff/types.go @@ -39,7 +39,6 @@ type DBParams struct { ConnectionURL string ID string ClientName string - Init bool } // Params is used to carry in parameters from subscribing/requesting clients configuration diff --git a/statediff/types/types.go b/statediff/types/types.go index b3acdb743..56babfb5b 100644 --- a/statediff/types/types.go +++ b/statediff/types/types.go @@ -22,7 +22,6 @@ package types import "github.com/ethereum/go-ethereum/common" // NodeType for explicitly setting type of node -// we use a string because it is RLP serializable, whereas an int is not type NodeType string const ( @@ -30,7 +29,7 @@ const ( Branch NodeType = "Branch" Extension NodeType = "Extension" Leaf NodeType = "Leaf" - Removed NodeType = "Removed" // used to represent pathes which have been emptied + Removed NodeType = "Removed" // used to represent paths which have been emptied ) func (n NodeType) Int() int { -- 2.45.2 From 937393d87c7fe9eabefd8478088d27597b2d3dc0 Mon Sep 17 00:00:00 2001 From: i-norden Date: Wed, 22 Sep 2021 14:09:49 -0500 Subject: [PATCH 5/5] log metrics --- statediff/indexer/metrics.go | 4 ++++ statediff/indexer/writer.go | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/statediff/indexer/metrics.go b/statediff/indexer/metrics.go index e1da3919c..2d37816f6 100644 --- a/statediff/indexer/metrics.go +++ b/statediff/indexer/metrics.go @@ -31,6 +31,8 @@ type indexerMetricsHandles struct { transactions metrics.Counter // The total number of processed receipts receipts metrics.Counter + // The total number of processed logs + logs metrics.Counter // The total number of access list entries processed accessListEntries metrics.Counter // Time spent waiting for free postgres tx @@ -52,6 +54,7 @@ func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles { blocks: metrics.NewCounter(), transactions: metrics.NewCounter(), receipts: metrics.NewCounter(), + logs: metrics.NewCounter(), accessListEntries: metrics.NewCounter(), tFreePostgres: metrics.NewTimer(), tPostgresCommit: metrics.NewTimer(), @@ -64,6 +67,7 @@ func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles { reg.Register(metricName(subsys, "blocks"), ctx.blocks) reg.Register(metricName(subsys, "transactions"), ctx.transactions) reg.Register(metricName(subsys, "receipts"), ctx.receipts) + reg.Register(metricName(subsys, "logs"), ctx.logs) reg.Register(metricName(subsys, "access_list_entries"), ctx.accessListEntries) reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres) reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit) diff --git a/statediff/indexer/writer.go b/statediff/indexer/writer.go index 60ed4fbc0..62b36ca58 100644 --- a/statediff/indexer/writer.go +++ b/statediff/indexer/writer.go @@ -30,7 +30,7 @@ var ( nullHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000") ) -// Handles processing and writing of indexed IPLD objects to Postgres +// PostgresCIDWriter handles processing and writing of indexed IPLD objects to Postgres type PostgresCIDWriter struct { db *postgres.DB } @@ -112,8 +112,8 @@ func (in *PostgresCIDWriter) upsertLogCID(tx *sqlx.Tx, logs []*models.LogsModel, if err != nil { return fmt.Errorf("error upserting logs entry: %w", err) } + indexerMetrics.logs.Inc(1) } - // TODO: Add metrics for logs. return nil } -- 2.45.2