V1.10.8 statediff #123
@ -81,6 +81,7 @@ This service introduces a CLI flag namespace `statediff`
|
||||
`--statediff.writing` is used to tell the service to write state diff objects it produces from synced ChainEvents directly to a configured Postgres database
|
||||
`--statediff.workers` is used to set the number of concurrent workers to process state diff objects and write them into the database
|
||||
`--statediff.db` is the connection string for the Postgres database to write to
|
||||
`--statediff.db.init` indicates whether we need to initialize a new database; set true if its the first time running the process on a given database
|
||||
`--statediff.dbnodeid` is the node id to use in the Postgres database
|
||||
`--statediff.dbclientname` is the client name to use in the Postgres database
|
||||
|
||||
@ -88,7 +89,7 @@ The service can only operate in full sync mode (`--syncmode=full`), but only the
|
||||
|
||||
e.g.
|
||||
`
|
||||
./build/bin/geth --syncmode=full --gcmode=archive --statediff --statediff.writing --statediff.db=postgres://localhost:5432/vulcanize_testing?sslmode=disable --statediff.dbnodeid={nodeId} --statediff.dbclientname={dbClientName}
|
||||
./build/bin/geth --syncmode=full --gcmode=archive --statediff --statediff.writing --statediff.db=postgres://localhost:5432/vulcanize_testing?sslmode=disable --statediff.db.init=true --statediff.dbnodeid={nodeId} --statediff.dbclientname={dbClientName}
|
||||
`
|
||||
|
||||
### RPC endpoints
|
||||
|
||||
@ -1481,10 +1481,11 @@ func TestBuilderWithRemovedAccountAndStorage(t *testing.T) {
|
||||
StorageNodes: emptyStorage,
|
||||
},
|
||||
{
|
||||
Path: []byte{'\x06'},
|
||||
NodeType: sdtypes.Removed,
|
||||
LeafKey: contractLeafKey,
|
||||
NodeValue: []byte{},
|
||||
Path: []byte{'\x06'},
|
||||
NodeType: sdtypes.Removed,
|
||||
LeafKey: contractLeafKey,
|
||||
NodeValue: []byte{},
|
||||
StorageNodes: emptyStorage,
|
||||
},
|
||||
{
|
||||
Path: []byte{'\x0c'},
|
||||
|
||||
@ -23,19 +23,9 @@ import (
|
||||
"github.com/ethereum/go-ethereum/statediff/types"
|
||||
)
|
||||
|
||||
// ResolveFromNodeType wrapper around NodeType.Int() so that we maintain backwards compatibility
|
||||
func ResolveFromNodeType(nodeType types.NodeType) int {
|
||||
switch nodeType {
|
||||
case types.Branch:
|
||||
return 0
|
||||
case types.Extension:
|
||||
return 1
|
||||
case types.Leaf:
|
||||
return 2
|
||||
case types.Removed:
|
||||
return 3
|
||||
default:
|
||||
return -1
|
||||
}
|
||||
return nodeType.Int()
|
||||
}
|
||||
|
||||
// ChainConfig returns the appropriate ethereum chain config for the provided chain id
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
// This package provides an interface for pushing and indexing IPLD objects into a Postgres database
|
||||
// Package indexer provides an interface for pushing and indexing IPLD objects into a Postgres database
|
||||
// Metrics for reporting processing and connection stats are defined in ./metrics.go
|
||||
package indexer
|
||||
|
||||
@ -47,6 +47,12 @@ var (
|
||||
dbMetrics = RegisterDBMetrics(metrics.DefaultRegistry)
|
||||
)
|
||||
|
||||
const (
|
||||
RemovedNodeStorageCID = "bagmacgzayxjemamg64rtzet6pwznzrydydsqbnstzkbcoo337lmaixmfurya"
|
||||
RemovedNodeStateCID = "baglacgzayxjemamg64rtzet6pwznzrydydsqbnstzkbcoo337lmaixmfurya"
|
||||
RemovedNodeMhKey = "/blocks/DMQMLUSGAGDPOIZ4SJ7H3MW4Y4B4BZIAWZJ4VARHHN57VWAELWC2I4A"
|
||||
)
|
||||
|
||||
// Indexer interface to allow substitution of mocks for testing
|
||||
type Indexer interface {
|
||||
PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error)
|
||||
@ -59,14 +65,19 @@ type Indexer interface {
|
||||
type StateDiffIndexer struct {
|
||||
chainConfig *params.ChainConfig
|
||||
dbWriter *PostgresCIDWriter
|
||||
init bool
|
||||
}
|
||||
|
||||
// NewStateDiffIndexer creates a pointer to a new PayloadConverter which satisfies the PayloadConverter interface
|
||||
func NewStateDiffIndexer(chainConfig *params.ChainConfig, db *postgres.DB) *StateDiffIndexer {
|
||||
func NewStateDiffIndexer(chainConfig *params.ChainConfig, db *postgres.DB) (*StateDiffIndexer, error) {
|
||||
// Write the removed node to the db on init
|
||||
if err := shared.PublishDirectWithDB(db, RemovedNodeMhKey, []byte{}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &StateDiffIndexer{
|
||||
chainConfig: chainConfig,
|
||||
dbWriter: NewPostgresCIDWriter(db),
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
type BlockTx struct {
|
||||
@ -76,7 +87,7 @@ type BlockTx struct {
|
||||
Close func(err error) error
|
||||
}
|
||||
|
||||
// Reporting function to run as goroutine
|
||||
// ReportDBMetrics is a reporting function to run as goroutine
|
||||
func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bool) {
|
||||
if !metrics.Enabled {
|
||||
return
|
||||
@ -95,7 +106,7 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bo
|
||||
}()
|
||||
}
|
||||
|
||||
// Pushes and indexes block data in database, except state & storage nodes (includes header, uncles, transactions & receipts)
|
||||
// PushBlock pushes and indexes block data in database, except state & storage nodes (includes header, uncles, transactions & receipts)
|
||||
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
|
||||
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error) {
|
||||
start, t := time.Now(), time.Now()
|
||||
@ -250,6 +261,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *sqlx.Tx, header *types.Header, he
|
||||
})
|
||||
}
|
||||
|
||||
// processUncles publishes and indexes uncle IPLDs in Postgres
|
||||
func (sdi *StateDiffIndexer) processUncles(tx *sqlx.Tx, headerID int64, blockNumber uint64, uncleNodes []*ipld.EthHeader) error {
|
||||
// publish and index uncles
|
||||
for _, uncleNode := range uncleNodes {
|
||||
@ -434,19 +446,32 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *sqlx.Tx, args processArgs
|
||||
return nil
|
||||
}
|
||||
|
||||
// PushStateNode publishes and indexes a state diff node object (including any child storage nodes) in the IPLD database
|
||||
func (sdi *StateDiffIndexer) PushStateNode(tx *BlockTx, stateNode sdtypes.StateNode) error {
|
||||
// publish the state node
|
||||
stateCIDStr, err := shared.PublishRaw(tx.dbtx, ipld.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
|
||||
if stateNode.NodeType == sdtypes.Removed {
|
||||
// short circuit if it is a Removed node
|
||||
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present
|
||||
stateModel := models.StateNodeModel{
|
||||
Path: stateNode.Path,
|
||||
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
|
||||
CID: RemovedNodeStateCID,
|
||||
MhKey: RemovedNodeMhKey,
|
||||
NodeType: stateNode.NodeType.Int(),
|
||||
}
|
||||
_, err := sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel, tx.headerID)
|
||||
return err
|
||||
}
|
||||
stateCIDStr, stateMhKey, err := shared.PublishRaw(tx.dbtx, ipld.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error publishing state node IPLD: %v", err)
|
||||
}
|
||||
mhKey, _ := shared.MultihashKeyFromCIDString(stateCIDStr)
|
||||
stateModel := models.StateNodeModel{
|
||||
Path: stateNode.Path,
|
||||
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
|
||||
CID: stateCIDStr,
|
||||
MhKey: mhKey,
|
||||
NodeType: ResolveFromNodeType(stateNode.NodeType),
|
||||
MhKey: stateMhKey,
|
||||
NodeType: stateNode.NodeType.Int(),
|
||||
}
|
||||
// index the state node, collect the stateID to reference by FK
|
||||
stateID, err := sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel, tx.headerID)
|
||||
@ -478,17 +503,31 @@ func (sdi *StateDiffIndexer) PushStateNode(tx *BlockTx, stateNode sdtypes.StateN
|
||||
}
|
||||
// if there are any storage nodes associated with this node, publish and index them
|
||||
for _, storageNode := range stateNode.StorageNodes {
|
||||
storageCIDStr, err := shared.PublishRaw(tx.dbtx, ipld.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue)
|
||||
if storageNode.NodeType == sdtypes.Removed {
|
||||
// short circuit if it is a Removed node
|
||||
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present
|
||||
storageModel := models.StorageNodeModel{
|
||||
Path: storageNode.Path,
|
||||
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
|
||||
CID: RemovedNodeStorageCID,
|
||||
MhKey: RemovedNodeMhKey,
|
||||
NodeType: storageNode.NodeType.Int(),
|
||||
}
|
||||
if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel, stateID); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
storageCIDStr, storageMhKey, err := shared.PublishRaw(tx.dbtx, ipld.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error publishing storage node IPLD: %v", err)
|
||||
}
|
||||
mhKey, _ := shared.MultihashKeyFromCIDString(storageCIDStr)
|
||||
storageModel := models.StorageNodeModel{
|
||||
Path: storageNode.Path,
|
||||
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
|
||||
CID: storageCIDStr,
|
||||
MhKey: mhKey,
|
||||
NodeType: ResolveFromNodeType(storageNode.NodeType),
|
||||
MhKey: storageMhKey,
|
||||
NodeType: storageNode.NodeType.Int(),
|
||||
}
|
||||
if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel, stateID); err != nil {
|
||||
return err
|
||||
@ -498,7 +537,7 @@ func (sdi *StateDiffIndexer) PushStateNode(tx *BlockTx, stateNode sdtypes.StateN
|
||||
return nil
|
||||
}
|
||||
|
||||
// Publishes code and codehash pairs to the ipld database
|
||||
// PushCodeAndCodeHash publishes code and codehash pairs to the ipld database
|
||||
func (sdi *StateDiffIndexer) PushCodeAndCodeHash(tx *BlockTx, codeAndCodeHash sdtypes.CodeAndCodeHash) error {
|
||||
// codec doesn't matter since db key is multihash-based
|
||||
mhKey, err := shared.MultihashKeyFromKeccak256(codeAndCodeHash.Hash)
|
||||
|
||||
@ -42,7 +42,8 @@ func setupLegacy(t *testing.T) {
|
||||
db, err = shared.SetupDB()
|
||||
require.NoError(t, err)
|
||||
|
||||
ind = indexer.NewStateDiffIndexer(legacyData.Config, db)
|
||||
ind, err = indexer.NewStateDiffIndexer(legacyData.Config, db)
|
||||
require.NoError(t, err)
|
||||
var tx *indexer.BlockTx
|
||||
tx, err = ind.PushBlock(
|
||||
mockLegacyBlock,
|
||||
|
||||
@ -139,7 +139,8 @@ func setup(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
ind = indexer.NewStateDiffIndexer(mocks.TestConfig, db)
|
||||
ind, err = indexer.NewStateDiffIndexer(mocks.TestConfig, db)
|
||||
require.NoError(t, err)
|
||||
var tx *indexer.BlockTx
|
||||
tx, err = ind.PushBlock(
|
||||
mockBlock,
|
||||
@ -470,7 +471,7 @@ func TestPublishAndIndexer(t *testing.T) {
|
||||
stateNodes := make([]models.StateNodeModel, 0)
|
||||
pgStr := `SELECT state_cids.id, state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id
|
||||
FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id)
|
||||
WHERE header_cids.block_number = $1`
|
||||
WHERE header_cids.block_number = $1 AND node_type != 3`
|
||||
err = db.Select(&stateNodes, pgStr, mocks.BlockNumber.Uint64())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -523,6 +524,33 @@ func TestPublishAndIndexer(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// check that Removed state nodes were properly indexed and published
|
||||
stateNodes = make([]models.StateNodeModel, 0)
|
||||
pgStr = `SELECT state_cids.id, state_cids.cid, state_cids.state_leaf_key, state_cids.node_type, state_cids.state_path, state_cids.header_id
|
||||
FROM eth.state_cids INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id)
|
||||
WHERE header_cids.block_number = $1 AND node_type = 3`
|
||||
err = db.Select(&stateNodes, pgStr, mocks.BlockNumber.Uint64())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
shared.ExpectEqual(t, len(stateNodes), 1)
|
||||
stateNode := stateNodes[0]
|
||||
var data []byte
|
||||
dc, err := cid.Decode(stateNode.CID)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
mhKey := dshelp.MultihashToDsKey(dc.Hash())
|
||||
prefixedKey := blockstore.BlockPrefix.String() + mhKey.String()
|
||||
shared.ExpectEqual(t, prefixedKey, indexer.RemovedNodeMhKey)
|
||||
err = db.Get(&data, ipfsPgGet, prefixedKey)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
shared.ExpectEqual(t, stateNode.CID, indexer.RemovedNodeStateCID)
|
||||
shared.ExpectEqual(t, stateNode.Path, []byte{'\x02'})
|
||||
shared.ExpectEqual(t, data, []byte{})
|
||||
})
|
||||
|
||||
t.Run("Publish and index storage IPLDs in a single tx", func(t *testing.T) {
|
||||
@ -534,7 +562,8 @@ func TestPublishAndIndexer(t *testing.T) {
|
||||
FROM eth.storage_cids, eth.state_cids, eth.header_cids
|
||||
WHERE storage_cids.state_id = state_cids.id
|
||||
AND state_cids.header_id = header_cids.id
|
||||
AND header_cids.block_number = $1`
|
||||
AND header_cids.block_number = $1
|
||||
AND storage_cids.node_type != 3`
|
||||
err = db.Select(&storageNodes, pgStr, mocks.BlockNumber.Uint64())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -559,5 +588,38 @@ func TestPublishAndIndexer(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
shared.ExpectEqual(t, data, mocks.StorageLeafNode)
|
||||
|
||||
// check that Removed storage nodes were properly indexed
|
||||
storageNodes = make([]models.StorageNodeWithStateKeyModel, 0)
|
||||
pgStr = `SELECT storage_cids.cid, state_cids.state_leaf_key, storage_cids.storage_leaf_key, storage_cids.node_type, storage_cids.storage_path
|
||||
FROM eth.storage_cids, eth.state_cids, eth.header_cids
|
||||
WHERE storage_cids.state_id = state_cids.id
|
||||
AND state_cids.header_id = header_cids.id
|
||||
AND header_cids.block_number = $1
|
||||
AND storage_cids.node_type = 3`
|
||||
err = db.Select(&storageNodes, pgStr, mocks.BlockNumber.Uint64())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
shared.ExpectEqual(t, len(storageNodes), 1)
|
||||
shared.ExpectEqual(t, storageNodes[0], models.StorageNodeWithStateKeyModel{
|
||||
CID: indexer.RemovedNodeStorageCID,
|
||||
NodeType: 3,
|
||||
StorageKey: common.BytesToHash(mocks.RemovedLeafKey).Hex(),
|
||||
StateKey: common.BytesToHash(mocks.ContractLeafKey).Hex(),
|
||||
Path: []byte{'\x03'},
|
||||
})
|
||||
dc, err = cid.Decode(storageNodes[0].CID)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
mhKey = dshelp.MultihashToDsKey(dc.Hash())
|
||||
prefixedKey = blockstore.BlockPrefix.String() + mhKey.String()
|
||||
shared.ExpectEqual(t, prefixedKey, indexer.RemovedNodeMhKey)
|
||||
err = db.Get(&data, ipfsPgGet, prefixedKey)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
shared.ExpectEqual(t, data, []byte{})
|
||||
})
|
||||
}
|
||||
|
||||
@ -31,6 +31,8 @@ type indexerMetricsHandles struct {
|
||||
transactions metrics.Counter
|
||||
// The total number of processed receipts
|
||||
receipts metrics.Counter
|
||||
// The total number of processed logs
|
||||
logs metrics.Counter
|
||||
// The total number of access list entries processed
|
||||
accessListEntries metrics.Counter
|
||||
// Time spent waiting for free postgres tx
|
||||
@ -52,6 +54,7 @@ func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles {
|
||||
blocks: metrics.NewCounter(),
|
||||
transactions: metrics.NewCounter(),
|
||||
receipts: metrics.NewCounter(),
|
||||
logs: metrics.NewCounter(),
|
||||
accessListEntries: metrics.NewCounter(),
|
||||
tFreePostgres: metrics.NewTimer(),
|
||||
tPostgresCommit: metrics.NewTimer(),
|
||||
@ -64,6 +67,7 @@ func RegisterIndexerMetrics(reg metrics.Registry) indexerMetricsHandles {
|
||||
reg.Register(metricName(subsys, "blocks"), ctx.blocks)
|
||||
reg.Register(metricName(subsys, "transactions"), ctx.transactions)
|
||||
reg.Register(metricName(subsys, "receipts"), ctx.receipts)
|
||||
reg.Register(metricName(subsys, "logs"), ctx.logs)
|
||||
reg.Register(metricName(subsys, "access_list_entries"), ctx.accessListEntries)
|
||||
reg.Register(metricName(subsys, "t_free_postgres"), ctx.tFreePostgres)
|
||||
reg.Register(metricName(subsys, "t_postgres_commit"), ctx.tPostgresCommit)
|
||||
|
||||
@ -129,6 +129,7 @@ var (
|
||||
AccountRoot = "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"
|
||||
AccountCodeHash = common.HexToHash("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470")
|
||||
AccountLeafKey = testhelpers.Account2LeafKey
|
||||
RemovedLeafKey = testhelpers.Account1LeafKey
|
||||
Account, _ = rlp.EncodeToBytes(state.Account{
|
||||
Nonce: nonce0,
|
||||
Balance: big.NewInt(1000),
|
||||
@ -154,6 +155,12 @@ var (
|
||||
LeafKey: StorageLeafKey,
|
||||
NodeValue: StorageLeafNode,
|
||||
},
|
||||
{
|
||||
Path: []byte{'\x03'},
|
||||
NodeType: sdtypes.Removed,
|
||||
LeafKey: RemovedLeafKey,
|
||||
NodeValue: []byte{},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
@ -163,25 +170,15 @@ var (
|
||||
NodeValue: AccountLeafNode,
|
||||
StorageNodes: []sdtypes.StorageNode{},
|
||||
},
|
||||
{
|
||||
Path: []byte{'\x02'},
|
||||
NodeType: sdtypes.Removed,
|
||||
LeafKey: RemovedLeafKey,
|
||||
NodeValue: []byte{},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
/*
|
||||
// AccessListTx is the data of EIP-2930 access list transactions.
|
||||
type AccessListTx struct {
|
||||
ChainID *big.Int // destination chain ID
|
||||
Nonce uint64 // nonce of sender account
|
||||
GasPrice *big.Int // wei per gas
|
||||
Gas uint64 // gas limit
|
||||
To *common.Address `rlp:"nil"` // nil means contract creation
|
||||
Value *big.Int // wei amount
|
||||
Data []byte // contract invocation input data
|
||||
AccessList AccessList // EIP-2930 access list
|
||||
V, R, S *big.Int // signature values
|
||||
}
|
||||
|
||||
*/
|
||||
|
||||
type LegacyData struct {
|
||||
Config *params.ChainConfig
|
||||
BlockNumber *big.Int
|
||||
|
||||
@ -57,7 +57,7 @@ func (r DataType) String() string {
|
||||
}
|
||||
}
|
||||
|
||||
// GenerateDataTypeFromString
|
||||
// GenerateDataTypeFromString returns a DataType from a provided string
|
||||
func GenerateDataTypeFromString(str string) (DataType, error) {
|
||||
switch strings.ToLower(str) {
|
||||
case "full", "f":
|
||||
@ -79,6 +79,7 @@ func GenerateDataTypeFromString(str string) (DataType, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// SupportedDataType returns whether a DataType is supported
|
||||
func SupportedDataType(d DataType) (bool, error) {
|
||||
switch d {
|
||||
case Full:
|
||||
|
||||
@ -20,6 +20,7 @@ import (
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
@ -96,15 +97,16 @@ func MultihashKeyFromCIDString(c string) (string, error) {
|
||||
}
|
||||
|
||||
// PublishRaw derives a cid from raw bytes and provided codec and multihash type, and writes it to the db tx
|
||||
func PublishRaw(tx *sqlx.Tx, codec, mh uint64, raw []byte) (string, error) {
|
||||
// returns the CID and blockstore prefixed multihash key
|
||||
func PublishRaw(tx *sqlx.Tx, codec, mh uint64, raw []byte) (string, string, error) {
|
||||
c, err := ipld.RawdataToCid(codec, raw, mh)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return "", "", err
|
||||
}
|
||||
dbKey := dshelp.MultihashToDsKey(c.Hash())
|
||||
prefixedKey := blockstore.BlockPrefix.String() + dbKey.String()
|
||||
_, err = tx.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`, prefixedKey, raw)
|
||||
return c.String(), err
|
||||
return c.String(), prefixedKey, err
|
||||
}
|
||||
|
||||
// MultihashKeyFromKeccak256 converts keccak256 hash bytes into a blockstore-prefixed multihash db key string
|
||||
@ -117,8 +119,14 @@ func MultihashKeyFromKeccak256(hash common.Hash) (string, error) {
|
||||
return blockstore.BlockPrefix.String() + dbKey.String(), nil
|
||||
}
|
||||
|
||||
// PublishDirect diretly writes a previously derived mhkey => value pair to the ipld database
|
||||
// PublishDirect diretly writes a previously derived mhkey => value pair to the ipld database in the provided tx
|
||||
func PublishDirect(tx *sqlx.Tx, key string, value []byte) error {
|
||||
_, err := tx.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`, key, value)
|
||||
return err
|
||||
}
|
||||
|
||||
// PublishDirectWithDB diretly writes a previously derived mhkey => value pair to the ipld database
|
||||
func PublishDirectWithDB(db *postgres.DB, key string, value []byte) error {
|
||||
_, err := db.Exec(`INSERT INTO public.blocks (key, data) VALUES ($1, $2) ON CONFLICT (key) DO NOTHING`, key, value)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -22,7 +22,7 @@ import (
|
||||
"github.com/ethereum/go-ethereum/statediff/types"
|
||||
)
|
||||
|
||||
// Trie struct used to flag node as leaf or not
|
||||
// TrieNode struct used to flag node as leaf or not
|
||||
type TrieNode struct {
|
||||
Path []byte
|
||||
LeafKey common.Hash
|
||||
|
||||
@ -30,7 +30,7 @@ var (
|
||||
nullHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000")
|
||||
)
|
||||
|
||||
// Handles processing and writing of indexed IPLD objects to Postgres
|
||||
// PostgresCIDWriter handles processing and writing of indexed IPLD objects to Postgres
|
||||
type PostgresCIDWriter struct {
|
||||
db *postgres.DB
|
||||
}
|
||||
@ -112,8 +112,8 @@ func (in *PostgresCIDWriter) upsertLogCID(tx *sqlx.Tx, logs []*models.LogsModel,
|
||||
if err != nil {
|
||||
return fmt.Errorf("error upserting logs entry: %w", err)
|
||||
}
|
||||
indexerMetrics.logs.Inc(1)
|
||||
}
|
||||
// TODO: Add metrics for logs.
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@ -165,7 +165,10 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
indexer = ind.NewStateDiffIndexer(blockChain.Config(), db)
|
||||
indexer, err = ind.NewStateDiffIndexer(blockChain.Config(), db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
workers := params.NumWorkers
|
||||
if workers == 0 {
|
||||
|
||||
@ -26,12 +26,27 @@ type NodeType string
|
||||
|
||||
const (
|
||||
Unknown NodeType = "Unknown"
|
||||
Leaf NodeType = "Leaf"
|
||||
Extension NodeType = "Extension"
|
||||
Branch NodeType = "Branch"
|
||||
Removed NodeType = "Removed" // used to represent pathes which have been emptied
|
||||
Extension NodeType = "Extension"
|
||||
Leaf NodeType = "Leaf"
|
||||
Removed NodeType = "Removed" // used to represent paths which have been emptied
|
||||
)
|
||||
|
||||
func (n NodeType) Int() int {
|
||||
switch n {
|
||||
case Branch:
|
||||
return 0
|
||||
case Extension:
|
||||
return 1
|
||||
case Leaf:
|
||||
return 2
|
||||
case Removed:
|
||||
return 3
|
||||
default:
|
||||
return -1
|
||||
}
|
||||
}
|
||||
|
||||
// StateNode holds the data for a single state diff node
|
||||
type StateNode struct {
|
||||
NodeType NodeType `json:"nodeType" gencodec:"required"`
|
||||
|
||||
Loading…
Reference in New Issue
Block a user