Use symmetric difference iterator #11

Merged
roysc merged 27 commits from symmetric-diff-iterator into main 2023-09-20 03:22:19 +00:00
26 changed files with 865 additions and 702 deletions

View File

@ -11,49 +11,49 @@ on:
# Needed until we can incorporate docker startup into the executor container
env:
DOCKER_HOST: unix:///var/run/dind.sock
SO_VERSION: v1.1.0-c30c779-202309082138
jobs:
unit-tests:
name: "Run unit tests"
name: Run unit tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v3
- uses: actions/setup-go@v4
with:
go-version-file: 'go.mod'
check-latest: true
- name: "Run dockerd"
- name: Run dockerd
run: |
dockerd -H $DOCKER_HOST --userland-proxy=false &
sleep 5
- name: "Run DB container"
- name: Run DB container
run: |
docker compose -f test/compose.yml up --wait
- name: "Set up Gitea access token"
- name: Set up Gitea access token
env:
TOKEN: ${{ secrets.CICD_REPO_TOKEN }}
run: |
git config --global url."https://$TOKEN:@git.vdb.to/".insteadOf https://git.vdb.to/
- name: "Run tests"
run: go test -v ./...
- name: Run tests
run: go test -p 1 -v ./...
integration-tests:
name: "Run integration tests"
name: Run integration tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
path: ./plugeth-statediff
# TODO: replace with release
- uses: actions/checkout@v3
with:
repository: cerc-io/plugeth
ref: statediff
path: ./plugeth
- name: "Run dockerd"
- name: Run dockerd
run: dockerd -H $DOCKER_HOST --userland-proxy=false &
# These images need access tokens configured
- name: "Build docker image"
- name: Build docker image
env:
TOKEN: ${{ secrets.CICD_REPO_TOKEN }}
run: |
@ -63,26 +63,27 @@ jobs:
docker build ./plugeth -t cerc/plugeth:local \
--build-arg GIT_VDBTO_TOKEN="$TOKEN"
- name: "Install stack-orchestrator"
uses: actions/checkout@v3
with:
repository: cerc-io/stack-orchestrator
ref: v1.1.0-e856616-202308032031
path: ./stack-orchestrator
- run: |
apt-get update && apt-get install -y python3-pip
pip install ./stack-orchestrator
- name: "Clone system-tests"
- name: Install stack-orchestrator
run: |
curl -L -O https://github.com/cerc-io/stack-orchestrator/releases/download/$SO_VERSION/laconic-so
chmod +x laconic-so
- name: Clone system-tests
uses: actions/checkout@v3
with:
repository: cerc-io/system-tests
ref: roy/plugeth-fixes
ref: plugeth-compat
path: ./system-tests
token: ${{ secrets.CICD_REPO_TOKEN }}
- name: "Run testnet stack"
- name: Run testnet stack
working-directory: ./plugeth-statediff
env:
LACONIC_SO: ../laconic-so
run: ./scripts/integration-setup.sh
- name: "Run tests"
- name: Install Python
uses: actions/setup-python@v4
with:
python-version: 3.10
- name: Run tests
working-directory: ./system-tests
run: |
pip install pytest

View File

@ -9,7 +9,8 @@ $(MOCKS_DIR)/gen_backend.go:
github.com/openrelayxyz/plugeth-utils/core Backend,Downloader
docker-image: mocks
docker build . -t "cerc/plugeth-statediff:local"
docker build . -t "cerc/plugeth-statediff:local" \
--build-arg GIT_VDBTO_TOKEN
.PHONY: docker-image
# Local build

View File

@ -21,7 +21,6 @@ package statediff
import (
"bytes"
"encoding/hex"
"fmt"
"time"
@ -35,7 +34,6 @@ import (
"github.com/cerc-io/plugeth-statediff/indexer/database/metrics"
"github.com/cerc-io/plugeth-statediff/indexer/ipld"
"github.com/cerc-io/plugeth-statediff/indexer/shared"
"github.com/cerc-io/plugeth-statediff/trie_helpers"
sdtypes "github.com/cerc-io/plugeth-statediff/types"
"github.com/cerc-io/plugeth-statediff/utils"
"github.com/cerc-io/plugeth-statediff/utils/log"
@ -44,39 +42,34 @@ import (
var (
emptyNode, _ = rlp.EncodeToBytes(&[]byte{})
emptyContractRoot = crypto.Keccak256Hash(emptyNode)
nullCodeHash = crypto.Keccak256Hash([]byte{}).Bytes()
nullCodeHash = crypto.Keccak256([]byte{})
zeroHash common.Hash
)
// Builder interface exposes the method for building a state diff between two blocks
type Builder interface {
BuildStateDiffObject(args Args, params Params) (sdtypes.StateObject, error)
WriteStateDiffObject(args Args, params Params, output sdtypes.StateNodeSink, ipldOutput sdtypes.IPLDSink) error
BuildStateDiffObject(Args, Params) (sdtypes.StateObject, error)
WriteStateDiff(Args, Params, sdtypes.StateNodeSink, sdtypes.IPLDSink) error
}
type StateDiffBuilder struct {
StateCache adapt.StateView
// state cache is safe for concurrent reads
stateCache adapt.StateView
}
type IterPair struct {
type iterPair struct {
Older, Newer trie.NodeIterator
}
func StateNodeAppender(nodes *[]sdtypes.StateLeafNode) sdtypes.StateNodeSink {
return func(node sdtypes.StateLeafNode) error {
*nodes = append(*nodes, node)
return nil
}
type accountUpdate struct {
new sdtypes.AccountWrapper
oldRoot common.Hash
}
func StorageNodeAppender(nodes *[]sdtypes.StorageLeafNode) sdtypes.StorageNodeSink {
return func(node sdtypes.StorageLeafNode) error {
*nodes = append(*nodes, node)
return nil
}
}
func IPLDMappingAppender(iplds *[]sdtypes.IPLD) sdtypes.IPLDSink {
return func(c sdtypes.IPLD) error {
*iplds = append(*iplds, c)
type accountUpdateMap map[string]*accountUpdate
func appender[T any](to *[]T) func(T) error {
return func(a T) error {
*to = append(*to, a)
return nil
}
}
@ -84,7 +77,7 @@ func IPLDMappingAppender(iplds *[]sdtypes.IPLD) sdtypes.IPLDSink {
// NewBuilder is used to create a statediff builder
func NewBuilder(stateCache adapt.StateView) Builder {
return &StateDiffBuilder{
StateCache: stateCache, // state cache is safe for concurrent reads
stateCache: stateCache,
}
}
@ -93,7 +86,7 @@ func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (sdt
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStateDiffObjectTimer)
var stateNodes []sdtypes.StateLeafNode
var iplds []sdtypes.IPLD
err := sdb.WriteStateDiffObject(args, params, StateNodeAppender(&stateNodes), IPLDMappingAppender(&iplds))
err := sdb.WriteStateDiff(args, params, appender(&stateNodes), appender(&iplds))
if err != nil {
return sdtypes.StateObject{}, err
}
@ -105,126 +98,110 @@ func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (sdt
}, nil
}
// WriteStateDiffObject writes a statediff object to output sinks
func (sdb *StateDiffBuilder) WriteStateDiffObject(args Args, params Params, output sdtypes.StateNodeSink,
ipldOutput sdtypes.IPLDSink) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.WriteStateDiffObjectTimer)
// WriteStateDiff writes a statediff object to output sinks
func (sdb *StateDiffBuilder) WriteStateDiff(
args Args, params Params,
nodeSink sdtypes.StateNodeSink,
ipldSink sdtypes.IPLDSink,
) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.WriteStateDiffTimer)
// Load tries for old and new states
oldTrie, err := sdb.StateCache.OpenTrie(args.OldStateRoot)
oldTrie, err := sdb.stateCache.OpenTrie(args.OldStateRoot)
if err != nil {
return fmt.Errorf("error creating trie for oldStateRoot: %w", err)
return fmt.Errorf("error opening old state trie: %w", err)
}
newTrie, err := sdb.StateCache.OpenTrie(args.NewStateRoot)
newTrie, err := sdb.stateCache.OpenTrie(args.NewStateRoot)
if err != nil {
return fmt.Errorf("error creating trie for newStateRoot: %w", err)
return fmt.Errorf("error opening new state trie: %w", err)
}
// we do two state trie iterations:
// one for new/updated nodes,
// one for deleted/updated nodes;
// prepare 2 iterator instances for each task
iterPairs := []IterPair{
{
Older: oldTrie.NodeIterator([]byte{}),
Newer: newTrie.NodeIterator([]byte{}),
},
{
Older: oldTrie.NodeIterator([]byte{}),
Newer: newTrie.NodeIterator([]byte{}),
},
iters := iterPair{
Older: oldTrie.NodeIterator(nil),
Newer: newTrie.NodeIterator(nil),
}
logger := log.New("hash", args.BlockHash, "number", args.BlockNumber)
logger := log.New("hash", args.BlockHash.String(), "number", args.BlockNumber)
return sdb.BuildStateDiff(iterPairs, params, output, ipldOutput, logger, nil)
}
func (sdb *StateDiffBuilder) BuildStateDiff(iterPairs []IterPair, params Params,
output sdtypes.StateNodeSink, ipldOutput sdtypes.IPLDSink, logger log.Logger, prefixPath []byte) error {
logger.Trace("statediff BEGIN BuildStateDiff")
defer metrics.ReportAndUpdateDuration("statediff END BuildStateDiff", time.Now(), logger, metrics.IndexerMetrics.BuildStateDiffTimer)
// collect a slice of all the nodes that were touched and exist at B (B-A)
// a map of their leafkey to all the accounts that were touched and exist at B
// and a slice of all the paths for the nodes in both of the above sets
diffAccountsAtB, err := sdb.createdAndUpdatedState(
iterPairs[0].Older, iterPairs[0].Newer, params.watchedAddressesLeafPaths, ipldOutput, logger, prefixPath)
err = sdb.processAccounts(
iters.Older, iters.Newer,
params.watchedAddressesLeafPaths,
nodeSink, ipldSink, logger)
if err != nil {
return fmt.Errorf("error collecting createdAndUpdatedNodes: %w", err)
}
// collect a slice of all the nodes that existed at a path in A that doesn't exist in B
// a map of their leafkey to all the accounts that were touched and exist at A
diffAccountsAtA, err := sdb.deletedOrUpdatedState(
iterPairs[1].Older, iterPairs[1].Newer, diffAccountsAtB,
params.watchedAddressesLeafPaths, output, logger, prefixPath)
if err != nil {
return fmt.Errorf("error collecting deletedOrUpdatedNodes: %w", err)
}
// collect and sort the leafkey keys for both account mappings into a slice
t := time.Now()
createKeys := trie_helpers.SortKeys(diffAccountsAtB)
deleteKeys := trie_helpers.SortKeys(diffAccountsAtA)
logger.Debug("statediff BuildStateDiff sort", "duration", time.Since(t))
// and then find the intersection of these keys
// these are the leafkeys for the accounts which exist at both A and B but are different
// this also mutates the passed in createKeys and deleteKeys, removing the intersection keys
// and leaving the truly created or deleted keys in place
t = time.Now()
updatedKeys := trie_helpers.FindIntersection(createKeys, deleteKeys)
logger.Debug("statediff BuildStateDiff intersection",
"count", len(updatedKeys),
"duration", time.Since(t))
// build the diff nodes for the updated accounts using the mappings at both A and B as directed by the keys found as the intersection of the two
err = sdb.buildAccountUpdates(diffAccountsAtB, diffAccountsAtA, updatedKeys, output, ipldOutput, logger)
if err != nil {
return fmt.Errorf("error building diff for updated accounts: %w", err)
}
// build the diff nodes for created accounts
err = sdb.buildAccountCreations(diffAccountsAtB, output, ipldOutput, logger)
if err != nil {
return fmt.Errorf("error building diff for created accounts: %w", err)
}
return nil
}
// createdAndUpdatedState returns
// a slice of all the intermediate nodes that exist in a different state at B than A
// a mapping of their leafkeys to all the accounts that exist in a different state at B than A
// and a slice of the paths for all of the nodes included in both
func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator,
watchedAddressesLeafPaths [][]byte, output sdtypes.IPLDSink, logger log.Logger, prefixPath []byte) (sdtypes.AccountMap, error) {
logger.Trace("statediff BEGIN createdAndUpdatedState")
defer metrics.ReportAndUpdateDuration("statediff END createdAndUpdatedState", time.Now(), logger, metrics.IndexerMetrics.CreatedAndUpdatedStateTimer)
diffAccountsAtB := make(sdtypes.AccountMap)
// processAccounts processes account creations and deletions, and returns a set of updated
// existing accounts, indexed by leaf key.
func (sdb *StateDiffBuilder) processAccounts(a, b trie.NodeIterator,
watchedAddressesLeafPaths [][]byte,
nodeSink sdtypes.StateNodeSink, ipldSink sdtypes.IPLDSink,
logger log.Logger,
) error {
logger.Trace("statediff/processAccounts BEGIN")
defer metrics.ReportAndUpdateDuration("statediff/processAccounts END",
time.Now(), logger, metrics.IndexerMetrics.ProcessAccountsTimer)
// cache the RLP of the previous node, so when we hit a leaf we have the parent (containing) node
updates := make(accountUpdateMap)
// Cache the RLP of the previous node. When we hit a value node this will be the parent blob.
var prevBlob []byte
it, itCount := trie.NewDifferenceIterator(a, b)
it, itCount := utils.NewSymmetricDifferenceIterator(a, b)
for it.Next(true) {
// ignore node if it is not along paths of interest
if !isWatchedPathPrefix(watchedAddressesLeafPaths, it.Path()) {
continue
}
// index values by leaf key
if it.FromA() { // Node exists in the old trie
if it.Leaf() {
var account types.StateAccount
if err := rlp.DecodeBytes(it.LeafBlob(), &account); err != nil {
return err
}
leafKey := make([]byte, len(it.LeafKey()))
copy(leafKey, it.LeafKey())
if it.CommonPath() {
// If B also contains this leaf node, this is the old state of an updated account.
if update, ok := updates[string(leafKey)]; ok {
update.oldRoot = account.Root
} else {
updates[string(leafKey)] = &accountUpdate{oldRoot: account.Root}
}
} else {
// This node was removed, meaning the account was deleted. Emit empty
// "removed" records for the state node and all storage all storage slots.
err := sdb.processAccountDeletion(leafKey, account, nodeSink)
if err != nil {
return err
}
}
}
continue
}
// Node exists in the new trie
if it.Leaf() {
// if it is a "value" node, we will index the value by leaf key
accountW, err := sdb.processStateValueNode(it, prevBlob)
accountW, err := sdb.decodeStateLeaf(it, prevBlob)
if err != nil {
return nil, err
return err
}
if accountW == nil {
continue
if it.CommonPath() {
// If A also contains this leaf node, this is the new state of an updated account.
if update, ok := updates[string(accountW.LeafKey)]; ok {
update.new = *accountW
} else {
updates[string(accountW.LeafKey)] = &accountUpdate{new: *accountW}
}
} else { // account was created
err := sdb.processAccountCreation(accountW, ipldSink, nodeSink)
if err != nil {
return err
}
}
// for now, just add it to diffAccountsAtB
// we will compare to diffAccountsAtA to determine which diffAccountsAtB
// were creations and which were updates and also identify accounts that were removed going A->B
diffAccountsAtB[hex.EncodeToString(accountW.LeafKey)] = *accountW
} else {
// trie nodes will be written to blockstore only
// reminder that this includes leaf nodes, since the geth iterator.Leaf() actually
// signifies a "value" node
// New trie nodes will be written to blockstore only.
// Reminder: this includes leaf nodes, since the geth iterator.Leaf() actually
// signifies a "value" node.
if it.Hash() == zeroHash {
continue
}
@ -234,41 +211,108 @@ func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator,
if len(watchedAddressesLeafPaths) > 0 {
var elements []interface{}
if err := rlp.DecodeBytes(nodeVal, &elements); err != nil {
return nil, err
return err
}
ok, err := isLeaf(elements)
if err != nil {
return nil, err
return err
}
partialPath := utils.CompactToHex(elements[0].([]byte))
valueNodePath := append(it.Path(), partialPath...)
if ok && !isWatchedPath(watchedAddressesLeafPaths, valueNodePath) {
continue
if ok {
partialPath := utils.CompactToHex(elements[0].([]byte))
valueNodePath := append(it.Path(), partialPath...)
if !isWatchedPath(watchedAddressesLeafPaths, valueNodePath) {
continue
}
}
}
if err := output(sdtypes.IPLD{
if err := ipldSink(sdtypes.IPLD{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, it.Hash().Bytes()).String(),
Content: nodeVal,
}); err != nil {
return nil, err
return err
}
prevBlob = nodeVal
}
}
logger.Debug("statediff COUNTS createdAndUpdatedState", "it", itCount, "diffAccountsAtB", len(diffAccountsAtB))
for key, update := range updates {
var storageDiff []sdtypes.StorageLeafNode
err := sdb.processStorageUpdates(
update.oldRoot, update.new.Account.Root,
appender(&storageDiff), ipldSink,
)
if err != nil {
return fmt.Errorf("error processing incremental storage diffs for account with leafkey %x\r\nerror: %w", key, err)
}
if err = nodeSink(sdtypes.StateLeafNode{
AccountWrapper: update.new,
StorageDiff: storageDiff,
}); err != nil {
return err
}
}
metrics.IndexerMetrics.DifferenceIteratorCounter.Inc(int64(*itCount))
return diffAccountsAtB, it.Error()
return it.Error()
}
func (sdb *StateDiffBuilder) processAccountDeletion(
leafKey []byte, account types.StateAccount, nodeSink sdtypes.StateNodeSink,
) error {
diff := sdtypes.StateLeafNode{
AccountWrapper: sdtypes.AccountWrapper{
LeafKey: leafKey,
CID: shared.RemovedNodeStateCID,
},
Removed: true,
}
err := sdb.processRemovedAccountStorage(account.Root, appender(&diff.StorageDiff))
if err != nil {
return fmt.Errorf("failed building storage diffs for removed state account with key %x\r\nerror: %w", leafKey, err)
}
return nodeSink(diff)
}
func (sdb *StateDiffBuilder) processAccountCreation(
accountW *sdtypes.AccountWrapper, ipldSink sdtypes.IPLDSink, nodeSink sdtypes.StateNodeSink,
) error {
diff := sdtypes.StateLeafNode{
AccountWrapper: *accountW,
}
if !bytes.Equal(accountW.Account.CodeHash, nullCodeHash) {
// For contract creations, any storage node contained is a diff
err := sdb.processStorageCreations(accountW.Account.Root, appender(&diff.StorageDiff), ipldSink)
if err != nil {
return fmt.Errorf("failed building eventual storage diffs for node with leaf key %x\r\nerror: %w", accountW.LeafKey, err)
}
// emit codehash => code mappings for contract
codeHash := common.BytesToHash(accountW.Account.CodeHash)
code, err := sdb.stateCache.ContractCode(codeHash)
if err != nil {
return fmt.Errorf("failed to retrieve code for codehash %s\r\n error: %w", codeHash, err)
}
if err := ipldSink(sdtypes.IPLD{
CID: ipld.Keccak256ToCid(ipld.RawBinary, codeHash.Bytes()).String(),
Content: code,
}); err != nil {
return err
}
}
return nodeSink(diff)
}
// decodes account at leaf and encodes RLP data to CID
// reminder: it.Leaf() == true when the iterator is positioned at a "value node" (which is not something
// that actually exists in an MMPT), therefore we pass the parent node blob as the leaf RLP.
func (sdb *StateDiffBuilder) processStateValueNode(it trie.NodeIterator, parentBlob []byte) (*sdtypes.AccountWrapper, error) {
func (sdb *StateDiffBuilder) decodeStateLeaf(it trie.NodeIterator, parentBlob []byte) (*sdtypes.AccountWrapper, error) {
var account types.StateAccount
if err := rlp.DecodeBytes(it.LeafBlob(), &account); err != nil {
return nil, fmt.Errorf("error decoding account at leaf key %x: %w", it.LeafKey(), err)
}
leafKey := make([]byte, len(it.LeafKey()))
copy(leafKey, it.LeafKey())
return &sdtypes.AccountWrapper{
LeafKey: it.LeafKey(),
Account: &account,
@ -276,176 +320,34 @@ func (sdb *StateDiffBuilder) processStateValueNode(it trie.NodeIterator, parentB
}, nil
}
// deletedOrUpdatedState returns a slice of all the paths that are emptied at B
// and a mapping of their leafkeys to all the accounts that exist in a different state at A than B
func (sdb *StateDiffBuilder) deletedOrUpdatedState(a, b trie.NodeIterator, diffAccountsAtB sdtypes.AccountMap,
watchedAddressesLeafPaths [][]byte, output sdtypes.StateNodeSink, logger log.Logger, prefixPath []byte) (sdtypes.AccountMap, error) {
logger.Trace("statediff BEGIN deletedOrUpdatedState")
defer metrics.ReportAndUpdateDuration("statediff END deletedOrUpdatedState", time.Now(), logger, metrics.IndexerMetrics.DeletedOrUpdatedStateTimer)
diffAccountAtA := make(sdtypes.AccountMap)
var prevBlob []byte
it, _ := trie.NewDifferenceIterator(b, a)
for it.Next(true) {
if !isWatchedPathPrefix(watchedAddressesLeafPaths, it.Path()) {
continue
}
if it.Leaf() {
accountW, err := sdb.processStateValueNode(it, prevBlob)
if err != nil {
return nil, err
}
if accountW == nil {
continue
}
leafKey := hex.EncodeToString(accountW.LeafKey)
diffAccountAtA[leafKey] = *accountW
// if this node's leaf key did not show up in diffAccountsAtB
// that means the account was deleted
// in that case, emit an empty "removed" diff state node
// include empty "removed" diff storage nodes for all the storage slots
if _, ok := diffAccountsAtB[leafKey]; !ok {
diff := sdtypes.StateLeafNode{
AccountWrapper: sdtypes.AccountWrapper{
Account: nil,
LeafKey: accountW.LeafKey,
CID: shared.RemovedNodeStateCID,
},
Removed: true,
}
storageDiff := make([]sdtypes.StorageLeafNode, 0)
err := sdb.buildRemovedAccountStorageNodes(accountW.Account.Root, StorageNodeAppender(&storageDiff))
if err != nil {
return nil, fmt.Errorf("failed building storage diffs for removed state account with key %x\r\nerror: %w", leafKey, err)
}
diff.StorageDiff = storageDiff
if err := output(diff); err != nil {
return nil, err
}
}
} else {
prevBlob = make([]byte, len(it.NodeBlob()))
copy(prevBlob, it.NodeBlob())
}
}
return diffAccountAtA, it.Error()
}
// buildAccountUpdates uses the account diffs maps for A => B and B => A and the known intersection of their leafkeys
// to generate the statediff node objects for all of the accounts that existed at both A and B but in different states
// needs to be called before building account creations and deletions as this mutates
// those account maps to remove the accounts which were updated
func (sdb *StateDiffBuilder) buildAccountUpdates(creations, deletions sdtypes.AccountMap, updatedKeys []string,
output sdtypes.StateNodeSink, ipldOutput sdtypes.IPLDSink, logger log.Logger) error {
logger.Trace("statediff BEGIN buildAccountUpdates",
"creations", len(creations), "deletions", len(deletions), "updated", len(updatedKeys))
defer metrics.ReportAndUpdateDuration("statediff END buildAccountUpdates ",
time.Now(), logger, metrics.IndexerMetrics.BuildAccountUpdatesTimer)
var err error
for _, key := range updatedKeys {
createdAcc := creations[key]
deletedAcc := deletions[key]
storageDiff := make([]sdtypes.StorageLeafNode, 0)
if deletedAcc.Account != nil && createdAcc.Account != nil {
err = sdb.buildStorageNodesIncremental(
deletedAcc.Account.Root, createdAcc.Account.Root,
StorageNodeAppender(&storageDiff), ipldOutput,
)
if err != nil {
return fmt.Errorf("failed building incremental storage diffs for account with leafkey %x\r\nerror: %w", key, err)
}
}
if err = output(sdtypes.StateLeafNode{
AccountWrapper: createdAcc,
Removed: false,
StorageDiff: storageDiff,
}); err != nil {
return err
}
delete(creations, key)
delete(deletions, key)
}
return nil
}
// buildAccountCreations returns the statediff node objects for all the accounts that exist at B but not at A
// it also returns the code and codehash for created contract accounts
func (sdb *StateDiffBuilder) buildAccountCreations(accounts sdtypes.AccountMap, output sdtypes.StateNodeSink,
ipldOutput sdtypes.IPLDSink, logger log.Logger) error {
logger.Trace("statediff BEGIN buildAccountCreations")
defer metrics.ReportAndUpdateDuration("statediff END buildAccountCreations",
time.Now(), logger, metrics.IndexerMetrics.BuildAccountCreationsTimer)
for _, val := range accounts {
diff := sdtypes.StateLeafNode{
AccountWrapper: val,
Removed: false,
}
if !bytes.Equal(val.Account.CodeHash, nullCodeHash) {
// For contract creations, any storage node contained is a diff
storageDiff := make([]sdtypes.StorageLeafNode, 0)
err := sdb.buildStorageNodesEventual(val.Account.Root, StorageNodeAppender(&storageDiff), ipldOutput)
if err != nil {
return fmt.Errorf("failed building eventual storage diffs for node with leaf key %x\r\nerror: %w", val.LeafKey, err)
}
diff.StorageDiff = storageDiff
// emit codehash => code mappings for contract
codeHash := common.BytesToHash(val.Account.CodeHash)
code, err := sdb.StateCache.ContractCode(codeHash)
if err != nil {
return fmt.Errorf("failed to retrieve code for codehash %x\r\n error: %w", codeHash, err)
}
if err := ipldOutput(sdtypes.IPLD{
CID: ipld.Keccak256ToCid(ipld.RawBinary, codeHash.Bytes()).String(),
Content: code,
}); err != nil {
return err
}
}
if err := output(diff); err != nil {
return err
}
}
return nil
}
// buildStorageNodesEventual builds the storage diff node objects for a created account
// i.e. it returns all the storage nodes at this state, since there is no previous state
func (sdb *StateDiffBuilder) buildStorageNodesEventual(sr common.Hash, output sdtypes.StorageNodeSink,
ipldOutput sdtypes.IPLDSink) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStorageNodesEventualTimer)
// processStorageCreations processes the storage node records for a newly created account
// i.e. it returns all the storage nodes at this state, since there is no previous state.
func (sdb *StateDiffBuilder) processStorageCreations(
sr common.Hash, storageSink sdtypes.StorageNodeSink, ipldSink sdtypes.IPLDSink,
) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.ProcessStorageCreationsTimer)
if sr == emptyContractRoot {
return nil
}
log.Debug("Storage root for eventual diff", "root", sr.String())
sTrie, err := sdb.StateCache.OpenTrie(sr)
log.Debug("Storage root for eventual diff", "root", sr)
sTrie, err := sdb.stateCache.OpenTrie(sr)
if err != nil {
log.Info("error in build storage diff eventual", "error", err)
return err
}
it := sTrie.NodeIterator(make([]byte, 0))
return sdb.buildStorageNodesFromTrie(it, output, ipldOutput)
}
// buildStorageNodesFromTrie returns all the storage diff node objects in the provided node iterator
func (sdb *StateDiffBuilder) buildStorageNodesFromTrie(it trie.NodeIterator, output sdtypes.StorageNodeSink,
ipldOutput sdtypes.IPLDSink) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStorageNodesFromTrieTimer)
var prevBlob []byte
it := sTrie.NodeIterator(make([]byte, 0))
for it.Next(true) {
if it.Leaf() {
storageLeafNode := sdb.processStorageValueNode(it, prevBlob)
if err := output(storageLeafNode); err != nil {
storageLeafNode := sdb.decodeStorageLeaf(it, prevBlob)
if err := storageSink(storageLeafNode); err != nil {
return err
}
} else {
nodeVal := make([]byte, len(it.NodeBlob()))
copy(nodeVal, it.NodeBlob())
if err := ipldOutput(sdtypes.IPLD{
if err := ipldSink(sdtypes.IPLD{
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, it.Hash().Bytes()).String(),
Content: nodeVal,
}); err != nil {
@ -457,46 +359,88 @@ func (sdb *StateDiffBuilder) buildStorageNodesFromTrie(it trie.NodeIterator, out
return it.Error()
}
// decodes account at leaf and encodes RLP data to CID
// reminder: it.Leaf() == true when the iterator is positioned at a "value node" (which is not something
// that actually exists in an MMPT), therefore we pass the parent node blob as the leaf RLP.
func (sdb *StateDiffBuilder) processStorageValueNode(it trie.NodeIterator, parentBlob []byte) sdtypes.StorageLeafNode {
leafKey := make([]byte, len(it.LeafKey()))
copy(leafKey, it.LeafKey())
value := make([]byte, len(it.LeafBlob()))
copy(value, it.LeafBlob())
return sdtypes.StorageLeafNode{
LeafKey: leafKey,
Value: value,
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(parentBlob)).String(),
// processStorageUpdates builds the storage diff node objects for all nodes that exist in a different state at B than A
func (sdb *StateDiffBuilder) processStorageUpdates(
oldroot common.Hash, newroot common.Hash,
storageSink sdtypes.StorageNodeSink,
ipldSink sdtypes.IPLDSink,
) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.ProcessStorageUpdatesTimer)
if newroot == oldroot {
return nil
}
log.Trace("Storage roots for incremental diff", "old", oldroot, "new", newroot)
oldTrie, err := sdb.stateCache.OpenTrie(oldroot)
if err != nil {
return err
}
newTrie, err := sdb.stateCache.OpenTrie(newroot)
if err != nil {
return err
}
var prevBlob []byte
a, b := oldTrie.NodeIterator(nil), newTrie.NodeIterator(nil)
it, _ := utils.NewSymmetricDifferenceIterator(a, b)
for it.Next(true) {
if it.FromA() {
if it.Leaf() && !it.CommonPath() {
// If this node's leaf key is absent from B, the storage slot was vacated.
// In that case, emit an empty "removed" storage node record.
if err := storageSink(sdtypes.StorageLeafNode{
CID: shared.RemovedNodeStorageCID,
Removed: true,
LeafKey: []byte(it.LeafKey()),
Value: []byte{},
}); err != nil {
return err
}
}
continue
}
if it.Leaf() {
storageLeafNode := sdb.decodeStorageLeaf(it, prevBlob)
if err := storageSink(storageLeafNode); err != nil {
return err
}
} else {
if it.Hash() == zeroHash {
continue
}
nodeVal := make([]byte, len(it.NodeBlob()))
copy(nodeVal, it.NodeBlob())
if err := ipldSink(sdtypes.IPLD{
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, it.Hash().Bytes()).String(),
Content: nodeVal,
}); err != nil {
return err
}
prevBlob = nodeVal
}
}
return it.Error()
}
// buildRemovedAccountStorageNodes builds the "removed" diffs for all the storage nodes for a destroyed account
func (sdb *StateDiffBuilder) buildRemovedAccountStorageNodes(sr common.Hash, output sdtypes.StorageNodeSink) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildRemovedAccountStorageNodesTimer)
// processRemovedAccountStorage builds the "removed" diffs for all the storage nodes for a destroyed account
func (sdb *StateDiffBuilder) processRemovedAccountStorage(
sr common.Hash, storageSink sdtypes.StorageNodeSink,
) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.ProcessRemovedAccountStorageTimer)
if sr == emptyContractRoot {
return nil
}
log.Debug("Storage root for removed diffs", "root", sr.String())
sTrie, err := sdb.StateCache.OpenTrie(sr)
log.Debug("Storage root for removed diffs", "root", sr)
sTrie, err := sdb.stateCache.OpenTrie(sr)
if err != nil {
log.Info("error in build removed account storage diffs", "error", err)
return err
}
it := sTrie.NodeIterator(make([]byte, 0))
return sdb.buildRemovedStorageNodesFromTrie(it, output)
}
// buildRemovedStorageNodesFromTrie returns diffs for all the storage nodes in the provided node interator
func (sdb *StateDiffBuilder) buildRemovedStorageNodesFromTrie(it trie.NodeIterator, output sdtypes.StorageNodeSink) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildRemovedStorageNodesFromTrieTimer)
it := sTrie.NodeIterator(nil)
for it.Next(true) {
if it.Leaf() { // only leaf values are indexed, don't need to demarcate removed intermediate nodes
leafKey := make([]byte, len(it.LeafKey()))
copy(leafKey, it.LeafKey())
if err := output(sdtypes.StorageLeafNode{
if err := storageSink(sdtypes.StorageLeafNode{
CID: shared.RemovedNodeStorageCID,
Removed: true,
LeafKey: leafKey,
@ -509,89 +453,20 @@ func (sdb *StateDiffBuilder) buildRemovedStorageNodesFromTrie(it trie.NodeIterat
return it.Error()
}
// buildStorageNodesIncremental builds the storage diff node objects for all nodes that exist in a different state at B than A
func (sdb *StateDiffBuilder) buildStorageNodesIncremental(oldroot common.Hash, newroot common.Hash, output sdtypes.StorageNodeSink,
ipldOutput sdtypes.IPLDSink) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStorageNodesIncrementalTimer)
if newroot == oldroot {
return nil
}
log.Trace("Storage roots for incremental diff", "old", oldroot.String(), "new", newroot.String())
oldTrie, err := sdb.StateCache.OpenTrie(oldroot)
if err != nil {
return err
}
newTrie, err := sdb.StateCache.OpenTrie(newroot)
if err != nil {
return err
}
// decodes slot at leaf and encodes RLP data to CID
// reminder: it.Leaf() == true when the iterator is positioned at a "value node" (which is not something
// that actually exists in an MMPT), therefore we pass the parent node blob as the leaf RLP.
func (sdb *StateDiffBuilder) decodeStorageLeaf(it trie.NodeIterator, parentBlob []byte) sdtypes.StorageLeafNode {
leafKey := make([]byte, len(it.LeafKey()))
copy(leafKey, it.LeafKey())
value := make([]byte, len(it.LeafBlob()))
copy(value, it.LeafBlob())
diffSlotsAtB, err := sdb.createdAndUpdatedStorage(
oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{}), output, ipldOutput)
if err != nil {
return err
return sdtypes.StorageLeafNode{
LeafKey: leafKey,
Value: value,
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(parentBlob)).String(),
}
return sdb.deletedOrUpdatedStorage(oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{}),
diffSlotsAtB, output)
}
func (sdb *StateDiffBuilder) createdAndUpdatedStorage(a, b trie.NodeIterator, output sdtypes.StorageNodeSink,
ipldOutput sdtypes.IPLDSink) (map[string]bool, error) {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.CreatedAndUpdatedStorageTimer)
diffSlotsAtB := make(map[string]bool)
var prevBlob []byte
it, _ := trie.NewDifferenceIterator(a, b)
for it.Next(true) {
if it.Leaf() {
storageLeafNode := sdb.processStorageValueNode(it, prevBlob)
if err := output(storageLeafNode); err != nil {
return nil, err
}
diffSlotsAtB[hex.EncodeToString(storageLeafNode.LeafKey)] = true
} else {
if it.Hash() == zeroHash {
continue
}
nodeVal := make([]byte, len(it.NodeBlob()))
copy(nodeVal, it.NodeBlob())
nodeHash := make([]byte, len(it.Hash().Bytes()))
copy(nodeHash, it.Hash().Bytes())
if err := ipldOutput(sdtypes.IPLD{
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, nodeHash).String(),
Content: nodeVal,
}); err != nil {
return nil, err
}
prevBlob = nodeVal
}
}
return diffSlotsAtB, it.Error()
}
func (sdb *StateDiffBuilder) deletedOrUpdatedStorage(a, b trie.NodeIterator, diffSlotsAtB map[string]bool, output sdtypes.StorageNodeSink) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.DeletedOrUpdatedStorageTimer)
it, _ := trie.NewDifferenceIterator(b, a)
for it.Next(true) {
if it.Leaf() {
leafKey := make([]byte, len(it.LeafKey()))
copy(leafKey, it.LeafKey())
// if this node's leaf key did not show up in diffSlotsAtB
// that means the storage slot was vacated
// in that case, emit an empty "removed" diff storage node
if _, ok := diffSlotsAtB[hex.EncodeToString(leafKey)]; !ok {
if err := output(sdtypes.StorageLeafNode{
CID: shared.RemovedNodeStorageCID,
Removed: true,
LeafKey: leafKey,
Value: []byte{},
}); err != nil {
return err
}
}
}
}
return it.Error()
}
// isWatchedPathPrefix checks if a node path is a prefix (ancestor) to one of the watched addresses.

View File

@ -27,7 +27,6 @@ import (
"github.com/ethereum/go-ethereum/rlp"
statediff "github.com/cerc-io/plugeth-statediff"
"github.com/cerc-io/plugeth-statediff/adapt"
"github.com/cerc-io/plugeth-statediff/indexer/ipld"
"github.com/cerc-io/plugeth-statediff/indexer/shared"
"github.com/cerc-io/plugeth-statediff/test_helpers"
@ -37,8 +36,8 @@ import (
var (
contractLeafKey []byte
emptyDiffs = make([]sdtypes.StateLeafNode, 0)
emptyStorage = make([]sdtypes.StorageLeafNode, 0)
emptyDiffs []sdtypes.StateLeafNode
emptyStorage []sdtypes.StorageLeafNode
block0, block1, block2, block3, block4, block5, block6 *types.Block
builder statediff.Builder
minerAddress = common.HexToAddress("0x0")
@ -796,7 +795,7 @@ func TestBuilder(t *testing.T) {
},
}
test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())),
test_helpers.RunBuilderTests(t, chain.StateCache(),
tests, params, test_helpers.CheckedRoots{
block0: bankAccountAtBlock0LeafNode,
block1: block1BranchRootNode,
@ -1010,7 +1009,7 @@ func TestBuilderWithWatchedAddressList(t *testing.T) {
},
}
test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
block0: bankAccountAtBlock0LeafNode,
block1: block1BranchRootNode,
block2: block2BranchRootNode,
@ -1260,7 +1259,7 @@ func TestBuilderWithRemovedAccountAndStorage(t *testing.T) {
},
}
test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
block4: block4BranchRootNode,
block5: block5BranchRootNode,
block6: block6BranchRootNode,
@ -1394,7 +1393,7 @@ func TestBuilderWithRemovedNonWatchedAccount(t *testing.T) {
},
}
test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
block4: block4BranchRootNode,
block5: block5BranchRootNode,
block6: block6BranchRootNode,
@ -1597,7 +1596,7 @@ func TestBuilderWithRemovedWatchedAccount(t *testing.T) {
},
}
test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
block4: block4BranchRootNode,
block5: block5BranchRootNode,
block6: block6BranchRootNode,
@ -1824,7 +1823,7 @@ func TestBuilderWithMovedAccount(t *testing.T) {
},
}
test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
block1: block01BranchRootNode,
block2: bankAccountAtBlock02LeafNode,
})
@ -2350,7 +2349,7 @@ func TestBuilderWithInternalizedLeafNode(t *testing.T) {
},
}
test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
block1: block1bBranchRootNode,
block2: block2bBranchRootNode,
block3: block3bBranchRootNode,
@ -2399,10 +2398,9 @@ func TestBuilderWithInternalizedLeafNodeAndWatchedAddress(t *testing.T) {
&sdtypes.StateObject{
BlockNumber: block0.Number(),
BlockHash: block0.Hash(),
Nodes: []sdtypes.StateLeafNode{},
IPLDs: []sdtypes.IPLD{}, // there's some kind of weird behavior where if our root node is a leaf node
// even though it is along the path to the watched leaf (necessarily, as it is the root) it doesn't get included
// unconsequential, but kinda odd.
// there's some kind of weird behavior where if our root node is a leaf node even
// though it is along the path to the watched leaf (necessarily, as it is the root)
// it doesn't get included. unconsequential, but kinda odd.
},
},
{
@ -2417,7 +2415,6 @@ func TestBuilderWithInternalizedLeafNodeAndWatchedAddress(t *testing.T) {
&sdtypes.StateObject{
BlockNumber: block1.Number(),
BlockHash: block1.Hash(),
Nodes: []sdtypes.StateLeafNode{},
IPLDs: []sdtypes.IPLD{
{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(block1bBranchRootNode)).String(),
@ -2553,7 +2550,7 @@ func TestBuilderWithInternalizedLeafNodeAndWatchedAddress(t *testing.T) {
},
}
test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
block1: block1bBranchRootNode,
block2: block2bBranchRootNode,
block3: block3bBranchRootNode,

View File

@ -99,6 +99,7 @@ func (p *ParamsWithMutex) CopyParams() Params {
// Args bundles the arguments for the state diff builder
type Args struct {
OldStateRoot, NewStateRoot, BlockHash common.Hash
BlockNumber *big.Int
OldStateRoot, NewStateRoot common.Hash
BlockHash common.Hash
BlockNumber *big.Int
}

2
go.mod
View File

@ -3,6 +3,7 @@ module github.com/cerc-io/plugeth-statediff
go 1.19
require (
github.com/cerc-io/eth-testing v0.2.1
github.com/ethereum/go-ethereum v1.11.6
github.com/georgysavva/scany v0.2.9
github.com/golang/mock v1.6.0
@ -122,6 +123,7 @@ require (
)
replace (
github.com/cerc-io/eth-testing => git.vdb.to/cerc-io/eth-testing v0.2.1
github.com/ethereum/go-ethereum => git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1
github.com/openrelayxyz/plugeth-utils => git.vdb.to/cerc-io/plugeth-utils v0.0.0-20230706160122-cd41de354c46
)

2
go.sum
View File

@ -1,4 +1,6 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
git.vdb.to/cerc-io/eth-testing v0.2.1 h1:IZAX7DVgzPkSmu1xdKZ5aOemdEYbvtgae7GUl/TUNtQ=
git.vdb.to/cerc-io/eth-testing v0.2.1/go.mod h1:qdvpc/W1xvf2MKx3rMOqvFvYaYIHG77Z1g0lwsmw0Uk=
git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1 h1:KLjxHwp9Zp7xhECccmJS00RiL+VwTuUGLU7qeIctg8g=
git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1/go.mod h1:cYXZu70+6xmDgIgrTD81GPasv16piiAFJnKyAbwVPMU=
git.vdb.to/cerc-io/plugeth-utils v0.0.0-20230706160122-cd41de354c46 h1:KYcbbne/RXd7AuxbUd/3hgk1jPN+33k2CKiNsUsMCC0=

View File

@ -72,65 +72,41 @@ type IndexerMetricsHandles struct {
StateStoreCodeProcessingTimer metrics.Timer
// Fine-grained code timers
BuildStateDiffTimer metrics.Timer
CreatedAndUpdatedStateTimer metrics.Timer
DeletedOrUpdatedStateTimer metrics.Timer
BuildAccountUpdatesTimer metrics.Timer
BuildAccountCreationsTimer metrics.Timer
ResolveNodeTimer metrics.Timer
SortKeysTimer metrics.Timer
FindIntersectionTimer metrics.Timer
OutputTimer metrics.Timer
IPLDOutputTimer metrics.Timer
DifferenceIteratorNextTimer metrics.Timer
DifferenceIteratorCounter metrics.Counter
DeletedOrUpdatedStorageTimer metrics.Timer
CreatedAndUpdatedStorageTimer metrics.Timer
BuildStorageNodesIncrementalTimer metrics.Timer
BuildStateDiffObjectTimer metrics.Timer
WriteStateDiffObjectTimer metrics.Timer
BuildStorageNodesEventualTimer metrics.Timer
BuildStorageNodesFromTrieTimer metrics.Timer
BuildRemovedAccountStorageNodesTimer metrics.Timer
BuildRemovedStorageNodesFromTrieTimer metrics.Timer
IsWatchedAddressTimer metrics.Timer
ProcessAccountsTimer metrics.Timer
OutputTimer metrics.Timer
IPLDOutputTimer metrics.Timer
DifferenceIteratorCounter metrics.Counter
BuildStateDiffObjectTimer metrics.Timer
WriteStateDiffTimer metrics.Timer
ProcessStorageUpdatesTimer metrics.Timer
ProcessStorageCreationsTimer metrics.Timer
ProcessRemovedAccountStorageTimer metrics.Timer
IsWatchedAddressTimer metrics.Timer
}
func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles {
ctx := IndexerMetricsHandles{
BlocksCounter: metrics.NewCounter(),
TransactionsCounter: metrics.NewCounter(),
ReceiptsCounter: metrics.NewCounter(),
LogsCounter: metrics.NewCounter(),
AccessListEntriesCounter: metrics.NewCounter(),
FreePostgresTimer: metrics.NewTimer(),
PostgresCommitTimer: metrics.NewTimer(),
HeaderProcessingTimer: metrics.NewTimer(),
UncleProcessingTimer: metrics.NewTimer(),
TxAndRecProcessingTimer: metrics.NewTimer(),
StateStoreCodeProcessingTimer: metrics.NewTimer(),
BuildStateDiffTimer: metrics.NewTimer(),
CreatedAndUpdatedStateTimer: metrics.NewTimer(),
DeletedOrUpdatedStateTimer: metrics.NewTimer(),
BuildAccountUpdatesTimer: metrics.NewTimer(),
BuildAccountCreationsTimer: metrics.NewTimer(),
ResolveNodeTimer: metrics.NewTimer(),
SortKeysTimer: metrics.NewTimer(),
FindIntersectionTimer: metrics.NewTimer(),
OutputTimer: metrics.NewTimer(),
IPLDOutputTimer: metrics.NewTimer(),
DifferenceIteratorNextTimer: metrics.NewTimer(),
DifferenceIteratorCounter: metrics.NewCounter(),
DeletedOrUpdatedStorageTimer: metrics.NewTimer(),
CreatedAndUpdatedStorageTimer: metrics.NewTimer(),
BuildStorageNodesIncrementalTimer: metrics.NewTimer(),
BuildStateDiffObjectTimer: metrics.NewTimer(),
WriteStateDiffObjectTimer: metrics.NewTimer(),
BuildStorageNodesEventualTimer: metrics.NewTimer(),
BuildStorageNodesFromTrieTimer: metrics.NewTimer(),
BuildRemovedAccountStorageNodesTimer: metrics.NewTimer(),
BuildRemovedStorageNodesFromTrieTimer: metrics.NewTimer(),
IsWatchedAddressTimer: metrics.NewTimer(),
BlocksCounter: metrics.NewCounter(),
TransactionsCounter: metrics.NewCounter(),
ReceiptsCounter: metrics.NewCounter(),
LogsCounter: metrics.NewCounter(),
AccessListEntriesCounter: metrics.NewCounter(),
FreePostgresTimer: metrics.NewTimer(),
PostgresCommitTimer: metrics.NewTimer(),
HeaderProcessingTimer: metrics.NewTimer(),
UncleProcessingTimer: metrics.NewTimer(),
TxAndRecProcessingTimer: metrics.NewTimer(),
StateStoreCodeProcessingTimer: metrics.NewTimer(),
ProcessAccountsTimer: metrics.NewTimer(),
OutputTimer: metrics.NewTimer(),
IPLDOutputTimer: metrics.NewTimer(),
DifferenceIteratorCounter: metrics.NewCounter(),
BuildStateDiffObjectTimer: metrics.NewTimer(),
WriteStateDiffTimer: metrics.NewTimer(),
ProcessStorageUpdatesTimer: metrics.NewTimer(),
ProcessStorageCreationsTimer: metrics.NewTimer(),
ProcessRemovedAccountStorageTimer: metrics.NewTimer(),
IsWatchedAddressTimer: metrics.NewTimer(),
}
subsys := "indexer"
reg.Register(metricName(subsys, "blocks"), ctx.BlocksCounter)
@ -144,28 +120,15 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles {
reg.Register(metricName(subsys, "t_uncle_processing"), ctx.UncleProcessingTimer)
reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.TxAndRecProcessingTimer)
reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.StateStoreCodeProcessingTimer)
reg.Register(metricName(subsys, "t_build_statediff"), ctx.BuildStateDiffTimer)
roysc marked this conversation as resolved Outdated

Are all of these timers irrelevant with the new iterator?

Are all of these timers irrelevant with the new iterator?
Outdated
Review

Some were not actually used anywhere - ResolveNodeTimer, DifferenceIteratorNextTimer (AccessListEntriesCounter is also unused)

The rest are either renamed or deleted along with the functions they tracked.

Removed include:

  • BuildRemovedStorageNodesFromTrieTimer
  • SortKeysTimer
  • FindIntersectionTimer
  • BuildStateDiffObjectTimer
Some were not actually used anywhere - ResolveNodeTimer, DifferenceIteratorNextTimer (AccessListEntriesCounter is also unused) The rest are either renamed or deleted along with the functions they tracked. Removed include: - BuildRemovedStorageNodesFromTrieTimer - SortKeysTimer - FindIntersectionTimer - BuildStateDiffObjectTimer
reg.Register(metricName(subsys, "t_created_and_update_state"), ctx.CreatedAndUpdatedStateTimer)
reg.Register(metricName(subsys, "t_deleted_or_updated_state"), ctx.DeletedOrUpdatedStateTimer)
reg.Register(metricName(subsys, "t_build_account_updates"), ctx.BuildAccountUpdatesTimer)
reg.Register(metricName(subsys, "t_build_account_creations"), ctx.BuildAccountCreationsTimer)
reg.Register(metricName(subsys, "t_resolve_node"), ctx.ResolveNodeTimer)
reg.Register(metricName(subsys, "t_sort_keys"), ctx.SortKeysTimer)
reg.Register(metricName(subsys, "t_find_intersection"), ctx.FindIntersectionTimer)
reg.Register(metricName(subsys, "t_output_fn"), ctx.OutputTimer)
reg.Register(metricName(subsys, "t_ipld_output_fn"), ctx.IPLDOutputTimer)
reg.Register(metricName(subsys, "t_difference_iterator_next"), ctx.DifferenceIteratorNextTimer)
reg.Register(metricName(subsys, "difference_iterator_counter"), ctx.DifferenceIteratorCounter)
reg.Register(metricName(subsys, "t_created_and_updated_storage"), ctx.CreatedAndUpdatedStorageTimer)
reg.Register(metricName(subsys, "t_deleted_or_updated_storage"), ctx.DeletedOrUpdatedStorageTimer)
reg.Register(metricName(subsys, "t_build_storage_nodes_incremental"), ctx.BuildStorageNodesIncrementalTimer)
reg.Register(metricName(subsys, "t_build_statediff_object"), ctx.BuildStateDiffObjectTimer)
reg.Register(metricName(subsys, "t_write_statediff_object"), ctx.WriteStateDiffObjectTimer)
reg.Register(metricName(subsys, "t_created_and_updated_state"), ctx.CreatedAndUpdatedStateTimer)
reg.Register(metricName(subsys, "t_build_storage_nodes_eventual"), ctx.BuildStorageNodesEventualTimer)
reg.Register(metricName(subsys, "t_build_storage_nodes_from_trie"), ctx.BuildStorageNodesFromTrieTimer)
reg.Register(metricName(subsys, "t_build_removed_accounts_storage_nodes"), ctx.BuildRemovedAccountStorageNodesTimer)
reg.Register(metricName(subsys, "t_build_removed_storage_nodes_from_trie"), ctx.BuildRemovedStorageNodesFromTrieTimer)
reg.Register(metricName(subsys, "t_write_statediff_object"), ctx.WriteStateDiffTimer)
reg.Register(metricName(subsys, "t_process_accounts"), ctx.ProcessAccountsTimer)
reg.Register(metricName(subsys, "t_process_storage_updates"), ctx.ProcessStorageUpdatesTimer)
reg.Register(metricName(subsys, "t_process_storage_creations"), ctx.ProcessStorageCreationsTimer)
reg.Register(metricName(subsys, "t_process_removed_account_storage"), ctx.ProcessRemovedAccountStorageTimer)
reg.Register(metricName(subsys, "t_is_watched_address"), ctx.IsWatchedAddressTimer)
log.Debug("Registering statediff indexer metrics.")

View File

@ -227,7 +227,7 @@ var (
CID: AccountLeafNodeCID,
},
Removed: false,
StorageDiff: []sdtypes.StorageLeafNode{},
StorageDiff: nil,
},
{
AccountWrapper: sdtypes.AccountWrapper{
@ -236,7 +236,7 @@ var (
CID: shared.RemovedNodeStateCID,
},
Removed: true,
StorageDiff: []sdtypes.StorageLeafNode{},
StorageDiff: nil,
},
{
AccountWrapper: sdtypes.AccountWrapper{

View File

@ -16,19 +16,14 @@ import (
)
var (
pluginLoader core.PluginLoader
gethContext core.Context
service *statediff.Service
blockchain statediff.BlockChain
gethContext core.Context
service *statediff.Service
blockchain statediff.BlockChain
)
func Initialize(ctx core.Context, pl core.PluginLoader, logger core.Logger) {
log.SetDefaultLogger(logger)
pluginLoader = pl
gethContext = ctx
log.Debug("Initialized statediff plugin")
}
func InitializeNode(stack core.Node, b core.Backend) {
@ -58,7 +53,7 @@ func InitializeNode(stack core.Node, b core.Backend) {
log.Error("failed to construct indexer", "error", err)
}
}
service, err := statediff.NewService(serviceConfig, blockchain, backend, indexer)
service, err = statediff.NewService(serviceConfig, blockchain, backend, indexer)
if err != nil {
log.Error("failed to construct service", "error", err)
}
@ -66,6 +61,8 @@ func InitializeNode(stack core.Node, b core.Backend) {
log.Error("failed to start service", "error", err)
return
}
log.Debug("Initialized statediff plugin")
}
func GetAPIs(stack core.Node, backend core.Backend) []core.API {

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -17,13 +17,10 @@
package statediff_test
import (
"bytes"
"io"
"log"
"math/big"
"os"
"testing"
"github.com/cerc-io/eth-testing/chaindata/mainnet"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
@ -35,23 +32,21 @@ import (
"github.com/ethereum/go-ethereum/rlp"
statediff "github.com/cerc-io/plugeth-statediff"
"github.com/cerc-io/plugeth-statediff/adapt"
"github.com/cerc-io/plugeth-statediff/indexer/ipld"
"github.com/cerc-io/plugeth-statediff/test_helpers"
sdtypes "github.com/cerc-io/plugeth-statediff/types"
)
func init() {
test_helpers.SilenceLogs()
test_helpers.QuietLogs()
}
var (
db ethdb.Database
genesisBlock, block0, block1, block2, block3 *types.Block
block1CoinbaseAddr, block2CoinbaseAddr, block3CoinbaseAddr common.Address
block1CoinbaseHash, block2CoinbaseHash, block3CoinbaseHash common.Hash
builder statediff.Builder
emptyStorage = make([]sdtypes.StorageLeafNode, 0)
emptyStorage []sdtypes.StorageLeafNode
// block 1 data
block1CoinbaseAccount = &types.StateAccount{
@ -426,50 +421,19 @@ var (
func init() {
db = rawdb.NewMemoryDatabase()
genesisBlock = core.DefaultGenesisBlock().MustCommit(db)
genBy, err := rlp.EncodeToBytes(genesisBlock)
if err != nil {
log.Fatal(err)
}
var block0RLP []byte
block0, block0RLP, err = loadBlockFromRLPFile("./block0_rlp")
if err != nil {
log.Fatal(err)
}
if !bytes.Equal(genBy, block0RLP) {
log.Fatal("mainnet genesis blocks do not match")
}
block1, _, err = loadBlockFromRLPFile("./block1_rlp")
if err != nil {
log.Fatal(err)
}
block1CoinbaseAddr = block1.Coinbase()
block1CoinbaseHash = crypto.Keccak256Hash(block1CoinbaseAddr.Bytes())
block2, _, err = loadBlockFromRLPFile("./block2_rlp")
if err != nil {
log.Fatal(err)
}
block2CoinbaseAddr = block2.Coinbase()
block2CoinbaseHash = crypto.Keccak256Hash(block2CoinbaseAddr.Bytes()) // 0x08d4679cbcf198c1741a6f4e4473845659a30caa8b26f8d37a0be2e2bc0d8892
block3, _, err = loadBlockFromRLPFile("./block3_rlp")
if err != nil {
log.Fatal(err)
}
block3CoinbaseAddr = block3.Coinbase()
block3CoinbaseHash = crypto.Keccak256Hash(block3CoinbaseAddr.Bytes())
}
func loadBlockFromRLPFile(filename string) (*types.Block, []byte, error) {
f, err := os.Open(filename)
if err != nil {
return nil, nil, err
}
defer f.Close()
blockRLP, err := io.ReadAll(f)
if err != nil {
return nil, nil, err
}
block := new(types.Block)
return block, blockRLP, rlp.DecodeBytes(blockRLP, block)
blocks := mainnet.GetBlocks()
block0 = blocks[0]
block1 = blocks[1]
block2 = blocks[2]
block3 = blocks[3]
// 0x4be8251692195afc818c92b485fcb8a4691af89cbe5a2ab557b83a4261be2a9a
block1CoinbaseHash = crypto.Keccak256Hash(block1.Coinbase().Bytes())
// 0x08d4679cbcf198c1741a6f4e4473845659a30caa8b26f8d37a0be2e2bc0d8892
block2CoinbaseHash = crypto.Keccak256Hash(block2.Coinbase().Bytes())
// 0x6efa174f00e64521a535f35e67c1aa241951c791639b2f3d060f49c5d9fa8b9e
block3CoinbaseHash = crypto.Keccak256Hash(block3.Coinbase().Bytes())
}
func TestBuilderOnMainnetBlocks(t *testing.T) {
@ -659,7 +623,7 @@ func TestBuilderOnMainnetBlocks(t *testing.T) {
}
test_helpers.RunBuilderTests(t,
statediff.NewBuilder(adapt.GethStateView(chain.StateCache())),
chain.StateCache(),
tests, params, test_helpers.CheckedRoots{
block1: block1RootBranchNode,
block2: block2RootBranchNode,

View File

@ -1,25 +1,30 @@
#!/bin/bash
# Builds and deploys a stack with only what we need.
# This script assumes we are running in the project root.
set -eux
set -e
cluster="${1:-test}"
laconic_so="${LACONIC_SO:-laconic-so} --stack fixturenet-plugeth-tx --verbose"
CONFIG_DIR=$(readlink -f "${CONFIG_DIR:-$(mktemp -d)}")
# By default assume we are running in the project root
export CERC_REPO_BASE_DIR="${CERC_REPO_BASE_DIR:-$(realpath $(git rev-parse --show-toplevel)/..)}"
# Point stack-orchestrator to the multi-project root
export CERC_REPO_BASE_DIR="${CERC_REPO_BASE_DIR:-$(git rev-parse --show-toplevel)/..}"
# v5 migrations only go up to version 18
echo CERC_STATEDIFF_DB_GOOSE_MIN_VER=18 >> $CONFIG_DIR/stack.env
# Build and deploy a cluster with only what we need from the stack
$laconic_so setup-repositories \
--exclude github.com/dboreham/foundry,github.com/cerc-io/tx-spammer,github.com/cerc-io/ipld-eth-server,git.vdb.to/cerc-io/plugeth,git.vdb.to/cerc-io/plugeth-statediff \
--branches-file ./test/stack-refs.txt
set -x
$laconic_so build-containers \
--exclude cerc/ipld-eth-server,cerc/keycloak,cerc/tx-spammer,cerc/foundry,cerc/plugeth,cerc/plugeth-statediff
if [[ -z $SKIP_BUILD ]]; then
$laconic_so setup-repositories \
--exclude github.com/dboreham/foundry,github.com/cerc-io/tx-spammer,github.com/cerc-io/ipld-eth-server,git.vdb.to/cerc-io/plugeth,git.vdb.to/cerc-io/plugeth-statediff \
--branches-file ./test/stack-refs.txt
$laconic_so build-containers \
--exclude cerc/ipld-eth-server,cerc/keycloak,cerc/tx-spammer,cerc/foundry,cerc/plugeth,cerc/plugeth-statediff
fi
$laconic_so deploy \
--exclude foundry,keycloak,tx-spammer,ipld-eth-server \

View File

@ -819,23 +819,23 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
return err
}
output := func(node types2.StateLeafNode) error {
nodeSink := func(node types2.StateLeafNode) error {
defer metrics.ReportAndUpdateDuration("statediff output", time.Now(), log,
metrics.IndexerMetrics.OutputTimer)
return sds.indexer.PushStateNode(tx, node, block.Hash().String())
}
ipldOutput := func(c types2.IPLD) error {
ipldSink := func(c types2.IPLD) error {
defer metrics.ReportAndUpdateDuration("statediff ipldOutput", time.Now(), log,
metrics.IndexerMetrics.IPLDOutputTimer)
return sds.indexer.PushIPLD(tx, c)
}
err = sds.Builder.WriteStateDiffObject(Args{
err = sds.Builder.WriteStateDiff(Args{
NewStateRoot: block.Root(),
OldStateRoot: parentRoot,
BlockHash: block.Hash(),
BlockNumber: block.Number(),
}, params, output, ipldOutput)
}, params, nodeSink, ipldSink)
// TODO this anti-pattern needs to be sorted out eventually
if err = tx.Submit(err); err != nil {
@ -885,7 +885,6 @@ func (sds *Service) UnsubscribeWriteStatus(id SubID) {
// add | remove | set | clear
func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.WatchAddressArg) error {
sds.writeLoopParams.Lock()
log.Debug("WatchAddress: locked sds.writeLoopParams")
defer sds.writeLoopParams.Unlock()
// get the current block number

View File

@ -41,7 +41,7 @@ import (
)
func init() {
test_helpers.SilenceLogs()
test_helpers.QuietLogs()
}
func TestServiceLoop(t *testing.T) {

View File

@ -3,7 +3,7 @@ services:
restart: on-failure
depends_on:
- ipld-eth-db
image: git.vdb.to/cerc-io/ipld-eth-db/ipld-eth-db:v5.0.2-alpha
image: git.vdb.to/cerc-io/ipld-eth-db/ipld-eth-db:v5.0.5-alpha
environment:
DATABASE_USER: "vdbm"
DATABASE_NAME: "cerc_testing"

View File

@ -2,15 +2,15 @@ package test_helpers
import (
"bytes"
"encoding/json"
"sort"
"testing"
"github.com/cerc-io/plugeth-statediff"
statediff "github.com/cerc-io/plugeth-statediff"
"github.com/cerc-io/plugeth-statediff/adapt"
sdtypes "github.com/cerc-io/plugeth-statediff/types"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
"github.com/stretchr/testify/require"
)
@ -24,44 +24,23 @@ type CheckedRoots = map[*types.Block][]byte
func RunBuilderTests(
t *testing.T,
builder statediff.Builder,
sdb state.Database,
tests []TestCase,
params statediff.Params,
roots CheckedRoots,
) {
builder := statediff.NewBuilder(adapt.GethStateView(sdb))
for _, test := range tests {
diff, err := builder.BuildStateDiffObject(test.Args, params)
if err != nil {
t.Error(err)
}
receivedStateDiffRlp, err := rlp.EncodeToBytes(&diff)
if err != nil {
t.Error(err)
}
expectedStateDiffRlp, err := rlp.EncodeToBytes(test.Expected)
if err != nil {
t.Error(err)
}
sort.Slice(receivedStateDiffRlp, func(i, j int) bool {
return receivedStateDiffRlp[i] < receivedStateDiffRlp[j]
})
sort.Slice(expectedStateDiffRlp, func(i, j int) bool {
return expectedStateDiffRlp[i] < expectedStateDiffRlp[j]
})
if !bytes.Equal(receivedStateDiffRlp, expectedStateDiffRlp) {
actualb, err := json.Marshal(diff)
require.NoError(t, err)
expectedb, err := json.Marshal(test.Expected)
require.NoError(t, err)
t.Run(test.Name, func(t *testing.T) {
diff, err := builder.BuildStateDiffObject(test.Args, params)
if err != nil {
t.Error(err)
}
var expected, actual interface{}
err = json.Unmarshal(expectedb, &expected)
require.NoError(t, err)
err = json.Unmarshal(actualb, &actual)
require.NoError(t, err)
require.Equal(t, expected, actual, test.Name)
}
normalize(test.Expected)
normalize(&diff)
require.Equal(t, *test.Expected, diff)
})
}
// Let's also confirm that our root state nodes form the state root hash in the headers
for block, node := range roots {
@ -69,3 +48,24 @@ func RunBuilderTests(
"expected root does not match actual root", block.Number())
}
}
// Sorts contained state nodes, storage nodes, and IPLDs
func normalize(diff *sdtypes.StateObject) {
sort.Slice(diff.IPLDs, func(i, j int) bool {
return diff.IPLDs[i].CID < diff.IPLDs[j].CID
})
sort.Slice(diff.Nodes, func(i, j int) bool {
return bytes.Compare(
diff.Nodes[i].AccountWrapper.LeafKey,
diff.Nodes[j].AccountWrapper.LeafKey,
) < 0
})
for _, node := range diff.Nodes {
sort.Slice(node.StorageDiff, func(i, j int) bool {
return bytes.Compare(
node.StorageDiff[i].LeafKey,
node.StorageDiff[j].LeafKey,
) < 0
})
}
}

View File

@ -40,7 +40,7 @@ func (builder *Builder) BuildStateDiffObject(args statediff.Args, params statedi
}
// BuildStateDiffObject mock method
func (builder *Builder) WriteStateDiffObject(args statediff.Args, params statediff.Params, output sdtypes.StateNodeSink, iplds sdtypes.IPLDSink) error {
func (builder *Builder) WriteStateDiff(args statediff.Args, params statediff.Params, output sdtypes.StateNodeSink, iplds sdtypes.IPLDSink) error {
builder.Args = args
builder.Params = params

View File

@ -6,8 +6,9 @@ import (
"github.com/cerc-io/plugeth-statediff/utils/log"
)
// The geth sync logs are noisy, it can be useful to silence them
func SilenceLogs() {
// QuietLogs silences the geth logs and sets the plugin test log level to "warning"
// The geth sync logs are noisy, so it can be nice to silence them.
func QuietLogs() {
geth_log.Root().SetHandler(geth_log.DiscardHandler())
log.TestLogger.SetLevel(2)
}

View File

@ -1,80 +0,0 @@
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Contains a batch of utility type declarations used by the tests. As the node
// operates on unique types, a lot of them are needed to check various features.
package trie_helpers
import (
"sort"
"strings"
"time"
metrics2 "github.com/cerc-io/plugeth-statediff/indexer/database/metrics"
"github.com/cerc-io/plugeth-statediff/types"
)
// SortKeys sorts the keys in the account map
func SortKeys(data types.AccountMap) []string {
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.SortKeysTimer)
keys := make([]string, 0, len(data))
for key := range data {
keys = append(keys, key)
}
sort.Strings(keys)
return keys
}
// FindIntersection finds the set of strings from both arrays that are equivalent
// a and b must first be sorted
// this is used to find which keys have been both "deleted" and "created" i.e. they were updated
func FindIntersection(a, b []string) []string {
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.FindIntersectionTimer)
lenA := len(a)
lenB := len(b)
iOfA, iOfB := 0, 0
updates := make([]string, 0)
if iOfA >= lenA || iOfB >= lenB {
return updates
}
for {
switch strings.Compare(a[iOfA], b[iOfB]) {
// -1 when a[iOfA] < b[iOfB]
case -1:
iOfA++
if iOfA >= lenA {
return updates
}
// 0 when a[iOfA] == b[iOfB]
case 0:
updates = append(updates, a[iOfA])
iOfA++
iOfB++
if iOfA >= lenA || iOfB >= lenB {
return updates
}
// 1 when a[iOfA] > b[iOfB]
case 1:
iOfB++
if iOfB >= lenB {
return updates
}
}
}
}

View File

@ -1 +1,174 @@
package utils
import (
"bytes"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/trie"
)
type symmDiffIterator struct {
a, b iterState // Nodes returned are those in b - a and a - b (keys only)
yieldFromA bool // Whether next node comes from a
count int // Number of nodes scanned on either trie
eqPathIndex int // Count index of last pair of equal paths, to detect an updated key
}
// NewSymmetricDifferenceIterator constructs a trie.NodeIterator that iterates over the symmetric difference
// of elements in a and b, i.e., the elements in a that are not in b, and vice versa.
// Returns the iterator, and a pointer to an integer recording the number of nodes seen.
func NewSymmetricDifferenceIterator(a, b trie.NodeIterator) (*symmDiffIterator, *int) {
it := &symmDiffIterator{
a: iterState{a, true},
b: iterState{b, true},
// common paths are detected by a distance <=1 from this index, so put it out of reach
eqPathIndex: -2,
}
return it, &it.count
}
// pairs an iterator with a cache of its valid status
type iterState struct {
trie.NodeIterator
valid bool
}
func (st *iterState) Next(descend bool) bool {
st.valid = st.NodeIterator.Next(descend)
return st.valid
}
func (it *symmDiffIterator) curr() *iterState {
if it.yieldFromA {
return &it.a
}
return &it.b
}
// FromA returns true if the current node is sourced from A.
func (it *symmDiffIterator) FromA() bool {
return it.yieldFromA
}
// CommonPath returns true if a node with the current path exists in each sub-iterator - i.e. it
// represents an updated node.
func (it *symmDiffIterator) CommonPath() bool {
return it.count-it.eqPathIndex <= 1
}
func (it *symmDiffIterator) Hash() common.Hash {
return it.curr().Hash()
}
func (it *symmDiffIterator) Parent() common.Hash {
return it.curr().Parent()
}
func (it *symmDiffIterator) Leaf() bool {
return it.curr().Leaf()
}
func (it *symmDiffIterator) LeafKey() []byte {
return it.curr().LeafKey()
}
func (it *symmDiffIterator) LeafBlob() []byte {
return it.curr().LeafBlob()
}
func (it *symmDiffIterator) LeafProof() [][]byte {
return it.curr().LeafProof()
}
func (it *symmDiffIterator) Path() []byte {
return it.curr().Path()
}
func (it *symmDiffIterator) NodeBlob() []byte {
return it.curr().NodeBlob()
}
func (it *symmDiffIterator) AddResolver(resolver trie.NodeResolver) {
panic("not implemented")
}
func (it *symmDiffIterator) Next(bool) bool {
// NodeIterators start in a "pre-valid" state, so the first Next advances to a valid node.
if it.count == 0 {
if it.a.Next(true) {
it.count++
}
if it.b.Next(true) {
it.count++
}
} else {
if it.curr().Next(true) {
it.count++
}
}
it.seek()
return it.a.valid || it.b.valid
}
func (it *symmDiffIterator) seek() {
// Invariants:
// - At the end of the function, the sub-iterator with the lexically lesser path
// points to the next element
// - Said sub-iterator never points to an element present in the other
for {
if !it.b.valid {
it.yieldFromA = true
return
}
if !it.a.valid {
it.yieldFromA = false
return
}
cmp := bytes.Compare(it.a.Path(), it.b.Path())
if cmp == 0 {
it.eqPathIndex = it.count
cmp = compareNodes(&it.a, &it.b)
}
switch cmp {
case -1:
it.yieldFromA = true
return
case 1:
it.yieldFromA = false
return
case 0:
// if A and B have the same path and non-zero hash, they are identical and we can skip
// the whole subtree
noHash := it.a.Hash() == common.Hash{}
if it.a.Next(noHash) {
it.count++
}
if it.b.Next(noHash) {
it.count++
}
}
}
}
func (it *symmDiffIterator) Error() error {
if err := it.a.Error(); err != nil {
return err
}
return it.b.Error()
}
roysc marked this conversation as resolved Outdated

Remove dead code?

Remove dead code?
func compareNodes(a, b trie.NodeIterator) int {
if a.Leaf() && !b.Leaf() {
return -1
} else if b.Leaf() && !a.Leaf() {
return 1
}
if cmp := bytes.Compare(a.Hash().Bytes(), b.Hash().Bytes()); cmp != 0 {
return cmp
}
if a.Leaf() && b.Leaf() {
return bytes.Compare(a.LeafBlob(), b.LeafBlob())
}
return 0
}

224
utils/iterator_test.go Normal file
View File

@ -0,0 +1,224 @@
package utils_test
import (
"testing"
"github.com/cerc-io/eth-testing/chaindata/mainnet"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/trie"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/cerc-io/plugeth-statediff/test_helpers"
"github.com/cerc-io/plugeth-statediff/utils"
)
type kvs struct{ k, v string }
var (
testdata1 = []kvs{
{"barb", "ba"},
{"bard", "bc"},
{"bars", "bb"},
{"bar", "b"},
{"fab", "z"},
{"food", "ab"},
{"foo", "a"},
}
testdata2 = []kvs{
{"aardvark", "c"},
{"bar", "b"},
{"barb", "bd"},
{"bars", "be"},
{"fab", "z"},
{"foo", "a"},
{"foos", "aa"},
{"jars", "d"},
}
)
func TestSymmetricDifferenceIterator(t *testing.T) {
t.Run("with no difference", func(t *testing.T) {
db := trie.NewDatabase(rawdb.NewMemoryDatabase())
triea := trie.NewEmpty(db)
di, count := utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), triea.NodeIterator(nil))
for di.Next(true) {
t.Errorf("iterator should not yield any elements")
}
assert.Equal(t, 0, *count)
triea.MustUpdate([]byte("foo"), []byte("bar"))
di, count = utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), triea.NodeIterator(nil))
for di.Next(true) {
t.Errorf("iterator should not yield any elements")
}
assert.Equal(t, 2, *count)
trieb := trie.NewEmpty(db)
di, count = utils.NewSymmetricDifferenceIterator(
triea.NodeIterator([]byte("jars")),
trieb.NodeIterator(nil))
for di.Next(true) {
t.Errorf("iterator should not yield any elements, but got key %s", di.Path())
}
assert.Equal(t, 0, *count)
// // TODO will fail until merged: https://github.com/ethereum/go-ethereum/pull/27838
// di, count = utils.NewSymmetricDifferenceIterator(
// triea.NodeIterator([]byte("food")),
// trieb.NodeIterator(nil))
// for di.Next(true) {
// t.Errorf("iterator should not yield any elements, but got key %s", di.Path())
// }
// assert.Equal(t, 0, *count)
})
t.Run("small difference", func(t *testing.T) {
dba := trie.NewDatabase(rawdb.NewMemoryDatabase())
triea := trie.NewEmpty(dba)
dbb := trie.NewDatabase(rawdb.NewMemoryDatabase())
trieb := trie.NewEmpty(dbb)
trieb.MustUpdate([]byte("foo"), []byte("bar"))
di, count := utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), trieb.NodeIterator(nil))
leaves := 0
for di.Next(true) {
if di.Leaf() {
assert.False(t, di.CommonPath())
assert.Equal(t, "foo", string(di.LeafKey()))
assert.Equal(t, "bar", string(di.LeafBlob()))
leaves++
}
}
assert.Equal(t, 1, leaves)
assert.Equal(t, 2, *count)
trieb.MustUpdate([]byte("quux"), []byte("bars"))
di, count = utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), trieb.NodeIterator([]byte("quux")))
leaves = 0
for di.Next(true) {
if di.Leaf() {
assert.False(t, di.CommonPath())
assert.Equal(t, "quux", string(di.LeafKey()))
assert.Equal(t, "bars", string(di.LeafBlob()))
leaves++
}
}
assert.Equal(t, 1, leaves)
assert.Equal(t, 1, *count)
})
dba := trie.NewDatabase(rawdb.NewMemoryDatabase())
triea := trie.NewEmpty(dba)
for _, val := range testdata1 {
triea.MustUpdate([]byte(val.k), []byte(val.v))
}
dbb := trie.NewDatabase(rawdb.NewMemoryDatabase())
trieb := trie.NewEmpty(dbb)
for _, val := range testdata2 {
trieb.MustUpdate([]byte(val.k), []byte(val.v))
}
onlyA := make(map[string]string)
onlyB := make(map[string]string)
var deletions, creations []string
it, _ := utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), trieb.NodeIterator(nil))
for it.Next(true) {
if !it.Leaf() {
continue
}
key, value := string(it.LeafKey()), string(it.LeafBlob())
if it.FromA() {
onlyA[key] = value
if !it.CommonPath() {
deletions = append(deletions, key)
}
} else {
onlyB[key] = value
if !it.CommonPath() {
creations = append(creations, key)
}
}
}
expectedOnlyA := map[string]string{
"barb": "ba",
"bard": "bc",
"bars": "bb",
"food": "ab",
}
expectedOnlyB := map[string]string{
"aardvark": "c",
"barb": "bd",
"bars": "be",
"foos": "aa",
"jars": "d",
}
expectedDeletions := []string{
"bard",
"food",
}
expectedCreations := []string{
"aardvark",
"foos",
"jars",
}
assert.Equal(t, expectedOnlyA, onlyA)
assert.Equal(t, expectedOnlyB, onlyB)
assert.Equal(t, expectedDeletions, deletions)
assert.Equal(t, expectedCreations, creations)
}
// compare the paths traversed by the geth difference iterator and symmetric difference iterator
// within a sample of mainnet data.
func TestCompareDifferenceIterators(t *testing.T) {
test_helpers.QuietLogs()
db := rawdb.NewMemoryDatabase()
core.DefaultGenesisBlock().MustCommit(db)
blocks := mainnet.GetBlocks()
chain, _ := core.NewBlockChain(db, nil, nil, nil, ethash.NewFaker(), vm.Config{}, nil, nil)
_, err := chain.InsertChain(blocks[1:])
if err != nil {
t.Fatal(err)
}
treeA, err := chain.StateCache().OpenTrie(blocks[1].Root())
if err != nil {
t.Fatal(err)
}
treeB, err := chain.StateCache().OpenTrie(blocks[2].Root())
if err != nil {
t.Fatal(err)
}
// collect the paths of nodes exclusive to A and B separately, then make sure the symmetric
// iterator produces the same sets
var pathsA, pathsB [][]byte
itBonly, _ := trie.NewDifferenceIterator(treeA.NodeIterator(nil), treeB.NodeIterator(nil))
for itBonly.Next(true) {
pathsB = append(pathsB, itBonly.Path())
}
itAonly, _ := trie.NewDifferenceIterator(treeB.NodeIterator(nil), treeA.NodeIterator(nil))
for itAonly.Next(true) {
pathsA = append(pathsA, itAonly.Path())
}
itSym, _ := utils.NewSymmetricDifferenceIterator(treeA.NodeIterator(nil), treeB.NodeIterator(nil))
var idxA, idxB int
for itSym.Next(true) {
if itSym.FromA() {
require.Equal(t, pathsA[idxA], itSym.Path())
idxA++
} else {
require.Equal(t, pathsB[idxB], itSym.Path())
idxB++
}
}
require.Equal(t, len(pathsA), idxA)
require.Equal(t, len(pathsB), idxB)
}

View File

@ -1,9 +1,12 @@
package utils
import (
"encoding/json"
"fmt"
"os"
"github.com/cerc-io/plugeth-statediff/utils/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
)
@ -21,3 +24,38 @@ func MustDecode[T any](buf []byte) *T {
}
return &ret
}
// LoadConfig loads chain config from json file
func LoadConfig(chainConfigPath string) (*params.ChainConfig, error) {
file, err := os.Open(chainConfigPath)
if err != nil {
log.Error("Failed to read chain config file", "error", err)
return nil, err
}
defer file.Close()
chainConfig := new(params.ChainConfig)
if err := json.NewDecoder(file).Decode(chainConfig); err != nil {
log.Error("invalid chain config file", "error", err)
return nil, err
}
log.Debug("Using chain config", "path", chainConfigPath, "content", chainConfig)
return chainConfig, nil
}
// ChainConfig returns the appropriate ethereum chain config for the provided chain id
func ChainConfig(chainID uint64) (*params.ChainConfig, error) {
switch chainID {
case 1:
return params.MainnetChainConfig, nil
case 4:
return params.RinkebyChainConfig, nil
case 5:
return params.GoerliChainConfig, nil
default:
return nil, fmt.Errorf("chain config for chainid %d not available", chainID)
}
}