* Write state diff to CSV (#2) * port statediff from9b7fd9af80/statediff/statediff.go
; minor fixes * integrating state diff extracting, building, and persisting into geth processes * work towards persisting created statediffs in ipfs; based off github.com/vulcanize/eth-block-extractor * Add a state diff service * Remove diff extractor from blockchain * Update imports * Move statediff on/off check to geth cmd config * Update starting state diff service * Add debugging logs for creating diff * Add statediff extractor and builder tests and small refactoring * Start to write statediff to a CSV * Restructure statediff directory * Pull CSV publishing methods into their own file * Reformatting due to go fmt * Add gomega to vendor dir * Remove testing focuses * Update statediff tests to use golang test pkg instead of ginkgo - builder_test - extractor_test - publisher_test * Use hexutil.Encode instead of deprecated common.ToHex * Remove OldValue from DiffBigInt and DiffUint64 fields * Update builder test * Remove old storage value from updated accounts * Remove old values from created/deleted accounts * Update publisher to account for only storing current account values * Update service loop and fetching previous block * Update testing - remove statediff ginkgo test suite file - move mocks to their own dir * Updates per go fmt * Updates to tests * Pass statediff mode and path in through cli * Return filename from publisher * Remove some duplication in builder * Remove code field from state diff output this is the contract byte code, and it can still be obtained by querying the db by the codeHash * Consolidate acct diff structs for updated & updated/deleted accts * Include block number in csv filename * Clean up error logging * Cleanup formatting, spelling, etc * Address PR comments * Add contract address and storage value to csv * Refactor accumulating account row in csv publisher * Add DiffStorage struct * Add storage key to csv * Address PR comments * Fix publisher to include rows for accounts that don't have store updates * Update builder test after merging in release/1.8 * Update test contract to include storage on contract intialization - so that we're able to test that storage diffing works for created and deleted accounts (not just updated accounts). * Factor out a common trie iterator method in builder * Apply goimports to statediff * Apply gosimple changes to statediff * Gracefully exit geth command(#4) * Statediff for full node (#6) * Open a trie from the in-memory database * Use a node's LeafKey as an identifier instead of the address It was proving difficult to find look the address up from a given path with a full node (sometimes the value wouldn't exist in the disk db). So, instead, for now we are using the node's LeafKey with is a Keccak256 hash of the address, so if we know the address we can figure out which LeafKey it matches up to. * Make sure that statediff has been processed before pruning * Use blockchain stateCache.OpenTrie for storage diffs * Clean up log lines and remove unnecessary fields from builder * Apply go fmt changes * Add a sleep to the blockchain test * refactoring/reorganizing packages * refactoring statediff builder and types and adjusted to relay proofs and paths (still need to make this optional) * refactoring state diff service and adding api which allows for streaming state diff payloads over an rpc websocket subscription * make proofs and paths optional + compress service loop into single for loop (may be missing something here) * option to process intermediate nodes * make state diff rlp serializable * cli parameter to limit statediffing to select account addresses + test * review fixes and fixes for issues ran into in integration * review fixes; proper method signature for api; adjust service so that statediff processing is halted/paused until there is at least one subscriber listening for the results * adjust buffering to improve stability; doc.go; fix notifier err handling * relay receipts with the rest of the data + review fixes/changes * rpc method to get statediff at specific block; requires archival node or the block be within the pruning range * fix linter issues * include total difficulty to the payload * fix state diff builder: emit actual leaf nodes instead of value nodes; diff on the leaf not on the value; emit correct path for intermediate nodes * adjust statediff builder tests to changes and extend to test intermediate nodes; golint * add genesis block to test; handle block 0 in StateDiffAt * rlp files for mainnet blocks 0-3, for tests * builder test on mainnet blocks * common.BytesToHash(path) => crypto.Keaccak256(hash) in builder; BytesToHash produces same hash for e.g. []byte{} and []byte{\x00} - prefix \x00 steps are inconsequential to the hash result * complete tests for early mainnet blocks * diff type for representing deleted accounts * fix builder so that we handle account deletions properly and properly diff storage when an account is moved to a new path; update params * remove cli params; moving them to subscriber defined * remove unneeded bc methods * update service and api; statediffing params are now defined by user through api rather than by service provider by cli * update top level tests * add ability to watch specific storage slots (leaf keys) only * comments; explain logic * update mainnet blocks test * update api_test.go * storage leafkey filter test * cleanup chain maker * adjust chain maker for tests to add an empty account in block1 and switch to EIP-158 afterwards (now we just need to generate enough accounts until one causes the empty account to be touched and removed post-EIP-158 so we can simulate and test that process...); also added 2 new blocks where more contract storage is set and old slots are set to zero so they are removed so we can test that * found an account whose creation causes the empty account to be moved to a new path; this should count as 'touching; the empty account and cause it to be removed according to eip-158... but it doesn't * use new contract in unit tests that has self-destruct ability, so we can test eip-158 since simply moving an account to new path doesn't count as 'touchin' it * handle storage deletions * tests for eip-158 account removal and storage value deletions; there is one edge case left to test where we remove 1 account when only two exist such that the remaining account is moved up and replaces the root branch node * finish testing known edge cases * add endpoint to fetch all state and storage nodes at a given blockheight; useful for generating a recent atate cache/snapshot that we can diff forward from rather than needing to collect all diffs from genesis * test for state trie builder * if statediffing is on, lock tries in triedb until the statediffing service signals they are done using them * fix mock blockchain; golint; bump patch * increase maxRequestContentLength; bump patch * log the sizes of the state objects we are sending * CI build (#20) * CI: run build on PR and on push to master * CI: debug building geth * CI: fix coping file * CI: fix coping file v2 * CI: temporary upload file to release asset * CI: get release upload_url by tag, upload asset to current relase * CI: fix tag name * fix ci build on statediff_at_anyblock-1.9.11 branch * fix publishing assets in release * use context deadline for timeout in eth_call * collect and emit codehash=>code mappings for state objects * subscription endpoint for retrieving all the codehash=>code mappings that exist at provided height * Implement WriteStateDiffAt * Writes state diffs directly to postgres * Adds CLI flags to configure PG * Refactors builder output with callbacks * Copies refactored postgres handling code from ipld-eth-indexer * rename PostgresCIDWriter.{index->upsert}* * go.mod update * rm unused * cleanup * output code & codehash iteratively * had to rf some types for this * prometheus metrics output * duplicate recent eth-indexer changes * migrations and metrics... * [wip] prom.Init() here? another CLI flag? * tidy & DRY * statediff WriteLoop service + CLI flag * [wip] update test mocks * todo - do something meaningful to test write loop * logging * use geth log * port tests to go testing * drop ginkgo/gomega * fix and cleanup tests * fail before defer statement * delete vendor/ dir * fixes after rebase onto 1.9.23 * fix API registration * use golang 1.15.5 version (#34) * bump version meta; add 0.0.11 branch to actions * bump version meta; update github actions workflows * statediff: refactor metrics * Remove redundant statediff/indexer/prom tooling and use existing prometheus integration. * "indexer" namespace for metrics * add reporting loop for db metrics * doc * metrics for statediff stats * metrics namespace/subsystem = statediff/{indexer,service} * statediff: use a worker pool (for direct writes) * fix test * fix chain event subscription * log tweaks * func name * unused import * intermediate chain event channel for metrics * update github actions; linting * add poststate and status to receipt ipld indexes * stateDiffFor endpoints for fetching or writing statediff object by blockhash; bump statediff version * fixes after rebase on to v1.10.1 * update github actions and version meta; go fmt * add leaf key to removed 'nodes' * include Postgres migrations and schema * service documentation * touching up update github actions after rebase fix connection leak (misplaced defer) and perform proper rollback on errs improve error logging; handle PushBlock internal err * build docker image and publish it to Docker Hub on release * add access list tx to unit tests * MarshalBinary and UnmarshalBinary methods for receipt * fix error caused by 2718 by using MarshalBinary instead of EncodeRLP methods * ipld encoding/decoding tests * update TxModel; add AccessListElementModel * index tx type and access lists * add access list metrics * unit tests for tx_type and access list table * unit tests for receipt marshal/unmarshal binary methods * improve documentation of the encoding methods * fix issue identified in linting update github actions and version meta after rebase unit test that fails undeterministically on eip2930 txs, giving same error we are seeing in prod fix bug Include genesis block state diff. Fix linting issue. documentation on versioning, rebasing, releasing; bump version meta Add geth and statediff unit test to CI. Set pgpassword in env. Added comments. Add new major branch to github action. Fix failing test. Fix lint errors. Add support for Dynamic txn(EIP-1559). Update version meta to 0.0.24 Verify block base fee in test. Fix base_fee type and add backward compatible test. Remove type definition for AccessListElementModel Change basefee to int64/bigint. block and uncle reward in PoA network = 0 (#87) * in PoA networks there is no block and uncle rewards * bump meta version (cherry picked from commitb64ca14689
) Use Ropsten to test block reward. Add Makefile target to build static linux binaries. Strip symbol tables from static binaries. Fix block_fee to support NULL values. bump version meta. Add new major branch to github action. Add new major branch to github action. Add new major branch to github action. Add new major branch to github action. rename doc.go to README.md Create a seperate table for storing logs Self review Bump statediff version to 0.0.26. add btree index to state/storage_cids.node_type; updated schema Dedup receipt data. Fix linter errors. Address comments. Bump statediff version to 0.0.27. new cli flag for initializing db first time service is ran only write Removed node ipld block (on db init) and reuse constant cid and mhkey linting test new handling of Removed nodes; don't require init flag log metrics Add new major branch to github action. Fix build. Update golang version in CI. Use ipld-eth-db in testing. Remove migration from repo. Add new major branch to github action. Use `GetTd` instead of `GetTdByHash`6289137827
Add new major branch to github action. Report DB metrics Address comments Add new major branch to github action. Address comments. Retry aborted transaction when the deadlock is detected. Fix lint error. bump statediff version Add new major branch to github action. log processing bug fix mainnet tests for problematic blocks (edge cases) fix log trie fk bug fix rct unit tests Add new major branch to github action. minor unit tests refactor bump statediff version fix README formatting run statediff service when the flag is turned on
552 lines
19 KiB
Go
552 lines
19 KiB
Go
// VulcanizeDB
|
|
// Copyright © 2019 Vulcanize
|
|
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
|
|
// This program is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
// Package indexer provides an interface for pushing and indexing IPLD objects into a Postgres database
|
|
// Metrics for reporting processing and connection stats are defined in ./metrics.go
|
|
package indexer
|
|
|
|
import (
|
|
"fmt"
|
|
"math/big"
|
|
"time"
|
|
|
|
"github.com/ipfs/go-cid"
|
|
node "github.com/ipfs/go-ipld-format"
|
|
"github.com/jmoiron/sqlx"
|
|
"github.com/multiformats/go-multihash"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/core/types"
|
|
"github.com/ethereum/go-ethereum/crypto"
|
|
"github.com/ethereum/go-ethereum/log"
|
|
"github.com/ethereum/go-ethereum/metrics"
|
|
"github.com/ethereum/go-ethereum/params"
|
|
"github.com/ethereum/go-ethereum/rlp"
|
|
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld"
|
|
"github.com/ethereum/go-ethereum/statediff/indexer/models"
|
|
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
|
|
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
|
|
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
|
|
)
|
|
|
|
var (
|
|
indexerMetrics = RegisterIndexerMetrics(metrics.DefaultRegistry)
|
|
dbMetrics = RegisterDBMetrics(metrics.DefaultRegistry)
|
|
)
|
|
|
|
const (
|
|
RemovedNodeStorageCID = "bagmacgzayxjemamg64rtzet6pwznzrydydsqbnstzkbcoo337lmaixmfurya"
|
|
RemovedNodeStateCID = "baglacgzayxjemamg64rtzet6pwznzrydydsqbnstzkbcoo337lmaixmfurya"
|
|
RemovedNodeMhKey = "/blocks/DMQMLUSGAGDPOIZ4SJ7H3MW4Y4B4BZIAWZJ4VARHHN57VWAELWC2I4A"
|
|
)
|
|
|
|
// Indexer interface to allow substitution of mocks for testing
|
|
type Indexer interface {
|
|
PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error)
|
|
PushStateNode(tx *BlockTx, stateNode sdtypes.StateNode) error
|
|
PushCodeAndCodeHash(tx *BlockTx, codeAndCodeHash sdtypes.CodeAndCodeHash) error
|
|
ReportDBMetrics(delay time.Duration, quit <-chan bool)
|
|
}
|
|
|
|
// StateDiffIndexer satisfies the Indexer interface for ethereum statediff objects
|
|
type StateDiffIndexer struct {
|
|
chainConfig *params.ChainConfig
|
|
dbWriter *PostgresCIDWriter
|
|
init bool
|
|
}
|
|
|
|
// NewStateDiffIndexer creates a pointer to a new PayloadConverter which satisfies the PayloadConverter interface
|
|
func NewStateDiffIndexer(chainConfig *params.ChainConfig, db *postgres.DB) (*StateDiffIndexer, error) {
|
|
// Write the removed node to the db on init
|
|
if err := shared.PublishDirectWithDB(db, RemovedNodeMhKey, []byte{}); err != nil {
|
|
return nil, err
|
|
}
|
|
return &StateDiffIndexer{
|
|
chainConfig: chainConfig,
|
|
dbWriter: NewPostgresCIDWriter(db),
|
|
}, nil
|
|
}
|
|
|
|
type BlockTx struct {
|
|
dbtx *sqlx.Tx
|
|
BlockNumber uint64
|
|
headerID int64
|
|
Close func(err error) error
|
|
}
|
|
|
|
// ReportDBMetrics is a reporting function to run as goroutine
|
|
func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bool) {
|
|
if !metrics.Enabled {
|
|
return
|
|
}
|
|
ticker := time.NewTicker(delay)
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
dbMetrics.Update(sdi.dbWriter.db.Stats())
|
|
case <-quit:
|
|
ticker.Stop()
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// PushBlock pushes and indexes block data in database, except state & storage nodes (includes header, uncles, transactions & receipts)
|
|
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
|
|
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (*BlockTx, error) {
|
|
start, t := time.Now(), time.Now()
|
|
blockHash := block.Hash()
|
|
blockHashStr := blockHash.String()
|
|
height := block.NumberU64()
|
|
traceMsg := fmt.Sprintf("indexer stats for statediff at %d with hash %s:\r\n", height, blockHashStr)
|
|
transactions := block.Transactions()
|
|
// Derive any missing fields
|
|
if err := receipts.DeriveFields(sdi.chainConfig, blockHash, height, transactions); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Generate the block iplds
|
|
headerNode, uncleNodes, txNodes, txTrieNodes, rctNodes, rctTrieNodes, logTrieNodes, logLeafNodeCIDs, rctLeafNodeCIDs, err := ipld.FromBlockAndReceipts(block, receipts)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err)
|
|
}
|
|
|
|
if len(txNodes) != len(rctNodes) || len(rctNodes) != len(rctLeafNodeCIDs) {
|
|
return nil, fmt.Errorf("expected number of transactions (%d), receipts (%d), and receipt trie leaf nodes (%d)to be equal", len(txNodes), len(rctNodes), len(rctLeafNodeCIDs))
|
|
}
|
|
|
|
// Calculate reward
|
|
var reward *big.Int
|
|
// in PoA networks block reward is 0
|
|
if sdi.chainConfig.Clique != nil {
|
|
reward = big.NewInt(0)
|
|
} else {
|
|
reward = CalcEthBlockReward(block.Header(), block.Uncles(), block.Transactions(), receipts)
|
|
}
|
|
t = time.Now()
|
|
// Begin new db tx for everything
|
|
tx, err := sdi.dbWriter.db.Beginx()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer func() {
|
|
if p := recover(); p != nil {
|
|
shared.Rollback(tx)
|
|
panic(p)
|
|
} else if err != nil {
|
|
shared.Rollback(tx)
|
|
}
|
|
}()
|
|
blockTx := &BlockTx{
|
|
dbtx: tx,
|
|
// handle transaction commit or rollback for any return case
|
|
Close: func(err error) error {
|
|
if p := recover(); p != nil {
|
|
shared.Rollback(tx)
|
|
panic(p)
|
|
} else if err != nil {
|
|
shared.Rollback(tx)
|
|
} else {
|
|
tDiff := time.Since(t)
|
|
indexerMetrics.tStateStoreCodeProcessing.Update(tDiff)
|
|
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
|
|
t = time.Now()
|
|
err = tx.Commit()
|
|
tDiff = time.Since(t)
|
|
indexerMetrics.tPostgresCommit.Update(tDiff)
|
|
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
|
|
}
|
|
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
|
|
log.Debug(traceMsg)
|
|
return err
|
|
},
|
|
}
|
|
tDiff := time.Since(t)
|
|
indexerMetrics.tFreePostgres.Update(tDiff)
|
|
|
|
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
|
|
t = time.Now()
|
|
|
|
// Publish and index header, collect headerID
|
|
var headerID int64
|
|
headerID, err = sdi.processHeader(tx, block.Header(), headerNode, reward, totalDifficulty)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tDiff = time.Since(t)
|
|
indexerMetrics.tHeaderProcessing.Update(tDiff)
|
|
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
|
|
t = time.Now()
|
|
// Publish and index uncles
|
|
err = sdi.processUncles(tx, headerID, height, uncleNodes)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tDiff = time.Since(t)
|
|
indexerMetrics.tUncleProcessing.Update(tDiff)
|
|
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
|
|
t = time.Now()
|
|
// Publish and index receipts and txs
|
|
err = sdi.processReceiptsAndTxs(tx, processArgs{
|
|
headerID: headerID,
|
|
blockNumber: block.Number(),
|
|
receipts: receipts,
|
|
txs: transactions,
|
|
rctNodes: rctNodes,
|
|
rctTrieNodes: rctTrieNodes,
|
|
txNodes: txNodes,
|
|
txTrieNodes: txTrieNodes,
|
|
logTrieNodes: logTrieNodes,
|
|
logLeafNodeCIDs: logLeafNodeCIDs,
|
|
rctLeafNodeCIDs: rctLeafNodeCIDs,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
tDiff = time.Since(t)
|
|
indexerMetrics.tTxAndRecProcessing.Update(tDiff)
|
|
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String())
|
|
t = time.Now()
|
|
|
|
blockTx.BlockNumber = height
|
|
blockTx.headerID = headerID
|
|
return blockTx, err
|
|
}
|
|
|
|
// processHeader publishes and indexes a header IPLD in Postgres
|
|
// it returns the headerID
|
|
func (sdi *StateDiffIndexer) processHeader(tx *sqlx.Tx, header *types.Header, headerNode node.Node, reward, td *big.Int) (int64, error) {
|
|
// publish header
|
|
if err := shared.PublishIPLD(tx, headerNode); err != nil {
|
|
return 0, fmt.Errorf("error publishing header IPLD: %v", err)
|
|
}
|
|
|
|
var baseFee *int64
|
|
if header.BaseFee != nil {
|
|
baseFee = new(int64)
|
|
*baseFee = header.BaseFee.Int64()
|
|
}
|
|
|
|
// index header
|
|
return sdi.dbWriter.upsertHeaderCID(tx, models.HeaderModel{
|
|
CID: headerNode.Cid().String(),
|
|
MhKey: shared.MultihashKeyFromCID(headerNode.Cid()),
|
|
ParentHash: header.ParentHash.String(),
|
|
BlockNumber: header.Number.String(),
|
|
BlockHash: header.Hash().String(),
|
|
TotalDifficulty: td.String(),
|
|
Reward: reward.String(),
|
|
Bloom: header.Bloom.Bytes(),
|
|
StateRoot: header.Root.String(),
|
|
RctRoot: header.ReceiptHash.String(),
|
|
TxRoot: header.TxHash.String(),
|
|
UncleRoot: header.UncleHash.String(),
|
|
Timestamp: header.Time,
|
|
BaseFee: baseFee,
|
|
})
|
|
}
|
|
|
|
// processUncles publishes and indexes uncle IPLDs in Postgres
|
|
func (sdi *StateDiffIndexer) processUncles(tx *sqlx.Tx, headerID int64, blockNumber uint64, uncleNodes []*ipld.EthHeader) error {
|
|
// publish and index uncles
|
|
for _, uncleNode := range uncleNodes {
|
|
if err := shared.PublishIPLD(tx, uncleNode); err != nil {
|
|
return fmt.Errorf("error publishing uncle IPLD: %v", err)
|
|
}
|
|
var uncleReward *big.Int
|
|
// in PoA networks uncle reward is 0
|
|
if sdi.chainConfig.Clique != nil {
|
|
uncleReward = big.NewInt(0)
|
|
} else {
|
|
uncleReward = CalcUncleMinerReward(blockNumber, uncleNode.Number.Uint64())
|
|
}
|
|
uncle := models.UncleModel{
|
|
CID: uncleNode.Cid().String(),
|
|
MhKey: shared.MultihashKeyFromCID(uncleNode.Cid()),
|
|
ParentHash: uncleNode.ParentHash.String(),
|
|
BlockHash: uncleNode.Hash().String(),
|
|
Reward: uncleReward.String(),
|
|
}
|
|
if err := sdi.dbWriter.upsertUncleCID(tx, uncle, headerID); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// processArgs bundles arguments to processReceiptsAndTxs
|
|
type processArgs struct {
|
|
headerID int64
|
|
blockNumber *big.Int
|
|
receipts types.Receipts
|
|
txs types.Transactions
|
|
rctNodes []*ipld.EthReceipt
|
|
rctTrieNodes []*ipld.EthRctTrie
|
|
txNodes []*ipld.EthTx
|
|
txTrieNodes []*ipld.EthTxTrie
|
|
logTrieNodes [][]node.Node
|
|
logLeafNodeCIDs [][]cid.Cid
|
|
rctLeafNodeCIDs []cid.Cid
|
|
}
|
|
|
|
// processReceiptsAndTxs publishes and indexes receipt and transaction IPLDs in Postgres
|
|
func (sdi *StateDiffIndexer) processReceiptsAndTxs(tx *sqlx.Tx, args processArgs) error {
|
|
// Process receipts and txs
|
|
signer := types.MakeSigner(sdi.chainConfig, args.blockNumber)
|
|
for i, receipt := range args.receipts {
|
|
// tx that corresponds with this receipt
|
|
trx := args.txs[i]
|
|
from, err := types.Sender(signer, trx)
|
|
if err != nil {
|
|
return fmt.Errorf("error deriving tx sender: %v", err)
|
|
}
|
|
|
|
for _, trie := range args.logTrieNodes[i] {
|
|
if err = shared.PublishIPLD(tx, trie); err != nil {
|
|
return fmt.Errorf("error publishing log trie node IPLD: %w", err)
|
|
}
|
|
}
|
|
|
|
// publish the txs and receipts
|
|
txNode := args.txNodes[i]
|
|
if err := shared.PublishIPLD(tx, txNode); err != nil {
|
|
return fmt.Errorf("error publishing tx IPLD: %v", err)
|
|
}
|
|
|
|
// Indexing
|
|
// extract topic and contract data from the receipt for indexing
|
|
mappedContracts := make(map[string]bool) // use map to avoid duplicate addresses
|
|
logDataSet := make([]*models.LogsModel, len(receipt.Logs))
|
|
for idx, l := range receipt.Logs {
|
|
topicSet := make([]string, 4)
|
|
for ti, topic := range l.Topics {
|
|
topicSet[ti] = topic.Hex()
|
|
}
|
|
|
|
if !args.logLeafNodeCIDs[i][idx].Defined() {
|
|
return fmt.Errorf("invalid log cid")
|
|
}
|
|
|
|
mappedContracts[l.Address.String()] = true
|
|
logDataSet[idx] = &models.LogsModel{
|
|
ID: 0,
|
|
Address: l.Address.String(),
|
|
Index: int64(l.Index),
|
|
Data: l.Data,
|
|
LeafCID: args.logLeafNodeCIDs[i][idx].String(),
|
|
LeafMhKey: shared.MultihashKeyFromCID(args.logLeafNodeCIDs[i][idx]),
|
|
Topic0: topicSet[0],
|
|
Topic1: topicSet[1],
|
|
Topic2: topicSet[2],
|
|
Topic3: topicSet[3],
|
|
}
|
|
}
|
|
// these are the contracts seen in the logs
|
|
logContracts := make([]string, 0, len(mappedContracts))
|
|
for addr := range mappedContracts {
|
|
logContracts = append(logContracts, addr)
|
|
}
|
|
// this is the contract address if this receipt is for a contract creation tx
|
|
contract := shared.HandleZeroAddr(receipt.ContractAddress)
|
|
var contractHash string
|
|
if contract != "" {
|
|
contractHash = crypto.Keccak256Hash(common.HexToAddress(contract).Bytes()).String()
|
|
}
|
|
// index tx first so that the receipt can reference it by FK
|
|
txModel := models.TxModel{
|
|
Dst: shared.HandleZeroAddrPointer(trx.To()),
|
|
Src: shared.HandleZeroAddr(from),
|
|
TxHash: trx.Hash().String(),
|
|
Index: int64(i),
|
|
Data: trx.Data(),
|
|
CID: txNode.Cid().String(),
|
|
MhKey: shared.MultihashKeyFromCID(txNode.Cid()),
|
|
}
|
|
txType := trx.Type()
|
|
if txType != types.LegacyTxType {
|
|
txModel.Type = &txType
|
|
}
|
|
txID, err := sdi.dbWriter.upsertTransactionCID(tx, txModel, args.headerID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// index access list if this is one
|
|
for j, accessListElement := range trx.AccessList() {
|
|
storageKeys := make([]string, len(accessListElement.StorageKeys))
|
|
for k, storageKey := range accessListElement.StorageKeys {
|
|
storageKeys[k] = storageKey.Hex()
|
|
}
|
|
accessListElementModel := models.AccessListElementModel{
|
|
Index: int64(j),
|
|
Address: accessListElement.Address.Hex(),
|
|
StorageKeys: storageKeys,
|
|
}
|
|
if err := sdi.dbWriter.upsertAccessListElement(tx, accessListElementModel, txID); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// index the receipt
|
|
if !args.rctLeafNodeCIDs[i].Defined() {
|
|
return fmt.Errorf("invalid receipt leaf node cid")
|
|
}
|
|
|
|
rctModel := &models.ReceiptModel{
|
|
Contract: contract,
|
|
ContractHash: contractHash,
|
|
LeafCID: args.rctLeafNodeCIDs[i].String(),
|
|
LeafMhKey: shared.MultihashKeyFromCID(args.rctLeafNodeCIDs[i]),
|
|
LogRoot: args.rctNodes[i].LogRoot.String(),
|
|
}
|
|
if len(receipt.PostState) == 0 {
|
|
rctModel.PostStatus = receipt.Status
|
|
} else {
|
|
rctModel.PostState = common.Bytes2Hex(receipt.PostState)
|
|
}
|
|
|
|
receiptID, err := sdi.dbWriter.upsertReceiptCID(tx, rctModel, txID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err = sdi.dbWriter.upsertLogCID(tx, logDataSet, receiptID); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// publish trie nodes, these aren't indexed directly
|
|
for _, n := range args.txTrieNodes {
|
|
if err := shared.PublishIPLD(tx, n); err != nil {
|
|
return fmt.Errorf("error publishing tx trie node IPLD: %w", err)
|
|
}
|
|
}
|
|
|
|
for _, n := range args.rctTrieNodes {
|
|
if err := shared.PublishIPLD(tx, n); err != nil {
|
|
return fmt.Errorf("error publishing rct trie node IPLD: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// PushStateNode publishes and indexes a state diff node object (including any child storage nodes) in the IPLD database
|
|
func (sdi *StateDiffIndexer) PushStateNode(tx *BlockTx, stateNode sdtypes.StateNode) error {
|
|
// publish the state node
|
|
if stateNode.NodeType == sdtypes.Removed {
|
|
// short circuit if it is a Removed node
|
|
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present
|
|
stateModel := models.StateNodeModel{
|
|
Path: stateNode.Path,
|
|
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
|
|
CID: RemovedNodeStateCID,
|
|
MhKey: RemovedNodeMhKey,
|
|
NodeType: stateNode.NodeType.Int(),
|
|
}
|
|
_, err := sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel, tx.headerID)
|
|
return err
|
|
}
|
|
stateCIDStr, stateMhKey, err := shared.PublishRaw(tx.dbtx, ipld.MEthStateTrie, multihash.KECCAK_256, stateNode.NodeValue)
|
|
if err != nil {
|
|
return fmt.Errorf("error publishing state node IPLD: %v", err)
|
|
}
|
|
stateModel := models.StateNodeModel{
|
|
Path: stateNode.Path,
|
|
StateKey: common.BytesToHash(stateNode.LeafKey).String(),
|
|
CID: stateCIDStr,
|
|
MhKey: stateMhKey,
|
|
NodeType: stateNode.NodeType.Int(),
|
|
}
|
|
// index the state node, collect the stateID to reference by FK
|
|
stateID, err := sdi.dbWriter.upsertStateCID(tx.dbtx, stateModel, tx.headerID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// if we have a leaf, decode and index the account data
|
|
if stateNode.NodeType == sdtypes.Leaf {
|
|
var i []interface{}
|
|
if err := rlp.DecodeBytes(stateNode.NodeValue, &i); err != nil {
|
|
return fmt.Errorf("error decoding state leaf node rlp: %s", err.Error())
|
|
}
|
|
if len(i) != 2 {
|
|
return fmt.Errorf("eth IPLDPublisher expected state leaf node rlp to decode into two elements")
|
|
}
|
|
var account types.StateAccount
|
|
if err := rlp.DecodeBytes(i[1].([]byte), &account); err != nil {
|
|
return fmt.Errorf("error decoding state account rlp: %s", err.Error())
|
|
}
|
|
accountModel := models.StateAccountModel{
|
|
Balance: account.Balance.String(),
|
|
Nonce: account.Nonce,
|
|
CodeHash: account.CodeHash,
|
|
StorageRoot: account.Root.String(),
|
|
}
|
|
if err := sdi.dbWriter.upsertStateAccount(tx.dbtx, accountModel, stateID); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
// if there are any storage nodes associated with this node, publish and index them
|
|
for _, storageNode := range stateNode.StorageNodes {
|
|
if storageNode.NodeType == sdtypes.Removed {
|
|
// short circuit if it is a Removed node
|
|
// this assumes the db has been initialized and a public.blocks entry for the Removed node is present
|
|
storageModel := models.StorageNodeModel{
|
|
Path: storageNode.Path,
|
|
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
|
|
CID: RemovedNodeStorageCID,
|
|
MhKey: RemovedNodeMhKey,
|
|
NodeType: storageNode.NodeType.Int(),
|
|
}
|
|
if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel, stateID); err != nil {
|
|
return err
|
|
}
|
|
continue
|
|
}
|
|
storageCIDStr, storageMhKey, err := shared.PublishRaw(tx.dbtx, ipld.MEthStorageTrie, multihash.KECCAK_256, storageNode.NodeValue)
|
|
if err != nil {
|
|
return fmt.Errorf("error publishing storage node IPLD: %v", err)
|
|
}
|
|
storageModel := models.StorageNodeModel{
|
|
Path: storageNode.Path,
|
|
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
|
|
CID: storageCIDStr,
|
|
MhKey: storageMhKey,
|
|
NodeType: storageNode.NodeType.Int(),
|
|
}
|
|
if err := sdi.dbWriter.upsertStorageCID(tx.dbtx, storageModel, stateID); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// PushCodeAndCodeHash publishes code and codehash pairs to the ipld database
|
|
func (sdi *StateDiffIndexer) PushCodeAndCodeHash(tx *BlockTx, codeAndCodeHash sdtypes.CodeAndCodeHash) error {
|
|
// codec doesn't matter since db key is multihash-based
|
|
mhKey, err := shared.MultihashKeyFromKeccak256(codeAndCodeHash.Hash)
|
|
if err != nil {
|
|
return fmt.Errorf("error deriving multihash key from codehash: %v", err)
|
|
}
|
|
if err := shared.PublishDirect(tx.dbtx, mhKey, codeAndCodeHash.Code); err != nil {
|
|
return fmt.Errorf("error publishing code IPLD: %v", err)
|
|
}
|
|
return nil
|
|
}
|