Use symmetric difference iterator #11

Merged
roysc merged 27 commits from symmetric-diff-iterator into main 2023-09-20 03:22:19 +00:00
26 changed files with 865 additions and 702 deletions

View File

@ -11,49 +11,49 @@ on:
# Needed until we can incorporate docker startup into the executor container # Needed until we can incorporate docker startup into the executor container
env: env:
DOCKER_HOST: unix:///var/run/dind.sock DOCKER_HOST: unix:///var/run/dind.sock
SO_VERSION: v1.1.0-c30c779-202309082138
jobs: jobs:
unit-tests: unit-tests:
name: "Run unit tests" name: Run unit tests
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v3
- uses: actions/setup-go@v3 - uses: actions/setup-go@v4
with: with:
go-version-file: 'go.mod' go-version-file: 'go.mod'
check-latest: true check-latest: true
- name: "Run dockerd" - name: Run dockerd
run: | run: |
dockerd -H $DOCKER_HOST --userland-proxy=false & dockerd -H $DOCKER_HOST --userland-proxy=false &
sleep 5 sleep 5
- name: "Run DB container" - name: Run DB container
run: | run: |
docker compose -f test/compose.yml up --wait docker compose -f test/compose.yml up --wait
- name: "Set up Gitea access token" - name: Set up Gitea access token
env: env:
TOKEN: ${{ secrets.CICD_REPO_TOKEN }} TOKEN: ${{ secrets.CICD_REPO_TOKEN }}
run: | run: |
git config --global url."https://$TOKEN:@git.vdb.to/".insteadOf https://git.vdb.to/ git config --global url."https://$TOKEN:@git.vdb.to/".insteadOf https://git.vdb.to/
- name: "Run tests" - name: Run tests
run: go test -v ./... run: go test -p 1 -v ./...
integration-tests: integration-tests:
name: "Run integration tests" name: Run integration tests
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v3
with: with:
path: ./plugeth-statediff path: ./plugeth-statediff
# TODO: replace with release
- uses: actions/checkout@v3 - uses: actions/checkout@v3
with: with:
repository: cerc-io/plugeth repository: cerc-io/plugeth
ref: statediff ref: statediff
path: ./plugeth path: ./plugeth
- name: "Run dockerd" - name: Run dockerd
run: dockerd -H $DOCKER_HOST --userland-proxy=false & run: dockerd -H $DOCKER_HOST --userland-proxy=false &
# These images need access tokens configured # These images need access tokens configured
- name: "Build docker image" - name: Build docker image
env: env:
TOKEN: ${{ secrets.CICD_REPO_TOKEN }} TOKEN: ${{ secrets.CICD_REPO_TOKEN }}
run: | run: |
@ -63,26 +63,27 @@ jobs:
docker build ./plugeth -t cerc/plugeth:local \ docker build ./plugeth -t cerc/plugeth:local \
--build-arg GIT_VDBTO_TOKEN="$TOKEN" --build-arg GIT_VDBTO_TOKEN="$TOKEN"
- name: "Install stack-orchestrator" - name: Install stack-orchestrator
uses: actions/checkout@v3 run: |
with: curl -L -O https://github.com/cerc-io/stack-orchestrator/releases/download/$SO_VERSION/laconic-so
repository: cerc-io/stack-orchestrator chmod +x laconic-so
ref: v1.1.0-e856616-202308032031 - name: Clone system-tests
path: ./stack-orchestrator
- run: |
apt-get update && apt-get install -y python3-pip
pip install ./stack-orchestrator
- name: "Clone system-tests"
uses: actions/checkout@v3 uses: actions/checkout@v3
with: with:
repository: cerc-io/system-tests repository: cerc-io/system-tests
ref: roy/plugeth-fixes ref: plugeth-compat
path: ./system-tests path: ./system-tests
token: ${{ secrets.CICD_REPO_TOKEN }} token: ${{ secrets.CICD_REPO_TOKEN }}
- name: "Run testnet stack" - name: Run testnet stack
working-directory: ./plugeth-statediff working-directory: ./plugeth-statediff
env:
LACONIC_SO: ../laconic-so
run: ./scripts/integration-setup.sh run: ./scripts/integration-setup.sh
- name: "Run tests" - name: Install Python
uses: actions/setup-python@v4
with:
python-version: 3.10
- name: Run tests
working-directory: ./system-tests working-directory: ./system-tests
run: | run: |
pip install pytest pip install pytest

View File

@ -9,7 +9,8 @@ $(MOCKS_DIR)/gen_backend.go:
github.com/openrelayxyz/plugeth-utils/core Backend,Downloader github.com/openrelayxyz/plugeth-utils/core Backend,Downloader
docker-image: mocks docker-image: mocks
docker build . -t "cerc/plugeth-statediff:local" docker build . -t "cerc/plugeth-statediff:local" \
--build-arg GIT_VDBTO_TOKEN
.PHONY: docker-image .PHONY: docker-image
# Local build # Local build

View File

@ -21,7 +21,6 @@ package statediff
import ( import (
"bytes" "bytes"
"encoding/hex"
"fmt" "fmt"
"time" "time"
@ -35,7 +34,6 @@ import (
"github.com/cerc-io/plugeth-statediff/indexer/database/metrics" "github.com/cerc-io/plugeth-statediff/indexer/database/metrics"
"github.com/cerc-io/plugeth-statediff/indexer/ipld" "github.com/cerc-io/plugeth-statediff/indexer/ipld"
"github.com/cerc-io/plugeth-statediff/indexer/shared" "github.com/cerc-io/plugeth-statediff/indexer/shared"
"github.com/cerc-io/plugeth-statediff/trie_helpers"
sdtypes "github.com/cerc-io/plugeth-statediff/types" sdtypes "github.com/cerc-io/plugeth-statediff/types"
"github.com/cerc-io/plugeth-statediff/utils" "github.com/cerc-io/plugeth-statediff/utils"
"github.com/cerc-io/plugeth-statediff/utils/log" "github.com/cerc-io/plugeth-statediff/utils/log"
@ -44,39 +42,34 @@ import (
var ( var (
emptyNode, _ = rlp.EncodeToBytes(&[]byte{}) emptyNode, _ = rlp.EncodeToBytes(&[]byte{})
emptyContractRoot = crypto.Keccak256Hash(emptyNode) emptyContractRoot = crypto.Keccak256Hash(emptyNode)
nullCodeHash = crypto.Keccak256Hash([]byte{}).Bytes() nullCodeHash = crypto.Keccak256([]byte{})
zeroHash common.Hash zeroHash common.Hash
) )
// Builder interface exposes the method for building a state diff between two blocks // Builder interface exposes the method for building a state diff between two blocks
type Builder interface { type Builder interface {
BuildStateDiffObject(args Args, params Params) (sdtypes.StateObject, error) BuildStateDiffObject(Args, Params) (sdtypes.StateObject, error)
WriteStateDiffObject(args Args, params Params, output sdtypes.StateNodeSink, ipldOutput sdtypes.IPLDSink) error WriteStateDiff(Args, Params, sdtypes.StateNodeSink, sdtypes.IPLDSink) error
} }
type StateDiffBuilder struct { type StateDiffBuilder struct {
StateCache adapt.StateView // state cache is safe for concurrent reads
stateCache adapt.StateView
} }
type IterPair struct { type iterPair struct {
Older, Newer trie.NodeIterator Older, Newer trie.NodeIterator
} }
func StateNodeAppender(nodes *[]sdtypes.StateLeafNode) sdtypes.StateNodeSink { type accountUpdate struct {
return func(node sdtypes.StateLeafNode) error { new sdtypes.AccountWrapper
*nodes = append(*nodes, node) oldRoot common.Hash
return nil
}
} }
func StorageNodeAppender(nodes *[]sdtypes.StorageLeafNode) sdtypes.StorageNodeSink { type accountUpdateMap map[string]*accountUpdate
return func(node sdtypes.StorageLeafNode) error {
*nodes = append(*nodes, node) func appender[T any](to *[]T) func(T) error {
return nil return func(a T) error {
} *to = append(*to, a)
}
func IPLDMappingAppender(iplds *[]sdtypes.IPLD) sdtypes.IPLDSink {
return func(c sdtypes.IPLD) error {
*iplds = append(*iplds, c)
return nil return nil
} }
} }
@ -84,7 +77,7 @@ func IPLDMappingAppender(iplds *[]sdtypes.IPLD) sdtypes.IPLDSink {
// NewBuilder is used to create a statediff builder // NewBuilder is used to create a statediff builder
func NewBuilder(stateCache adapt.StateView) Builder { func NewBuilder(stateCache adapt.StateView) Builder {
return &StateDiffBuilder{ return &StateDiffBuilder{
StateCache: stateCache, // state cache is safe for concurrent reads stateCache: stateCache,
} }
} }
@ -93,7 +86,7 @@ func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (sdt
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStateDiffObjectTimer) defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStateDiffObjectTimer)
var stateNodes []sdtypes.StateLeafNode var stateNodes []sdtypes.StateLeafNode
var iplds []sdtypes.IPLD var iplds []sdtypes.IPLD
err := sdb.WriteStateDiffObject(args, params, StateNodeAppender(&stateNodes), IPLDMappingAppender(&iplds)) err := sdb.WriteStateDiff(args, params, appender(&stateNodes), appender(&iplds))
if err != nil { if err != nil {
return sdtypes.StateObject{}, err return sdtypes.StateObject{}, err
} }
@ -105,126 +98,110 @@ func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (sdt
}, nil }, nil
} }
// WriteStateDiffObject writes a statediff object to output sinks // WriteStateDiff writes a statediff object to output sinks
func (sdb *StateDiffBuilder) WriteStateDiffObject(args Args, params Params, output sdtypes.StateNodeSink, func (sdb *StateDiffBuilder) WriteStateDiff(
ipldOutput sdtypes.IPLDSink) error { args Args, params Params,
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.WriteStateDiffObjectTimer) nodeSink sdtypes.StateNodeSink,
ipldSink sdtypes.IPLDSink,
) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.WriteStateDiffTimer)
// Load tries for old and new states // Load tries for old and new states
oldTrie, err := sdb.StateCache.OpenTrie(args.OldStateRoot) oldTrie, err := sdb.stateCache.OpenTrie(args.OldStateRoot)
if err != nil { if err != nil {
return fmt.Errorf("error creating trie for oldStateRoot: %w", err) return fmt.Errorf("error opening old state trie: %w", err)
} }
newTrie, err := sdb.StateCache.OpenTrie(args.NewStateRoot) newTrie, err := sdb.stateCache.OpenTrie(args.NewStateRoot)
if err != nil { if err != nil {
return fmt.Errorf("error creating trie for newStateRoot: %w", err) return fmt.Errorf("error opening new state trie: %w", err)
} }
// we do two state trie iterations: iters := iterPair{
// one for new/updated nodes, Older: oldTrie.NodeIterator(nil),
// one for deleted/updated nodes; Newer: newTrie.NodeIterator(nil),
// prepare 2 iterator instances for each task
iterPairs := []IterPair{
{
Older: oldTrie.NodeIterator([]byte{}),
Newer: newTrie.NodeIterator([]byte{}),
},
{
Older: oldTrie.NodeIterator([]byte{}),
Newer: newTrie.NodeIterator([]byte{}),
},
} }
logger := log.New("hash", args.BlockHash, "number", args.BlockNumber)
logger := log.New("hash", args.BlockHash.String(), "number", args.BlockNumber) err = sdb.processAccounts(
return sdb.BuildStateDiff(iterPairs, params, output, ipldOutput, logger, nil) iters.Older, iters.Newer,
} params.watchedAddressesLeafPaths,
nodeSink, ipldSink, logger)
func (sdb *StateDiffBuilder) BuildStateDiff(iterPairs []IterPair, params Params,
output sdtypes.StateNodeSink, ipldOutput sdtypes.IPLDSink, logger log.Logger, prefixPath []byte) error {
logger.Trace("statediff BEGIN BuildStateDiff")
defer metrics.ReportAndUpdateDuration("statediff END BuildStateDiff", time.Now(), logger, metrics.IndexerMetrics.BuildStateDiffTimer)
// collect a slice of all the nodes that were touched and exist at B (B-A)
// a map of their leafkey to all the accounts that were touched and exist at B
// and a slice of all the paths for the nodes in both of the above sets
diffAccountsAtB, err := sdb.createdAndUpdatedState(
iterPairs[0].Older, iterPairs[0].Newer, params.watchedAddressesLeafPaths, ipldOutput, logger, prefixPath)
if err != nil { if err != nil {
return fmt.Errorf("error collecting createdAndUpdatedNodes: %w", err) return fmt.Errorf("error collecting createdAndUpdatedNodes: %w", err)
} }
// collect a slice of all the nodes that existed at a path in A that doesn't exist in B
// a map of their leafkey to all the accounts that were touched and exist at A
diffAccountsAtA, err := sdb.deletedOrUpdatedState(
iterPairs[1].Older, iterPairs[1].Newer, diffAccountsAtB,
params.watchedAddressesLeafPaths, output, logger, prefixPath)
if err != nil {
return fmt.Errorf("error collecting deletedOrUpdatedNodes: %w", err)
}
// collect and sort the leafkey keys for both account mappings into a slice
t := time.Now()
createKeys := trie_helpers.SortKeys(diffAccountsAtB)
deleteKeys := trie_helpers.SortKeys(diffAccountsAtA)
logger.Debug("statediff BuildStateDiff sort", "duration", time.Since(t))
// and then find the intersection of these keys
// these are the leafkeys for the accounts which exist at both A and B but are different
// this also mutates the passed in createKeys and deleteKeys, removing the intersection keys
// and leaving the truly created or deleted keys in place
t = time.Now()
updatedKeys := trie_helpers.FindIntersection(createKeys, deleteKeys)
logger.Debug("statediff BuildStateDiff intersection",
"count", len(updatedKeys),
"duration", time.Since(t))
// build the diff nodes for the updated accounts using the mappings at both A and B as directed by the keys found as the intersection of the two
err = sdb.buildAccountUpdates(diffAccountsAtB, diffAccountsAtA, updatedKeys, output, ipldOutput, logger)
if err != nil {
return fmt.Errorf("error building diff for updated accounts: %w", err)
}
// build the diff nodes for created accounts
err = sdb.buildAccountCreations(diffAccountsAtB, output, ipldOutput, logger)
if err != nil {
return fmt.Errorf("error building diff for created accounts: %w", err)
}
return nil return nil
} }
// createdAndUpdatedState returns // processAccounts processes account creations and deletions, and returns a set of updated
// a slice of all the intermediate nodes that exist in a different state at B than A // existing accounts, indexed by leaf key.
// a mapping of their leafkeys to all the accounts that exist in a different state at B than A func (sdb *StateDiffBuilder) processAccounts(a, b trie.NodeIterator,
// and a slice of the paths for all of the nodes included in both watchedAddressesLeafPaths [][]byte,
func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator, nodeSink sdtypes.StateNodeSink, ipldSink sdtypes.IPLDSink,
watchedAddressesLeafPaths [][]byte, output sdtypes.IPLDSink, logger log.Logger, prefixPath []byte) (sdtypes.AccountMap, error) { logger log.Logger,
logger.Trace("statediff BEGIN createdAndUpdatedState") ) error {
defer metrics.ReportAndUpdateDuration("statediff END createdAndUpdatedState", time.Now(), logger, metrics.IndexerMetrics.CreatedAndUpdatedStateTimer) logger.Trace("statediff/processAccounts BEGIN")
diffAccountsAtB := make(sdtypes.AccountMap) defer metrics.ReportAndUpdateDuration("statediff/processAccounts END",
time.Now(), logger, metrics.IndexerMetrics.ProcessAccountsTimer)
// cache the RLP of the previous node, so when we hit a leaf we have the parent (containing) node updates := make(accountUpdateMap)
// Cache the RLP of the previous node. When we hit a value node this will be the parent blob.
var prevBlob []byte var prevBlob []byte
it, itCount := trie.NewDifferenceIterator(a, b) it, itCount := utils.NewSymmetricDifferenceIterator(a, b)
for it.Next(true) { for it.Next(true) {
// ignore node if it is not along paths of interest // ignore node if it is not along paths of interest
if !isWatchedPathPrefix(watchedAddressesLeafPaths, it.Path()) { if !isWatchedPathPrefix(watchedAddressesLeafPaths, it.Path()) {
continue continue
} }
// index values by leaf key if it.FromA() { // Node exists in the old trie
if it.Leaf() { if it.Leaf() {
// if it is a "value" node, we will index the value by leaf key var account types.StateAccount
accountW, err := sdb.processStateValueNode(it, prevBlob) if err := rlp.DecodeBytes(it.LeafBlob(), &account); err != nil {
if err != nil { return err
return nil, err }
leafKey := make([]byte, len(it.LeafKey()))
copy(leafKey, it.LeafKey())
if it.CommonPath() {
// If B also contains this leaf node, this is the old state of an updated account.
if update, ok := updates[string(leafKey)]; ok {
update.oldRoot = account.Root
} else {
updates[string(leafKey)] = &accountUpdate{oldRoot: account.Root}
}
} else {
// This node was removed, meaning the account was deleted. Emit empty
// "removed" records for the state node and all storage all storage slots.
err := sdb.processAccountDeletion(leafKey, account, nodeSink)
if err != nil {
return err
}
}
} }
if accountW == nil {
continue continue
} }
// for now, just add it to diffAccountsAtB // Node exists in the new trie
// we will compare to diffAccountsAtA to determine which diffAccountsAtB if it.Leaf() {
// were creations and which were updates and also identify accounts that were removed going A->B accountW, err := sdb.decodeStateLeaf(it, prevBlob)
diffAccountsAtB[hex.EncodeToString(accountW.LeafKey)] = *accountW if err != nil {
return err
}
if it.CommonPath() {
// If A also contains this leaf node, this is the new state of an updated account.
if update, ok := updates[string(accountW.LeafKey)]; ok {
update.new = *accountW
} else { } else {
// trie nodes will be written to blockstore only updates[string(accountW.LeafKey)] = &accountUpdate{new: *accountW}
// reminder that this includes leaf nodes, since the geth iterator.Leaf() actually }
// signifies a "value" node } else { // account was created
err := sdb.processAccountCreation(accountW, ipldSink, nodeSink)
if err != nil {
return err
}
}
} else {
// New trie nodes will be written to blockstore only.
// Reminder: this includes leaf nodes, since the geth iterator.Leaf() actually
// signifies a "value" node.
if it.Hash() == zeroHash { if it.Hash() == zeroHash {
continue continue
} }
@ -234,41 +211,108 @@ func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator,
if len(watchedAddressesLeafPaths) > 0 { if len(watchedAddressesLeafPaths) > 0 {
var elements []interface{} var elements []interface{}
if err := rlp.DecodeBytes(nodeVal, &elements); err != nil { if err := rlp.DecodeBytes(nodeVal, &elements); err != nil {
return nil, err return err
} }
ok, err := isLeaf(elements) ok, err := isLeaf(elements)
if err != nil { if err != nil {
return nil, err return err
} }
if ok {
partialPath := utils.CompactToHex(elements[0].([]byte)) partialPath := utils.CompactToHex(elements[0].([]byte))
valueNodePath := append(it.Path(), partialPath...) valueNodePath := append(it.Path(), partialPath...)
if ok && !isWatchedPath(watchedAddressesLeafPaths, valueNodePath) { if !isWatchedPath(watchedAddressesLeafPaths, valueNodePath) {
continue continue
} }
} }
if err := output(sdtypes.IPLD{ }
if err := ipldSink(sdtypes.IPLD{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, it.Hash().Bytes()).String(), CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, it.Hash().Bytes()).String(),
Content: nodeVal, Content: nodeVal,
}); err != nil { }); err != nil {
return nil, err return err
} }
prevBlob = nodeVal prevBlob = nodeVal
} }
} }
logger.Debug("statediff COUNTS createdAndUpdatedState", "it", itCount, "diffAccountsAtB", len(diffAccountsAtB))
for key, update := range updates {
var storageDiff []sdtypes.StorageLeafNode
err := sdb.processStorageUpdates(
update.oldRoot, update.new.Account.Root,
appender(&storageDiff), ipldSink,
)
if err != nil {
return fmt.Errorf("error processing incremental storage diffs for account with leafkey %x\r\nerror: %w", key, err)
}
if err = nodeSink(sdtypes.StateLeafNode{
AccountWrapper: update.new,
StorageDiff: storageDiff,
}); err != nil {
return err
}
}
metrics.IndexerMetrics.DifferenceIteratorCounter.Inc(int64(*itCount)) metrics.IndexerMetrics.DifferenceIteratorCounter.Inc(int64(*itCount))
return diffAccountsAtB, it.Error() return it.Error()
}
func (sdb *StateDiffBuilder) processAccountDeletion(
leafKey []byte, account types.StateAccount, nodeSink sdtypes.StateNodeSink,
) error {
diff := sdtypes.StateLeafNode{
AccountWrapper: sdtypes.AccountWrapper{
LeafKey: leafKey,
CID: shared.RemovedNodeStateCID,
},
Removed: true,
}
err := sdb.processRemovedAccountStorage(account.Root, appender(&diff.StorageDiff))
if err != nil {
return fmt.Errorf("failed building storage diffs for removed state account with key %x\r\nerror: %w", leafKey, err)
}
return nodeSink(diff)
}
func (sdb *StateDiffBuilder) processAccountCreation(
accountW *sdtypes.AccountWrapper, ipldSink sdtypes.IPLDSink, nodeSink sdtypes.StateNodeSink,
) error {
diff := sdtypes.StateLeafNode{
AccountWrapper: *accountW,
}
if !bytes.Equal(accountW.Account.CodeHash, nullCodeHash) {
// For contract creations, any storage node contained is a diff
err := sdb.processStorageCreations(accountW.Account.Root, appender(&diff.StorageDiff), ipldSink)
if err != nil {
return fmt.Errorf("failed building eventual storage diffs for node with leaf key %x\r\nerror: %w", accountW.LeafKey, err)
}
// emit codehash => code mappings for contract
codeHash := common.BytesToHash(accountW.Account.CodeHash)
code, err := sdb.stateCache.ContractCode(codeHash)
if err != nil {
return fmt.Errorf("failed to retrieve code for codehash %s\r\n error: %w", codeHash, err)
}
if err := ipldSink(sdtypes.IPLD{
CID: ipld.Keccak256ToCid(ipld.RawBinary, codeHash.Bytes()).String(),
Content: code,
}); err != nil {
return err
}
}
return nodeSink(diff)
} }
// decodes account at leaf and encodes RLP data to CID // decodes account at leaf and encodes RLP data to CID
// reminder: it.Leaf() == true when the iterator is positioned at a "value node" (which is not something // reminder: it.Leaf() == true when the iterator is positioned at a "value node" (which is not something
// that actually exists in an MMPT), therefore we pass the parent node blob as the leaf RLP. // that actually exists in an MMPT), therefore we pass the parent node blob as the leaf RLP.
func (sdb *StateDiffBuilder) processStateValueNode(it trie.NodeIterator, parentBlob []byte) (*sdtypes.AccountWrapper, error) { func (sdb *StateDiffBuilder) decodeStateLeaf(it trie.NodeIterator, parentBlob []byte) (*sdtypes.AccountWrapper, error) {
var account types.StateAccount var account types.StateAccount
if err := rlp.DecodeBytes(it.LeafBlob(), &account); err != nil { if err := rlp.DecodeBytes(it.LeafBlob(), &account); err != nil {
return nil, fmt.Errorf("error decoding account at leaf key %x: %w", it.LeafKey(), err) return nil, fmt.Errorf("error decoding account at leaf key %x: %w", it.LeafKey(), err)
} }
leafKey := make([]byte, len(it.LeafKey()))
copy(leafKey, it.LeafKey())
return &sdtypes.AccountWrapper{ return &sdtypes.AccountWrapper{
LeafKey: it.LeafKey(), LeafKey: it.LeafKey(),
Account: &account, Account: &account,
@ -276,176 +320,34 @@ func (sdb *StateDiffBuilder) processStateValueNode(it trie.NodeIterator, parentB
}, nil }, nil
} }
// deletedOrUpdatedState returns a slice of all the paths that are emptied at B // processStorageCreations processes the storage node records for a newly created account
// and a mapping of their leafkeys to all the accounts that exist in a different state at A than B // i.e. it returns all the storage nodes at this state, since there is no previous state.
func (sdb *StateDiffBuilder) deletedOrUpdatedState(a, b trie.NodeIterator, diffAccountsAtB sdtypes.AccountMap, func (sdb *StateDiffBuilder) processStorageCreations(
watchedAddressesLeafPaths [][]byte, output sdtypes.StateNodeSink, logger log.Logger, prefixPath []byte) (sdtypes.AccountMap, error) { sr common.Hash, storageSink sdtypes.StorageNodeSink, ipldSink sdtypes.IPLDSink,
logger.Trace("statediff BEGIN deletedOrUpdatedState") ) error {
defer metrics.ReportAndUpdateDuration("statediff END deletedOrUpdatedState", time.Now(), logger, metrics.IndexerMetrics.DeletedOrUpdatedStateTimer) defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.ProcessStorageCreationsTimer)
diffAccountAtA := make(sdtypes.AccountMap)
var prevBlob []byte
it, _ := trie.NewDifferenceIterator(b, a)
for it.Next(true) {
if !isWatchedPathPrefix(watchedAddressesLeafPaths, it.Path()) {
continue
}
if it.Leaf() {
accountW, err := sdb.processStateValueNode(it, prevBlob)
if err != nil {
return nil, err
}
if accountW == nil {
continue
}
leafKey := hex.EncodeToString(accountW.LeafKey)
diffAccountAtA[leafKey] = *accountW
// if this node's leaf key did not show up in diffAccountsAtB
// that means the account was deleted
// in that case, emit an empty "removed" diff state node
// include empty "removed" diff storage nodes for all the storage slots
if _, ok := diffAccountsAtB[leafKey]; !ok {
diff := sdtypes.StateLeafNode{
AccountWrapper: sdtypes.AccountWrapper{
Account: nil,
LeafKey: accountW.LeafKey,
CID: shared.RemovedNodeStateCID,
},
Removed: true,
}
storageDiff := make([]sdtypes.StorageLeafNode, 0)
err := sdb.buildRemovedAccountStorageNodes(accountW.Account.Root, StorageNodeAppender(&storageDiff))
if err != nil {
return nil, fmt.Errorf("failed building storage diffs for removed state account with key %x\r\nerror: %w", leafKey, err)
}
diff.StorageDiff = storageDiff
if err := output(diff); err != nil {
return nil, err
}
}
} else {
prevBlob = make([]byte, len(it.NodeBlob()))
copy(prevBlob, it.NodeBlob())
}
}
return diffAccountAtA, it.Error()
}
// buildAccountUpdates uses the account diffs maps for A => B and B => A and the known intersection of their leafkeys
// to generate the statediff node objects for all of the accounts that existed at both A and B but in different states
// needs to be called before building account creations and deletions as this mutates
// those account maps to remove the accounts which were updated
func (sdb *StateDiffBuilder) buildAccountUpdates(creations, deletions sdtypes.AccountMap, updatedKeys []string,
output sdtypes.StateNodeSink, ipldOutput sdtypes.IPLDSink, logger log.Logger) error {
logger.Trace("statediff BEGIN buildAccountUpdates",
"creations", len(creations), "deletions", len(deletions), "updated", len(updatedKeys))
defer metrics.ReportAndUpdateDuration("statediff END buildAccountUpdates ",
time.Now(), logger, metrics.IndexerMetrics.BuildAccountUpdatesTimer)
var err error
for _, key := range updatedKeys {
createdAcc := creations[key]
deletedAcc := deletions[key]
storageDiff := make([]sdtypes.StorageLeafNode, 0)
if deletedAcc.Account != nil && createdAcc.Account != nil {
err = sdb.buildStorageNodesIncremental(
deletedAcc.Account.Root, createdAcc.Account.Root,
StorageNodeAppender(&storageDiff), ipldOutput,
)
if err != nil {
return fmt.Errorf("failed building incremental storage diffs for account with leafkey %x\r\nerror: %w", key, err)
}
}
if err = output(sdtypes.StateLeafNode{
AccountWrapper: createdAcc,
Removed: false,
StorageDiff: storageDiff,
}); err != nil {
return err
}
delete(creations, key)
delete(deletions, key)
}
return nil
}
// buildAccountCreations returns the statediff node objects for all the accounts that exist at B but not at A
// it also returns the code and codehash for created contract accounts
func (sdb *StateDiffBuilder) buildAccountCreations(accounts sdtypes.AccountMap, output sdtypes.StateNodeSink,
ipldOutput sdtypes.IPLDSink, logger log.Logger) error {
logger.Trace("statediff BEGIN buildAccountCreations")
defer metrics.ReportAndUpdateDuration("statediff END buildAccountCreations",
time.Now(), logger, metrics.IndexerMetrics.BuildAccountCreationsTimer)
for _, val := range accounts {
diff := sdtypes.StateLeafNode{
AccountWrapper: val,
Removed: false,
}
if !bytes.Equal(val.Account.CodeHash, nullCodeHash) {
// For contract creations, any storage node contained is a diff
storageDiff := make([]sdtypes.StorageLeafNode, 0)
err := sdb.buildStorageNodesEventual(val.Account.Root, StorageNodeAppender(&storageDiff), ipldOutput)
if err != nil {
return fmt.Errorf("failed building eventual storage diffs for node with leaf key %x\r\nerror: %w", val.LeafKey, err)
}
diff.StorageDiff = storageDiff
// emit codehash => code mappings for contract
codeHash := common.BytesToHash(val.Account.CodeHash)
code, err := sdb.StateCache.ContractCode(codeHash)
if err != nil {
return fmt.Errorf("failed to retrieve code for codehash %x\r\n error: %w", codeHash, err)
}
if err := ipldOutput(sdtypes.IPLD{
CID: ipld.Keccak256ToCid(ipld.RawBinary, codeHash.Bytes()).String(),
Content: code,
}); err != nil {
return err
}
}
if err := output(diff); err != nil {
return err
}
}
return nil
}
// buildStorageNodesEventual builds the storage diff node objects for a created account
// i.e. it returns all the storage nodes at this state, since there is no previous state
func (sdb *StateDiffBuilder) buildStorageNodesEventual(sr common.Hash, output sdtypes.StorageNodeSink,
ipldOutput sdtypes.IPLDSink) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStorageNodesEventualTimer)
if sr == emptyContractRoot { if sr == emptyContractRoot {
return nil return nil
} }
log.Debug("Storage root for eventual diff", "root", sr.String()) log.Debug("Storage root for eventual diff", "root", sr)
sTrie, err := sdb.StateCache.OpenTrie(sr) sTrie, err := sdb.stateCache.OpenTrie(sr)
if err != nil { if err != nil {
log.Info("error in build storage diff eventual", "error", err) log.Info("error in build storage diff eventual", "error", err)
return err return err
} }
it := sTrie.NodeIterator(make([]byte, 0))
return sdb.buildStorageNodesFromTrie(it, output, ipldOutput)
}
// buildStorageNodesFromTrie returns all the storage diff node objects in the provided node iterator
func (sdb *StateDiffBuilder) buildStorageNodesFromTrie(it trie.NodeIterator, output sdtypes.StorageNodeSink,
ipldOutput sdtypes.IPLDSink) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStorageNodesFromTrieTimer)
var prevBlob []byte var prevBlob []byte
it := sTrie.NodeIterator(make([]byte, 0))
for it.Next(true) { for it.Next(true) {
if it.Leaf() { if it.Leaf() {
storageLeafNode := sdb.processStorageValueNode(it, prevBlob) storageLeafNode := sdb.decodeStorageLeaf(it, prevBlob)
if err := output(storageLeafNode); err != nil { if err := storageSink(storageLeafNode); err != nil {
return err return err
} }
} else { } else {
nodeVal := make([]byte, len(it.NodeBlob())) nodeVal := make([]byte, len(it.NodeBlob()))
copy(nodeVal, it.NodeBlob()) copy(nodeVal, it.NodeBlob())
if err := ipldOutput(sdtypes.IPLD{ if err := ipldSink(sdtypes.IPLD{
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, it.Hash().Bytes()).String(), CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, it.Hash().Bytes()).String(),
Content: nodeVal, Content: nodeVal,
}); err != nil { }); err != nil {
@ -457,10 +359,104 @@ func (sdb *StateDiffBuilder) buildStorageNodesFromTrie(it trie.NodeIterator, out
return it.Error() return it.Error()
} }
// decodes account at leaf and encodes RLP data to CID // processStorageUpdates builds the storage diff node objects for all nodes that exist in a different state at B than A
func (sdb *StateDiffBuilder) processStorageUpdates(
oldroot common.Hash, newroot common.Hash,
storageSink sdtypes.StorageNodeSink,
ipldSink sdtypes.IPLDSink,
) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.ProcessStorageUpdatesTimer)
if newroot == oldroot {
return nil
}
log.Trace("Storage roots for incremental diff", "old", oldroot, "new", newroot)
oldTrie, err := sdb.stateCache.OpenTrie(oldroot)
if err != nil {
return err
}
newTrie, err := sdb.stateCache.OpenTrie(newroot)
if err != nil {
return err
}
var prevBlob []byte
a, b := oldTrie.NodeIterator(nil), newTrie.NodeIterator(nil)
it, _ := utils.NewSymmetricDifferenceIterator(a, b)
for it.Next(true) {
if it.FromA() {
if it.Leaf() && !it.CommonPath() {
// If this node's leaf key is absent from B, the storage slot was vacated.
// In that case, emit an empty "removed" storage node record.
if err := storageSink(sdtypes.StorageLeafNode{
CID: shared.RemovedNodeStorageCID,
Removed: true,
LeafKey: []byte(it.LeafKey()),
Value: []byte{},
}); err != nil {
return err
}
}
continue
}
if it.Leaf() {
storageLeafNode := sdb.decodeStorageLeaf(it, prevBlob)
if err := storageSink(storageLeafNode); err != nil {
return err
}
} else {
if it.Hash() == zeroHash {
continue
}
nodeVal := make([]byte, len(it.NodeBlob()))
copy(nodeVal, it.NodeBlob())
if err := ipldSink(sdtypes.IPLD{
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, it.Hash().Bytes()).String(),
Content: nodeVal,
}); err != nil {
return err
}
prevBlob = nodeVal
}
}
return it.Error()
}
// processRemovedAccountStorage builds the "removed" diffs for all the storage nodes for a destroyed account
func (sdb *StateDiffBuilder) processRemovedAccountStorage(
sr common.Hash, storageSink sdtypes.StorageNodeSink,
) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.ProcessRemovedAccountStorageTimer)
if sr == emptyContractRoot {
return nil
}
log.Debug("Storage root for removed diffs", "root", sr)
sTrie, err := sdb.stateCache.OpenTrie(sr)
if err != nil {
log.Info("error in build removed account storage diffs", "error", err)
return err
}
it := sTrie.NodeIterator(nil)
for it.Next(true) {
if it.Leaf() { // only leaf values are indexed, don't need to demarcate removed intermediate nodes
leafKey := make([]byte, len(it.LeafKey()))
copy(leafKey, it.LeafKey())
if err := storageSink(sdtypes.StorageLeafNode{
CID: shared.RemovedNodeStorageCID,
Removed: true,
LeafKey: leafKey,
Value: []byte{},
}); err != nil {
return err
}
}
}
return it.Error()
}
// decodes slot at leaf and encodes RLP data to CID
// reminder: it.Leaf() == true when the iterator is positioned at a "value node" (which is not something // reminder: it.Leaf() == true when the iterator is positioned at a "value node" (which is not something
// that actually exists in an MMPT), therefore we pass the parent node blob as the leaf RLP. // that actually exists in an MMPT), therefore we pass the parent node blob as the leaf RLP.
func (sdb *StateDiffBuilder) processStorageValueNode(it trie.NodeIterator, parentBlob []byte) sdtypes.StorageLeafNode { func (sdb *StateDiffBuilder) decodeStorageLeaf(it trie.NodeIterator, parentBlob []byte) sdtypes.StorageLeafNode {
leafKey := make([]byte, len(it.LeafKey())) leafKey := make([]byte, len(it.LeafKey()))
copy(leafKey, it.LeafKey()) copy(leafKey, it.LeafKey())
value := make([]byte, len(it.LeafBlob())) value := make([]byte, len(it.LeafBlob()))
@ -473,127 +469,6 @@ func (sdb *StateDiffBuilder) processStorageValueNode(it trie.NodeIterator, paren
} }
} }
// buildRemovedAccountStorageNodes builds the "removed" diffs for all the storage nodes for a destroyed account
func (sdb *StateDiffBuilder) buildRemovedAccountStorageNodes(sr common.Hash, output sdtypes.StorageNodeSink) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildRemovedAccountStorageNodesTimer)
if sr == emptyContractRoot {
return nil
}
log.Debug("Storage root for removed diffs", "root", sr.String())
sTrie, err := sdb.StateCache.OpenTrie(sr)
if err != nil {
log.Info("error in build removed account storage diffs", "error", err)
return err
}
it := sTrie.NodeIterator(make([]byte, 0))
return sdb.buildRemovedStorageNodesFromTrie(it, output)
}
// buildRemovedStorageNodesFromTrie returns diffs for all the storage nodes in the provided node interator
func (sdb *StateDiffBuilder) buildRemovedStorageNodesFromTrie(it trie.NodeIterator, output sdtypes.StorageNodeSink) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildRemovedStorageNodesFromTrieTimer)
for it.Next(true) {
if it.Leaf() { // only leaf values are indexed, don't need to demarcate removed intermediate nodes
leafKey := make([]byte, len(it.LeafKey()))
copy(leafKey, it.LeafKey())
if err := output(sdtypes.StorageLeafNode{
CID: shared.RemovedNodeStorageCID,
Removed: true,
LeafKey: leafKey,
Value: []byte{},
}); err != nil {
return err
}
}
}
return it.Error()
}
// buildStorageNodesIncremental builds the storage diff node objects for all nodes that exist in a different state at B than A
func (sdb *StateDiffBuilder) buildStorageNodesIncremental(oldroot common.Hash, newroot common.Hash, output sdtypes.StorageNodeSink,
ipldOutput sdtypes.IPLDSink) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStorageNodesIncrementalTimer)
if newroot == oldroot {
return nil
}
log.Trace("Storage roots for incremental diff", "old", oldroot.String(), "new", newroot.String())
oldTrie, err := sdb.StateCache.OpenTrie(oldroot)
if err != nil {
return err
}
newTrie, err := sdb.StateCache.OpenTrie(newroot)
if err != nil {
return err
}
diffSlotsAtB, err := sdb.createdAndUpdatedStorage(
oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{}), output, ipldOutput)
if err != nil {
return err
}
return sdb.deletedOrUpdatedStorage(oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{}),
diffSlotsAtB, output)
}
func (sdb *StateDiffBuilder) createdAndUpdatedStorage(a, b trie.NodeIterator, output sdtypes.StorageNodeSink,
ipldOutput sdtypes.IPLDSink) (map[string]bool, error) {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.CreatedAndUpdatedStorageTimer)
diffSlotsAtB := make(map[string]bool)
var prevBlob []byte
it, _ := trie.NewDifferenceIterator(a, b)
for it.Next(true) {
if it.Leaf() {
storageLeafNode := sdb.processStorageValueNode(it, prevBlob)
if err := output(storageLeafNode); err != nil {
return nil, err
}
diffSlotsAtB[hex.EncodeToString(storageLeafNode.LeafKey)] = true
} else {
if it.Hash() == zeroHash {
continue
}
nodeVal := make([]byte, len(it.NodeBlob()))
copy(nodeVal, it.NodeBlob())
nodeHash := make([]byte, len(it.Hash().Bytes()))
copy(nodeHash, it.Hash().Bytes())
if err := ipldOutput(sdtypes.IPLD{
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, nodeHash).String(),
Content: nodeVal,
}); err != nil {
return nil, err
}
prevBlob = nodeVal
}
}
return diffSlotsAtB, it.Error()
}
func (sdb *StateDiffBuilder) deletedOrUpdatedStorage(a, b trie.NodeIterator, diffSlotsAtB map[string]bool, output sdtypes.StorageNodeSink) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.DeletedOrUpdatedStorageTimer)
it, _ := trie.NewDifferenceIterator(b, a)
for it.Next(true) {
if it.Leaf() {
leafKey := make([]byte, len(it.LeafKey()))
copy(leafKey, it.LeafKey())
// if this node's leaf key did not show up in diffSlotsAtB
// that means the storage slot was vacated
// in that case, emit an empty "removed" diff storage node
if _, ok := diffSlotsAtB[hex.EncodeToString(leafKey)]; !ok {
if err := output(sdtypes.StorageLeafNode{
CID: shared.RemovedNodeStorageCID,
Removed: true,
LeafKey: leafKey,
Value: []byte{},
}); err != nil {
return err
}
}
}
}
return it.Error()
}
// isWatchedPathPrefix checks if a node path is a prefix (ancestor) to one of the watched addresses. // isWatchedPathPrefix checks if a node path is a prefix (ancestor) to one of the watched addresses.
// An empty watch list means all paths are watched. // An empty watch list means all paths are watched.
func isWatchedPathPrefix(watchedLeafPaths [][]byte, path []byte) bool { func isWatchedPathPrefix(watchedLeafPaths [][]byte, path []byte) bool {

View File

@ -27,7 +27,6 @@ import (
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
statediff "github.com/cerc-io/plugeth-statediff" statediff "github.com/cerc-io/plugeth-statediff"
"github.com/cerc-io/plugeth-statediff/adapt"
"github.com/cerc-io/plugeth-statediff/indexer/ipld" "github.com/cerc-io/plugeth-statediff/indexer/ipld"
"github.com/cerc-io/plugeth-statediff/indexer/shared" "github.com/cerc-io/plugeth-statediff/indexer/shared"
"github.com/cerc-io/plugeth-statediff/test_helpers" "github.com/cerc-io/plugeth-statediff/test_helpers"
@ -37,8 +36,8 @@ import (
var ( var (
contractLeafKey []byte contractLeafKey []byte
emptyDiffs = make([]sdtypes.StateLeafNode, 0) emptyDiffs []sdtypes.StateLeafNode
emptyStorage = make([]sdtypes.StorageLeafNode, 0) emptyStorage []sdtypes.StorageLeafNode
block0, block1, block2, block3, block4, block5, block6 *types.Block block0, block1, block2, block3, block4, block5, block6 *types.Block
builder statediff.Builder builder statediff.Builder
minerAddress = common.HexToAddress("0x0") minerAddress = common.HexToAddress("0x0")
@ -796,7 +795,7 @@ func TestBuilder(t *testing.T) {
}, },
} }
test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), test_helpers.RunBuilderTests(t, chain.StateCache(),
tests, params, test_helpers.CheckedRoots{ tests, params, test_helpers.CheckedRoots{
block0: bankAccountAtBlock0LeafNode, block0: bankAccountAtBlock0LeafNode,
block1: block1BranchRootNode, block1: block1BranchRootNode,
@ -1010,7 +1009,7 @@ func TestBuilderWithWatchedAddressList(t *testing.T) {
}, },
} }
test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{ test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
block0: bankAccountAtBlock0LeafNode, block0: bankAccountAtBlock0LeafNode,
block1: block1BranchRootNode, block1: block1BranchRootNode,
block2: block2BranchRootNode, block2: block2BranchRootNode,
@ -1260,7 +1259,7 @@ func TestBuilderWithRemovedAccountAndStorage(t *testing.T) {
}, },
} }
test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{ test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
block4: block4BranchRootNode, block4: block4BranchRootNode,
block5: block5BranchRootNode, block5: block5BranchRootNode,
block6: block6BranchRootNode, block6: block6BranchRootNode,
@ -1394,7 +1393,7 @@ func TestBuilderWithRemovedNonWatchedAccount(t *testing.T) {
}, },
} }
test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{ test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
block4: block4BranchRootNode, block4: block4BranchRootNode,
block5: block5BranchRootNode, block5: block5BranchRootNode,
block6: block6BranchRootNode, block6: block6BranchRootNode,
@ -1597,7 +1596,7 @@ func TestBuilderWithRemovedWatchedAccount(t *testing.T) {
}, },
} }
test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{ test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
block4: block4BranchRootNode, block4: block4BranchRootNode,
block5: block5BranchRootNode, block5: block5BranchRootNode,
block6: block6BranchRootNode, block6: block6BranchRootNode,
@ -1824,7 +1823,7 @@ func TestBuilderWithMovedAccount(t *testing.T) {
}, },
} }
test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{ test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
block1: block01BranchRootNode, block1: block01BranchRootNode,
block2: bankAccountAtBlock02LeafNode, block2: bankAccountAtBlock02LeafNode,
}) })
@ -2350,7 +2349,7 @@ func TestBuilderWithInternalizedLeafNode(t *testing.T) {
}, },
} }
test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{ test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
block1: block1bBranchRootNode, block1: block1bBranchRootNode,
block2: block2bBranchRootNode, block2: block2bBranchRootNode,
block3: block3bBranchRootNode, block3: block3bBranchRootNode,
@ -2399,10 +2398,9 @@ func TestBuilderWithInternalizedLeafNodeAndWatchedAddress(t *testing.T) {
&sdtypes.StateObject{ &sdtypes.StateObject{
BlockNumber: block0.Number(), BlockNumber: block0.Number(),
BlockHash: block0.Hash(), BlockHash: block0.Hash(),
Nodes: []sdtypes.StateLeafNode{}, // there's some kind of weird behavior where if our root node is a leaf node even
IPLDs: []sdtypes.IPLD{}, // there's some kind of weird behavior where if our root node is a leaf node // though it is along the path to the watched leaf (necessarily, as it is the root)
// even though it is along the path to the watched leaf (necessarily, as it is the root) it doesn't get included // it doesn't get included. unconsequential, but kinda odd.
// unconsequential, but kinda odd.
}, },
}, },
{ {
@ -2417,7 +2415,6 @@ func TestBuilderWithInternalizedLeafNodeAndWatchedAddress(t *testing.T) {
&sdtypes.StateObject{ &sdtypes.StateObject{
BlockNumber: block1.Number(), BlockNumber: block1.Number(),
BlockHash: block1.Hash(), BlockHash: block1.Hash(),
Nodes: []sdtypes.StateLeafNode{},
IPLDs: []sdtypes.IPLD{ IPLDs: []sdtypes.IPLD{
{ {
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(block1bBranchRootNode)).String(), CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(block1bBranchRootNode)).String(),
@ -2553,7 +2550,7 @@ func TestBuilderWithInternalizedLeafNodeAndWatchedAddress(t *testing.T) {
}, },
} }
test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{ test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
block1: block1bBranchRootNode, block1: block1bBranchRootNode,
block2: block2bBranchRootNode, block2: block2bBranchRootNode,
block3: block3bBranchRootNode, block3: block3bBranchRootNode,

View File

@ -99,6 +99,7 @@ func (p *ParamsWithMutex) CopyParams() Params {
// Args bundles the arguments for the state diff builder // Args bundles the arguments for the state diff builder
type Args struct { type Args struct {
OldStateRoot, NewStateRoot, BlockHash common.Hash OldStateRoot, NewStateRoot common.Hash
BlockHash common.Hash
BlockNumber *big.Int BlockNumber *big.Int
} }

2
go.mod
View File

@ -3,6 +3,7 @@ module github.com/cerc-io/plugeth-statediff
go 1.19 go 1.19
require ( require (
github.com/cerc-io/eth-testing v0.2.1
github.com/ethereum/go-ethereum v1.11.6 github.com/ethereum/go-ethereum v1.11.6
github.com/georgysavva/scany v0.2.9 github.com/georgysavva/scany v0.2.9
github.com/golang/mock v1.6.0 github.com/golang/mock v1.6.0
@ -122,6 +123,7 @@ require (
) )
replace ( replace (
github.com/cerc-io/eth-testing => git.vdb.to/cerc-io/eth-testing v0.2.1
github.com/ethereum/go-ethereum => git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1 github.com/ethereum/go-ethereum => git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1
github.com/openrelayxyz/plugeth-utils => git.vdb.to/cerc-io/plugeth-utils v0.0.0-20230706160122-cd41de354c46 github.com/openrelayxyz/plugeth-utils => git.vdb.to/cerc-io/plugeth-utils v0.0.0-20230706160122-cd41de354c46
) )

2
go.sum
View File

@ -1,4 +1,6 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
git.vdb.to/cerc-io/eth-testing v0.2.1 h1:IZAX7DVgzPkSmu1xdKZ5aOemdEYbvtgae7GUl/TUNtQ=
git.vdb.to/cerc-io/eth-testing v0.2.1/go.mod h1:qdvpc/W1xvf2MKx3rMOqvFvYaYIHG77Z1g0lwsmw0Uk=
git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1 h1:KLjxHwp9Zp7xhECccmJS00RiL+VwTuUGLU7qeIctg8g= git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1 h1:KLjxHwp9Zp7xhECccmJS00RiL+VwTuUGLU7qeIctg8g=
git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1/go.mod h1:cYXZu70+6xmDgIgrTD81GPasv16piiAFJnKyAbwVPMU= git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1/go.mod h1:cYXZu70+6xmDgIgrTD81GPasv16piiAFJnKyAbwVPMU=
git.vdb.to/cerc-io/plugeth-utils v0.0.0-20230706160122-cd41de354c46 h1:KYcbbne/RXd7AuxbUd/3hgk1jPN+33k2CKiNsUsMCC0= git.vdb.to/cerc-io/plugeth-utils v0.0.0-20230706160122-cd41de354c46 h1:KYcbbne/RXd7AuxbUd/3hgk1jPN+33k2CKiNsUsMCC0=

View File

@ -72,27 +72,15 @@ type IndexerMetricsHandles struct {
StateStoreCodeProcessingTimer metrics.Timer StateStoreCodeProcessingTimer metrics.Timer
// Fine-grained code timers // Fine-grained code timers
BuildStateDiffTimer metrics.Timer ProcessAccountsTimer metrics.Timer
CreatedAndUpdatedStateTimer metrics.Timer
DeletedOrUpdatedStateTimer metrics.Timer
BuildAccountUpdatesTimer metrics.Timer
BuildAccountCreationsTimer metrics.Timer
ResolveNodeTimer metrics.Timer
SortKeysTimer metrics.Timer
FindIntersectionTimer metrics.Timer
OutputTimer metrics.Timer OutputTimer metrics.Timer
IPLDOutputTimer metrics.Timer IPLDOutputTimer metrics.Timer
DifferenceIteratorNextTimer metrics.Timer
DifferenceIteratorCounter metrics.Counter DifferenceIteratorCounter metrics.Counter
DeletedOrUpdatedStorageTimer metrics.Timer
CreatedAndUpdatedStorageTimer metrics.Timer
BuildStorageNodesIncrementalTimer metrics.Timer
BuildStateDiffObjectTimer metrics.Timer BuildStateDiffObjectTimer metrics.Timer
WriteStateDiffObjectTimer metrics.Timer WriteStateDiffTimer metrics.Timer
BuildStorageNodesEventualTimer metrics.Timer ProcessStorageUpdatesTimer metrics.Timer
BuildStorageNodesFromTrieTimer metrics.Timer ProcessStorageCreationsTimer metrics.Timer
BuildRemovedAccountStorageNodesTimer metrics.Timer ProcessRemovedAccountStorageTimer metrics.Timer
BuildRemovedStorageNodesFromTrieTimer metrics.Timer
IsWatchedAddressTimer metrics.Timer IsWatchedAddressTimer metrics.Timer
} }
@ -109,27 +97,15 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles {
UncleProcessingTimer: metrics.NewTimer(), UncleProcessingTimer: metrics.NewTimer(),
TxAndRecProcessingTimer: metrics.NewTimer(), TxAndRecProcessingTimer: metrics.NewTimer(),
StateStoreCodeProcessingTimer: metrics.NewTimer(), StateStoreCodeProcessingTimer: metrics.NewTimer(),
BuildStateDiffTimer: metrics.NewTimer(), ProcessAccountsTimer: metrics.NewTimer(),
CreatedAndUpdatedStateTimer: metrics.NewTimer(),
DeletedOrUpdatedStateTimer: metrics.NewTimer(),
BuildAccountUpdatesTimer: metrics.NewTimer(),
BuildAccountCreationsTimer: metrics.NewTimer(),
ResolveNodeTimer: metrics.NewTimer(),
SortKeysTimer: metrics.NewTimer(),
FindIntersectionTimer: metrics.NewTimer(),
OutputTimer: metrics.NewTimer(), OutputTimer: metrics.NewTimer(),
IPLDOutputTimer: metrics.NewTimer(), IPLDOutputTimer: metrics.NewTimer(),
DifferenceIteratorNextTimer: metrics.NewTimer(),
DifferenceIteratorCounter: metrics.NewCounter(), DifferenceIteratorCounter: metrics.NewCounter(),
DeletedOrUpdatedStorageTimer: metrics.NewTimer(),
CreatedAndUpdatedStorageTimer: metrics.NewTimer(),
BuildStorageNodesIncrementalTimer: metrics.NewTimer(),
BuildStateDiffObjectTimer: metrics.NewTimer(), BuildStateDiffObjectTimer: metrics.NewTimer(),
WriteStateDiffObjectTimer: metrics.NewTimer(), WriteStateDiffTimer: metrics.NewTimer(),
BuildStorageNodesEventualTimer: metrics.NewTimer(), ProcessStorageUpdatesTimer: metrics.NewTimer(),
BuildStorageNodesFromTrieTimer: metrics.NewTimer(), ProcessStorageCreationsTimer: metrics.NewTimer(),
BuildRemovedAccountStorageNodesTimer: metrics.NewTimer(), ProcessRemovedAccountStorageTimer: metrics.NewTimer(),
BuildRemovedStorageNodesFromTrieTimer: metrics.NewTimer(),
IsWatchedAddressTimer: metrics.NewTimer(), IsWatchedAddressTimer: metrics.NewTimer(),
} }
subsys := "indexer" subsys := "indexer"
@ -144,28 +120,15 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles {
reg.Register(metricName(subsys, "t_uncle_processing"), ctx.UncleProcessingTimer) reg.Register(metricName(subsys, "t_uncle_processing"), ctx.UncleProcessingTimer)
reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.TxAndRecProcessingTimer) reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.TxAndRecProcessingTimer)
reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.StateStoreCodeProcessingTimer) reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.StateStoreCodeProcessingTimer)
reg.Register(metricName(subsys, "t_build_statediff"), ctx.BuildStateDiffTimer)
reg.Register(metricName(subsys, "t_created_and_update_state"), ctx.CreatedAndUpdatedStateTimer)
reg.Register(metricName(subsys, "t_deleted_or_updated_state"), ctx.DeletedOrUpdatedStateTimer)
reg.Register(metricName(subsys, "t_build_account_updates"), ctx.BuildAccountUpdatesTimer)
reg.Register(metricName(subsys, "t_build_account_creations"), ctx.BuildAccountCreationsTimer)
reg.Register(metricName(subsys, "t_resolve_node"), ctx.ResolveNodeTimer)
reg.Register(metricName(subsys, "t_sort_keys"), ctx.SortKeysTimer)
reg.Register(metricName(subsys, "t_find_intersection"), ctx.FindIntersectionTimer)
reg.Register(metricName(subsys, "t_output_fn"), ctx.OutputTimer) reg.Register(metricName(subsys, "t_output_fn"), ctx.OutputTimer)
reg.Register(metricName(subsys, "t_ipld_output_fn"), ctx.IPLDOutputTimer) reg.Register(metricName(subsys, "t_ipld_output_fn"), ctx.IPLDOutputTimer)
reg.Register(metricName(subsys, "t_difference_iterator_next"), ctx.DifferenceIteratorNextTimer)
reg.Register(metricName(subsys, "difference_iterator_counter"), ctx.DifferenceIteratorCounter) reg.Register(metricName(subsys, "difference_iterator_counter"), ctx.DifferenceIteratorCounter)
reg.Register(metricName(subsys, "t_created_and_updated_storage"), ctx.CreatedAndUpdatedStorageTimer)
reg.Register(metricName(subsys, "t_deleted_or_updated_storage"), ctx.DeletedOrUpdatedStorageTimer)
reg.Register(metricName(subsys, "t_build_storage_nodes_incremental"), ctx.BuildStorageNodesIncrementalTimer)
reg.Register(metricName(subsys, "t_build_statediff_object"), ctx.BuildStateDiffObjectTimer) reg.Register(metricName(subsys, "t_build_statediff_object"), ctx.BuildStateDiffObjectTimer)
reg.Register(metricName(subsys, "t_write_statediff_object"), ctx.WriteStateDiffObjectTimer) reg.Register(metricName(subsys, "t_write_statediff_object"), ctx.WriteStateDiffTimer)
reg.Register(metricName(subsys, "t_created_and_updated_state"), ctx.CreatedAndUpdatedStateTimer) reg.Register(metricName(subsys, "t_process_accounts"), ctx.ProcessAccountsTimer)
reg.Register(metricName(subsys, "t_build_storage_nodes_eventual"), ctx.BuildStorageNodesEventualTimer) reg.Register(metricName(subsys, "t_process_storage_updates"), ctx.ProcessStorageUpdatesTimer)
reg.Register(metricName(subsys, "t_build_storage_nodes_from_trie"), ctx.BuildStorageNodesFromTrieTimer) reg.Register(metricName(subsys, "t_process_storage_creations"), ctx.ProcessStorageCreationsTimer)
reg.Register(metricName(subsys, "t_build_removed_accounts_storage_nodes"), ctx.BuildRemovedAccountStorageNodesTimer) reg.Register(metricName(subsys, "t_process_removed_account_storage"), ctx.ProcessRemovedAccountStorageTimer)
reg.Register(metricName(subsys, "t_build_removed_storage_nodes_from_trie"), ctx.BuildRemovedStorageNodesFromTrieTimer)
reg.Register(metricName(subsys, "t_is_watched_address"), ctx.IsWatchedAddressTimer) reg.Register(metricName(subsys, "t_is_watched_address"), ctx.IsWatchedAddressTimer)
log.Debug("Registering statediff indexer metrics.") log.Debug("Registering statediff indexer metrics.")

View File

@ -227,7 +227,7 @@ var (
CID: AccountLeafNodeCID, CID: AccountLeafNodeCID,
}, },
Removed: false, Removed: false,
StorageDiff: []sdtypes.StorageLeafNode{}, StorageDiff: nil,
}, },
{ {
AccountWrapper: sdtypes.AccountWrapper{ AccountWrapper: sdtypes.AccountWrapper{
@ -236,7 +236,7 @@ var (
CID: shared.RemovedNodeStateCID, CID: shared.RemovedNodeStateCID,
}, },
Removed: true, Removed: true,
StorageDiff: []sdtypes.StorageLeafNode{}, StorageDiff: nil,
}, },
{ {
AccountWrapper: sdtypes.AccountWrapper{ AccountWrapper: sdtypes.AccountWrapper{

View File

@ -16,7 +16,6 @@ import (
) )
var ( var (
pluginLoader core.PluginLoader
gethContext core.Context gethContext core.Context
service *statediff.Service service *statediff.Service
blockchain statediff.BlockChain blockchain statediff.BlockChain
@ -24,11 +23,7 @@ var (
func Initialize(ctx core.Context, pl core.PluginLoader, logger core.Logger) { func Initialize(ctx core.Context, pl core.PluginLoader, logger core.Logger) {
log.SetDefaultLogger(logger) log.SetDefaultLogger(logger)
pluginLoader = pl
gethContext = ctx gethContext = ctx
log.Debug("Initialized statediff plugin")
} }
func InitializeNode(stack core.Node, b core.Backend) { func InitializeNode(stack core.Node, b core.Backend) {
@ -58,7 +53,7 @@ func InitializeNode(stack core.Node, b core.Backend) {
log.Error("failed to construct indexer", "error", err) log.Error("failed to construct indexer", "error", err)
} }
} }
service, err := statediff.NewService(serviceConfig, blockchain, backend, indexer) service, err = statediff.NewService(serviceConfig, blockchain, backend, indexer)
if err != nil { if err != nil {
log.Error("failed to construct service", "error", err) log.Error("failed to construct service", "error", err)
} }
@ -66,6 +61,8 @@ func InitializeNode(stack core.Node, b core.Backend) {
log.Error("failed to start service", "error", err) log.Error("failed to start service", "error", err)
return return
} }
log.Debug("Initialized statediff plugin")
} }
func GetAPIs(stack core.Node, backend core.Backend) []core.API { func GetAPIs(stack core.Node, backend core.Backend) []core.API {

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -17,13 +17,10 @@
package statediff_test package statediff_test
import ( import (
"bytes"
"io"
"log"
"math/big" "math/big"
"os"
"testing" "testing"
"github.com/cerc-io/eth-testing/chaindata/mainnet"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
@ -35,23 +32,21 @@ import (
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
statediff "github.com/cerc-io/plugeth-statediff" statediff "github.com/cerc-io/plugeth-statediff"
"github.com/cerc-io/plugeth-statediff/adapt"
"github.com/cerc-io/plugeth-statediff/indexer/ipld" "github.com/cerc-io/plugeth-statediff/indexer/ipld"
"github.com/cerc-io/plugeth-statediff/test_helpers" "github.com/cerc-io/plugeth-statediff/test_helpers"
sdtypes "github.com/cerc-io/plugeth-statediff/types" sdtypes "github.com/cerc-io/plugeth-statediff/types"
) )
func init() { func init() {
test_helpers.SilenceLogs() test_helpers.QuietLogs()
} }
var ( var (
db ethdb.Database db ethdb.Database
genesisBlock, block0, block1, block2, block3 *types.Block genesisBlock, block0, block1, block2, block3 *types.Block
block1CoinbaseAddr, block2CoinbaseAddr, block3CoinbaseAddr common.Address
block1CoinbaseHash, block2CoinbaseHash, block3CoinbaseHash common.Hash block1CoinbaseHash, block2CoinbaseHash, block3CoinbaseHash common.Hash
builder statediff.Builder builder statediff.Builder
emptyStorage = make([]sdtypes.StorageLeafNode, 0) emptyStorage []sdtypes.StorageLeafNode
// block 1 data // block 1 data
block1CoinbaseAccount = &types.StateAccount{ block1CoinbaseAccount = &types.StateAccount{
@ -426,50 +421,19 @@ var (
func init() { func init() {
db = rawdb.NewMemoryDatabase() db = rawdb.NewMemoryDatabase()
genesisBlock = core.DefaultGenesisBlock().MustCommit(db) genesisBlock = core.DefaultGenesisBlock().MustCommit(db)
genBy, err := rlp.EncodeToBytes(genesisBlock)
if err != nil {
log.Fatal(err)
}
var block0RLP []byte
block0, block0RLP, err = loadBlockFromRLPFile("./block0_rlp")
if err != nil {
log.Fatal(err)
}
if !bytes.Equal(genBy, block0RLP) {
log.Fatal("mainnet genesis blocks do not match")
}
block1, _, err = loadBlockFromRLPFile("./block1_rlp")
if err != nil {
log.Fatal(err)
}
block1CoinbaseAddr = block1.Coinbase()
block1CoinbaseHash = crypto.Keccak256Hash(block1CoinbaseAddr.Bytes())
block2, _, err = loadBlockFromRLPFile("./block2_rlp")
if err != nil {
log.Fatal(err)
}
block2CoinbaseAddr = block2.Coinbase()
block2CoinbaseHash = crypto.Keccak256Hash(block2CoinbaseAddr.Bytes()) // 0x08d4679cbcf198c1741a6f4e4473845659a30caa8b26f8d37a0be2e2bc0d8892
block3, _, err = loadBlockFromRLPFile("./block3_rlp")
if err != nil {
log.Fatal(err)
}
block3CoinbaseAddr = block3.Coinbase()
block3CoinbaseHash = crypto.Keccak256Hash(block3CoinbaseAddr.Bytes())
}
func loadBlockFromRLPFile(filename string) (*types.Block, []byte, error) { blocks := mainnet.GetBlocks()
f, err := os.Open(filename) block0 = blocks[0]
if err != nil { block1 = blocks[1]
return nil, nil, err block2 = blocks[2]
} block3 = blocks[3]
defer f.Close()
blockRLP, err := io.ReadAll(f) // 0x4be8251692195afc818c92b485fcb8a4691af89cbe5a2ab557b83a4261be2a9a
if err != nil { block1CoinbaseHash = crypto.Keccak256Hash(block1.Coinbase().Bytes())
return nil, nil, err // 0x08d4679cbcf198c1741a6f4e4473845659a30caa8b26f8d37a0be2e2bc0d8892
} block2CoinbaseHash = crypto.Keccak256Hash(block2.Coinbase().Bytes())
block := new(types.Block) // 0x6efa174f00e64521a535f35e67c1aa241951c791639b2f3d060f49c5d9fa8b9e
return block, blockRLP, rlp.DecodeBytes(blockRLP, block) block3CoinbaseHash = crypto.Keccak256Hash(block3.Coinbase().Bytes())
} }
func TestBuilderOnMainnetBlocks(t *testing.T) { func TestBuilderOnMainnetBlocks(t *testing.T) {
@ -659,7 +623,7 @@ func TestBuilderOnMainnetBlocks(t *testing.T) {
} }
test_helpers.RunBuilderTests(t, test_helpers.RunBuilderTests(t,
statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), chain.StateCache(),
tests, params, test_helpers.CheckedRoots{ tests, params, test_helpers.CheckedRoots{
block1: block1RootBranchNode, block1: block1RootBranchNode,
block2: block2RootBranchNode, block2: block2RootBranchNode,

View File

@ -1,25 +1,30 @@
#!/bin/bash #!/bin/bash
# Builds and deploys a stack with only what we need.
# This script assumes we are running in the project root.
set -eux set -e
cluster="${1:-test}" cluster="${1:-test}"
laconic_so="${LACONIC_SO:-laconic-so} --stack fixturenet-plugeth-tx --verbose" laconic_so="${LACONIC_SO:-laconic-so} --stack fixturenet-plugeth-tx --verbose"
CONFIG_DIR=$(readlink -f "${CONFIG_DIR:-$(mktemp -d)}") CONFIG_DIR=$(readlink -f "${CONFIG_DIR:-$(mktemp -d)}")
# By default assume we are running in the project root # Point stack-orchestrator to the multi-project root
export CERC_REPO_BASE_DIR="${CERC_REPO_BASE_DIR:-$(realpath $(git rev-parse --show-toplevel)/..)}" export CERC_REPO_BASE_DIR="${CERC_REPO_BASE_DIR:-$(git rev-parse --show-toplevel)/..}"
# v5 migrations only go up to version 18 # v5 migrations only go up to version 18
echo CERC_STATEDIFF_DB_GOOSE_MIN_VER=18 >> $CONFIG_DIR/stack.env echo CERC_STATEDIFF_DB_GOOSE_MIN_VER=18 >> $CONFIG_DIR/stack.env
# Build and deploy a cluster with only what we need from the stack set -x
$laconic_so setup-repositories \
if [[ -z $SKIP_BUILD ]]; then
$laconic_so setup-repositories \
--exclude github.com/dboreham/foundry,github.com/cerc-io/tx-spammer,github.com/cerc-io/ipld-eth-server,git.vdb.to/cerc-io/plugeth,git.vdb.to/cerc-io/plugeth-statediff \ --exclude github.com/dboreham/foundry,github.com/cerc-io/tx-spammer,github.com/cerc-io/ipld-eth-server,git.vdb.to/cerc-io/plugeth,git.vdb.to/cerc-io/plugeth-statediff \
--branches-file ./test/stack-refs.txt --branches-file ./test/stack-refs.txt
$laconic_so build-containers \ $laconic_so build-containers \
--exclude cerc/ipld-eth-server,cerc/keycloak,cerc/tx-spammer,cerc/foundry,cerc/plugeth,cerc/plugeth-statediff --exclude cerc/ipld-eth-server,cerc/keycloak,cerc/tx-spammer,cerc/foundry,cerc/plugeth,cerc/plugeth-statediff
fi
$laconic_so deploy \ $laconic_so deploy \
--exclude foundry,keycloak,tx-spammer,ipld-eth-server \ --exclude foundry,keycloak,tx-spammer,ipld-eth-server \

View File

@ -819,23 +819,23 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
return err return err
} }
output := func(node types2.StateLeafNode) error { nodeSink := func(node types2.StateLeafNode) error {
defer metrics.ReportAndUpdateDuration("statediff output", time.Now(), log, defer metrics.ReportAndUpdateDuration("statediff output", time.Now(), log,
metrics.IndexerMetrics.OutputTimer) metrics.IndexerMetrics.OutputTimer)
return sds.indexer.PushStateNode(tx, node, block.Hash().String()) return sds.indexer.PushStateNode(tx, node, block.Hash().String())
} }
ipldOutput := func(c types2.IPLD) error { ipldSink := func(c types2.IPLD) error {
defer metrics.ReportAndUpdateDuration("statediff ipldOutput", time.Now(), log, defer metrics.ReportAndUpdateDuration("statediff ipldOutput", time.Now(), log,
metrics.IndexerMetrics.IPLDOutputTimer) metrics.IndexerMetrics.IPLDOutputTimer)
return sds.indexer.PushIPLD(tx, c) return sds.indexer.PushIPLD(tx, c)
} }
err = sds.Builder.WriteStateDiffObject(Args{ err = sds.Builder.WriteStateDiff(Args{
NewStateRoot: block.Root(), NewStateRoot: block.Root(),
OldStateRoot: parentRoot, OldStateRoot: parentRoot,
BlockHash: block.Hash(), BlockHash: block.Hash(),
BlockNumber: block.Number(), BlockNumber: block.Number(),
}, params, output, ipldOutput) }, params, nodeSink, ipldSink)
// TODO this anti-pattern needs to be sorted out eventually // TODO this anti-pattern needs to be sorted out eventually
if err = tx.Submit(err); err != nil { if err = tx.Submit(err); err != nil {
@ -885,7 +885,6 @@ func (sds *Service) UnsubscribeWriteStatus(id SubID) {
// add | remove | set | clear // add | remove | set | clear
func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.WatchAddressArg) error { func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.WatchAddressArg) error {
sds.writeLoopParams.Lock() sds.writeLoopParams.Lock()
log.Debug("WatchAddress: locked sds.writeLoopParams")
defer sds.writeLoopParams.Unlock() defer sds.writeLoopParams.Unlock()
// get the current block number // get the current block number

View File

@ -41,7 +41,7 @@ import (
) )
func init() { func init() {
test_helpers.SilenceLogs() test_helpers.QuietLogs()
} }
func TestServiceLoop(t *testing.T) { func TestServiceLoop(t *testing.T) {

View File

@ -3,7 +3,7 @@ services:
restart: on-failure restart: on-failure
depends_on: depends_on:
- ipld-eth-db - ipld-eth-db
image: git.vdb.to/cerc-io/ipld-eth-db/ipld-eth-db:v5.0.2-alpha image: git.vdb.to/cerc-io/ipld-eth-db/ipld-eth-db:v5.0.5-alpha
environment: environment:
DATABASE_USER: "vdbm" DATABASE_USER: "vdbm"
DATABASE_NAME: "cerc_testing" DATABASE_NAME: "cerc_testing"

View File

@ -2,15 +2,15 @@ package test_helpers
import ( import (
"bytes" "bytes"
"encoding/json"
"sort" "sort"
"testing" "testing"
"github.com/cerc-io/plugeth-statediff" statediff "github.com/cerc-io/plugeth-statediff"
"github.com/cerc-io/plugeth-statediff/adapt"
sdtypes "github.com/cerc-io/plugeth-statediff/types" sdtypes "github.com/cerc-io/plugeth-statediff/types"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -24,44 +24,23 @@ type CheckedRoots = map[*types.Block][]byte
func RunBuilderTests( func RunBuilderTests(
t *testing.T, t *testing.T,
builder statediff.Builder, sdb state.Database,
tests []TestCase, tests []TestCase,
params statediff.Params, params statediff.Params,
roots CheckedRoots, roots CheckedRoots,
) { ) {
builder := statediff.NewBuilder(adapt.GethStateView(sdb))
for _, test := range tests { for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
diff, err := builder.BuildStateDiffObject(test.Args, params) diff, err := builder.BuildStateDiffObject(test.Args, params)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
receivedStateDiffRlp, err := rlp.EncodeToBytes(&diff)
if err != nil {
t.Error(err)
}
expectedStateDiffRlp, err := rlp.EncodeToBytes(test.Expected)
if err != nil {
t.Error(err)
}
sort.Slice(receivedStateDiffRlp, func(i, j int) bool {
return receivedStateDiffRlp[i] < receivedStateDiffRlp[j]
})
sort.Slice(expectedStateDiffRlp, func(i, j int) bool {
return expectedStateDiffRlp[i] < expectedStateDiffRlp[j]
})
if !bytes.Equal(receivedStateDiffRlp, expectedStateDiffRlp) {
actualb, err := json.Marshal(diff)
require.NoError(t, err)
expectedb, err := json.Marshal(test.Expected)
require.NoError(t, err)
var expected, actual interface{} normalize(test.Expected)
err = json.Unmarshal(expectedb, &expected) normalize(&diff)
require.NoError(t, err) require.Equal(t, *test.Expected, diff)
err = json.Unmarshal(actualb, &actual) })
require.NoError(t, err)
require.Equal(t, expected, actual, test.Name)
}
} }
// Let's also confirm that our root state nodes form the state root hash in the headers // Let's also confirm that our root state nodes form the state root hash in the headers
for block, node := range roots { for block, node := range roots {
@ -69,3 +48,24 @@ func RunBuilderTests(
"expected root does not match actual root", block.Number()) "expected root does not match actual root", block.Number())
} }
} }
// Sorts contained state nodes, storage nodes, and IPLDs
func normalize(diff *sdtypes.StateObject) {
sort.Slice(diff.IPLDs, func(i, j int) bool {
return diff.IPLDs[i].CID < diff.IPLDs[j].CID
})
sort.Slice(diff.Nodes, func(i, j int) bool {
return bytes.Compare(
diff.Nodes[i].AccountWrapper.LeafKey,
diff.Nodes[j].AccountWrapper.LeafKey,
) < 0
})
for _, node := range diff.Nodes {
sort.Slice(node.StorageDiff, func(i, j int) bool {
return bytes.Compare(
node.StorageDiff[i].LeafKey,
node.StorageDiff[j].LeafKey,
) < 0
})
}
}

View File

@ -40,7 +40,7 @@ func (builder *Builder) BuildStateDiffObject(args statediff.Args, params statedi
} }
// BuildStateDiffObject mock method // BuildStateDiffObject mock method
func (builder *Builder) WriteStateDiffObject(args statediff.Args, params statediff.Params, output sdtypes.StateNodeSink, iplds sdtypes.IPLDSink) error { func (builder *Builder) WriteStateDiff(args statediff.Args, params statediff.Params, output sdtypes.StateNodeSink, iplds sdtypes.IPLDSink) error {
builder.Args = args builder.Args = args
builder.Params = params builder.Params = params

View File

@ -6,8 +6,9 @@ import (
"github.com/cerc-io/plugeth-statediff/utils/log" "github.com/cerc-io/plugeth-statediff/utils/log"
) )
// The geth sync logs are noisy, it can be useful to silence them // QuietLogs silences the geth logs and sets the plugin test log level to "warning"
func SilenceLogs() { // The geth sync logs are noisy, so it can be nice to silence them.
func QuietLogs() {
geth_log.Root().SetHandler(geth_log.DiscardHandler()) geth_log.Root().SetHandler(geth_log.DiscardHandler())
log.TestLogger.SetLevel(2) log.TestLogger.SetLevel(2)
} }

View File

@ -1,80 +0,0 @@
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Contains a batch of utility type declarations used by the tests. As the node
// operates on unique types, a lot of them are needed to check various features.
package trie_helpers
import (
"sort"
"strings"
"time"
metrics2 "github.com/cerc-io/plugeth-statediff/indexer/database/metrics"
"github.com/cerc-io/plugeth-statediff/types"
)
// SortKeys sorts the keys in the account map
func SortKeys(data types.AccountMap) []string {
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.SortKeysTimer)
keys := make([]string, 0, len(data))
for key := range data {
keys = append(keys, key)
}
sort.Strings(keys)
return keys
}
// FindIntersection finds the set of strings from both arrays that are equivalent
// a and b must first be sorted
// this is used to find which keys have been both "deleted" and "created" i.e. they were updated
func FindIntersection(a, b []string) []string {
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.FindIntersectionTimer)
lenA := len(a)
lenB := len(b)
iOfA, iOfB := 0, 0
updates := make([]string, 0)
if iOfA >= lenA || iOfB >= lenB {
return updates
}
for {
switch strings.Compare(a[iOfA], b[iOfB]) {
// -1 when a[iOfA] < b[iOfB]
case -1:
iOfA++
if iOfA >= lenA {
return updates
}
// 0 when a[iOfA] == b[iOfB]
case 0:
updates = append(updates, a[iOfA])
iOfA++
iOfB++
if iOfA >= lenA || iOfB >= lenB {
return updates
}
// 1 when a[iOfA] > b[iOfB]
case 1:
iOfB++
if iOfB >= lenB {
return updates
}
}
}
}

View File

@ -1 +1,174 @@
package utils package utils
import (
"bytes"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/trie"
)
type symmDiffIterator struct {
a, b iterState // Nodes returned are those in b - a and a - b (keys only)
yieldFromA bool // Whether next node comes from a
count int // Number of nodes scanned on either trie
eqPathIndex int // Count index of last pair of equal paths, to detect an updated key
}
// NewSymmetricDifferenceIterator constructs a trie.NodeIterator that iterates over the symmetric difference
// of elements in a and b, i.e., the elements in a that are not in b, and vice versa.
// Returns the iterator, and a pointer to an integer recording the number of nodes seen.
func NewSymmetricDifferenceIterator(a, b trie.NodeIterator) (*symmDiffIterator, *int) {
it := &symmDiffIterator{
a: iterState{a, true},
b: iterState{b, true},
// common paths are detected by a distance <=1 from this index, so put it out of reach
eqPathIndex: -2,
}
return it, &it.count
}
// pairs an iterator with a cache of its valid status
type iterState struct {
trie.NodeIterator
valid bool
}
func (st *iterState) Next(descend bool) bool {
st.valid = st.NodeIterator.Next(descend)
return st.valid
}
func (it *symmDiffIterator) curr() *iterState {
if it.yieldFromA {
return &it.a
}
return &it.b
}
// FromA returns true if the current node is sourced from A.
func (it *symmDiffIterator) FromA() bool {
return it.yieldFromA
}
// CommonPath returns true if a node with the current path exists in each sub-iterator - i.e. it
// represents an updated node.
func (it *symmDiffIterator) CommonPath() bool {
return it.count-it.eqPathIndex <= 1
}
func (it *symmDiffIterator) Hash() common.Hash {
return it.curr().Hash()
}
func (it *symmDiffIterator) Parent() common.Hash {
return it.curr().Parent()
}
func (it *symmDiffIterator) Leaf() bool {
return it.curr().Leaf()
}
func (it *symmDiffIterator) LeafKey() []byte {
return it.curr().LeafKey()
}
func (it *symmDiffIterator) LeafBlob() []byte {
return it.curr().LeafBlob()
}
func (it *symmDiffIterator) LeafProof() [][]byte {
return it.curr().LeafProof()
}
func (it *symmDiffIterator) Path() []byte {
return it.curr().Path()
}
func (it *symmDiffIterator) NodeBlob() []byte {
return it.curr().NodeBlob()
}
func (it *symmDiffIterator) AddResolver(resolver trie.NodeResolver) {
panic("not implemented")
}
func (it *symmDiffIterator) Next(bool) bool {
// NodeIterators start in a "pre-valid" state, so the first Next advances to a valid node.
if it.count == 0 {
if it.a.Next(true) {
it.count++
}
if it.b.Next(true) {
it.count++
}
} else {
if it.curr().Next(true) {
it.count++
}
}
it.seek()
return it.a.valid || it.b.valid
}
func (it *symmDiffIterator) seek() {
// Invariants:
// - At the end of the function, the sub-iterator with the lexically lesser path
// points to the next element
// - Said sub-iterator never points to an element present in the other
for {
if !it.b.valid {
it.yieldFromA = true
return
}
if !it.a.valid {
it.yieldFromA = false
return
}
cmp := bytes.Compare(it.a.Path(), it.b.Path())
if cmp == 0 {
it.eqPathIndex = it.count
cmp = compareNodes(&it.a, &it.b)
}
switch cmp {
case -1:
it.yieldFromA = true
return
case 1:
it.yieldFromA = false
return
case 0:
// if A and B have the same path and non-zero hash, they are identical and we can skip
// the whole subtree
noHash := it.a.Hash() == common.Hash{}
if it.a.Next(noHash) {
it.count++
}
if it.b.Next(noHash) {
it.count++
}
}
}
}
func (it *symmDiffIterator) Error() error {
if err := it.a.Error(); err != nil {
return err
}
return it.b.Error()
}
func compareNodes(a, b trie.NodeIterator) int {
if a.Leaf() && !b.Leaf() {
return -1
} else if b.Leaf() && !a.Leaf() {
return 1
}
if cmp := bytes.Compare(a.Hash().Bytes(), b.Hash().Bytes()); cmp != 0 {
return cmp
}
if a.Leaf() && b.Leaf() {
return bytes.Compare(a.LeafBlob(), b.LeafBlob())
}
return 0
}

224
utils/iterator_test.go Normal file
View File

@ -0,0 +1,224 @@
package utils_test
import (
"testing"
"github.com/cerc-io/eth-testing/chaindata/mainnet"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/trie"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/cerc-io/plugeth-statediff/test_helpers"
"github.com/cerc-io/plugeth-statediff/utils"
)
type kvs struct{ k, v string }
var (
testdata1 = []kvs{
{"barb", "ba"},
{"bard", "bc"},
{"bars", "bb"},
{"bar", "b"},
{"fab", "z"},
{"food", "ab"},
{"foo", "a"},
}
testdata2 = []kvs{
{"aardvark", "c"},
{"bar", "b"},
{"barb", "bd"},
{"bars", "be"},
{"fab", "z"},
{"foo", "a"},
{"foos", "aa"},
{"jars", "d"},
}
)
func TestSymmetricDifferenceIterator(t *testing.T) {
t.Run("with no difference", func(t *testing.T) {
db := trie.NewDatabase(rawdb.NewMemoryDatabase())
triea := trie.NewEmpty(db)
di, count := utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), triea.NodeIterator(nil))
for di.Next(true) {
t.Errorf("iterator should not yield any elements")
}
assert.Equal(t, 0, *count)
triea.MustUpdate([]byte("foo"), []byte("bar"))
di, count = utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), triea.NodeIterator(nil))
for di.Next(true) {
t.Errorf("iterator should not yield any elements")
}
assert.Equal(t, 2, *count)
trieb := trie.NewEmpty(db)
di, count = utils.NewSymmetricDifferenceIterator(
triea.NodeIterator([]byte("jars")),
trieb.NodeIterator(nil))
for di.Next(true) {
t.Errorf("iterator should not yield any elements, but got key %s", di.Path())
}
assert.Equal(t, 0, *count)
// // TODO will fail until merged: https://github.com/ethereum/go-ethereum/pull/27838
// di, count = utils.NewSymmetricDifferenceIterator(
// triea.NodeIterator([]byte("food")),
// trieb.NodeIterator(nil))
// for di.Next(true) {
// t.Errorf("iterator should not yield any elements, but got key %s", di.Path())
// }
// assert.Equal(t, 0, *count)
})
t.Run("small difference", func(t *testing.T) {
dba := trie.NewDatabase(rawdb.NewMemoryDatabase())
triea := trie.NewEmpty(dba)
dbb := trie.NewDatabase(rawdb.NewMemoryDatabase())
trieb := trie.NewEmpty(dbb)
trieb.MustUpdate([]byte("foo"), []byte("bar"))
di, count := utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), trieb.NodeIterator(nil))
leaves := 0
for di.Next(true) {
if di.Leaf() {
assert.False(t, di.CommonPath())
assert.Equal(t, "foo", string(di.LeafKey()))
assert.Equal(t, "bar", string(di.LeafBlob()))
leaves++
}
}
assert.Equal(t, 1, leaves)
assert.Equal(t, 2, *count)
trieb.MustUpdate([]byte("quux"), []byte("bars"))
di, count = utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), trieb.NodeIterator([]byte("quux")))
leaves = 0
for di.Next(true) {
if di.Leaf() {
assert.False(t, di.CommonPath())
assert.Equal(t, "quux", string(di.LeafKey()))
assert.Equal(t, "bars", string(di.LeafBlob()))
leaves++
}
}
assert.Equal(t, 1, leaves)
assert.Equal(t, 1, *count)
})
dba := trie.NewDatabase(rawdb.NewMemoryDatabase())
triea := trie.NewEmpty(dba)
for _, val := range testdata1 {
triea.MustUpdate([]byte(val.k), []byte(val.v))
}
dbb := trie.NewDatabase(rawdb.NewMemoryDatabase())
trieb := trie.NewEmpty(dbb)
for _, val := range testdata2 {
trieb.MustUpdate([]byte(val.k), []byte(val.v))
}
onlyA := make(map[string]string)
onlyB := make(map[string]string)
var deletions, creations []string
it, _ := utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), trieb.NodeIterator(nil))
for it.Next(true) {
if !it.Leaf() {
continue
}
key, value := string(it.LeafKey()), string(it.LeafBlob())
if it.FromA() {
onlyA[key] = value
if !it.CommonPath() {
deletions = append(deletions, key)
}
} else {
onlyB[key] = value
if !it.CommonPath() {
creations = append(creations, key)
}
}
}
expectedOnlyA := map[string]string{
"barb": "ba",
"bard": "bc",
"bars": "bb",
"food": "ab",
}
expectedOnlyB := map[string]string{
"aardvark": "c",
"barb": "bd",
"bars": "be",
"foos": "aa",
"jars": "d",
}
expectedDeletions := []string{
"bard",
"food",
}
expectedCreations := []string{
"aardvark",
"foos",
"jars",
}
assert.Equal(t, expectedOnlyA, onlyA)
assert.Equal(t, expectedOnlyB, onlyB)
assert.Equal(t, expectedDeletions, deletions)
assert.Equal(t, expectedCreations, creations)
}
// compare the paths traversed by the geth difference iterator and symmetric difference iterator
// within a sample of mainnet data.
func TestCompareDifferenceIterators(t *testing.T) {
test_helpers.QuietLogs()
db := rawdb.NewMemoryDatabase()
core.DefaultGenesisBlock().MustCommit(db)
blocks := mainnet.GetBlocks()
chain, _ := core.NewBlockChain(db, nil, nil, nil, ethash.NewFaker(), vm.Config{}, nil, nil)
_, err := chain.InsertChain(blocks[1:])
if err != nil {
t.Fatal(err)
}
treeA, err := chain.StateCache().OpenTrie(blocks[1].Root())
if err != nil {
t.Fatal(err)
}
treeB, err := chain.StateCache().OpenTrie(blocks[2].Root())
if err != nil {
t.Fatal(err)
}
// collect the paths of nodes exclusive to A and B separately, then make sure the symmetric
// iterator produces the same sets
var pathsA, pathsB [][]byte
itBonly, _ := trie.NewDifferenceIterator(treeA.NodeIterator(nil), treeB.NodeIterator(nil))
for itBonly.Next(true) {
pathsB = append(pathsB, itBonly.Path())
}
itAonly, _ := trie.NewDifferenceIterator(treeB.NodeIterator(nil), treeA.NodeIterator(nil))
for itAonly.Next(true) {
pathsA = append(pathsA, itAonly.Path())
}
itSym, _ := utils.NewSymmetricDifferenceIterator(treeA.NodeIterator(nil), treeB.NodeIterator(nil))
var idxA, idxB int
for itSym.Next(true) {
if itSym.FromA() {
require.Equal(t, pathsA[idxA], itSym.Path())
idxA++
} else {
require.Equal(t, pathsB[idxB], itSym.Path())
idxB++
}
}
require.Equal(t, len(pathsA), idxA)
require.Equal(t, len(pathsB), idxB)
}

View File

@ -1,9 +1,12 @@
package utils package utils
import ( import (
"encoding/json"
"fmt" "fmt"
"os" "os"
"github.com/cerc-io/plugeth-statediff/utils/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
) )
@ -21,3 +24,38 @@ func MustDecode[T any](buf []byte) *T {
} }
return &ret return &ret
} }
// LoadConfig loads chain config from json file
func LoadConfig(chainConfigPath string) (*params.ChainConfig, error) {
file, err := os.Open(chainConfigPath)
if err != nil {
log.Error("Failed to read chain config file", "error", err)
return nil, err
}
defer file.Close()
chainConfig := new(params.ChainConfig)
if err := json.NewDecoder(file).Decode(chainConfig); err != nil {
log.Error("invalid chain config file", "error", err)
return nil, err
}
log.Debug("Using chain config", "path", chainConfigPath, "content", chainConfig)
return chainConfig, nil
}
// ChainConfig returns the appropriate ethereum chain config for the provided chain id
func ChainConfig(chainID uint64) (*params.ChainConfig, error) {
switch chainID {
case 1:
return params.MainnetChainConfig, nil
case 4:
return params.RinkebyChainConfig, nil
case 5:
return params.GoerliChainConfig, nil
default:
return nil, fmt.Errorf("chain config for chainid %d not available", chainID)
}
}