tests: update ipld-eth-db image #13

Closed
roysc wants to merge 2 commits from update-ipld-eth-db into main
35 changed files with 1037 additions and 845 deletions
Showing only changes of commit 56a98a84e5 - Show all commits

View File

@ -11,49 +11,49 @@ on:
# Needed until we can incorporate docker startup into the executor container # Needed until we can incorporate docker startup into the executor container
env: env:
DOCKER_HOST: unix:///var/run/dind.sock DOCKER_HOST: unix:///var/run/dind.sock
SO_VERSION: v1.1.0-c30c779-202309082138
jobs: jobs:
unit-tests: unit-tests:
name: "Run unit tests" name: Run unit tests
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v3
- uses: actions/setup-go@v3 - uses: actions/setup-go@v4
with: with:
go-version-file: 'go.mod' go-version-file: 'go.mod'
check-latest: true check-latest: true
- name: "Run dockerd" - name: Run dockerd
run: | run: |
dockerd -H $DOCKER_HOST --userland-proxy=false & dockerd -H $DOCKER_HOST --userland-proxy=false &
sleep 5 sleep 5
- name: "Run DB container" - name: Run DB container
run: | run: |
docker compose -f test/compose.yml up --wait docker compose -f test/compose.yml up --wait
- name: "Set up Gitea access token" - name: Set up Gitea access token
env: env:
TOKEN: ${{ secrets.CICD_REPO_TOKEN }} TOKEN: ${{ secrets.CICD_REPO_TOKEN }}
run: | run: |
git config --global url."https://$TOKEN:@git.vdb.to/".insteadOf https://git.vdb.to/ git config --global url."https://$TOKEN:@git.vdb.to/".insteadOf https://git.vdb.to/
- name: "Run tests" - name: Run tests
run: go test -v ./... run: go test -p 1 -v ./...
integration-tests: integration-tests:
name: "Run integration tests" name: Run integration tests
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v3
with: with:
path: ./plugeth-statediff path: ./plugeth-statediff
# TODO: replace with release
- uses: actions/checkout@v3 - uses: actions/checkout@v3
with: with:
repository: cerc-io/plugeth repository: cerc-io/plugeth
ref: statediff-wip ref: statediff
path: ./plugeth path: ./plugeth
- name: "Run dockerd" - name: Run dockerd
run: dockerd -H $DOCKER_HOST --userland-proxy=false & run: dockerd -H $DOCKER_HOST --userland-proxy=false &
# These images need access tokens configured # These images need access tokens configured
- name: "Build docker image" - name: Build docker image
env: env:
TOKEN: ${{ secrets.CICD_REPO_TOKEN }} TOKEN: ${{ secrets.CICD_REPO_TOKEN }}
run: | run: |
@ -63,25 +63,27 @@ jobs:
docker build ./plugeth -t cerc/plugeth:local \ docker build ./plugeth -t cerc/plugeth:local \
--build-arg GIT_VDBTO_TOKEN="$TOKEN" --build-arg GIT_VDBTO_TOKEN="$TOKEN"
- name: "Install stack-orchestrator" - name: Install stack-orchestrator
uses: actions/checkout@v3 run: |
with: curl -L -O https://github.com/cerc-io/stack-orchestrator/releases/download/$SO_VERSION/laconic-so
repository: roysc/stack-orchestrator chmod +x laconic-so
ref: plugeth-testing - name: Clone system-tests
path: ./stack-orchestrator
- run: |
apt-get update && apt-get install -y python3-pip
pip install ./stack-orchestrator
- name: "Run testnet stack"
working-directory: ./plugeth-statediff
run: ./scripts/integration-setup.sh
- name: "Clone system-tests"
uses: actions/checkout@v3 uses: actions/checkout@v3
with: with:
repository: cerc-io/system-tests repository: cerc-io/system-tests
ref: main ref: plugeth-compat
path: ./system-tests path: ./system-tests
- name: "Run tests" token: ${{ secrets.CICD_REPO_TOKEN }}
- name: Run testnet stack
working-directory: ./plugeth-statediff
env:
LACONIC_SO: ../laconic-so
run: ./scripts/integration-setup.sh
- name: Install Python
uses: actions/setup-python@v4
with:
python-version: 3.10
- name: Run tests
working-directory: ./system-tests working-directory: ./system-tests
run: | run: |
pip install pytest pip install pytest

View File

@ -9,7 +9,8 @@ $(MOCKS_DIR)/gen_backend.go:
github.com/openrelayxyz/plugeth-utils/core Backend,Downloader github.com/openrelayxyz/plugeth-utils/core Backend,Downloader
docker-image: mocks docker-image: mocks
docker build . -t "cerc/plugeth-statediff:local" docker build . -t "cerc/plugeth-statediff:local" \
--build-arg GIT_VDBTO_TOKEN
.PHONY: docker-image .PHONY: docker-image
# Local build # Local build

View File

@ -21,21 +21,23 @@ package statediff
import ( import (
"bytes" "bytes"
"encoding/hex" "context"
"fmt" "fmt"
"sync"
"time" "time"
iterutils "github.com/cerc-io/eth-iterator-utils"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie"
"golang.org/x/sync/errgroup"
"github.com/cerc-io/plugeth-statediff/adapt" "github.com/cerc-io/plugeth-statediff/adapt"
"github.com/cerc-io/plugeth-statediff/indexer/database/metrics" "github.com/cerc-io/plugeth-statediff/indexer/database/metrics"
"github.com/cerc-io/plugeth-statediff/indexer/ipld" "github.com/cerc-io/plugeth-statediff/indexer/ipld"
"github.com/cerc-io/plugeth-statediff/indexer/shared" "github.com/cerc-io/plugeth-statediff/indexer/shared"
"github.com/cerc-io/plugeth-statediff/trie_helpers"
sdtypes "github.com/cerc-io/plugeth-statediff/types" sdtypes "github.com/cerc-io/plugeth-statediff/types"
"github.com/cerc-io/plugeth-statediff/utils" "github.com/cerc-io/plugeth-statediff/utils"
"github.com/cerc-io/plugeth-statediff/utils/log" "github.com/cerc-io/plugeth-statediff/utils/log"
@ -44,56 +46,70 @@ import (
var ( var (
emptyNode, _ = rlp.EncodeToBytes(&[]byte{}) emptyNode, _ = rlp.EncodeToBytes(&[]byte{})
emptyContractRoot = crypto.Keccak256Hash(emptyNode) emptyContractRoot = crypto.Keccak256Hash(emptyNode)
nullCodeHash = crypto.Keccak256Hash([]byte{}).Bytes() nullCodeHash = crypto.Keccak256([]byte{})
zeroHash common.Hash zeroHash common.Hash
defaultSubtrieWorkers uint = 1
) )
// Builder interface exposes the method for building a state diff between two blocks // Builder interface exposes the method for building a state diff between two blocks
type Builder interface { type Builder interface {
BuildStateDiffObject(args Args, params Params) (sdtypes.StateObject, error) BuildStateDiffObject(Args, Params) (sdtypes.StateObject, error)
WriteStateDiffObject(args Args, params Params, output sdtypes.StateNodeSink, ipldOutput sdtypes.IPLDSink) error WriteStateDiff(Args, Params, sdtypes.StateNodeSink, sdtypes.IPLDSink) error
} }
type StateDiffBuilder struct { type StateDiffBuilder struct {
StateCache adapt.StateView // state cache is safe for concurrent reads
stateCache adapt.StateView
subtrieWorkers uint
} }
type IterPair struct { type accountUpdate struct {
Older, Newer trie.NodeIterator new sdtypes.AccountWrapper
oldRoot common.Hash
} }
type accountUpdateMap map[string]*accountUpdate
func StateNodeAppender(nodes *[]sdtypes.StateLeafNode) sdtypes.StateNodeSink { func appender[T any](to *[]T) func(T) error {
return func(node sdtypes.StateLeafNode) error { return func(a T) error {
*nodes = append(*nodes, node) *to = append(*to, a)
return nil return nil
} }
} }
func StorageNodeAppender(nodes *[]sdtypes.StorageLeafNode) sdtypes.StorageNodeSink {
return func(node sdtypes.StorageLeafNode) error { func syncedAppender[T any](to *[]T) func(T) error {
*nodes = append(*nodes, node) var mtx sync.Mutex
return nil return func(a T) error {
} mtx.Lock()
} *to = append(*to, a)
func IPLDMappingAppender(iplds *[]sdtypes.IPLD) sdtypes.IPLDSink { mtx.Unlock()
return func(c sdtypes.IPLD) error {
*iplds = append(*iplds, c)
return nil return nil
} }
} }
// NewBuilder is used to create a statediff builder // NewBuilder is used to create a statediff builder
func NewBuilder(stateCache adapt.StateView) Builder { func NewBuilder(stateCache adapt.StateView) *StateDiffBuilder {
return &StateDiffBuilder{ return &StateDiffBuilder{
StateCache: stateCache, // state cache is safe for concurrent reads stateCache: stateCache,
subtrieWorkers: defaultSubtrieWorkers,
} }
} }
// SetSubtrieWorkers sets the number of disjoint subtries to divide among parallel workers.
// Passing 0 will reset this to the default value.
func (sdb *StateDiffBuilder) SetSubtrieWorkers(n uint) {
if n == 0 {
n = defaultSubtrieWorkers
}
sdb.subtrieWorkers = n
}
// BuildStateDiffObject builds a statediff object from two blocks and the provided parameters // BuildStateDiffObject builds a statediff object from two blocks and the provided parameters
func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (sdtypes.StateObject, error) { func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (sdtypes.StateObject, error) {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStateDiffObjectTimer) defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStateDiffObjectTimer)
var stateNodes []sdtypes.StateLeafNode var stateNodes []sdtypes.StateLeafNode
var iplds []sdtypes.IPLD var iplds []sdtypes.IPLD
err := sdb.WriteStateDiffObject(args, params, StateNodeAppender(&stateNodes), IPLDMappingAppender(&iplds)) err := sdb.WriteStateDiff(args, params, syncedAppender(&stateNodes), syncedAppender(&iplds))
if err != nil { if err != nil {
return sdtypes.StateObject{}, err return sdtypes.StateObject{}, err
} }
@ -105,170 +121,233 @@ func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (sdt
}, nil }, nil
} }
// WriteStateDiffObject writes a statediff object to output sinks // WriteStateDiff writes a statediff object to output sinks
func (sdb *StateDiffBuilder) WriteStateDiffObject(args Args, params Params, output sdtypes.StateNodeSink, func (sdb *StateDiffBuilder) WriteStateDiff(
ipldOutput sdtypes.IPLDSink) error { args Args, params Params,
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.WriteStateDiffObjectTimer) nodeSink sdtypes.StateNodeSink,
ipldSink sdtypes.IPLDSink,
) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.WriteStateDiffTimer)
// Load tries for old and new states // Load tries for old and new states
oldTrie, err := sdb.StateCache.OpenTrie(args.OldStateRoot) triea, err := sdb.stateCache.OpenTrie(args.OldStateRoot)
if err != nil { if err != nil {
return fmt.Errorf("error creating trie for oldStateRoot: %w", err) return fmt.Errorf("error opening old state trie: %w", err)
} }
newTrie, err := sdb.StateCache.OpenTrie(args.NewStateRoot) trieb, err := sdb.stateCache.OpenTrie(args.NewStateRoot)
if err != nil { if err != nil {
return fmt.Errorf("error creating trie for newStateRoot: %w", err) return fmt.Errorf("error opening new state trie: %w", err)
} }
subitersA := iterutils.SubtrieIterators(triea.NodeIterator, uint(sdb.subtrieWorkers))
subitersB := iterutils.SubtrieIterators(trieb.NodeIterator, uint(sdb.subtrieWorkers))
// we do two state trie iterations: logger := log.New("hash", args.BlockHash, "number", args.BlockNumber)
// one for new/updated nodes, // errgroup will cancel if any gr fails
// one for deleted/updated nodes; g, ctx := errgroup.WithContext(context.Background())
// prepare 2 iterator instances for each task for i := uint(0); i < sdb.subtrieWorkers; i++ {
iterPairs := []IterPair{ func(subdiv uint) {
{ g.Go(func() error {
Older: oldTrie.NodeIterator([]byte{}), a, b := subitersA[subdiv], subitersB[subdiv]
Newer: newTrie.NodeIterator([]byte{}), return sdb.processAccounts(ctx,
}, a, b, params.watchedAddressesLeafPaths,
{ nodeSink, ipldSink, logger,
Older: oldTrie.NodeIterator([]byte{}), )
Newer: newTrie.NodeIterator([]byte{}), })
}, }(i)
} }
return g.Wait()
logger := log.New("hash", args.BlockHash.String(), "number", args.BlockNumber)
return sdb.BuildStateDiff(iterPairs, params, output, ipldOutput, logger, nil)
} }
func (sdb *StateDiffBuilder) BuildStateDiff(iterPairs []IterPair, params Params, // processAccounts processes account creations and deletions, and returns a set of updated
output sdtypes.StateNodeSink, ipldOutput sdtypes.IPLDSink, logger log.Logger, prefixPath []byte) error { // existing accounts, indexed by leaf key.
logger.Trace("statediff BEGIN BuildStateDiff") func (sdb *StateDiffBuilder) processAccounts(
defer metrics.ReportAndUpdateDuration("statediff END BuildStateDiff", time.Now(), logger, metrics.IndexerMetrics.BuildStateDiffTimer) ctx context.Context,
// collect a slice of all the nodes that were touched and exist at B (B-A) a, b trie.NodeIterator, watchedAddressesLeafPaths [][]byte,
// a map of their leafkey to all the accounts that were touched and exist at B nodeSink sdtypes.StateNodeSink, ipldSink sdtypes.IPLDSink,
// and a slice of all the paths for the nodes in both of the above sets logger log.Logger,
diffAccountsAtB, err := sdb.createdAndUpdatedState( ) error {
iterPairs[0].Older, iterPairs[0].Newer, params.watchedAddressesLeafPaths, ipldOutput, logger, prefixPath) logger.Trace("statediff/processAccounts BEGIN")
if err != nil { defer metrics.ReportAndUpdateDuration("statediff/processAccounts END",
return fmt.Errorf("error collecting createdAndUpdatedNodes: %w", err) time.Now(), logger, metrics.IndexerMetrics.ProcessAccountsTimer)
}
// collect a slice of all the nodes that existed at a path in A that doesn't exist in B updates := make(accountUpdateMap)
// a map of their leafkey to all the accounts that were touched and exist at A // Cache the RLP of the previous node. When we hit a value node this will be the parent blob.
diffAccountsAtA, err := sdb.deletedOrUpdatedState(
iterPairs[1].Older, iterPairs[1].Newer, diffAccountsAtB,
params.watchedAddressesLeafPaths, output, logger, prefixPath)
if err != nil {
return fmt.Errorf("error collecting deletedOrUpdatedNodes: %w", err)
}
// collect and sort the leafkey keys for both account mappings into a slice
t := time.Now()
createKeys := trie_helpers.SortKeys(diffAccountsAtB)
deleteKeys := trie_helpers.SortKeys(diffAccountsAtA)
logger.Debug("statediff BuildStateDiff sort", "duration", time.Since(t))
// and then find the intersection of these keys
// these are the leafkeys for the accounts which exist at both A and B but are different
// this also mutates the passed in createKeys and deleteKeys, removing the intersection keys
// and leaving the truly created or deleted keys in place
t = time.Now()
updatedKeys := trie_helpers.FindIntersection(createKeys, deleteKeys)
logger.Debug("statediff BuildStateDiff intersection",
"count", len(updatedKeys),
"duration", time.Since(t))
// build the diff nodes for the updated accounts using the mappings at both A and B as directed by the keys found as the intersection of the two
err = sdb.buildAccountUpdates(diffAccountsAtB, diffAccountsAtA, updatedKeys, output, ipldOutput, logger)
if err != nil {
return fmt.Errorf("error building diff for updated accounts: %w", err)
}
// build the diff nodes for created accounts
err = sdb.buildAccountCreations(diffAccountsAtB, output, ipldOutput, logger)
if err != nil {
return fmt.Errorf("error building diff for created accounts: %w", err)
}
return nil
}
// createdAndUpdatedState returns
// a slice of all the intermediate nodes that exist in a different state at B than A
// a mapping of their leafkeys to all the accounts that exist in a different state at B than A
// and a slice of the paths for all of the nodes included in both
func (sdb *StateDiffBuilder) createdAndUpdatedState(a, b trie.NodeIterator,
watchedAddressesLeafPaths [][]byte, output sdtypes.IPLDSink, logger log.Logger, prefixPath []byte) (sdtypes.AccountMap, error) {
logger.Trace("statediff BEGIN createdAndUpdatedState")
defer metrics.ReportAndUpdateDuration("statediff END createdAndUpdatedState", time.Now(), logger, metrics.IndexerMetrics.CreatedAndUpdatedStateTimer)
diffAccountsAtB := make(sdtypes.AccountMap)
// cache the RLP of the previous node, so when we hit a leaf we have the parent (containing) node
var prevBlob []byte var prevBlob []byte
it, itCount := trie.NewDifferenceIterator(a, b) it, itCount := utils.NewSymmetricDifferenceIterator(a, b)
prevBlob = it.NodeBlob()
for it.Next(true) { for it.Next(true) {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// ignore node if it is not along paths of interest // ignore node if it is not along paths of interest
if !isWatchedPathPrefix(watchedAddressesLeafPaths, it.Path()) { if !isWatchedPathPrefix(watchedAddressesLeafPaths, it.Path()) {
continue continue
} }
// index values by leaf key if it.FromA() { // Node exists in the old trie
if it.Leaf() {
var account types.StateAccount
if err := rlp.DecodeBytes(it.LeafBlob(), &account); err != nil {
return err
}
leafKey := make([]byte, len(it.LeafKey()))
copy(leafKey, it.LeafKey())
if it.CommonPath() {
// If B also contains this leaf node, this is the old state of an updated account.
if update, ok := updates[string(leafKey)]; ok {
update.oldRoot = account.Root
} else {
updates[string(leafKey)] = &accountUpdate{oldRoot: account.Root}
}
} else {
// This node was removed, meaning the account was deleted. Emit empty
// "removed" records for the state node and all storage all storage slots.
err := sdb.processAccountDeletion(leafKey, account, nodeSink)
if err != nil {
return err
}
}
}
continue
}
// Node exists in the new trie
if it.Leaf() { if it.Leaf() {
// if it is a "value" node, we will index the value by leaf key accountW, err := sdb.decodeStateLeaf(it, prevBlob)
accountW, err := sdb.processStateValueNode(it, prevBlob)
if err != nil { if err != nil {
return nil, err return err
} }
if accountW == nil {
continue if it.CommonPath() {
// If A also contains this leaf node, this is the new state of an updated account.
if update, ok := updates[string(accountW.LeafKey)]; ok {
update.new = *accountW
} else {
updates[string(accountW.LeafKey)] = &accountUpdate{new: *accountW}
}
} else { // account was created
err := sdb.processAccountCreation(accountW, ipldSink, nodeSink)
if err != nil {
return err
}
} }
// for now, just add it to diffAccountsAtB
// we will compare to diffAccountsAtA to determine which diffAccountsAtB
// were creations and which were updates and also identify accounts that were removed going A->B
diffAccountsAtB[hex.EncodeToString(accountW.LeafKey)] = *accountW
} else { } else {
// trie nodes will be written to blockstore only // New trie nodes will be written to blockstore only.
// reminder that this includes leaf nodes, since the geth iterator.Leaf() actually // Reminder: this includes leaf nodes, since the geth iterator.Leaf() actually
// signifies a "value" node // signifies a "value" node.
if it.Hash() == zeroHash { if it.Hash() == zeroHash {
continue continue
} }
// TODO - this can be handled when value node is (craeted?)
nodeVal := make([]byte, len(it.NodeBlob())) nodeVal := make([]byte, len(it.NodeBlob()))
copy(nodeVal, it.NodeBlob()) copy(nodeVal, it.NodeBlob())
// if doing a selective diff, we need to ensure this is a watched path // if doing a selective diff, we need to ensure this is a watched path
if len(watchedAddressesLeafPaths) > 0 { if len(watchedAddressesLeafPaths) > 0 {
var elements []interface{} var elements []interface{}
if err := rlp.DecodeBytes(nodeVal, &elements); err != nil { if err := rlp.DecodeBytes(nodeVal, &elements); err != nil {
return nil, err return err
} }
ok, err := isLeaf(elements) ok, err := isLeaf(elements)
if err != nil { if err != nil {
return nil, err return err
} }
partialPath := utils.CompactToHex(elements[0].([]byte)) if ok {
valueNodePath := append(it.Path(), partialPath...) partialPath := utils.CompactToHex(elements[0].([]byte))
if ok && !isWatchedPath(watchedAddressesLeafPaths, valueNodePath) { valueNodePath := append(it.Path(), partialPath...)
continue if !isWatchedPath(watchedAddressesLeafPaths, valueNodePath) {
continue
}
} }
} }
if err := output(sdtypes.IPLD{ if err := ipldSink(sdtypes.IPLD{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, it.Hash().Bytes()).String(), CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, it.Hash().Bytes()).String(),
Content: nodeVal, Content: nodeVal,
}); err != nil { }); err != nil {
return nil, err return err
} }
prevBlob = nodeVal prevBlob = nodeVal
} }
} }
logger.Debug("statediff COUNTS createdAndUpdatedState", "it", itCount, "diffAccountsAtB", len(diffAccountsAtB))
for key, update := range updates {
var storageDiff []sdtypes.StorageLeafNode
err := sdb.processStorageUpdates(
update.oldRoot, update.new.Account.Root,
appender(&storageDiff), ipldSink,
)
if err != nil {
return fmt.Errorf("error processing incremental storage diffs for account with leafkey %x\r\nerror: %w", key, err)
}
if err = nodeSink(sdtypes.StateLeafNode{
AccountWrapper: update.new,
StorageDiff: storageDiff,
}); err != nil {
return err
}
}
metrics.IndexerMetrics.DifferenceIteratorCounter.Inc(int64(*itCount)) metrics.IndexerMetrics.DifferenceIteratorCounter.Inc(int64(*itCount))
return diffAccountsAtB, it.Error() return it.Error()
}
func (sdb *StateDiffBuilder) processAccountDeletion(
leafKey []byte, account types.StateAccount, nodeSink sdtypes.StateNodeSink,
) error {
diff := sdtypes.StateLeafNode{
AccountWrapper: sdtypes.AccountWrapper{
LeafKey: leafKey,
CID: shared.RemovedNodeStateCID,
},
Removed: true,
}
err := sdb.processRemovedAccountStorage(account.Root, appender(&diff.StorageDiff))
if err != nil {
return fmt.Errorf("failed building storage diffs for removed state account with key %x\r\nerror: %w", leafKey, err)
}
return nodeSink(diff)
}
func (sdb *StateDiffBuilder) processAccountCreation(
accountW *sdtypes.AccountWrapper, ipldSink sdtypes.IPLDSink, nodeSink sdtypes.StateNodeSink,
) error {
diff := sdtypes.StateLeafNode{
AccountWrapper: *accountW,
}
if !bytes.Equal(accountW.Account.CodeHash, nullCodeHash) {
// For contract creations, any storage node contained is a diff
err := sdb.processStorageCreations(accountW.Account.Root, appender(&diff.StorageDiff), ipldSink)
if err != nil {
return fmt.Errorf("failed building eventual storage diffs for node with leaf key %x\r\nerror: %w", accountW.LeafKey, err)
}
// emit codehash => code mappings for contract
codeHash := common.BytesToHash(accountW.Account.CodeHash)
code, err := sdb.stateCache.ContractCode(codeHash)
if err != nil {
return fmt.Errorf("failed to retrieve code for codehash %s\r\n error: %w", codeHash, err)
}
if err := ipldSink(sdtypes.IPLD{
CID: ipld.Keccak256ToCid(ipld.RawBinary, codeHash.Bytes()).String(),
Content: code,
}); err != nil {
return err
}
}
return nodeSink(diff)
} }
// decodes account at leaf and encodes RLP data to CID // decodes account at leaf and encodes RLP data to CID
// reminder: it.Leaf() == true when the iterator is positioned at a "value node" (which is not something // reminder: it.Leaf() == true when the iterator is positioned at a "value node" (which is not something
// that actually exists in an MMPT), therefore we pass the parent node blob as the leaf RLP. // that actually exists in an MMPT), therefore we pass the parent node blob as the leaf RLP.
func (sdb *StateDiffBuilder) processStateValueNode(it trie.NodeIterator, parentBlob []byte) (*sdtypes.AccountWrapper, error) { func (sdb *StateDiffBuilder) decodeStateLeaf(it trie.NodeIterator, parentBlob []byte) (*sdtypes.AccountWrapper, error) {
var account types.StateAccount var account types.StateAccount
if err := rlp.DecodeBytes(it.LeafBlob(), &account); err != nil { if err := rlp.DecodeBytes(it.LeafBlob(), &account); err != nil {
return nil, fmt.Errorf("error decoding account at leaf key %x: %w", it.LeafKey(), err) return nil, fmt.Errorf("error decoding account at leaf key %x: %w", it.LeafKey(), err)
} }
leafKey := make([]byte, len(it.LeafKey()))
copy(leafKey, it.LeafKey())
return &sdtypes.AccountWrapper{ return &sdtypes.AccountWrapper{
LeafKey: it.LeafKey(), LeafKey: it.LeafKey(),
Account: &account, Account: &account,
@ -276,176 +355,34 @@ func (sdb *StateDiffBuilder) processStateValueNode(it trie.NodeIterator, parentB
}, nil }, nil
} }
// deletedOrUpdatedState returns a slice of all the paths that are emptied at B // processStorageCreations processes the storage node records for a newly created account
// and a mapping of their leafkeys to all the accounts that exist in a different state at A than B // i.e. it returns all the storage nodes at this state, since there is no previous state.
func (sdb *StateDiffBuilder) deletedOrUpdatedState(a, b trie.NodeIterator, diffAccountsAtB sdtypes.AccountMap, func (sdb *StateDiffBuilder) processStorageCreations(
watchedAddressesLeafPaths [][]byte, output sdtypes.StateNodeSink, logger log.Logger, prefixPath []byte) (sdtypes.AccountMap, error) { sr common.Hash, storageSink sdtypes.StorageNodeSink, ipldSink sdtypes.IPLDSink,
logger.Trace("statediff BEGIN deletedOrUpdatedState") ) error {
defer metrics.ReportAndUpdateDuration("statediff END deletedOrUpdatedState", time.Now(), logger, metrics.IndexerMetrics.DeletedOrUpdatedStateTimer) defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.ProcessStorageCreationsTimer)
diffAccountAtA := make(sdtypes.AccountMap)
var prevBlob []byte
it, _ := trie.NewDifferenceIterator(b, a)
for it.Next(true) {
if !isWatchedPathPrefix(watchedAddressesLeafPaths, it.Path()) {
continue
}
if it.Leaf() {
accountW, err := sdb.processStateValueNode(it, prevBlob)
if err != nil {
return nil, err
}
if accountW == nil {
continue
}
leafKey := hex.EncodeToString(accountW.LeafKey)
diffAccountAtA[leafKey] = *accountW
// if this node's leaf key did not show up in diffAccountsAtB
// that means the account was deleted
// in that case, emit an empty "removed" diff state node
// include empty "removed" diff storage nodes for all the storage slots
if _, ok := diffAccountsAtB[leafKey]; !ok {
diff := sdtypes.StateLeafNode{
AccountWrapper: sdtypes.AccountWrapper{
Account: nil,
LeafKey: accountW.LeafKey,
CID: shared.RemovedNodeStateCID,
},
Removed: true,
}
storageDiff := make([]sdtypes.StorageLeafNode, 0)
err := sdb.buildRemovedAccountStorageNodes(accountW.Account.Root, StorageNodeAppender(&storageDiff))
if err != nil {
return nil, fmt.Errorf("failed building storage diffs for removed state account with key %x\r\nerror: %w", leafKey, err)
}
diff.StorageDiff = storageDiff
if err := output(diff); err != nil {
return nil, err
}
}
} else {
prevBlob = make([]byte, len(it.NodeBlob()))
copy(prevBlob, it.NodeBlob())
}
}
return diffAccountAtA, it.Error()
}
// buildAccountUpdates uses the account diffs maps for A => B and B => A and the known intersection of their leafkeys
// to generate the statediff node objects for all of the accounts that existed at both A and B but in different states
// needs to be called before building account creations and deletions as this mutates
// those account maps to remove the accounts which were updated
func (sdb *StateDiffBuilder) buildAccountUpdates(creations, deletions sdtypes.AccountMap, updatedKeys []string,
output sdtypes.StateNodeSink, ipldOutput sdtypes.IPLDSink, logger log.Logger) error {
logger.Trace("statediff BEGIN buildAccountUpdates",
"creations", len(creations), "deletions", len(deletions), "updated", len(updatedKeys))
defer metrics.ReportAndUpdateDuration("statediff END buildAccountUpdates ",
time.Now(), logger, metrics.IndexerMetrics.BuildAccountUpdatesTimer)
var err error
for _, key := range updatedKeys {
createdAcc := creations[key]
deletedAcc := deletions[key]
storageDiff := make([]sdtypes.StorageLeafNode, 0)
if deletedAcc.Account != nil && createdAcc.Account != nil {
err = sdb.buildStorageNodesIncremental(
deletedAcc.Account.Root, createdAcc.Account.Root,
StorageNodeAppender(&storageDiff), ipldOutput,
)
if err != nil {
return fmt.Errorf("failed building incremental storage diffs for account with leafkey %x\r\nerror: %w", key, err)
}
}
if err = output(sdtypes.StateLeafNode{
AccountWrapper: createdAcc,
Removed: false,
StorageDiff: storageDiff,
}); err != nil {
return err
}
delete(creations, key)
delete(deletions, key)
}
return nil
}
// buildAccountCreations returns the statediff node objects for all the accounts that exist at B but not at A
// it also returns the code and codehash for created contract accounts
func (sdb *StateDiffBuilder) buildAccountCreations(accounts sdtypes.AccountMap, output sdtypes.StateNodeSink,
ipldOutput sdtypes.IPLDSink, logger log.Logger) error {
logger.Trace("statediff BEGIN buildAccountCreations")
defer metrics.ReportAndUpdateDuration("statediff END buildAccountCreations",
time.Now(), logger, metrics.IndexerMetrics.BuildAccountCreationsTimer)
for _, val := range accounts {
diff := sdtypes.StateLeafNode{
AccountWrapper: val,
Removed: false,
}
if !bytes.Equal(val.Account.CodeHash, nullCodeHash) {
// For contract creations, any storage node contained is a diff
storageDiff := make([]sdtypes.StorageLeafNode, 0)
err := sdb.buildStorageNodesEventual(val.Account.Root, StorageNodeAppender(&storageDiff), ipldOutput)
if err != nil {
return fmt.Errorf("failed building eventual storage diffs for node with leaf key %x\r\nerror: %w", val.LeafKey, err)
}
diff.StorageDiff = storageDiff
// emit codehash => code mappings for contract
codeHash := common.BytesToHash(val.Account.CodeHash)
code, err := sdb.StateCache.ContractCode(codeHash)
if err != nil {
return fmt.Errorf("failed to retrieve code for codehash %x\r\n error: %w", codeHash, err)
}
if err := ipldOutput(sdtypes.IPLD{
CID: ipld.Keccak256ToCid(ipld.RawBinary, codeHash.Bytes()).String(),
Content: code,
}); err != nil {
return err
}
}
if err := output(diff); err != nil {
return err
}
}
return nil
}
// buildStorageNodesEventual builds the storage diff node objects for a created account
// i.e. it returns all the storage nodes at this state, since there is no previous state
func (sdb *StateDiffBuilder) buildStorageNodesEventual(sr common.Hash, output sdtypes.StorageNodeSink,
ipldOutput sdtypes.IPLDSink) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStorageNodesEventualTimer)
if sr == emptyContractRoot { if sr == emptyContractRoot {
return nil return nil
} }
log.Debug("Storage root for eventual diff", "root", sr.String()) log.Debug("Storage root for eventual diff", "root", sr)
sTrie, err := sdb.StateCache.OpenTrie(sr) sTrie, err := sdb.stateCache.OpenTrie(sr)
if err != nil { if err != nil {
log.Info("error in build storage diff eventual", "error", err) log.Info("error in build storage diff eventual", "error", err)
return err return err
} }
it := sTrie.NodeIterator(make([]byte, 0))
return sdb.buildStorageNodesFromTrie(it, output, ipldOutput)
}
// buildStorageNodesFromTrie returns all the storage diff node objects in the provided node iterator
func (sdb *StateDiffBuilder) buildStorageNodesFromTrie(it trie.NodeIterator, output sdtypes.StorageNodeSink,
ipldOutput sdtypes.IPLDSink) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStorageNodesFromTrieTimer)
var prevBlob []byte var prevBlob []byte
it := sTrie.NodeIterator(make([]byte, 0))
for it.Next(true) { for it.Next(true) {
if it.Leaf() { if it.Leaf() {
storageLeafNode := sdb.processStorageValueNode(it, prevBlob) storageLeafNode := sdb.decodeStorageLeaf(it, prevBlob)
if err := output(storageLeafNode); err != nil { if err := storageSink(storageLeafNode); err != nil {
return err return err
} }
} else { } else {
nodeVal := make([]byte, len(it.NodeBlob())) nodeVal := make([]byte, len(it.NodeBlob()))
copy(nodeVal, it.NodeBlob()) copy(nodeVal, it.NodeBlob())
if err := ipldOutput(sdtypes.IPLD{ if err := ipldSink(sdtypes.IPLD{
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, it.Hash().Bytes()).String(), CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, it.Hash().Bytes()).String(),
Content: nodeVal, Content: nodeVal,
}); err != nil { }); err != nil {
@ -457,46 +394,88 @@ func (sdb *StateDiffBuilder) buildStorageNodesFromTrie(it trie.NodeIterator, out
return it.Error() return it.Error()
} }
// decodes account at leaf and encodes RLP data to CID // processStorageUpdates builds the storage diff node objects for all nodes that exist in a different state at B than A
// reminder: it.Leaf() == true when the iterator is positioned at a "value node" (which is not something func (sdb *StateDiffBuilder) processStorageUpdates(
// that actually exists in an MMPT), therefore we pass the parent node blob as the leaf RLP. oldroot common.Hash, newroot common.Hash,
func (sdb *StateDiffBuilder) processStorageValueNode(it trie.NodeIterator, parentBlob []byte) sdtypes.StorageLeafNode { storageSink sdtypes.StorageNodeSink,
leafKey := make([]byte, len(it.LeafKey())) ipldSink sdtypes.IPLDSink,
copy(leafKey, it.LeafKey()) ) error {
value := make([]byte, len(it.LeafBlob())) defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.ProcessStorageUpdatesTimer)
copy(value, it.LeafBlob()) if newroot == oldroot {
return nil
return sdtypes.StorageLeafNode{
LeafKey: leafKey,
Value: value,
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(parentBlob)).String(),
} }
log.Trace("Storage roots for incremental diff", "old", oldroot, "new", newroot)
oldTrie, err := sdb.stateCache.OpenTrie(oldroot)
if err != nil {
return err
}
newTrie, err := sdb.stateCache.OpenTrie(newroot)
if err != nil {
return err
}
var prevBlob []byte
a, b := oldTrie.NodeIterator(nil), newTrie.NodeIterator(nil)
it, _ := utils.NewSymmetricDifferenceIterator(a, b)
for it.Next(true) {
if it.FromA() {
if it.Leaf() && !it.CommonPath() {
// If this node's leaf key is absent from B, the storage slot was vacated.
// In that case, emit an empty "removed" storage node record.
if err := storageSink(sdtypes.StorageLeafNode{
CID: shared.RemovedNodeStorageCID,
Removed: true,
LeafKey: []byte(it.LeafKey()),
Value: []byte{},
}); err != nil {
return err
}
}
continue
}
if it.Leaf() {
storageLeafNode := sdb.decodeStorageLeaf(it, prevBlob)
if err := storageSink(storageLeafNode); err != nil {
return err
}
} else {
if it.Hash() == zeroHash {
continue
}
nodeVal := make([]byte, len(it.NodeBlob()))
copy(nodeVal, it.NodeBlob())
if err := ipldSink(sdtypes.IPLD{
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, it.Hash().Bytes()).String(),
Content: nodeVal,
}); err != nil {
return err
}
prevBlob = nodeVal
}
}
return it.Error()
} }
// buildRemovedAccountStorageNodes builds the "removed" diffs for all the storage nodes for a destroyed account // processRemovedAccountStorage builds the "removed" diffs for all the storage nodes for a destroyed account
func (sdb *StateDiffBuilder) buildRemovedAccountStorageNodes(sr common.Hash, output sdtypes.StorageNodeSink) error { func (sdb *StateDiffBuilder) processRemovedAccountStorage(
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildRemovedAccountStorageNodesTimer) sr common.Hash, storageSink sdtypes.StorageNodeSink,
) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.ProcessRemovedAccountStorageTimer)
if sr == emptyContractRoot { if sr == emptyContractRoot {
return nil return nil
} }
log.Debug("Storage root for removed diffs", "root", sr.String()) log.Debug("Storage root for removed diffs", "root", sr)
sTrie, err := sdb.StateCache.OpenTrie(sr) sTrie, err := sdb.stateCache.OpenTrie(sr)
if err != nil { if err != nil {
log.Info("error in build removed account storage diffs", "error", err) log.Info("error in build removed account storage diffs", "error", err)
return err return err
} }
it := sTrie.NodeIterator(make([]byte, 0)) it := sTrie.NodeIterator(nil)
return sdb.buildRemovedStorageNodesFromTrie(it, output)
}
// buildRemovedStorageNodesFromTrie returns diffs for all the storage nodes in the provided node interator
func (sdb *StateDiffBuilder) buildRemovedStorageNodesFromTrie(it trie.NodeIterator, output sdtypes.StorageNodeSink) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildRemovedStorageNodesFromTrieTimer)
for it.Next(true) { for it.Next(true) {
if it.Leaf() { // only leaf values are indexed, don't need to demarcate removed intermediate nodes if it.Leaf() { // only leaf values are indexed, don't need to demarcate removed intermediate nodes
leafKey := make([]byte, len(it.LeafKey())) leafKey := make([]byte, len(it.LeafKey()))
copy(leafKey, it.LeafKey()) copy(leafKey, it.LeafKey())
if err := output(sdtypes.StorageLeafNode{ if err := storageSink(sdtypes.StorageLeafNode{
CID: shared.RemovedNodeStorageCID, CID: shared.RemovedNodeStorageCID,
Removed: true, Removed: true,
LeafKey: leafKey, LeafKey: leafKey,
@ -509,89 +488,20 @@ func (sdb *StateDiffBuilder) buildRemovedStorageNodesFromTrie(it trie.NodeIterat
return it.Error() return it.Error()
} }
// buildStorageNodesIncremental builds the storage diff node objects for all nodes that exist in a different state at B than A // decodes slot at leaf and encodes RLP data to CID
func (sdb *StateDiffBuilder) buildStorageNodesIncremental(oldroot common.Hash, newroot common.Hash, output sdtypes.StorageNodeSink, // reminder: it.Leaf() == true when the iterator is positioned at a "value node" (which is not something
ipldOutput sdtypes.IPLDSink) error { // that actually exists in an MMPT), therefore we pass the parent node blob as the leaf RLP.
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStorageNodesIncrementalTimer) func (sdb *StateDiffBuilder) decodeStorageLeaf(it trie.NodeIterator, parentBlob []byte) sdtypes.StorageLeafNode {
if newroot == oldroot { leafKey := make([]byte, len(it.LeafKey()))
return nil copy(leafKey, it.LeafKey())
} value := make([]byte, len(it.LeafBlob()))
log.Trace("Storage roots for incremental diff", "old", oldroot.String(), "new", newroot.String()) copy(value, it.LeafBlob())
oldTrie, err := sdb.StateCache.OpenTrie(oldroot)
if err != nil {
return err
}
newTrie, err := sdb.StateCache.OpenTrie(newroot)
if err != nil {
return err
}
diffSlotsAtB, err := sdb.createdAndUpdatedStorage( return sdtypes.StorageLeafNode{
oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{}), output, ipldOutput) LeafKey: leafKey,
if err != nil { Value: value,
return err CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(parentBlob)).String(),
} }
return sdb.deletedOrUpdatedStorage(oldTrie.NodeIterator([]byte{}), newTrie.NodeIterator([]byte{}),
diffSlotsAtB, output)
}
func (sdb *StateDiffBuilder) createdAndUpdatedStorage(a, b trie.NodeIterator, output sdtypes.StorageNodeSink,
ipldOutput sdtypes.IPLDSink) (map[string]bool, error) {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.CreatedAndUpdatedStorageTimer)
diffSlotsAtB := make(map[string]bool)
var prevBlob []byte
it, _ := trie.NewDifferenceIterator(a, b)
for it.Next(true) {
if it.Leaf() {
storageLeafNode := sdb.processStorageValueNode(it, prevBlob)
if err := output(storageLeafNode); err != nil {
return nil, err
}
diffSlotsAtB[hex.EncodeToString(storageLeafNode.LeafKey)] = true
} else {
if it.Hash() == zeroHash {
continue
}
nodeVal := make([]byte, len(it.NodeBlob()))
copy(nodeVal, it.NodeBlob())
nodeHash := make([]byte, len(it.Hash().Bytes()))
copy(nodeHash, it.Hash().Bytes())
if err := ipldOutput(sdtypes.IPLD{
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, nodeHash).String(),
Content: nodeVal,
}); err != nil {
return nil, err
}
prevBlob = nodeVal
}
}
return diffSlotsAtB, it.Error()
}
func (sdb *StateDiffBuilder) deletedOrUpdatedStorage(a, b trie.NodeIterator, diffSlotsAtB map[string]bool, output sdtypes.StorageNodeSink) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.DeletedOrUpdatedStorageTimer)
it, _ := trie.NewDifferenceIterator(b, a)
for it.Next(true) {
if it.Leaf() {
leafKey := make([]byte, len(it.LeafKey()))
copy(leafKey, it.LeafKey())
// if this node's leaf key did not show up in diffSlotsAtB
// that means the storage slot was vacated
// in that case, emit an empty "removed" diff storage node
if _, ok := diffSlotsAtB[hex.EncodeToString(leafKey)]; !ok {
if err := output(sdtypes.StorageLeafNode{
CID: shared.RemovedNodeStorageCID,
Removed: true,
LeafKey: leafKey,
Value: []byte{},
}); err != nil {
return err
}
}
}
}
return it.Error()
} }
// isWatchedPathPrefix checks if a node path is a prefix (ancestor) to one of the watched addresses. // isWatchedPathPrefix checks if a node path is a prefix (ancestor) to one of the watched addresses.

View File

@ -27,7 +27,6 @@ import (
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
statediff "github.com/cerc-io/plugeth-statediff" statediff "github.com/cerc-io/plugeth-statediff"
"github.com/cerc-io/plugeth-statediff/adapt"
"github.com/cerc-io/plugeth-statediff/indexer/ipld" "github.com/cerc-io/plugeth-statediff/indexer/ipld"
"github.com/cerc-io/plugeth-statediff/indexer/shared" "github.com/cerc-io/plugeth-statediff/indexer/shared"
"github.com/cerc-io/plugeth-statediff/test_helpers" "github.com/cerc-io/plugeth-statediff/test_helpers"
@ -37,8 +36,8 @@ import (
var ( var (
contractLeafKey []byte contractLeafKey []byte
emptyDiffs = make([]sdtypes.StateLeafNode, 0) emptyDiffs []sdtypes.StateLeafNode
emptyStorage = make([]sdtypes.StorageLeafNode, 0) emptyStorage []sdtypes.StorageLeafNode
block0, block1, block2, block3, block4, block5, block6 *types.Block block0, block1, block2, block3, block4, block5, block6 *types.Block
builder statediff.Builder builder statediff.Builder
minerAddress = common.HexToAddress("0x0") minerAddress = common.HexToAddress("0x0")
@ -796,13 +795,13 @@ func TestBuilder(t *testing.T) {
}, },
} }
test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
tests, params, test_helpers.CheckedRoots{ test_helpers.CheckedRoots{
block0: bankAccountAtBlock0LeafNode, block0: bankAccountAtBlock0LeafNode,
block1: block1BranchRootNode, block1: block1BranchRootNode,
block2: block2BranchRootNode, block2: block2BranchRootNode,
block3: block3BranchRootNode, block3: block3BranchRootNode,
}) }.Check(t)
} }
func TestBuilderWithWatchedAddressList(t *testing.T) { func TestBuilderWithWatchedAddressList(t *testing.T) {
@ -1010,12 +1009,13 @@ func TestBuilderWithWatchedAddressList(t *testing.T) {
}, },
} }
test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{ test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
test_helpers.CheckedRoots{
block0: bankAccountAtBlock0LeafNode, block0: bankAccountAtBlock0LeafNode,
block1: block1BranchRootNode, block1: block1BranchRootNode,
block2: block2BranchRootNode, block2: block2BranchRootNode,
block3: block3BranchRootNode, block3: block3BranchRootNode,
}) }.Check(t)
} }
func TestBuilderWithRemovedAccountAndStorage(t *testing.T) { func TestBuilderWithRemovedAccountAndStorage(t *testing.T) {
@ -1260,11 +1260,12 @@ func TestBuilderWithRemovedAccountAndStorage(t *testing.T) {
}, },
} }
test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{ test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
test_helpers.CheckedRoots{
block4: block4BranchRootNode, block4: block4BranchRootNode,
block5: block5BranchRootNode, block5: block5BranchRootNode,
block6: block6BranchRootNode, block6: block6BranchRootNode,
}) }.Check(t)
} }
func TestBuilderWithRemovedNonWatchedAccount(t *testing.T) { func TestBuilderWithRemovedNonWatchedAccount(t *testing.T) {
@ -1394,11 +1395,12 @@ func TestBuilderWithRemovedNonWatchedAccount(t *testing.T) {
}, },
} }
test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{ test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
test_helpers.CheckedRoots{
block4: block4BranchRootNode, block4: block4BranchRootNode,
block5: block5BranchRootNode, block5: block5BranchRootNode,
block6: block6BranchRootNode, block6: block6BranchRootNode,
}) }.Check(t)
} }
func TestBuilderWithRemovedWatchedAccount(t *testing.T) { func TestBuilderWithRemovedWatchedAccount(t *testing.T) {
@ -1597,11 +1599,12 @@ func TestBuilderWithRemovedWatchedAccount(t *testing.T) {
}, },
} }
test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{ test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
test_helpers.CheckedRoots{
block4: block4BranchRootNode, block4: block4BranchRootNode,
block5: block5BranchRootNode, block5: block5BranchRootNode,
block6: block6BranchRootNode, block6: block6BranchRootNode,
}) }.Check(t)
} }
var ( var (
@ -1824,10 +1827,11 @@ func TestBuilderWithMovedAccount(t *testing.T) {
}, },
} }
test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{ test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
test_helpers.CheckedRoots{
block1: block01BranchRootNode, block1: block01BranchRootNode,
block2: bankAccountAtBlock02LeafNode, block2: bankAccountAtBlock02LeafNode,
}) }.Check(t)
} }
/* /*
@ -2350,11 +2354,12 @@ func TestBuilderWithInternalizedLeafNode(t *testing.T) {
}, },
} }
test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{ test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
test_helpers.CheckedRoots{
block1: block1bBranchRootNode, block1: block1bBranchRootNode,
block2: block2bBranchRootNode, block2: block2bBranchRootNode,
block3: block3bBranchRootNode, block3: block3bBranchRootNode,
}) }.Check(t)
} }
func TestBuilderWithInternalizedLeafNodeAndWatchedAddress(t *testing.T) { func TestBuilderWithInternalizedLeafNodeAndWatchedAddress(t *testing.T) {
@ -2399,10 +2404,9 @@ func TestBuilderWithInternalizedLeafNodeAndWatchedAddress(t *testing.T) {
&sdtypes.StateObject{ &sdtypes.StateObject{
BlockNumber: block0.Number(), BlockNumber: block0.Number(),
BlockHash: block0.Hash(), BlockHash: block0.Hash(),
Nodes: []sdtypes.StateLeafNode{}, // there's some kind of weird behavior where if our root node is a leaf node even
IPLDs: []sdtypes.IPLD{}, // there's some kind of weird behavior where if our root node is a leaf node // though it is along the path to the watched leaf (necessarily, as it is the root)
// even though it is along the path to the watched leaf (necessarily, as it is the root) it doesn't get included // it doesn't get included. unconsequential, but kinda odd.
// unconsequential, but kinda odd.
}, },
}, },
{ {
@ -2417,7 +2421,6 @@ func TestBuilderWithInternalizedLeafNodeAndWatchedAddress(t *testing.T) {
&sdtypes.StateObject{ &sdtypes.StateObject{
BlockNumber: block1.Number(), BlockNumber: block1.Number(),
BlockHash: block1.Hash(), BlockHash: block1.Hash(),
Nodes: []sdtypes.StateLeafNode{},
IPLDs: []sdtypes.IPLD{ IPLDs: []sdtypes.IPLD{
{ {
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(block1bBranchRootNode)).String(), CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(block1bBranchRootNode)).String(),
@ -2553,11 +2556,12 @@ func TestBuilderWithInternalizedLeafNodeAndWatchedAddress(t *testing.T) {
}, },
} }
test_helpers.RunBuilderTests(t, statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), tests, params, test_helpers.CheckedRoots{ test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
test_helpers.CheckedRoots{
block1: block1bBranchRootNode, block1: block1bBranchRootNode,
block2: block2bBranchRootNode, block2: block2bBranchRootNode,
block3: block3bBranchRootNode, block3: block3bBranchRootNode,
}) }.Check(t)
} }
/* /*

View File

@ -44,6 +44,8 @@ type Config struct {
BackfillCheckPastBlocks uint64 BackfillCheckPastBlocks uint64
// Size of the worker pool // Size of the worker pool
NumWorkers uint NumWorkers uint
// Number of subtries to iterate in parallel
SubtrieWorkers uint
// Should the statediff service wait until geth has synced to the head of the blockchain? // Should the statediff service wait until geth has synced to the head of the blockchain?
WaitForSync bool WaitForSync bool
// Context used during DB initialization // Context used during DB initialization
@ -99,6 +101,7 @@ func (p *ParamsWithMutex) CopyParams() Params {
// Args bundles the arguments for the state diff builder // Args bundles the arguments for the state diff builder
type Args struct { type Args struct {
OldStateRoot, NewStateRoot, BlockHash common.Hash OldStateRoot, NewStateRoot common.Hash
BlockNumber *big.Int BlockHash common.Hash
BlockNumber *big.Int
} }

12
go.mod
View File

@ -3,6 +3,8 @@ module github.com/cerc-io/plugeth-statediff
go 1.19 go 1.19
require ( require (
github.com/cerc-io/eth-iterator-utils v0.1.1
github.com/cerc-io/eth-testing v0.2.1
github.com/ethereum/go-ethereum v1.11.6 github.com/ethereum/go-ethereum v1.11.6
github.com/georgysavva/scany v0.2.9 github.com/georgysavva/scany v0.2.9
github.com/golang/mock v1.6.0 github.com/golang/mock v1.6.0
@ -17,8 +19,9 @@ require (
github.com/openrelayxyz/plugeth-utils v1.2.0 github.com/openrelayxyz/plugeth-utils v1.2.0
github.com/pganalyze/pg_query_go/v4 v4.2.1 github.com/pganalyze/pg_query_go/v4 v4.2.1
github.com/shopspring/decimal v1.2.0 github.com/shopspring/decimal v1.2.0
github.com/stretchr/testify v1.8.1 github.com/stretchr/testify v1.8.2
github.com/thoas/go-funk v0.9.2 github.com/thoas/go-funk v0.9.3
golang.org/x/sync v0.1.0
) )
require ( require (
@ -108,7 +111,6 @@ require (
golang.org/x/crypto v0.1.0 // indirect golang.org/x/crypto v0.1.0 // indirect
golang.org/x/exp v0.0.0-20230206171751-46f607a40771 // indirect golang.org/x/exp v0.0.0-20230206171751-46f607a40771 // indirect
golang.org/x/net v0.8.0 // indirect golang.org/x/net v0.8.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.6.0 // indirect golang.org/x/sys v0.6.0 // indirect
golang.org/x/term v0.6.0 // indirect golang.org/x/term v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect golang.org/x/text v0.8.0 // indirect
@ -122,6 +124,8 @@ require (
) )
replace ( replace (
github.com/ethereum/go-ethereum => git.vdb.to/cerc-io/plugeth v0.0.0-20230710223804-34971d65a36a github.com/cerc-io/eth-iterator-utils => git.vdb.to/cerc-io/eth-iterator-utils v0.1.1
github.com/cerc-io/eth-testing => git.vdb.to/cerc-io/eth-testing v0.2.1
github.com/ethereum/go-ethereum => git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1
github.com/openrelayxyz/plugeth-utils => git.vdb.to/cerc-io/plugeth-utils v0.0.0-20230706160122-cd41de354c46 github.com/openrelayxyz/plugeth-utils => git.vdb.to/cerc-io/plugeth-utils v0.0.0-20230706160122-cd41de354c46
) )

16
go.sum
View File

@ -1,6 +1,10 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
git.vdb.to/cerc-io/plugeth v0.0.0-20230710223804-34971d65a36a h1:R3DoXSTTXc0xc3M/hOFppVitj1lk1cn2VWTsZloYZ/8= git.vdb.to/cerc-io/eth-iterator-utils v0.1.1 h1:AGen4U2GaYJVzPjEo3U+GPczSfOUEMkM1nWTM+cq5Dk=
git.vdb.to/cerc-io/plugeth v0.0.0-20230710223804-34971d65a36a/go.mod h1:odpOaIpK01aVThIoAuw9YryLBJeHYOsDn9Mxm4LhB5s= git.vdb.to/cerc-io/eth-iterator-utils v0.1.1/go.mod h1:uiocO9elfDe78kd3c/VZ2in26V+gyXJuN+sdTxK4Xag=
git.vdb.to/cerc-io/eth-testing v0.2.1 h1:IZAX7DVgzPkSmu1xdKZ5aOemdEYbvtgae7GUl/TUNtQ=
git.vdb.to/cerc-io/eth-testing v0.2.1/go.mod h1:qdvpc/W1xvf2MKx3rMOqvFvYaYIHG77Z1g0lwsmw0Uk=
git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1 h1:KLjxHwp9Zp7xhECccmJS00RiL+VwTuUGLU7qeIctg8g=
git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1/go.mod h1:cYXZu70+6xmDgIgrTD81GPasv16piiAFJnKyAbwVPMU=
git.vdb.to/cerc-io/plugeth-utils v0.0.0-20230706160122-cd41de354c46 h1:KYcbbne/RXd7AuxbUd/3hgk1jPN+33k2CKiNsUsMCC0= git.vdb.to/cerc-io/plugeth-utils v0.0.0-20230706160122-cd41de354c46 h1:KYcbbne/RXd7AuxbUd/3hgk1jPN+33k2CKiNsUsMCC0=
git.vdb.to/cerc-io/plugeth-utils v0.0.0-20230706160122-cd41de354c46/go.mod h1:VpDN61dxy64zGff05F0adujR5enD/JEdXBkTQ+PaIsQ= git.vdb.to/cerc-io/plugeth-utils v0.0.0-20230706160122-cd41de354c46/go.mod h1:VpDN61dxy64zGff05F0adujR5enD/JEdXBkTQ+PaIsQ=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
@ -491,12 +495,12 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY=
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc=
github.com/thoas/go-funk v0.9.2 h1:oKlNYv0AY5nyf9g+/GhMgS/UO2ces0QRdPKwkhY3VCk= github.com/thoas/go-funk v0.9.3 h1:7+nAEx3kn5ZJcnDm2Bh23N2yOtweO14bi//dvRtgLpw=
github.com/thoas/go-funk v0.9.2/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q= github.com/thoas/go-funk v0.9.3/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q=
github.com/tklauser/go-sysconf v0.3.5 h1:uu3Xl4nkLzQfXNsWn15rPc/HQCJKObbt1dKJeWp3vU4= github.com/tklauser/go-sysconf v0.3.5 h1:uu3Xl4nkLzQfXNsWn15rPc/HQCJKObbt1dKJeWp3vU4=
github.com/tklauser/go-sysconf v0.3.5/go.mod h1:MkWzOF4RMCshBAMXuhXJs64Rte09mITnppBXY/rYEFI= github.com/tklauser/go-sysconf v0.3.5/go.mod h1:MkWzOF4RMCshBAMXuhXJs64Rte09mITnppBXY/rYEFI=
github.com/tklauser/numcpus v0.2.2 h1:oyhllyrScuYI6g+h/zUvNXNp1wy7x8qQy3t/piefldA= github.com/tklauser/numcpus v0.2.2 h1:oyhllyrScuYI6g+h/zUvNXNp1wy7x8qQy3t/piefldA=

View File

@ -57,7 +57,7 @@ type StateDiffIndexer struct {
chainConfig *params.ChainConfig chainConfig *params.ChainConfig
nodeID string nodeID string
wg *sync.WaitGroup wg *sync.WaitGroup
removedCacheFlag *uint32 removedCacheFlag uint32
} }
// NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer // NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer
@ -130,7 +130,6 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(time.Duration, <-chan bool) {}
// PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts) // PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts)
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback // Returns an initiated DB transaction which must be Closed via defer to commit or rollback
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
sdi.removedCacheFlag = new(uint32)
start, t := time.Now(), time.Now() start, t := time.Now(), time.Now()
blockHash := block.Hash() blockHash := block.Hash()
blockHashStr := blockHash.String() blockHashStr := blockHash.String()
@ -223,11 +222,6 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) string { func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) string {
sdi.fileWriter.upsertIPLDNode(header.Number.String(), headerNode) sdi.fileWriter.upsertIPLDNode(header.Number.String(), headerNode)
var baseFee *string
if header.BaseFee != nil {
baseFee = new(string)
*baseFee = header.BaseFee.String()
}
headerID := header.Hash().String() headerID := header.Hash().String()
sdi.fileWriter.upsertHeaderCID(models.HeaderModel{ sdi.fileWriter.upsertHeaderCID(models.HeaderModel{
NodeIDs: pq.StringArray([]string{sdi.nodeID}), NodeIDs: pq.StringArray([]string{sdi.nodeID}),
@ -388,8 +382,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// publish the state node // publish the state node
var stateModel models.StateNodeModel var stateModel models.StateNodeModel
if stateNode.Removed { if stateNode.Removed {
if atomic.LoadUint32(sdi.removedCacheFlag) == 0 { if atomic.LoadUint32(&sdi.removedCacheFlag) == 0 {
atomic.StoreUint32(sdi.removedCacheFlag, 1) atomic.StoreUint32(&sdi.removedCacheFlag, 1)
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeStateCID, []byte{}) sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeStateCID, []byte{})
} }
stateModel = models.StateNodeModel{ stateModel = models.StateNodeModel{
@ -419,8 +413,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// if there are any storage nodes associated with this node, publish and index them // if there are any storage nodes associated with this node, publish and index them
for _, storageNode := range stateNode.StorageDiff { for _, storageNode := range stateNode.StorageDiff {
if storageNode.Removed { if storageNode.Removed {
if atomic.LoadUint32(sdi.removedCacheFlag) == 0 { if atomic.LoadUint32(&sdi.removedCacheFlag) == 0 {
atomic.StoreUint32(sdi.removedCacheFlag, 1) atomic.StoreUint32(&sdi.removedCacheFlag, 1)
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeStorageCID, []byte{}) sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeStorageCID, []byte{})
} }
storageModel := models.StorageNodeModel{ storageModel := models.StorageNodeModel{

View File

@ -72,65 +72,41 @@ type IndexerMetricsHandles struct {
StateStoreCodeProcessingTimer metrics.Timer StateStoreCodeProcessingTimer metrics.Timer
// Fine-grained code timers // Fine-grained code timers
BuildStateDiffTimer metrics.Timer ProcessAccountsTimer metrics.Timer
CreatedAndUpdatedStateTimer metrics.Timer OutputTimer metrics.Timer
DeletedOrUpdatedStateTimer metrics.Timer IPLDOutputTimer metrics.Timer
BuildAccountUpdatesTimer metrics.Timer DifferenceIteratorCounter metrics.Counter
BuildAccountCreationsTimer metrics.Timer BuildStateDiffObjectTimer metrics.Timer
ResolveNodeTimer metrics.Timer WriteStateDiffTimer metrics.Timer
SortKeysTimer metrics.Timer ProcessStorageUpdatesTimer metrics.Timer
FindIntersectionTimer metrics.Timer ProcessStorageCreationsTimer metrics.Timer
OutputTimer metrics.Timer ProcessRemovedAccountStorageTimer metrics.Timer
IPLDOutputTimer metrics.Timer IsWatchedAddressTimer metrics.Timer
DifferenceIteratorNextTimer metrics.Timer
DifferenceIteratorCounter metrics.Counter
DeletedOrUpdatedStorageTimer metrics.Timer
CreatedAndUpdatedStorageTimer metrics.Timer
BuildStorageNodesIncrementalTimer metrics.Timer
BuildStateDiffObjectTimer metrics.Timer
WriteStateDiffObjectTimer metrics.Timer
BuildStorageNodesEventualTimer metrics.Timer
BuildStorageNodesFromTrieTimer metrics.Timer
BuildRemovedAccountStorageNodesTimer metrics.Timer
BuildRemovedStorageNodesFromTrieTimer metrics.Timer
IsWatchedAddressTimer metrics.Timer
} }
func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles { func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles {
ctx := IndexerMetricsHandles{ ctx := IndexerMetricsHandles{
BlocksCounter: metrics.NewCounter(), BlocksCounter: metrics.NewCounter(),
TransactionsCounter: metrics.NewCounter(), TransactionsCounter: metrics.NewCounter(),
ReceiptsCounter: metrics.NewCounter(), ReceiptsCounter: metrics.NewCounter(),
LogsCounter: metrics.NewCounter(), LogsCounter: metrics.NewCounter(),
AccessListEntriesCounter: metrics.NewCounter(), AccessListEntriesCounter: metrics.NewCounter(),
FreePostgresTimer: metrics.NewTimer(), FreePostgresTimer: metrics.NewTimer(),
PostgresCommitTimer: metrics.NewTimer(), PostgresCommitTimer: metrics.NewTimer(),
HeaderProcessingTimer: metrics.NewTimer(), HeaderProcessingTimer: metrics.NewTimer(),
UncleProcessingTimer: metrics.NewTimer(), UncleProcessingTimer: metrics.NewTimer(),
TxAndRecProcessingTimer: metrics.NewTimer(), TxAndRecProcessingTimer: metrics.NewTimer(),
StateStoreCodeProcessingTimer: metrics.NewTimer(), StateStoreCodeProcessingTimer: metrics.NewTimer(),
BuildStateDiffTimer: metrics.NewTimer(), ProcessAccountsTimer: metrics.NewTimer(),
CreatedAndUpdatedStateTimer: metrics.NewTimer(), OutputTimer: metrics.NewTimer(),
DeletedOrUpdatedStateTimer: metrics.NewTimer(), IPLDOutputTimer: metrics.NewTimer(),
BuildAccountUpdatesTimer: metrics.NewTimer(), DifferenceIteratorCounter: metrics.NewCounter(),
BuildAccountCreationsTimer: metrics.NewTimer(), BuildStateDiffObjectTimer: metrics.NewTimer(),
ResolveNodeTimer: metrics.NewTimer(), WriteStateDiffTimer: metrics.NewTimer(),
SortKeysTimer: metrics.NewTimer(), ProcessStorageUpdatesTimer: metrics.NewTimer(),
FindIntersectionTimer: metrics.NewTimer(), ProcessStorageCreationsTimer: metrics.NewTimer(),
OutputTimer: metrics.NewTimer(), ProcessRemovedAccountStorageTimer: metrics.NewTimer(),
IPLDOutputTimer: metrics.NewTimer(), IsWatchedAddressTimer: metrics.NewTimer(),
DifferenceIteratorNextTimer: metrics.NewTimer(),
DifferenceIteratorCounter: metrics.NewCounter(),
DeletedOrUpdatedStorageTimer: metrics.NewTimer(),
CreatedAndUpdatedStorageTimer: metrics.NewTimer(),
BuildStorageNodesIncrementalTimer: metrics.NewTimer(),
BuildStateDiffObjectTimer: metrics.NewTimer(),
WriteStateDiffObjectTimer: metrics.NewTimer(),
BuildStorageNodesEventualTimer: metrics.NewTimer(),
BuildStorageNodesFromTrieTimer: metrics.NewTimer(),
BuildRemovedAccountStorageNodesTimer: metrics.NewTimer(),
BuildRemovedStorageNodesFromTrieTimer: metrics.NewTimer(),
IsWatchedAddressTimer: metrics.NewTimer(),
} }
subsys := "indexer" subsys := "indexer"
reg.Register(metricName(subsys, "blocks"), ctx.BlocksCounter) reg.Register(metricName(subsys, "blocks"), ctx.BlocksCounter)
@ -144,28 +120,15 @@ func RegisterIndexerMetrics(reg metrics.Registry) IndexerMetricsHandles {
reg.Register(metricName(subsys, "t_uncle_processing"), ctx.UncleProcessingTimer) reg.Register(metricName(subsys, "t_uncle_processing"), ctx.UncleProcessingTimer)
reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.TxAndRecProcessingTimer) reg.Register(metricName(subsys, "t_tx_receipt_processing"), ctx.TxAndRecProcessingTimer)
reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.StateStoreCodeProcessingTimer) reg.Register(metricName(subsys, "t_state_store_code_processing"), ctx.StateStoreCodeProcessingTimer)
reg.Register(metricName(subsys, "t_build_statediff"), ctx.BuildStateDiffTimer)
reg.Register(metricName(subsys, "t_created_and_update_state"), ctx.CreatedAndUpdatedStateTimer)
reg.Register(metricName(subsys, "t_deleted_or_updated_state"), ctx.DeletedOrUpdatedStateTimer)
reg.Register(metricName(subsys, "t_build_account_updates"), ctx.BuildAccountUpdatesTimer)
reg.Register(metricName(subsys, "t_build_account_creations"), ctx.BuildAccountCreationsTimer)
reg.Register(metricName(subsys, "t_resolve_node"), ctx.ResolveNodeTimer)
reg.Register(metricName(subsys, "t_sort_keys"), ctx.SortKeysTimer)
reg.Register(metricName(subsys, "t_find_intersection"), ctx.FindIntersectionTimer)
reg.Register(metricName(subsys, "t_output_fn"), ctx.OutputTimer) reg.Register(metricName(subsys, "t_output_fn"), ctx.OutputTimer)
reg.Register(metricName(subsys, "t_ipld_output_fn"), ctx.IPLDOutputTimer) reg.Register(metricName(subsys, "t_ipld_output_fn"), ctx.IPLDOutputTimer)
reg.Register(metricName(subsys, "t_difference_iterator_next"), ctx.DifferenceIteratorNextTimer)
reg.Register(metricName(subsys, "difference_iterator_counter"), ctx.DifferenceIteratorCounter) reg.Register(metricName(subsys, "difference_iterator_counter"), ctx.DifferenceIteratorCounter)
reg.Register(metricName(subsys, "t_created_and_updated_storage"), ctx.CreatedAndUpdatedStorageTimer)
reg.Register(metricName(subsys, "t_deleted_or_updated_storage"), ctx.DeletedOrUpdatedStorageTimer)
reg.Register(metricName(subsys, "t_build_storage_nodes_incremental"), ctx.BuildStorageNodesIncrementalTimer)
reg.Register(metricName(subsys, "t_build_statediff_object"), ctx.BuildStateDiffObjectTimer) reg.Register(metricName(subsys, "t_build_statediff_object"), ctx.BuildStateDiffObjectTimer)
reg.Register(metricName(subsys, "t_write_statediff_object"), ctx.WriteStateDiffObjectTimer) reg.Register(metricName(subsys, "t_write_statediff_object"), ctx.WriteStateDiffTimer)
reg.Register(metricName(subsys, "t_created_and_updated_state"), ctx.CreatedAndUpdatedStateTimer) reg.Register(metricName(subsys, "t_process_accounts"), ctx.ProcessAccountsTimer)
reg.Register(metricName(subsys, "t_build_storage_nodes_eventual"), ctx.BuildStorageNodesEventualTimer) reg.Register(metricName(subsys, "t_process_storage_updates"), ctx.ProcessStorageUpdatesTimer)
reg.Register(metricName(subsys, "t_build_storage_nodes_from_trie"), ctx.BuildStorageNodesFromTrieTimer) reg.Register(metricName(subsys, "t_process_storage_creations"), ctx.ProcessStorageCreationsTimer)
reg.Register(metricName(subsys, "t_build_removed_accounts_storage_nodes"), ctx.BuildRemovedAccountStorageNodesTimer) reg.Register(metricName(subsys, "t_process_removed_account_storage"), ctx.ProcessRemovedAccountStorageTimer)
reg.Register(metricName(subsys, "t_build_removed_storage_nodes_from_trie"), ctx.BuildRemovedStorageNodesFromTrieTimer)
reg.Register(metricName(subsys, "t_is_watched_address"), ctx.IsWatchedAddressTimer) reg.Register(metricName(subsys, "t_is_watched_address"), ctx.IsWatchedAddressTimer)
log.Debug("Registering statediff indexer metrics.") log.Debug("Registering statediff indexer metrics.")

View File

@ -84,9 +84,8 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bo
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) { func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
start, t := time.Now(), time.Now() start, t := time.Now(), time.Now()
blockHash := block.Hash() blockHash := block.Hash()
blockHashStr := blockHash.String()
height := block.NumberU64() height := block.NumberU64()
traceMsg := fmt.Sprintf("indexer stats for statediff at %d with hash %s:\r\n", height, blockHashStr) traceMsg := fmt.Sprintf("indexer stats for statediff at %d with hash %s:\r\n", height, blockHash)
transactions := block.Transactions() transactions := block.Transactions()
// Derive any missing fields // Derive any missing fields
if err := receipts.DeriveFields(sdi.chainConfig, blockHash, height, block.BaseFee(), transactions); err != nil { if err := receipts.DeriveFields(sdi.chainConfig, blockHash, height, block.BaseFee(), transactions); err != nil {
@ -155,20 +154,20 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
} else { } else {
tDiff := time.Since(t) tDiff := time.Since(t)
metrics2.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff) metrics2.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String()) traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff)
t = time.Now() t = time.Now()
if err := self.flush(); err != nil { if err := self.flush(); err != nil {
rollback(sdi.ctx, tx) rollback(sdi.ctx, tx)
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String()) traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start))
log.Debug(traceMsg) log.Debug(traceMsg)
return err return err
} }
err = tx.Commit(sdi.ctx) err = tx.Commit(sdi.ctx)
tDiff = time.Since(t) tDiff = time.Since(t)
metrics2.IndexerMetrics.PostgresCommitTimer.Update(tDiff) metrics2.IndexerMetrics.PostgresCommitTimer.Update(tDiff)
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String()) traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff)
} }
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String()) traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start))
log.Debug(traceMsg) log.Debug(traceMsg)
return err return err
}, },
@ -178,7 +177,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
tDiff := time.Since(t) tDiff := time.Since(t)
metrics2.IndexerMetrics.FreePostgresTimer.Update(tDiff) metrics2.IndexerMetrics.FreePostgresTimer.Update(tDiff)
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String()) traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff)
t = time.Now() t = time.Now()
// Publish and index header, collect headerID // Publish and index header, collect headerID
@ -189,7 +188,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
} }
tDiff = time.Since(t) tDiff = time.Since(t)
metrics2.IndexerMetrics.HeaderProcessingTimer.Update(tDiff) metrics2.IndexerMetrics.HeaderProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String()) traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff)
t = time.Now() t = time.Now()
// Publish and index uncles // Publish and index uncles
err = sdi.processUncles(blockTx, headerID, block.Number(), block.UncleHash(), block.Uncles()) err = sdi.processUncles(blockTx, headerID, block.Number(), block.UncleHash(), block.Uncles())
@ -198,7 +197,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
} }
tDiff = time.Since(t) tDiff = time.Since(t)
metrics2.IndexerMetrics.UncleProcessingTimer.Update(tDiff) metrics2.IndexerMetrics.UncleProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String()) traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff)
t = time.Now() t = time.Now()
// Publish and index receipts and txs // Publish and index receipts and txs
err = sdi.processReceiptsAndTxs(blockTx, processArgs{ err = sdi.processReceiptsAndTxs(blockTx, processArgs{
@ -215,7 +214,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
} }
tDiff = time.Since(t) tDiff = time.Since(t)
metrics2.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff) metrics2.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String()) traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff)
t = time.Now() t = time.Now()
return blockTx, err return blockTx, err
@ -236,11 +235,6 @@ func (sdi *StateDiffIndexer) DetectGaps(beginBlockNumber uint64, endBlockNumber
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) (string, error) { func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) (string, error) {
tx.cacheIPLD(headerNode) tx.cacheIPLD(headerNode)
var baseFee *string
if header.BaseFee != nil {
baseFee = new(string)
*baseFee = header.BaseFee.String()
}
headerID := header.Hash().String() headerID := header.Hash().String()
// index header // index header
return headerID, sdi.dbWriter.upsertHeaderCID(tx.dbtx, models.HeaderModel{ return headerID, sdi.dbWriter.upsertHeaderCID(tx.dbtx, models.HeaderModel{

View File

@ -58,18 +58,6 @@ type Statements interface {
InsertStorageStm() string InsertStorageStm() string
InsertIPLDStm() string InsertIPLDStm() string
InsertIPLDsStm() string InsertIPLDsStm() string
// Table/column descriptions for use with CopyFrom and similar commands.
LogTableName() []string
LogColumnNames() []string
RctTableName() []string
RctColumnNames() []string
StateTableName() []string
StateColumnNames() []string
StorageTableName() []string
StorageColumnNames() []string
TxTableName() []string
TxColumnNames() []string
} }
// Tx interface to accommodate different concrete SQL transaction types // Tx interface to accommodate different concrete SQL transaction types

View File

@ -107,43 +107,3 @@ func (db *DB) InsertIPLDStm() string {
func (db *DB) InsertIPLDsStm() string { func (db *DB) InsertIPLDsStm() string {
return `INSERT INTO ipld.blocks (block_number, key, data) VALUES (unnest($1::BIGINT[]), unnest($2::TEXT[]), unnest($3::BYTEA[])) ON CONFLICT DO NOTHING` return `INSERT INTO ipld.blocks (block_number, key, data) VALUES (unnest($1::BIGINT[]), unnest($2::TEXT[]), unnest($3::BYTEA[])) ON CONFLICT DO NOTHING`
} }
func (db *DB) LogTableName() []string {
return []string{"eth", "log_cids"}
}
func (db *DB) LogColumnNames() []string {
return []string{"block_number", "header_id", "cid", "rct_id", "address", "index", "topic0", "topic1", "topic2", "topic3"}
}
func (db *DB) RctTableName() []string {
return []string{"eth", "receipt_cids"}
}
func (db *DB) RctColumnNames() []string {
return []string{"block_number", "header_id", "tx_id", "cid", "contract", "post_state", "post_status"}
}
func (db *DB) StateTableName() []string {
return []string{"eth", "state_cids"}
}
func (db *DB) StateColumnNames() []string {
return []string{"block_number", "header_id", "state_leaf_key", "cid", "diff", "balance", "nonce", "code_hash", "storage_root", "removed"}
}
func (db *DB) StorageTableName() []string {
return []string{"eth", "storage_cids"}
}
func (db *DB) StorageColumnNames() []string {
return []string{"block_number", "header_id", "state_leaf_key", "storage_leaf_key", "cid", "diff", "val", "removed"}
}
func (db *DB) TxTableName() []string {
return []string{"eth", "transaction_cids"}
}
func (db *DB) TxColumnNames() []string {
return []string{"block_number", "header_id", "tx_hash", "cid", "dst", "src", "index", "tx_type", "value"}
}

View File

@ -36,12 +36,6 @@ var (
ctx = context.Background() ctx = context.Background()
) )
func expectContainsSubstring(t *testing.T, full string, sub string) {
if !strings.Contains(full, sub) {
t.Fatalf("Expected \"%v\" to contain substring \"%v\"\n", full, sub)
}
}
func TestPostgresPGX(t *testing.T) { func TestPostgresPGX(t *testing.T) {
t.Run("connects to the sql", func(t *testing.T) { t.Run("connects to the sql", func(t *testing.T) {
dbPool, err := pgxpool.ConnectConfig(context.Background(), pgxConfig) dbPool, err := pgxpool.ConnectConfig(context.Background(), pgxConfig)
@ -105,7 +99,7 @@ func TestPostgresPGX(t *testing.T) {
t.Fatal("Expected an error") t.Fatal("Expected an error")
} }
expectContainsSubstring(t, err.Error(), postgres.DbConnectionFailedMsg) require.Contains(t, err.Error(), postgres.DbConnectionFailedMsg)
}) })
t.Run("throws error when can't create node", func(t *testing.T) { t.Run("throws error when can't create node", func(t *testing.T) {
@ -117,6 +111,6 @@ func TestPostgresPGX(t *testing.T) {
t.Fatal("Expected an error") t.Fatal("Expected an error")
} }
expectContainsSubstring(t, err.Error(), postgres.SettingNodeFailedMsg) require.Contains(t, err.Error(), postgres.SettingNodeFailedMsg)
}) })
} }

View File

@ -102,7 +102,7 @@ func TestPostgresSQLX(t *testing.T) {
t.Fatal("Expected an error") t.Fatal("Expected an error")
} }
expectContainsSubstring(t, err.Error(), postgres.DbConnectionFailedMsg) require.Contains(t, err.Error(), postgres.DbConnectionFailedMsg)
}) })
t.Run("throws error when can't create node", func(t *testing.T) { t.Run("throws error when can't create node", func(t *testing.T) {
@ -114,6 +114,6 @@ func TestPostgresSQLX(t *testing.T) {
t.Fatal("Expected an error") t.Fatal("Expected an error")
} }
expectContainsSubstring(t, err.Error(), postgres.SettingNodeFailedMsg) require.Contains(t, err.Error(), postgres.SettingNodeFailedMsg)
}) })
} }

View File

@ -29,6 +29,7 @@ import (
"github.com/cerc-io/plugeth-statediff/indexer/database/metrics" "github.com/cerc-io/plugeth-statediff/indexer/database/metrics"
"github.com/cerc-io/plugeth-statediff/indexer/interfaces" "github.com/cerc-io/plugeth-statediff/indexer/interfaces"
"github.com/cerc-io/plugeth-statediff/indexer/models" "github.com/cerc-io/plugeth-statediff/indexer/models"
"github.com/cerc-io/plugeth-statediff/indexer/shared/schema"
) )
// Writer handles processing and writing of indexed IPLD objects to Postgres // Writer handles processing and writing of indexed IPLD objects to Postgres
@ -65,7 +66,8 @@ func (w *Writer) detectGaps(beginBlockNumber uint64, endBlockNumber uint64) ([]*
// pgx misdetects the parameter OIDs and selects int8, which can overflow. // pgx misdetects the parameter OIDs and selects int8, which can overflow.
// unfortunately there is no good place to override it, so it is safer to pass the uint64s as text // unfortunately there is no good place to override it, so it is safer to pass the uint64s as text
// and let PG handle the cast // and let PG handle the cast
err := w.db.Select(w.db.Context(), &gaps, w.db.DetectGapsStm(), strconv.FormatUint(beginBlockNumber, 10), strconv.FormatUint(endBlockNumber, 10)) err := w.db.Select(w.db.Context(), &gaps, w.db.DetectGapsStm(),
strconv.FormatUint(beginBlockNumber, 10), strconv.FormatUint(endBlockNumber, 10))
return gaps, err return gaps, err
} }
@ -177,7 +179,8 @@ func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error {
return insertError{"eth.transaction_cids", err, "COPY", transaction} return insertError{"eth.transaction_cids", err, "COPY", transaction}
} }
_, err = tx.CopyFrom(w.db.Context(), w.db.TxTableName(), w.db.TxColumnNames(), _, err = tx.CopyFrom(w.db.Context(),
schema.TableTransaction.TableName(), schema.TableTransaction.ColumnNames(),
toRows(toRow(blockNum, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, toRows(toRow(blockNum, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst,
transaction.Src, transaction.Index, int(transaction.Type), value))) transaction.Src, transaction.Index, int(transaction.Type), value)))
if err != nil { if err != nil {
@ -213,7 +216,7 @@ func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error {
return insertError{"eth.receipt_cids", err, "COPY", rct} return insertError{"eth.receipt_cids", err, "COPY", rct}
} }
_, err = tx.CopyFrom(w.db.Context(), w.db.RctTableName(), w.db.RctColumnNames(), _, err = tx.CopyFrom(w.db.Context(), schema.TableReceipt.TableName(), schema.TableReceipt.ColumnNames(),
toRows(toRow(blockNum, rct.HeaderID, rct.TxID, rct.CID, rct.Contract, toRows(toRow(blockNum, rct.HeaderID, rct.TxID, rct.CID, rct.Contract,
rct.PostState, int(rct.PostStatus)))) rct.PostState, int(rct.PostStatus))))
if err != nil { if err != nil {
@ -253,7 +256,7 @@ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error {
log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3)) log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3))
} }
if nil != rows && len(rows) >= 0 { if nil != rows && len(rows) >= 0 {
_, err := tx.CopyFrom(w.db.Context(), w.db.LogTableName(), w.db.LogColumnNames(), rows) _, err := tx.CopyFrom(w.db.Context(), schema.TableLog.TableName(), schema.TableLog.ColumnNames(), rows)
if err != nil { if err != nil {
return insertError{"eth.log_cids", err, "COPY", rows} return insertError{"eth.log_cids", err, "COPY", rows}
} }
@ -302,7 +305,8 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error {
return insertError{"eth.state_cids", err, "COPY", stateNode} return insertError{"eth.state_cids", err, "COPY", stateNode}
} }
_, err = tx.CopyFrom(w.db.Context(), w.db.StateTableName(), w.db.StateColumnNames(), _, err = tx.CopyFrom(w.db.Context(),
schema.TableStateNode.TableName(), schema.TableStateNode.ColumnNames(),
toRows(toRow(blockNum, stateNode.HeaderID, stateNode.StateKey, stateNode.CID, toRows(toRow(blockNum, stateNode.HeaderID, stateNode.StateKey, stateNode.CID,
true, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed))) true, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed)))
if err != nil { if err != nil {
@ -339,7 +343,8 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err
return insertError{"eth.storage_cids", err, "COPY", storageCID} return insertError{"eth.storage_cids", err, "COPY", storageCID}
} }
_, err = tx.CopyFrom(w.db.Context(), w.db.StorageTableName(), w.db.StorageColumnNames(), _, err = tx.CopyFrom(w.db.Context(),
schema.TableStorageNode.TableName(), schema.TableStorageNode.ColumnNames(),
toRows(toRow(blockNum, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID, toRows(toRow(blockNum, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID,
true, storageCID.Value, storageCID.Removed))) true, storageCID.Value, storageCID.Removed)))
if err != nil { if err != nil {

View File

@ -227,7 +227,7 @@ var (
CID: AccountLeafNodeCID, CID: AccountLeafNodeCID,
}, },
Removed: false, Removed: false,
StorageDiff: []sdtypes.StorageLeafNode{}, StorageDiff: nil,
}, },
{ {
AccountWrapper: sdtypes.AccountWrapper{ AccountWrapper: sdtypes.AccountWrapper{
@ -236,7 +236,7 @@ var (
CID: shared.RemovedNodeStateCID, CID: shared.RemovedNodeStateCID,
}, },
Removed: true, Removed: true,
StorageDiff: []sdtypes.StorageLeafNode{}, StorageDiff: nil,
}, },
{ {
AccountWrapper: sdtypes.AccountWrapper{ AccountWrapper: sdtypes.AccountWrapper{

View File

@ -45,6 +45,7 @@ type Column struct {
Type colType Type colType
Array bool Array bool
} }
type Table struct { type Table struct {
Name string Name string
Columns []Column Columns []Column
@ -117,6 +118,20 @@ func (tbl *Table) ToInsertStatement(upsert bool) string {
) )
} }
// TableName returns a pgx-compatible table name.
func (tbl *Table) TableName() []string {
return strings.Split(tbl.Name, ".")
}
// ColumnNames returns the ordered list of column names.
func (tbl *Table) ColumnNames() []string {
var names []string
for _, col := range tbl.Columns {
names = append(names, col.Name)
}
return names
}
func sprintf(f string) colfmt { func sprintf(f string) colfmt {
return func(x interface{}) string { return fmt.Sprintf(f, x) } return func(x interface{}) string { return fmt.Sprintf(f, x) }
} }

View File

@ -46,7 +46,11 @@ func init() {
) )
Flags.UintVar(&config.NumWorkers, Flags.UintVar(&config.NumWorkers,
"statediff.workers", 1, "statediff.workers", 1,
"Number of concurrent workers to use during statediff processing (default 1)", "Number of concurrent workers to dispatch to during statediff processing",
)
Flags.UintVar(&config.SubtrieWorkers,
"statediff.subtries", 1,
"Number of subtries to iterate in parallel",
) )
Flags.BoolVar(&config.WaitForSync, Flags.BoolVar(&config.WaitForSync,
"statediff.waitforsync", false, "statediff.waitforsync", false,

View File

@ -16,19 +16,14 @@ import (
) )
var ( var (
pluginLoader core.PluginLoader gethContext core.Context
gethContext core.Context service *statediff.Service
service *statediff.Service blockchain statediff.BlockChain
blockchain statediff.BlockChain
) )
func Initialize(ctx core.Context, pl core.PluginLoader, logger core.Logger) { func Initialize(ctx core.Context, pl core.PluginLoader, logger core.Logger) {
log.SetDefaultLogger(logger) log.SetDefaultLogger(logger)
pluginLoader = pl
gethContext = ctx gethContext = ctx
log.Debug("Initialized statediff plugin")
} }
func InitializeNode(stack core.Node, b core.Backend) { func InitializeNode(stack core.Node, b core.Backend) {
@ -58,7 +53,7 @@ func InitializeNode(stack core.Node, b core.Backend) {
log.Error("failed to construct indexer", "error", err) log.Error("failed to construct indexer", "error", err)
} }
} }
service, err := statediff.NewService(serviceConfig, blockchain, backend, indexer) service, err = statediff.NewService(serviceConfig, blockchain, backend, indexer)
if err != nil { if err != nil {
log.Error("failed to construct service", "error", err) log.Error("failed to construct service", "error", err)
} }
@ -66,6 +61,8 @@ func InitializeNode(stack core.Node, b core.Backend) {
log.Error("failed to start service", "error", err) log.Error("failed to start service", "error", err)
return return
} }
log.Debug("Initialized statediff plugin")
} }
func GetAPIs(stack core.Node, backend core.Backend) []core.API { func GetAPIs(stack core.Node, backend core.Backend) []core.API {

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -17,13 +17,10 @@
package statediff_test package statediff_test
import ( import (
"bytes"
"io"
"log"
"math/big" "math/big"
"os"
"testing" "testing"
"github.com/cerc-io/eth-testing/chaindata/mainnet"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
@ -35,23 +32,21 @@ import (
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
statediff "github.com/cerc-io/plugeth-statediff" statediff "github.com/cerc-io/plugeth-statediff"
"github.com/cerc-io/plugeth-statediff/adapt"
"github.com/cerc-io/plugeth-statediff/indexer/ipld" "github.com/cerc-io/plugeth-statediff/indexer/ipld"
"github.com/cerc-io/plugeth-statediff/test_helpers" "github.com/cerc-io/plugeth-statediff/test_helpers"
sdtypes "github.com/cerc-io/plugeth-statediff/types" sdtypes "github.com/cerc-io/plugeth-statediff/types"
) )
func init() { func init() {
test_helpers.SilenceLogs() test_helpers.QuietLogs()
} }
var ( var (
db ethdb.Database db ethdb.Database
genesisBlock, block0, block1, block2, block3 *types.Block genesisBlock, block0, block1, block2, block3 *types.Block
block1CoinbaseAddr, block2CoinbaseAddr, block3CoinbaseAddr common.Address
block1CoinbaseHash, block2CoinbaseHash, block3CoinbaseHash common.Hash block1CoinbaseHash, block2CoinbaseHash, block3CoinbaseHash common.Hash
builder statediff.Builder builder statediff.Builder
emptyStorage = make([]sdtypes.StorageLeafNode, 0) emptyStorage []sdtypes.StorageLeafNode
// block 1 data // block 1 data
block1CoinbaseAccount = &types.StateAccount{ block1CoinbaseAccount = &types.StateAccount{
@ -426,50 +421,19 @@ var (
func init() { func init() {
db = rawdb.NewMemoryDatabase() db = rawdb.NewMemoryDatabase()
genesisBlock = core.DefaultGenesisBlock().MustCommit(db) genesisBlock = core.DefaultGenesisBlock().MustCommit(db)
genBy, err := rlp.EncodeToBytes(genesisBlock)
if err != nil {
log.Fatal(err)
}
var block0RLP []byte
block0, block0RLP, err = loadBlockFromRLPFile("./block0_rlp")
if err != nil {
log.Fatal(err)
}
if !bytes.Equal(genBy, block0RLP) {
log.Fatal("mainnet genesis blocks do not match")
}
block1, _, err = loadBlockFromRLPFile("./block1_rlp")
if err != nil {
log.Fatal(err)
}
block1CoinbaseAddr = block1.Coinbase()
block1CoinbaseHash = crypto.Keccak256Hash(block1CoinbaseAddr.Bytes())
block2, _, err = loadBlockFromRLPFile("./block2_rlp")
if err != nil {
log.Fatal(err)
}
block2CoinbaseAddr = block2.Coinbase()
block2CoinbaseHash = crypto.Keccak256Hash(block2CoinbaseAddr.Bytes()) // 0x08d4679cbcf198c1741a6f4e4473845659a30caa8b26f8d37a0be2e2bc0d8892
block3, _, err = loadBlockFromRLPFile("./block3_rlp")
if err != nil {
log.Fatal(err)
}
block3CoinbaseAddr = block3.Coinbase()
block3CoinbaseHash = crypto.Keccak256Hash(block3CoinbaseAddr.Bytes())
}
func loadBlockFromRLPFile(filename string) (*types.Block, []byte, error) { blocks := mainnet.GetBlocks()
f, err := os.Open(filename) block0 = blocks[0]
if err != nil { block1 = blocks[1]
return nil, nil, err block2 = blocks[2]
} block3 = blocks[3]
defer f.Close()
blockRLP, err := io.ReadAll(f) // 0x4be8251692195afc818c92b485fcb8a4691af89cbe5a2ab557b83a4261be2a9a
if err != nil { block1CoinbaseHash = crypto.Keccak256Hash(block1.Coinbase().Bytes())
return nil, nil, err // 0x08d4679cbcf198c1741a6f4e4473845659a30caa8b26f8d37a0be2e2bc0d8892
} block2CoinbaseHash = crypto.Keccak256Hash(block2.Coinbase().Bytes())
block := new(types.Block) // 0x6efa174f00e64521a535f35e67c1aa241951c791639b2f3d060f49c5d9fa8b9e
return block, blockRLP, rlp.DecodeBytes(blockRLP, block) block3CoinbaseHash = crypto.Keccak256Hash(block3.Coinbase().Bytes())
} }
func TestBuilderOnMainnetBlocks(t *testing.T) { func TestBuilderOnMainnetBlocks(t *testing.T) {
@ -602,7 +566,9 @@ func TestBuilderOnMainnetBlocks(t *testing.T) {
}, },
StorageDiff: emptyStorage, StorageDiff: emptyStorage,
}, },
{ // this is the new account created due to the coinbase mining a block, it's creation shouldn't affect 0x 0e 05 07 {
// this is the new account created due to the coinbase mining a block, its
// creation shouldn't affect 0x 0e 05 07
Removed: false, Removed: false,
AccountWrapper: sdtypes.AccountWrapper{ AccountWrapper: sdtypes.AccountWrapper{
Account: block3CoinbaseAccount, Account: block3CoinbaseAccount,
@ -658,11 +624,10 @@ func TestBuilderOnMainnetBlocks(t *testing.T) {
}, },
} }
test_helpers.RunBuilderTests(t, test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
statediff.NewBuilder(adapt.GethStateView(chain.StateCache())), test_helpers.CheckedRoots{
tests, params, test_helpers.CheckedRoots{ block1: block1RootBranchNode,
block1: block1RootBranchNode, block2: block2RootBranchNode,
block2: block2RootBranchNode, block3: block3RootBranchNode,
block3: block3RootBranchNode, }.Check(t)
})
} }

View File

@ -1,25 +1,30 @@
#!/bin/bash #!/bin/bash
# Builds and deploys a stack with only what we need.
# This script assumes we are running in the project root.
set -eux set -e
cluster="${1:-test}" cluster="${1:-test}"
laconic_so="${LACONIC_SO:-laconic-so} --stack fixturenet-plugeth-tx --verbose" laconic_so="${LACONIC_SO:-laconic-so} --stack fixturenet-plugeth-tx --verbose"
CONFIG_DIR=$(readlink -f "${CONFIG_DIR:-$(mktemp -d)}") CONFIG_DIR=$(readlink -f "${CONFIG_DIR:-$(mktemp -d)}")
# By default assume we are running in the project root # Point stack-orchestrator to the multi-project root
export CERC_REPO_BASE_DIR="${CERC_REPO_BASE_DIR:-$(realpath $(git rev-parse --show-toplevel)/..)}" export CERC_REPO_BASE_DIR="${CERC_REPO_BASE_DIR:-$(git rev-parse --show-toplevel)/..}"
# v5 migrations only go up to version 18 # v5 migrations only go up to version 18
echo CERC_STATEDIFF_DB_GOOSE_MIN_VER=18 >> $CONFIG_DIR/stack.env echo CERC_STATEDIFF_DB_GOOSE_MIN_VER=18 >> $CONFIG_DIR/stack.env
# Build and deploy a cluster with only what we need from the stack set -x
$laconic_so setup-repositories \
--exclude github.com/dboreham/foundry,github.com/cerc-io/tx-spammer,github.com/cerc-io/ipld-eth-server,git.vdb.to/cerc-io/plugeth,git.vdb.to/cerc-io/plugeth-statediff \
--branches-file ./test/stack-refs.txt
$laconic_so build-containers \ if [[ -z $SKIP_BUILD ]]; then
--exclude cerc/ipld-eth-server,cerc/keycloak,cerc/tx-spammer,cerc/foundry,cerc/plugeth,cerc/plugeth-statediff $laconic_so setup-repositories \
--exclude github.com/dboreham/foundry,github.com/cerc-io/tx-spammer,github.com/cerc-io/ipld-eth-server,git.vdb.to/cerc-io/plugeth,git.vdb.to/cerc-io/plugeth-statediff \
--branches-file ./test/stack-refs.txt
$laconic_so build-containers \
--exclude cerc/ipld-eth-server,cerc/keycloak,cerc/tx-spammer,cerc/foundry,cerc/plugeth,cerc/plugeth-statediff
fi
$laconic_so deploy \ $laconic_so deploy \
--exclude foundry,keycloak,tx-spammer,ipld-eth-server \ --exclude foundry,keycloak,tx-spammer,ipld-eth-server \

View File

@ -166,10 +166,12 @@ func NewService(cfg Config, blockChain BlockChain, backend plugeth.Backend, inde
workers = 1 workers = 1
} }
builder := NewBuilder(blockChain.StateCache())
builder.SetSubtrieWorkers(cfg.SubtrieWorkers)
quitCh := make(chan bool) quitCh := make(chan bool)
sds := &Service{ sds := &Service{
BlockChain: blockChain, BlockChain: blockChain,
Builder: NewBuilder(blockChain.StateCache()), Builder: builder,
QuitChan: quitCh, QuitChan: quitCh,
Subscriptions: make(map[common.Hash]map[SubID]Subscription), Subscriptions: make(map[common.Hash]map[SubID]Subscription),
SubscriptionTypes: make(map[common.Hash]Params), SubscriptionTypes: make(map[common.Hash]Params),
@ -785,6 +787,10 @@ func (sds *Service) claimExclusiveAccess(block *types.Block) (bool, func()) {
// Writes a state diff from the current block, parent state root, and provided params // Writes a state diff from the current block, parent state root, and provided params
func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error { func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error {
if sds.indexer == nil {
return fmt.Errorf("indexer is not set; cannot write indexed diffs")
}
log := log.New("hash", block.Hash(), "number", block.Number()) log := log.New("hash", block.Hash(), "number", block.Number())
if granted, relinquish := sds.claimExclusiveAccess(block); granted { if granted, relinquish := sds.claimExclusiveAccess(block); granted {
defer relinquish() defer relinquish()
@ -804,9 +810,6 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
start := countStateDiffBegin(block, log) start := countStateDiffBegin(block, log)
defer countStateDiffEnd(start, log, &err) defer countStateDiffEnd(start, log, &err)
if sds.indexer == nil {
return fmt.Errorf("indexer is not set; cannot write indexed diffs")
}
if params.IncludeTD { if params.IncludeTD {
totalDifficulty = sds.BlockChain.GetTd(block.Hash(), block.NumberU64()) totalDifficulty = sds.BlockChain.GetTd(block.Hash(), block.NumberU64())
@ -819,23 +822,26 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
return err return err
} }
output := func(node types2.StateLeafNode) error { var nodeMtx, ipldMtx sync.Mutex
defer metrics.ReportAndUpdateDuration("statediff output", time.Now(), log, nodeSink := func(node types2.StateLeafNode) error {
metrics.IndexerMetrics.OutputTimer) defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.OutputTimer)
nodeMtx.Lock()
defer nodeMtx.Unlock()
return sds.indexer.PushStateNode(tx, node, block.Hash().String()) return sds.indexer.PushStateNode(tx, node, block.Hash().String())
} }
ipldOutput := func(c types2.IPLD) error { ipldSink := func(c types2.IPLD) error {
defer metrics.ReportAndUpdateDuration("statediff ipldOutput", time.Now(), log, defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.IPLDOutputTimer)
metrics.IndexerMetrics.IPLDOutputTimer) ipldMtx.Lock()
defer ipldMtx.Unlock()
return sds.indexer.PushIPLD(tx, c) return sds.indexer.PushIPLD(tx, c)
} }
err = sds.Builder.WriteStateDiffObject(Args{ err = sds.Builder.WriteStateDiff(Args{
NewStateRoot: block.Root(), NewStateRoot: block.Root(),
OldStateRoot: parentRoot, OldStateRoot: parentRoot,
BlockHash: block.Hash(), BlockHash: block.Hash(),
BlockNumber: block.Number(), BlockNumber: block.Number(),
}, params, output, ipldOutput) }, params, nodeSink, ipldSink)
// TODO this anti-pattern needs to be sorted out eventually // TODO this anti-pattern needs to be sorted out eventually
if err = tx.Submit(err); err != nil { if err = tx.Submit(err); err != nil {
@ -885,7 +891,6 @@ func (sds *Service) UnsubscribeWriteStatus(id SubID) {
// add | remove | set | clear // add | remove | set | clear
func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.WatchAddressArg) error { func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.WatchAddressArg) error {
sds.writeLoopParams.Lock() sds.writeLoopParams.Lock()
log.Debug("WatchAddress: locked sds.writeLoopParams")
defer sds.writeLoopParams.Unlock() defer sds.writeLoopParams.Unlock()
// get the current block number // get the current block number

View File

@ -41,7 +41,7 @@ import (
) )
func init() { func init() {
test_helpers.SilenceLogs() test_helpers.QuietLogs()
} }
func TestServiceLoop(t *testing.T) { func TestServiceLoop(t *testing.T) {

View File

@ -1 +1 @@
github.com/cerc-io/ipld-eth-db v5.0.2-alpha github.com/cerc-io/ipld-eth-db v5.0.5-alpha

View File

@ -2,15 +2,18 @@ package test_helpers
import ( import (
"bytes" "bytes"
"encoding/json" "fmt"
"math/big"
"sort" "sort"
"testing" "testing"
"github.com/cerc-io/plugeth-statediff" statediff "github.com/cerc-io/plugeth-statediff"
"github.com/cerc-io/plugeth-statediff/adapt"
sdtypes "github.com/cerc-io/plugeth-statediff/types" sdtypes "github.com/cerc-io/plugeth-statediff/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -20,52 +23,67 @@ type TestCase struct {
Expected *sdtypes.StateObject Expected *sdtypes.StateObject
} }
type CheckedRoots = map[*types.Block][]byte type CheckedRoots map[*types.Block][]byte
// Replicates the statediff object, but indexes nodes by CID
type normalizedStateDiff struct {
BlockNumber *big.Int
BlockHash common.Hash
Nodes map[string]sdtypes.StateLeafNode
IPLDs map[string]sdtypes.IPLD
}
func RunBuilderTests( func RunBuilderTests(
t *testing.T, t *testing.T,
builder statediff.Builder, sdb state.Database,
tests []TestCase, tests []TestCase,
params statediff.Params, params statediff.Params,
roots CheckedRoots, subtrieCounts []uint,
) { ) {
builder := statediff.NewBuilder(adapt.GethStateView(sdb))
for _, test := range tests { for _, test := range tests {
diff, err := builder.BuildStateDiffObject(test.Args, params) for _, subtries := range subtrieCounts {
if err != nil { t.Run(fmt.Sprintf("%s with %d subtries", test.Name, subtries), func(t *testing.T) {
t.Error(err) builder.SetSubtrieWorkers(subtries)
} diff, err := builder.BuildStateDiffObject(test.Args, params)
receivedStateDiffRlp, err := rlp.EncodeToBytes(&diff) if err != nil {
if err != nil { t.Error(err)
t.Error(err) }
} require.Equal(t,
expectedStateDiffRlp, err := rlp.EncodeToBytes(test.Expected) normalize(test.Expected),
if err != nil { normalize(&diff),
t.Error(err) )
} })
sort.Slice(receivedStateDiffRlp, func(i, j int) bool {
return receivedStateDiffRlp[i] < receivedStateDiffRlp[j]
})
sort.Slice(expectedStateDiffRlp, func(i, j int) bool {
return expectedStateDiffRlp[i] < expectedStateDiffRlp[j]
})
if !bytes.Equal(receivedStateDiffRlp, expectedStateDiffRlp) {
actualb, err := json.Marshal(diff)
require.NoError(t, err)
expectedb, err := json.Marshal(test.Expected)
require.NoError(t, err)
var expected, actual interface{}
err = json.Unmarshal(expectedb, &expected)
require.NoError(t, err)
err = json.Unmarshal(actualb, &actual)
require.NoError(t, err)
require.Equal(t, expected, actual, test.Name)
} }
} }
}
func (roots CheckedRoots) Check(t *testing.T) {
// Let's also confirm that our root state nodes form the state root hash in the headers // Let's also confirm that our root state nodes form the state root hash in the headers
for block, node := range roots { for block, node := range roots {
require.Equal(t, block.Root(), crypto.Keccak256Hash(node), require.Equal(t, block.Root(), crypto.Keccak256Hash(node),
"expected root does not match actual root", block.Number()) "expected root does not match actual root", block.Number())
} }
} }
func normalize(diff *sdtypes.StateObject) normalizedStateDiff {
norm := normalizedStateDiff{
BlockNumber: diff.BlockNumber,
BlockHash: diff.BlockHash,
Nodes: make(map[string]sdtypes.StateLeafNode),
IPLDs: make(map[string]sdtypes.IPLD),
}
for _, node := range diff.Nodes {
sort.Slice(node.StorageDiff, func(i, j int) bool {
return bytes.Compare(
node.StorageDiff[i].LeafKey,
node.StorageDiff[j].LeafKey,
) < 0
})
norm.Nodes[node.AccountWrapper.CID] = node
}
for _, ipld := range diff.IPLDs {
norm.IPLDs[ipld.CID] = ipld
}
return norm
}

View File

@ -40,7 +40,7 @@ func (builder *Builder) BuildStateDiffObject(args statediff.Args, params statedi
} }
// BuildStateDiffObject mock method // BuildStateDiffObject mock method
func (builder *Builder) WriteStateDiffObject(args statediff.Args, params statediff.Params, output sdtypes.StateNodeSink, iplds sdtypes.IPLDSink) error { func (builder *Builder) WriteStateDiff(args statediff.Args, params statediff.Params, output sdtypes.StateNodeSink, iplds sdtypes.IPLDSink) error {
builder.Args = args builder.Args = args
builder.Params = params builder.Params = params

View File

@ -6,8 +6,9 @@ import (
"github.com/cerc-io/plugeth-statediff/utils/log" "github.com/cerc-io/plugeth-statediff/utils/log"
) )
// The geth sync logs are noisy, it can be useful to silence them // QuietLogs silences the geth logs and sets the plugin test log level to "warning"
func SilenceLogs() { // The geth sync logs are noisy, so it can be nice to silence them.
func QuietLogs() {
geth_log.Root().SetHandler(geth_log.DiscardHandler()) geth_log.Root().SetHandler(geth_log.DiscardHandler())
log.TestLogger.SetLevel(2) log.TestLogger.SetLevel(2)
} }

View File

@ -1,80 +0,0 @@
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// Contains a batch of utility type declarations used by the tests. As the node
// operates on unique types, a lot of them are needed to check various features.
package trie_helpers
import (
"sort"
"strings"
"time"
metrics2 "github.com/cerc-io/plugeth-statediff/indexer/database/metrics"
"github.com/cerc-io/plugeth-statediff/types"
)
// SortKeys sorts the keys in the account map
func SortKeys(data types.AccountMap) []string {
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.SortKeysTimer)
keys := make([]string, 0, len(data))
for key := range data {
keys = append(keys, key)
}
sort.Strings(keys)
return keys
}
// FindIntersection finds the set of strings from both arrays that are equivalent
// a and b must first be sorted
// this is used to find which keys have been both "deleted" and "created" i.e. they were updated
func FindIntersection(a, b []string) []string {
defer metrics2.UpdateDuration(time.Now(), metrics2.IndexerMetrics.FindIntersectionTimer)
lenA := len(a)
lenB := len(b)
iOfA, iOfB := 0, 0
updates := make([]string, 0)
if iOfA >= lenA || iOfB >= lenB {
return updates
}
for {
switch strings.Compare(a[iOfA], b[iOfB]) {
// -1 when a[iOfA] < b[iOfB]
case -1:
iOfA++
if iOfA >= lenA {
return updates
}
// 0 when a[iOfA] == b[iOfB]
case 0:
updates = append(updates, a[iOfA])
iOfA++
iOfB++
if iOfA >= lenA || iOfB >= lenB {
return updates
}
// 1 when a[iOfA] > b[iOfB]
case 1:
iOfB++
if iOfB >= lenB {
return updates
}
}
}
}

View File

@ -1 +1,174 @@
package utils package utils
import (
"bytes"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/trie"
)
type symmDiffIterator struct {
a, b iterState // Nodes returned are those in b - a and a - b (keys only)
yieldFromA bool // Whether next node comes from a
count int // Number of nodes scanned on either trie
eqPathIndex int // Count index of last pair of equal paths, to detect an updated key
}
// NewSymmetricDifferenceIterator constructs a trie.NodeIterator that iterates over the symmetric difference
// of elements in a and b, i.e., the elements in a that are not in b, and vice versa.
// Returns the iterator, and a pointer to an integer recording the number of nodes seen.
func NewSymmetricDifferenceIterator(a, b trie.NodeIterator) (*symmDiffIterator, *int) {
it := &symmDiffIterator{
a: iterState{a, true},
b: iterState{b, true},
// common paths are detected by a distance <=1 from this index, so put it out of reach
eqPathIndex: -2,
}
return it, &it.count
}
// pairs an iterator with a cache of its valid status
type iterState struct {
trie.NodeIterator
valid bool
}
func (st *iterState) Next(descend bool) bool {
st.valid = st.NodeIterator.Next(descend)
return st.valid
}
func (it *symmDiffIterator) curr() *iterState {
if it.yieldFromA {
return &it.a
}
return &it.b
}
// FromA returns true if the current node is sourced from A.
func (it *symmDiffIterator) FromA() bool {
return it.yieldFromA
}
// CommonPath returns true if a node with the current path exists in each sub-iterator - i.e. it
// represents an updated node.
func (it *symmDiffIterator) CommonPath() bool {
return it.count-it.eqPathIndex <= 1
}
func (it *symmDiffIterator) Hash() common.Hash {
return it.curr().Hash()
}
func (it *symmDiffIterator) Parent() common.Hash {
return it.curr().Parent()
}
func (it *symmDiffIterator) Leaf() bool {
return it.curr().Leaf()
}
func (it *symmDiffIterator) LeafKey() []byte {
return it.curr().LeafKey()
}
func (it *symmDiffIterator) LeafBlob() []byte {
return it.curr().LeafBlob()
}
func (it *symmDiffIterator) LeafProof() [][]byte {
return it.curr().LeafProof()
}
func (it *symmDiffIterator) Path() []byte {
return it.curr().Path()
}
func (it *symmDiffIterator) NodeBlob() []byte {
return it.curr().NodeBlob()
}
func (it *symmDiffIterator) AddResolver(resolver trie.NodeResolver) {
panic("not implemented")
}
func (it *symmDiffIterator) Next(bool) bool {
// NodeIterators start in a "pre-valid" state, so the first Next advances to a valid node.
if it.count == 0 {
if it.a.Next(true) {
it.count++
}
if it.b.Next(true) {
it.count++
}
} else {
if it.curr().Next(true) {
it.count++
}
}
it.seek()
return it.a.valid || it.b.valid
}
func (it *symmDiffIterator) seek() {
// Invariants:
// - At the end of the function, the sub-iterator with the lexically lesser path
// points to the next element
// - Said sub-iterator never points to an element present in the other
for {
if !it.b.valid {
it.yieldFromA = true
return
}
if !it.a.valid {
it.yieldFromA = false
return
}
cmp := bytes.Compare(it.a.Path(), it.b.Path())
if cmp == 0 {
it.eqPathIndex = it.count
cmp = compareNodes(&it.a, &it.b)
}
switch cmp {
case -1:
it.yieldFromA = true
return
case 1:
it.yieldFromA = false
return
case 0:
// if A and B have the same path and non-zero hash, they are identical and we can skip
// the whole subtree
noHash := it.a.Hash() == common.Hash{}
if it.a.Next(noHash) {
it.count++
}
if it.b.Next(noHash) {
it.count++
}
}
}
}
func (it *symmDiffIterator) Error() error {
if err := it.a.Error(); err != nil {
return err
}
return it.b.Error()
}
func compareNodes(a, b trie.NodeIterator) int {
if a.Leaf() && !b.Leaf() {
return -1
} else if b.Leaf() && !a.Leaf() {
return 1
}
if cmp := bytes.Compare(a.Hash().Bytes(), b.Hash().Bytes()); cmp != 0 {
return cmp
}
if a.Leaf() && b.Leaf() {
return bytes.Compare(a.LeafBlob(), b.LeafBlob())
}
return 0
}

225
utils/iterator_test.go Normal file
View File

@ -0,0 +1,225 @@
package utils_test
import (
"testing"
"github.com/cerc-io/eth-testing/chaindata/mainnet"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/trie"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/cerc-io/plugeth-statediff/test_helpers"
"github.com/cerc-io/plugeth-statediff/utils"
)
type kvs struct{ k, v string }
var (
testdata1 = []kvs{
{"barb", "ba"},
{"bard", "bc"},
{"bars", "bb"},
{"bar", "b"},
{"fab", "z"},
{"food", "ab"},
{"foo", "a"},
}
testdata2 = []kvs{
{"aardvark", "c"},
{"bar", "b"},
{"barb", "bd"},
{"bars", "be"},
{"fab", "z"},
{"foo", "a"},
{"foos", "aa"},
{"jars", "d"},
}
)
func TestSymmetricDifferenceIterator(t *testing.T) {
t.Run("with no difference", func(t *testing.T) {
db := trie.NewDatabase(rawdb.NewMemoryDatabase())
triea := trie.NewEmpty(db)
di, count := utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), triea.NodeIterator(nil))
for di.Next(true) {
t.Errorf("iterator should not yield any elements")
}
assert.Equal(t, 0, *count)
triea.MustUpdate([]byte("foo"), []byte("bar"))
di, count = utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), triea.NodeIterator(nil))
for di.Next(true) {
t.Errorf("iterator should not yield any elements")
}
assert.Equal(t, 2, *count)
// TODO will fail until fixed https://github.com/ethereum/go-ethereum/pull/27838
trieb := trie.NewEmpty(db)
di, count = utils.NewSymmetricDifferenceIterator(
triea.NodeIterator([]byte("jars")),
trieb.NodeIterator(nil))
for di.Next(true) {
t.Errorf("iterator should not yield any elements, but got key %s", di.Path())
}
assert.Equal(t, 0, *count)
// // TODO will fail until merged: https://github.com/ethereum/go-ethereum/pull/27838
// di, count = utils.NewSymmetricDifferenceIterator(
// triea.NodeIterator([]byte("food")),
// trieb.NodeIterator(nil))
// for di.Next(true) {
// t.Errorf("iterator should not yield any elements, but got key %s", di.Path())
// }
// assert.Equal(t, 0, *count)
})
t.Run("small difference", func(t *testing.T) {
dba := trie.NewDatabase(rawdb.NewMemoryDatabase())
triea := trie.NewEmpty(dba)
dbb := trie.NewDatabase(rawdb.NewMemoryDatabase())
trieb := trie.NewEmpty(dbb)
trieb.MustUpdate([]byte("foo"), []byte("bar"))
di, count := utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), trieb.NodeIterator(nil))
leaves := 0
for di.Next(true) {
if di.Leaf() {
assert.False(t, di.CommonPath())
assert.Equal(t, "foo", string(di.LeafKey()))
assert.Equal(t, "bar", string(di.LeafBlob()))
leaves++
}
}
assert.Equal(t, 1, leaves)
assert.Equal(t, 2, *count)
trieb.MustUpdate([]byte("quux"), []byte("bars"))
di, count = utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), trieb.NodeIterator([]byte("quux")))
leaves = 0
for di.Next(true) {
if di.Leaf() {
assert.False(t, di.CommonPath())
assert.Equal(t, "quux", string(di.LeafKey()))
assert.Equal(t, "bars", string(di.LeafBlob()))
leaves++
}
}
assert.Equal(t, 1, leaves)
assert.Equal(t, 1, *count)
})
dba := trie.NewDatabase(rawdb.NewMemoryDatabase())
triea := trie.NewEmpty(dba)
for _, val := range testdata1 {
triea.MustUpdate([]byte(val.k), []byte(val.v))
}
dbb := trie.NewDatabase(rawdb.NewMemoryDatabase())
trieb := trie.NewEmpty(dbb)
for _, val := range testdata2 {
trieb.MustUpdate([]byte(val.k), []byte(val.v))
}
onlyA := make(map[string]string)
onlyB := make(map[string]string)
var deletions, creations []string
it, _ := utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), trieb.NodeIterator(nil))
for it.Next(true) {
if !it.Leaf() {
continue
}
key, value := string(it.LeafKey()), string(it.LeafBlob())
if it.FromA() {
onlyA[key] = value
if !it.CommonPath() {
deletions = append(deletions, key)
}
} else {
onlyB[key] = value
if !it.CommonPath() {
creations = append(creations, key)
}
}
}
expectedOnlyA := map[string]string{
"barb": "ba",
"bard": "bc",
"bars": "bb",
"food": "ab",
}
expectedOnlyB := map[string]string{
"aardvark": "c",
"barb": "bd",
"bars": "be",
"foos": "aa",
"jars": "d",
}
expectedDeletions := []string{
"bard",
"food",
}
expectedCreations := []string{
"aardvark",
"foos",
"jars",
}
assert.Equal(t, expectedOnlyA, onlyA)
assert.Equal(t, expectedOnlyB, onlyB)
assert.Equal(t, expectedDeletions, deletions)
assert.Equal(t, expectedCreations, creations)
}
// compare the paths traversed by the geth difference iterator and symmetric difference iterator
// within a sample of mainnet data.
func TestCompareDifferenceIterators(t *testing.T) {
test_helpers.QuietLogs()
db := rawdb.NewMemoryDatabase()
core.DefaultGenesisBlock().MustCommit(db)
blocks := mainnet.GetBlocks()
chain, _ := core.NewBlockChain(db, nil, nil, nil, ethash.NewFaker(), vm.Config{}, nil, nil)
_, err := chain.InsertChain(blocks[1:])
if err != nil {
t.Fatal(err)
}
treeA, err := chain.StateCache().OpenTrie(blocks[1].Root())
if err != nil {
t.Fatal(err)
}
treeB, err := chain.StateCache().OpenTrie(blocks[2].Root())
if err != nil {
t.Fatal(err)
}
// collect the paths of nodes exclusive to A and B separately, then make sure the symmetric
// iterator produces the same sets
var pathsA, pathsB [][]byte
itBonly, _ := trie.NewDifferenceIterator(treeA.NodeIterator(nil), treeB.NodeIterator(nil))
for itBonly.Next(true) {
pathsB = append(pathsB, itBonly.Path())
}
itAonly, _ := trie.NewDifferenceIterator(treeB.NodeIterator(nil), treeA.NodeIterator(nil))
for itAonly.Next(true) {
pathsA = append(pathsA, itAonly.Path())
}
itSym, _ := utils.NewSymmetricDifferenceIterator(treeA.NodeIterator(nil), treeB.NodeIterator(nil))
var idxA, idxB int
for itSym.Next(true) {
if itSym.FromA() {
require.Equal(t, pathsA[idxA], itSym.Path())
idxA++
} else {
require.Equal(t, pathsB[idxB], itSym.Path())
idxB++
}
}
require.Equal(t, len(pathsA), idxA)
require.Equal(t, len(pathsB), idxB)
}

View File

@ -1,9 +1,12 @@
package utils package utils
import ( import (
"encoding/json"
"fmt" "fmt"
"os" "os"
"github.com/cerc-io/plugeth-statediff/utils/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
) )
@ -21,3 +24,38 @@ func MustDecode[T any](buf []byte) *T {
} }
return &ret return &ret
} }
// LoadConfig loads chain config from json file
func LoadConfig(chainConfigPath string) (*params.ChainConfig, error) {
file, err := os.Open(chainConfigPath)
if err != nil {
log.Error("Failed to read chain config file", "error", err)
return nil, err
}
defer file.Close()
chainConfig := new(params.ChainConfig)
if err := json.NewDecoder(file).Decode(chainConfig); err != nil {
log.Error("invalid chain config file", "error", err)
return nil, err
}
log.Debug("Using chain config", "path", chainConfigPath, "content", chainConfig)
return chainConfig, nil
}
// ChainConfig returns the appropriate ethereum chain config for the provided chain id
func ChainConfig(chainID uint64) (*params.ChainConfig, error) {
switch chainID {
case 1:
return params.MainnetChainConfig, nil
case 4:
return params.RinkebyChainConfig, nil
case 5:
return params.GoerliChainConfig, nil
default:
return nil, fmt.Errorf("chain config for chainid %d not available", chainID)
}
}