V1.10.8 statediff #123

Merged
telackey merged 7 commits from v1.10.8-statediff into statediff 2021-09-23 13:57:13 +00:00
14 changed files with 185 additions and 63 deletions

View File

@ -81,6 +81,7 @@ This service introduces a CLI flag namespace `statediff`
`--statediff.writing` is used to tell the service to write state diff objects it produces from synced ChainEvents directly to a configured Postgres database
`--statediff.workers` is used to set the number of concurrent workers to process state diff objects and write them into the database
`--statediff.db` is the connection string for the Postgres database to write to
`--statediff.db.init` indicates whether we need to initialize a new database; set true if its the first time running the process on a given database
`--statediff.dbnodeid` is the node id to use in the Postgres database
`--statediff.dbclientname` is the client name to use in the Postgres database
@ -88,7 +89,7 @@ The service can only operate in full sync mode (`--syncmode=full`), but only the
e.g.
`
./build/bin/geth --syncmode=full --gcmode=archive --statediff --statediff.writing --statediff.db=postgres://localhost:5432/vulcanize_testing?sslmode=disable --statediff.dbnodeid={nodeId} --statediff.dbclientname={dbClientName}
./build/bin/geth --syncmode=full --gcmode=archive --statediff --statediff.writing --statediff.db=postgres://localhost:5432/vulcanize_testing?sslmode=disable --statediff.db.init=true --statediff.dbnodeid={nodeId} --statediff.dbclientname={dbClientName}
`
### RPC endpoints

View File

@ -1481,10 +1481,11 @@ func TestBuilderWithRemovedAccountAndStorage(t *testing.T) {
StorageNodes: emptyStorage,
},
{
Path: []byte{'\x06'},
NodeType: sdtypes.Removed,
LeafKey: contractLeafKey,
NodeValue: []byte{},
Path: []byte{'\x06'},
NodeType: sdtypes.Removed,
LeafKey: contractLeafKey,
NodeValue: []byte{},
StorageNodes: emptyStorage,
},
{
Path: []byte{'\x0c'},

View File

@ -23,19 +23,9 @@ import (
"github.com/ethereum/go-ethereum/statediff/types"
)
// ResolveFromNodeType wrapper around NodeType.Int() so that we maintain backwards compatibility
func ResolveFromNodeType(nodeType types.NodeType) int {
switch nodeType {
case types.Branch:
return 0
case types.Extension:
return 1
case types.Leaf:
return 2
case types.Removed:
return 3
default:
return -1
}
return nodeType.Int()
}
// ChainConfig returns the appropriate ethereum chain config for the provided chain id

View File

@ -14,7 +14,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
// This package provides an interface for pushing and indexing IPLD objects into a Postgres database
// Package indexer provides an interface for pushing and indexing IPLD objects into a Postgres database
// Metrics for reporting processing and connection stats are defined in ./metrics.go
package indexer
@ -47,6 +47,12 @@ var (
dbMetrics = RegisterDBMetrics(metrics.DefaultRegistry)
)
const (
RemovedNodeStorageCID = "bagmacgzayxjemamg64rtzet6pwznzrydydsqbnstzkbcoo337lmaixmfurya"
RemovedNodeStateCID = "baglacgzayxjemamg64rtzet6pwznzrydydsqbnstzkbcoo337lmaixmfurya"
RemovedNodeMhKey = "/blocks/DMQMLUSGAGDPOIZ4SJ7H3MW4Y4B4BZIAWZJ4VARHHN57VWAELWC2I4A"
)
// Indexer interface to allow substitution of mocks for testing
type Indexer interface {
PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error)
@ -59,14 +65,19 @@ type Indexer interface {
type StateDiffIndexer struct {
chainConfig *params.ChainConfig
dbWriter *PostgresCIDWriter
init bool
}
// NewStateDiffIndexer creates a pointer to a new PayloadConverter which satisfies the PayloadConverter interface
func NewStateDiffIndexer(chainConfig *params.ChainConfig, db *postgres.DB) *StateDiffIndexer {
func NewStateDiffIndexer(chainConfig *params.ChainConfig, db *postgres.DB) (*StateDiffIndexer, error) {
// Write the removed node to the db on init
if err := shared.PublishDirectWithDB(db, RemovedNodeMhKey, []byte{}); err != nil {
return nil, err
}
return &StateDiffIndexer{
chainConfig: chainConfig,
dbWriter: NewPostgresCIDWriter(db),
}
}, nil
}
type BlockTx struct {
@ -76,7 +87,7 @@ type BlockTx struct {
Close func(err error) error
}
// Reporting function to run as goroutine
// ReportDBMetrics is a reporting function to run as goroutine
func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bool) {
if !metrics.Enabled {
return
@ -95,7 +106,7 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bo
}()
}
// Pushes and indexes block data in database, except state & storage nodes (includes header, uncles, transactions & receipts)
// PushBlock pushes and indexes block data in database, except state & storage nodes (includes header, uncles, transactions & receipts)
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error) {
start, t := time.Now(), time.Now()
@ -250,6 +261,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *sqlx.Tx, header *types.Header, he
})
}
// processUncles publishes and indexes uncle IPLDs in Postgres
func (sdi *StateDiffIndexer) processUncles(tx *sqlx.Tx, headerID int64, blockNumber uint64, uncleNodes []*ipld.EthHeader) error {
// publish and index uncles
for _, uncleNode := range uncleNodes {
@ -434,19 +446,32 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *sqlx.Tx, args processArgs
return nil
}
// PushStateNode publishes and indexes a state diff node object (including any child storage nodes) in the IPLD database
func (sdi *StateDiffIndexer) PushStateNode(tx *BlockTx, stateNode sdtypes.StateNode) error {
// publish the state node
stateCIDStr, err := shared.PublishRaw(tx.dbtx, ipld.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
if stateNode.NodeType == sdtypes.Removed {
// short circuit if it is a Removed node
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present
stateModel := models.StateNodeModel{
Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
CID: RemovedNodeStateCID,
MhKey: RemovedNodeMhKey,
NodeType: stateNode.NodeType.Int(),
}
_, err := sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel, tx.headerID)
return err
}
stateCIDStr, stateMhKey, err := shared.PublishRaw(tx.dbtx, ipld.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
if err != nil {
return fmt.Errorf("error publishing state node IPLD: %v", err)
}
mhKey, _ := shared.MultihashKeyFromCIDString(stateCIDStr)
stateModel := models.StateNodeModel{
Path: stateNode.Path,
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
CID: stateCIDStr,
MhKey: mhKey,
NodeType: ResolveFromNodeType(stateNode.NodeType),
MhKey: stateMhKey,
NodeType: stateNode.NodeType.Int(),
}
// index the state node, collect the stateID to reference by FK
stateID, err := sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel, tx.headerID)
@ -478,17 +503,31 @@ func (sdi *StateDiffIndexer) PushStateNode(tx *BlockTx, stateNode sdtypes.StateN
}
// if there are any storage nodes associated with this node, publish and index them
for _, storageNode := range stateNode.StorageNodes {
storageCIDStr, err := shared.PublishRaw(tx.dbtx, ipld.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue)
if storageNode.NodeType == sdtypes.Removed {
// short circuit if it is a Removed node
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present
storageModel := models.StorageNodeModel{
Path: storageNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: RemovedNodeStorageCID,
MhKey: RemovedNodeMhKey,
NodeType: storageNode.NodeType.Int(),
}
if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel, stateID); err != nil {
return err
}
continue
}
storageCIDStr, storageMhKey, err := shared.PublishRaw(tx.dbtx, ipld.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue)
if err != nil {
return fmt.Errorf("error publishing storage node IPLD: %v", err)
}
mhKey, _ := shared.MultihashKeyFromCIDString(storageCIDStr)
storageModel := models.StorageNodeModel{
Path: storageNode.Path,
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
CID: storageCIDStr,
MhKey: mhKey,
NodeType: ResolveFromNodeType(storageNode.NodeType),
MhKey: storageMhKey,
NodeType: storageNode.NodeType.Int(),
}
if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel, stateID); err != nil {
return err
@ -498,7 +537,7 @@ func (sdi *StateDiffIndexer) PushStateNode(tx *BlockTx, stateNode sdtypes.StateN
return nil
}
// Publishes code and codehash pairs to the ipld database
// PushCodeAndCodeHash publishes code and codehash pairs to the ipld database
func (sdi *StateDiffIndexer) PushCodeAndCodeHash(tx *BlockTx, codeAndCodeHash sdtypes.CodeAndCodeHash) error {
// codec doesn't matter since db key is multihash-based
mhKey, err := shared.MultihashKeyFromKeccak256(codeAndCodeHash.Hash)

View File

@ -42,7 +42,8 @@ func setupLegacy(t *testing.T) {
db, err = shared.SetupDB()
require.NoError(t, err)
ind = indexer.NewStateDiffIndexer(legacyData.Config, db)
ind, err = indexer.NewStateDiffIndexer(legacyData.Config, db)
require.NoError(t, err)
var tx *indexer.BlockTx
tx, err = ind.PushBlock(
mockLegacyBlock,

View File

@ -139,7 +139,8 @@ func setup(t *testing.T) {
if err != nil {
t.Fatal(err)
}
ind = indexer.NewStateDiffIndexer(mocks.TestConfig, db)
ind, err = indexer.NewStateDiffIndexer(mocks.TestConfig, db)
require.NoError(t, err)
var tx *indexer.BlockTx
tx, err = ind.PushBlock(
mockBlock,
@ -470,7 +471,7 @@ func TestPublishAndIndexer(t *testing.T) {
stateNodes := make([]models.StateNodeModel, 0)
pgStr := `SELECT state_cids.id, state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id
FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id)
WHERE header_cids.block_number = $1`
WHERE header_cids.block_number = $1 AND node_type != 3`
err = db.Select(&stateNodes, pgStr, mocks.BlockNumber.Uint64())
if err != nil {
t.Fatal(err)
@ -523,6 +524,33 @@ func TestPublishAndIndexer(t *testing.T) {
})
}
}
// check that Removed state nodes were properly indexed and published
stateNodes = make([]models.StateNodeModel, 0)
pgStr = `SELECT state_cids.id, state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id
FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id)
WHERE header_cids.block_number = $1 AND node_type = 3`
err = db.Select(&stateNodes, pgStr, mocks.BlockNumber.Uint64())
if err != nil {
t.Fatal(err)
}
shared.ExpectEqual(t, len(stateNodes), 1)
stateNode := stateNodes[0]
var data []byte
dc, err := cid.Decode(stateNode.CID)
if err != nil {
t.Fatal(err)
}
mhKey := dshelp.MultihashToDsKey(dc.Hash())
prefixedKey := blockstore.BlockPrefix.String() + mhKey.String()
shared.ExpectEqual(t, prefixedKey, indexer.RemovedNodeMhKey)
err = db.Get(&data, ipfsPgGet, prefixedKey)
if err != nil {
t.Fatal(err)
}
shared.ExpectEqual(t, stateNode.CID, indexer.RemovedNodeStateCID)
shared.ExpectEqual(t, stateNode.Path, []byte{'\x02'})
shared.ExpectEqual(t, data, []byte{})
})
t.Run("Publish and index storage IPLDs in a single tx", func(t *testing.T) {
@ -534,7 +562,8 @@ func TestPublishAndIndexer(t *testing.T) {
FROM eth.storage_cids, eth.state_cids, eth.header_cids
WHERE storage_cids.state_id = state_cids.id
AND state_cids.header_id = header_cids.id
AND header_cids.block_number = $1`
AND header_cids.block_number = $1
AND storage_cids.node_type != 3`
err = db.Select(&storageNodes, pgStr, mocks.BlockNumber.Uint64())
if err != nil {
t.Fatal(err)
@ -559,5 +588,38 @@ func TestPublishAndIndexer(t *testing.T) {
t.Fatal(err)
}
shared.ExpectEqual(t, data, mocks.StorageLeafNode)
// check that Removed storage nodes were properly indexed
storageNodes = make([]models.StorageNodeWithStateKeyModel, 0)
pgStr = `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path
FROM eth.storage_cids, eth.state_cids, eth.header_cids
WHERE storage_cids.state_id = state_cids.id
AND state_cids.header_id = header_cids.id
AND header_cids.block_number = $1
AND storage_cids.node_type = 3`
err = db.Select(&storageNodes, pgStr, mocks.BlockNumber.Uint64())
if err != nil {
t.Fatal(err)
}
shared.ExpectEqual(t, len(storageNodes), 1)
shared.ExpectEqual(t, storageNodes[0], models.StorageNodeWithStateKeyModel{
CID: indexer.RemovedNodeStorageCID,
NodeType: 3,
StorageKey: common.BytesToHash(mocks.RemovedLeafKey).Hex(),
StateKey: common.BytesToHash(mocks.ContractLeafKey).Hex(),
Path: []byte{'\x03'},
})
dc, err = cid.Decode(storageNodes[0].CID)
if err != nil {
t.Fatal(err)
}
mhKey = dshelp.MultihashToDsKey(dc.Hash())
prefixedKey = blockstore.BlockPrefix.String() + mhKey.String()
shared.ExpectEqual(t, prefixedKey, indexer.RemovedNodeMhKey)
err = db.Get(&data, ipfsPgGet, prefixedKey)
if err != nil {
t.Fatal(err)
}
shared.ExpectEqual(t, data, []byte{})
})
}

View File

@ -31,6 +31,8 @@ type indexerMetricsHandles struct {
transactions metrics.Counter
// The total number of processed receipts
receipts metrics.Counter
// The total number of processed logs
logs metrics.Counter
// The total number of access list entries processed
accessListEntries metrics.Counter
// Time spent waiting for free postgres tx
@ -52,6 +54,7 @@ func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles {
blocks: metrics.NewCounter(),
transactions: metrics.NewCounter(),
receipts: metrics.NewCounter(),
logs: metrics.NewCounter(),
accessListEntries: metrics.NewCounter(),
tFreePostgres: metrics.NewTimer(),
tPostgresCommit: metrics.NewTimer(),
@ -64,6 +67,7 @@ func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles {
reg.Register(metricName(subsys, "blocks"), ctx.blocks)
reg.Register(metricName(subsys, "transactions"), ctx.transactions)
reg.Register(metricName(subsys, "receipts"), ctx.receipts)
reg.Register(metricName(subsys, "logs"), ctx.logs)
reg.Register(metricName(subsys, "access_list_entries"), ctx.accessListEntries)
reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres)
reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit)

View File

@ -129,6 +129,7 @@ var (
AccountRoot = "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"
AccountCodeHash = common.HexToHash("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470")
AccountLeafKey = testhelpers.Account2LeafKey
RemovedLeafKey = testhelpers.Account1LeafKey
Account, _ = rlp.EncodeToBytes(state.Account{
Nonce: nonce0,
Balance: big.NewInt(1000),
@ -154,6 +155,12 @@ var (
LeafKey: StorageLeafKey,
NodeValue: StorageLeafNode,
},
{
Path: []byte{'\x03'},
NodeType: sdtypes.Removed,
LeafKey: RemovedLeafKey,
NodeValue: []byte{},
},
},
},
{
@ -163,25 +170,15 @@ var (
NodeValue: AccountLeafNode,
StorageNodes: []sdtypes.StorageNode{},
},
{
Path: []byte{'\x02'},
NodeType: sdtypes.Removed,
LeafKey: RemovedLeafKey,
NodeValue: []byte{},
},
}
)
/*
// AccessListTx is the data of EIP-2930 access list transactions.
type AccessListTx struct {
ChainID *big.Int // destination chain ID
Nonce uint64 // nonce of sender account
GasPrice *big.Int // wei per gas
Gas uint64 // gas limit
To *common.Address `rlp:"nil"` // nil means contract creation
Value *big.Int // wei amount
Data []byte // contract invocation input data
AccessList AccessList // EIP-2930 access list
V, R, S *big.Int // signature values
}
*/
type LegacyData struct {
Config *params.ChainConfig
BlockNumber *big.Int

View File

@ -57,7 +57,7 @@ func (r DataType) String() string {
}
}
// GenerateDataTypeFromString
// GenerateDataTypeFromString returns a DataType from a provided string
func GenerateDataTypeFromString(str string) (DataType, error) {
switch strings.ToLower(str) {
case "full", "f":
@ -79,6 +79,7 @@ func GenerateDataTypeFromString(str string) (DataType, error) {
}
}
// SupportedDataType returns whether a DataType is supported
func SupportedDataType(d DataType) (bool, error) {
switch d {
case Full:

View File

@ -20,6 +20,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
@ -96,15 +97,16 @@ func MultihashKeyFromCIDString(c string) (string, error) {
}
// PublishRaw derives a cid from raw bytes and provided codec and multihash type, and writes it to the db tx
func PublishRaw(tx *sqlx.Tx, codec, mh uint64, raw []byte) (string, error) {
// returns the CID and blockstore prefixed multihash key
func PublishRaw(tx *sqlx.Tx, codec, mh uint64, raw []byte) (string, string, error) {
c, err := ipld.RawdataToCid(codec, raw, mh)
if err != nil {
return "", err
return "", "", err
}
dbKey := dshelp.MultihashToDsKey(c.Hash())
prefixedKey := blockstore.BlockPrefix.String() + dbKey.String()
_, err = tx.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`, prefixedKey, raw)
return c.String(), err
return c.String(), prefixedKey, err
}
// MultihashKeyFromKeccak256 converts keccak256 hash bytes into a blockstore-prefixed multihash db key string
@ -117,8 +119,14 @@ func MultihashKeyFromKeccak256(hash common.Hash) (string, error) {
return blockstore.BlockPrefix.String() + dbKey.String(), nil
}
// PublishDirect diretly writes a previously derived mhkey => value pair to the ipld database
// PublishDirect diretly writes a previously derived mhkey => value pair to the ipld database in the provided tx
func PublishDirect(tx *sqlx.Tx, key string, value []byte) error {
_, err := tx.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`, key, value)
return err
}
// PublishDirectWithDB diretly writes a previously derived mhkey => value pair to the ipld database
func PublishDirectWithDB(db *postgres.DB, key string, value []byte) error {
_, err := db.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`, key, value)
return err
}

View File

@ -22,7 +22,7 @@ import (
"github.com/ethereum/go-ethereum/statediff/types"
)
// Trie struct used to flag node as leaf or not
// TrieNode struct used to flag node as leaf or not
type TrieNode struct {
Path []byte
LeafKey common.Hash

View File

@ -30,7 +30,7 @@ var (
nullHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000")
)
// Handles processing and writing of indexed IPLD objects to Postgres
// PostgresCIDWriter handles processing and writing of indexed IPLD objects to Postgres
type PostgresCIDWriter struct {
db *postgres.DB
}
@ -112,8 +112,8 @@ func (in *PostgresCIDWriter) upsertLogCID(tx *sqlx.Tx, logs []*models.LogsModel,
if err != nil {
return fmt.Errorf("error upserting logs entry: %w", err)
}
indexerMetrics.logs.Inc(1)
}
// TODO: Add metrics for logs.
return nil
}

View File

@ -165,7 +165,10 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
if err != nil {
return err
}
indexer = ind.NewStateDiffIndexer(blockChain.Config(), db)
indexer, err = ind.NewStateDiffIndexer(blockChain.Config(), db)
if err != nil {
return err
}
}
workers := params.NumWorkers
if workers == 0 {

View File

@ -26,12 +26,27 @@ type NodeType string
const (
Unknown NodeType = "Unknown"
Leaf NodeType = "Leaf"
Extension NodeType = "Extension"
Branch NodeType = "Branch"
Removed NodeType = "Removed" // used to represent pathes which have been emptied
Extension NodeType = "Extension"
Leaf NodeType = "Leaf"
Removed NodeType = "Removed" // used to represent paths which have been emptied
)
func (n NodeType) Int() int {
switch n {
case Branch:
return 0
case Extension:
return 1
case Leaf:
return 2
case Removed:
return 3
default:
return -1
}
}
// StateNode holds the data for a single state diff node
type StateNode struct {
NodeType NodeType `json:"nodeType" gencodec:"required"`