Add WriteStateSnapshot #15

Merged
roysc merged 23 commits from with-iterator-tracker into main 2023-09-28 03:35:47 +00:00
40 changed files with 1222 additions and 453 deletions

View File

@ -89,3 +89,47 @@ jobs:
pip install pytest
pip install -r requirements.txt
pytest -v -k test_basic_db
compliance-test:
name: Run compliance tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
path: ./plugeth-statediff
- uses: actions/checkout@v3
with:
repository: cerc-io/eth-statediff-compliance
ref: v0.1.0
path: ./eth-statediff-compliance
token: ${{ secrets.CICD_REPO_TOKEN }}
- uses: actions/setup-go@v4
with:
go-version-file: './eth-statediff-compliance/go.mod'
check-latest: true
- name: Install jq
run: apt-get update && apt-get install -yq jq
- name: Set up Gitea access token
env:
TOKEN: ${{ secrets.CICD_REPO_TOKEN }}
run: |
git config --global url."https://$TOKEN:@git.vdb.to/".insteadOf https://git.vdb.to/
- name: Update go.mod for dumpdiff-geth
working-directory: ./eth-statediff-compliance/
run: ./scripts/update-mod.sh ../plugeth-statediff dumpdiff-geth/
- name: Update go.mod for dumpdiff-plugeth
working-directory: ./eth-statediff-compliance/
run: ./scripts/update-mod.sh ../plugeth-statediff dumpdiff-plugeth/
- name: Update go.mod for dumpdiff-plugeth-parallel
working-directory: ./eth-statediff-compliance/
run: ./scripts/update-mod.sh ../plugeth-statediff dumpdiff-plugeth-parallel/
- name: Build tools
working-directory: ./eth-statediff-compliance/
run: make all
- name: Compare output of geth and plugeth
working-directory: ./eth-statediff-compliance/
run: ./scripts/compare-diffs.sh geth plugeth
- name: Compare output of geth and plugeth-parallel
working-directory: ./eth-statediff-compliance/
run: ./scripts/compare-diffs.sh geth plugeth-parallel

View File

@ -27,6 +27,7 @@ import (
"time"
iterutils "github.com/cerc-io/eth-iterator-utils"
"github.com/cerc-io/eth-iterator-utils/tracker"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
@ -58,7 +59,7 @@ type Builder interface {
WriteStateDiff(Args, Params, sdtypes.StateNodeSink, sdtypes.IPLDSink) error
}
type StateDiffBuilder struct {
type builder struct {
// state cache is safe for concurrent reads
stateCache adapt.StateView
subtrieWorkers uint
@ -88,8 +89,8 @@ func syncedAppender[T any](to *[]T) func(T) error {
}
// NewBuilder is used to create a statediff builder
func NewBuilder(stateCache adapt.StateView) *StateDiffBuilder {
return &StateDiffBuilder{
func NewBuilder(stateCache adapt.StateView) *builder {
return &builder{
stateCache: stateCache,
subtrieWorkers: defaultSubtrieWorkers,
}
@ -97,7 +98,7 @@ func NewBuilder(stateCache adapt.StateView) *StateDiffBuilder {
// SetSubtrieWorkers sets the number of disjoint subtries to divide among parallel workers.
// Passing 0 will reset this to the default value.
func (sdb *StateDiffBuilder) SetSubtrieWorkers(n uint) {
func (sdb *builder) SetSubtrieWorkers(n uint) {
if n == 0 {
n = defaultSubtrieWorkers
}
@ -105,7 +106,7 @@ func (sdb *StateDiffBuilder) SetSubtrieWorkers(n uint) {
}
// BuildStateDiffObject builds a statediff object from two blocks and the provided parameters
func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (sdtypes.StateObject, error) {
func (sdb *builder) BuildStateDiffObject(args Args, params Params) (sdtypes.StateObject, error) {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStateDiffObjectTimer)
var stateNodes []sdtypes.StateLeafNode
var iplds []sdtypes.IPLD
@ -122,7 +123,7 @@ func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (sdt
}
// WriteStateDiff writes a statediff object to output sinks
func (sdb *StateDiffBuilder) WriteStateDiff(
func (sdb *builder) WriteStateDiff(
args Args, params Params,
nodeSink sdtypes.StateNodeSink,
ipldSink sdtypes.IPLDSink,
@ -141,14 +142,16 @@ func (sdb *StateDiffBuilder) WriteStateDiff(
subitersB := iterutils.SubtrieIterators(trieb.NodeIterator, uint(sdb.subtrieWorkers))
logger := log.New("hash", args.BlockHash, "number", args.BlockNumber)
// errgroup will cancel if any gr fails
// errgroup will cancel if any group fails
g, ctx := errgroup.WithContext(context.Background())
for i := uint(0); i < sdb.subtrieWorkers; i++ {
func(subdiv uint) {
g.Go(func() error {
a, b := subitersA[subdiv], subitersB[subdiv]
it := utils.NewSymmetricDifferenceIterator(a, b)
return sdb.processAccounts(ctx,
a, b, params.watchedAddressesLeafPaths,
it, &it.SymmDiffState,
params.watchedAddressesLeafPaths,
nodeSink, ipldSink, logger,
)
})
@ -157,11 +160,60 @@ func (sdb *StateDiffBuilder) WriteStateDiff(
return g.Wait()
}
// processAccounts processes account creations and deletions, and returns a set of updated
// existing accounts, indexed by leaf key.
func (sdb *StateDiffBuilder) processAccounts(
// WriteStateDiff writes a statediff object to output sinks
func (sdb *builder) WriteStateSnapshot(
stateRoot common.Hash, params Params,
nodeSink sdtypes.StateNodeSink,
ipldSink sdtypes.IPLDSink,
tracker tracker.IteratorTracker,
) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.WriteStateDiffTimer)
// Load tries for old and new states
tree, err := sdb.stateCache.OpenTrie(stateRoot)
if err != nil {
return fmt.Errorf("error opening new state trie: %w", err)
}
subiters, _, err := tracker.Restore(tree.NodeIterator)
telackey marked this conversation as resolved
Review

I didn't see a test case where the iterators are actually restored. Is there one? Perhaps it exists in ipld-eth-state-snapshot?

I didn't see a test case where the iterators are actually restored. Is there one? Perhaps it exists in ipld-eth-state-snapshot?
Review

It happens in the TestBuilderSnapshot tests, via test_helpers.RunStateSnapshot. Although there's no explicit check for the file in these tests, I do see them getting restored.

It happens in the `TestBuilderSnapshot` tests, via `test_helpers.RunStateSnapshot`. Although there's no explicit check for the file in these tests, I do see them getting restored.
Review

Added a check for the file

Added a check for the file
Review

Great! I see it now. I read that test, but I didn't put 2 and 2 together regarding the interrupt.

Great! I see it now. I read that test, but I didn't put 2 and 2 together regarding the interrupt.
Review

Yeah, a check for the file is even better, but you are right, practically speaking even the previous version of the test couldn't have succeeded if the file were not created and used.

Yeah, a check for the file is even better, but you are right, practically speaking even the previous version of the test couldn't have succeeded if the file were not created and used.
if err != nil {
return fmt.Errorf("error restoring iterators: %w", err)
}
if len(subiters) != 0 {
// Completed iterators are not saved by the tracker, so restoring fewer than configured is ok,
// but having too many is a problem.
if len(subiters) > int(sdb.subtrieWorkers) {
return fmt.Errorf("restored too many iterators: expected %d, got %d",
sdb.subtrieWorkers, len(subiters))
}
} else {
subiters = iterutils.SubtrieIterators(tree.NodeIterator, uint(sdb.subtrieWorkers))
for i := range subiters {
subiters[i] = tracker.Tracked(subiters[i])
}
}
// errgroup will cancel if any group fails
g, ctx := errgroup.WithContext(context.Background())
for i := range subiters {
func(subdiv uint) {
g.Go(func() error {
symdiff := utils.AlwaysBState()
return sdb.processAccounts(ctx,
subiters[subdiv], &symdiff,
params.watchedAddressesLeafPaths,
nodeSink, ipldSink, log.DefaultLogger,
)
})
}(uint(i))
}
return g.Wait()
}
// processAccounts processes account creations, deletions, and updates
// the NodeIterator and SymmDiffIterator instances should refer to the same object, will only be used
func (sdb *builder) processAccounts(
ctx context.Context,
a, b trie.NodeIterator, watchedAddressesLeafPaths [][]byte,
it trie.NodeIterator, symdiff *utils.SymmDiffState,
watchedAddressesLeafPaths [][]byte,
nodeSink sdtypes.StateNodeSink, ipldSink sdtypes.IPLDSink,
logger log.Logger,
) error {
@ -171,9 +223,7 @@ func (sdb *StateDiffBuilder) processAccounts(
updates := make(accountUpdateMap)
// Cache the RLP of the previous node. When we hit a value node this will be the parent blob.
var prevBlob []byte
it, itCount := utils.NewSymmetricDifferenceIterator(a, b)
prevBlob = it.NodeBlob()
var prevBlob = it.NodeBlob()
for it.Next(true) {
select {
case <-ctx.Done():
@ -185,7 +235,7 @@ func (sdb *StateDiffBuilder) processAccounts(
if !isWatchedPathPrefix(watchedAddressesLeafPaths, it.Path()) {
continue
}
if it.FromA() { // Node exists in the old trie
if symdiff.FromA() { // Node exists in the old trie
if it.Leaf() {
var account types.StateAccount
if err := rlp.DecodeBytes(it.LeafBlob(), &account); err != nil {
@ -194,7 +244,7 @@ func (sdb *StateDiffBuilder) processAccounts(
leafKey := make([]byte, len(it.LeafKey()))
copy(leafKey, it.LeafKey())
if it.CommonPath() {
if symdiff.CommonPath() {
// If B also contains this leaf node, this is the old state of an updated account.
if update, ok := updates[string(leafKey)]; ok {
update.oldRoot = account.Root
@ -212,14 +262,14 @@ func (sdb *StateDiffBuilder) processAccounts(
}
continue
}
// Node exists in the new trie
// Node exists in the new trie (B)
if it.Leaf() {
accountW, err := sdb.decodeStateLeaf(it, prevBlob)
if err != nil {
return err
}
if it.CommonPath() {
if symdiff.CommonPath() {
// If A also contains this leaf node, this is the new state of an updated account.
if update, ok := updates[string(accountW.LeafKey)]; ok {
update.new = *accountW
@ -232,42 +282,41 @@ func (sdb *StateDiffBuilder) processAccounts(
return err
}
}
} else {
// New trie nodes will be written to blockstore only.
// Reminder: this includes leaf nodes, since the geth iterator.Leaf() actually
// signifies a "value" node.
if it.Hash() == zeroHash {
continue
}
// TODO - this can be handled when value node is (craeted?)
nodeVal := make([]byte, len(it.NodeBlob()))
copy(nodeVal, it.NodeBlob())
// if doing a selective diff, we need to ensure this is a watched path
if len(watchedAddressesLeafPaths) > 0 {
var elements []interface{}
if err := rlp.DecodeBytes(nodeVal, &elements); err != nil {
return err
}
ok, err := isLeaf(elements)
if err != nil {
return err
}
if ok {
partialPath := utils.CompactToHex(elements[0].([]byte))
valueNodePath := append(it.Path(), partialPath...)
if !isWatchedPath(watchedAddressesLeafPaths, valueNodePath) {
continue
}
}
}
if err := ipldSink(sdtypes.IPLD{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, it.Hash().Bytes()).String(),
Content: nodeVal,
}); err != nil {
continue
}
// New inner trie nodes will be written to blockstore only.
// Reminder: this includes leaf nodes, since the geth iterator.Leaf() actually
// signifies a "value" node.
if it.Hash() == zeroHash {
continue
}
nodeVal := make([]byte, len(it.NodeBlob()))
copy(nodeVal, it.NodeBlob())
// if doing a selective diff, we need to ensure this is a watched path
if len(watchedAddressesLeafPaths) > 0 {
var elements []interface{}
if err := rlp.DecodeBytes(nodeVal, &elements); err != nil {
return err
}
prevBlob = nodeVal
ok, err := isLeaf(elements)
if err != nil {
return err
}
if ok {
partialPath := utils.CompactToHex(elements[0].([]byte))
valueNodePath := append(it.Path(), partialPath...)
if !isWatchedPath(watchedAddressesLeafPaths, valueNodePath) {
continue
}
}
}
if err := ipldSink(sdtypes.IPLD{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, it.Hash().Bytes()).String(),
Content: nodeVal,
}); err != nil {
return err
}
prevBlob = nodeVal
}
for key, update := range updates {
@ -287,12 +336,10 @@ func (sdb *StateDiffBuilder) processAccounts(
return err
}
}
metrics.IndexerMetrics.DifferenceIteratorCounter.Inc(int64(*itCount))
return it.Error()
}
func (sdb *StateDiffBuilder) processAccountDeletion(
func (sdb *builder) processAccountDeletion(
leafKey []byte, account types.StateAccount, nodeSink sdtypes.StateNodeSink,
) error {
diff := sdtypes.StateLeafNode{
@ -309,7 +356,7 @@ func (sdb *StateDiffBuilder) processAccountDeletion(
return nodeSink(diff)
}
func (sdb *StateDiffBuilder) processAccountCreation(
func (sdb *builder) processAccountCreation(
accountW *sdtypes.AccountWrapper, ipldSink sdtypes.IPLDSink, nodeSink sdtypes.StateNodeSink,
) error {
diff := sdtypes.StateLeafNode{
@ -340,7 +387,7 @@ func (sdb *StateDiffBuilder) processAccountCreation(
// decodes account at leaf and encodes RLP data to CID
// reminder: it.Leaf() == true when the iterator is positioned at a "value node" (which is not something
// that actually exists in an MMPT), therefore we pass the parent node blob as the leaf RLP.
func (sdb *StateDiffBuilder) decodeStateLeaf(it trie.NodeIterator, parentBlob []byte) (*sdtypes.AccountWrapper, error) {
func (sdb *builder) decodeStateLeaf(it trie.NodeIterator, parentBlob []byte) (*sdtypes.AccountWrapper, error) {
var account types.StateAccount
if err := rlp.DecodeBytes(it.LeafBlob(), &account); err != nil {
return nil, fmt.Errorf("error decoding account at leaf key %x: %w", it.LeafKey(), err)
@ -357,7 +404,7 @@ func (sdb *StateDiffBuilder) decodeStateLeaf(it trie.NodeIterator, parentBlob []
// processStorageCreations processes the storage node records for a newly created account
// i.e. it returns all the storage nodes at this state, since there is no previous state.
func (sdb *StateDiffBuilder) processStorageCreations(
func (sdb *builder) processStorageCreations(
sr common.Hash, storageSink sdtypes.StorageNodeSink, ipldSink sdtypes.IPLDSink,
) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.ProcessStorageCreationsTimer)
@ -394,8 +441,9 @@ func (sdb *StateDiffBuilder) processStorageCreations(
return it.Error()
}
// processStorageUpdates builds the storage diff node objects for all nodes that exist in a different state at B than A
func (sdb *StateDiffBuilder) processStorageUpdates(
// processStorageUpdates builds the storage diff node objects for all nodes that exist in a
// different state at B than A
func (sdb *builder) processStorageUpdates(
oldroot common.Hash, newroot common.Hash,
storageSink sdtypes.StorageNodeSink,
ipldSink sdtypes.IPLDSink,
@ -416,7 +464,7 @@ func (sdb *StateDiffBuilder) processStorageUpdates(
var prevBlob []byte
a, b := oldTrie.NodeIterator(nil), newTrie.NodeIterator(nil)
it, _ := utils.NewSymmetricDifferenceIterator(a, b)
it := utils.NewSymmetricDifferenceIterator(a, b)
for it.Next(true) {
if it.FromA() {
if it.Leaf() && !it.CommonPath() {
@ -457,7 +505,7 @@ func (sdb *StateDiffBuilder) processStorageUpdates(
}
// processRemovedAccountStorage builds the "removed" diffs for all the storage nodes for a destroyed account
func (sdb *StateDiffBuilder) processRemovedAccountStorage(
func (sdb *builder) processRemovedAccountStorage(
sr common.Hash, storageSink sdtypes.StorageNodeSink,
) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.ProcessRemovedAccountStorageTimer)
@ -491,7 +539,7 @@ func (sdb *StateDiffBuilder) processRemovedAccountStorage(
// decodes slot at leaf and encodes RLP data to CID
// reminder: it.Leaf() == true when the iterator is positioned at a "value node" (which is not something
// that actually exists in an MMPT), therefore we pass the parent node blob as the leaf RLP.
func (sdb *StateDiffBuilder) decodeStorageLeaf(it trie.NodeIterator, parentBlob []byte) sdtypes.StorageLeafNode {
func (sdb *builder) decodeStorageLeaf(it trie.NodeIterator, parentBlob []byte) sdtypes.StorageLeafNode {
leafKey := make([]byte, len(it.LeafKey()))
copy(leafKey, it.LeafKey())
value := make([]byte, len(it.LeafBlob()))

538
builder_snapshot_test.go Normal file
View File

@ -0,0 +1,538 @@
package statediff_test
import (
"testing"
statediff "github.com/cerc-io/plugeth-statediff"
"github.com/cerc-io/plugeth-statediff/indexer/ipld"
"github.com/cerc-io/plugeth-statediff/test_helpers"
sdtypes "github.com/cerc-io/plugeth-statediff/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
)
func TestBuilderSnapshot(t *testing.T) {
blocks, chain := test_helpers.MakeChain(3, test_helpers.Genesis, test_helpers.TestChainGen)
contractLeafKey = test_helpers.AddressToLeafKey(test_helpers.ContractAddr)
defer chain.Stop()
block0 = test_helpers.Genesis
block1 = blocks[0]
block2 = blocks[1]
block3 = blocks[2]
params := statediff.Params{}
tests := []test_helpers.SnapshotTestCase{
{
"testEmptyDiff",
common.Hash{},
&sdtypes.StateObject{
Nodes: emptyDiffs,
},
},
{
"testBlock0",
//10000 transferred from testBankAddress to account1Addr
block0.Root(),
&sdtypes.StateObject{
Nodes: []sdtypes.StateLeafNode{
{
Removed: false,
AccountWrapper: sdtypes.AccountWrapper{
Account: bankAccountAtBlock0,
LeafKey: test_helpers.BankLeafKey,
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(bankAccountAtBlock0LeafNode)).String()},
StorageDiff: emptyStorage,
},
},
IPLDs: []sdtypes.IPLD{
{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(bankAccountAtBlock0LeafNode)).String(),
Content: bankAccountAtBlock0LeafNode,
},
},
},
},
{
"testBlock1",
//10000 transferred from testBankAddress to account1Addr
block1.Root(),
&sdtypes.StateObject{
Nodes: []sdtypes.StateLeafNode{
{
Removed: false,
AccountWrapper: sdtypes.AccountWrapper{
Account: bankAccountAtBlock1,
LeafKey: test_helpers.BankLeafKey,
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(bankAccountAtBlock1LeafNode)).String()},
StorageDiff: emptyStorage,
},
{
Removed: false,
AccountWrapper: sdtypes.AccountWrapper{
Account: minerAccountAtBlock1,
LeafKey: minerLeafKey,
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(minerAccountAtBlock1LeafNode)).String()},
StorageDiff: emptyStorage,
},
{
Removed: false,
AccountWrapper: sdtypes.AccountWrapper{
Account: account1AtBlock1,
LeafKey: test_helpers.Account1LeafKey,
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account1AtBlock1LeafNode)).String()},
StorageDiff: emptyStorage,
},
},
IPLDs: []sdtypes.IPLD{
{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(block1BranchRootNode)).String(),
Content: block1BranchRootNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(bankAccountAtBlock1LeafNode)).String(),
Content: bankAccountAtBlock1LeafNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(minerAccountAtBlock1LeafNode)).String(),
Content: minerAccountAtBlock1LeafNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account1AtBlock1LeafNode)).String(),
Content: account1AtBlock1LeafNode,
},
},
},
},
{
"testBlock2",
//1000 transferred from testBankAddress to account1Addr
//1000 transferred from account1Addr to account2Addr
block2.Root(),
&sdtypes.StateObject{
Nodes: []sdtypes.StateLeafNode{
{
Removed: false,
AccountWrapper: sdtypes.AccountWrapper{
Account: bankAccountAtBlock2,
LeafKey: test_helpers.BankLeafKey,
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(bankAccountAtBlock2LeafNode)).String()},
StorageDiff: emptyStorage,
},
{
Removed: false,
AccountWrapper: sdtypes.AccountWrapper{
Account: minerAccountAtBlock2,
LeafKey: minerLeafKey,
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(minerAccountAtBlock2LeafNode)).String()},
StorageDiff: emptyStorage,
},
{
Removed: false,
AccountWrapper: sdtypes.AccountWrapper{
Account: account1AtBlock2,
LeafKey: test_helpers.Account1LeafKey,
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account1AtBlock2LeafNode)).String()},
StorageDiff: emptyStorage,
},
{
Removed: false,
AccountWrapper: sdtypes.AccountWrapper{
Account: contractAccountAtBlock2,
LeafKey: contractLeafKey,
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(contractAccountAtBlock2LeafNode)).String()},
StorageDiff: []sdtypes.StorageLeafNode{
{
Removed: false,
Value: slot0StorageValue,
LeafKey: slot0StorageKey.Bytes(),
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot0StorageLeafNode)).String(),
},
{
Removed: false,
Value: slot1StorageValue,
LeafKey: slot1StorageKey.Bytes(),
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot1StorageLeafNode)).String(),
},
},
},
{
Removed: false,
AccountWrapper: sdtypes.AccountWrapper{
Account: account2AtBlock2,
LeafKey: test_helpers.Account2LeafKey,
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account2AtBlock2LeafNode)).String()},
StorageDiff: emptyStorage,
},
},
IPLDs: []sdtypes.IPLD{
{
CID: ipld.Keccak256ToCid(ipld.RawBinary, test_helpers.CodeHash.Bytes()).String(),
Content: test_helpers.ByteCodeAfterDeployment,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(block2BranchRootNode)).String(),
Content: block2BranchRootNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(bankAccountAtBlock2LeafNode)).String(),
Content: bankAccountAtBlock2LeafNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(minerAccountAtBlock2LeafNode)).String(),
Content: minerAccountAtBlock2LeafNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account1AtBlock2LeafNode)).String(),
Content: account1AtBlock2LeafNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(contractAccountAtBlock2LeafNode)).String(),
Content: contractAccountAtBlock2LeafNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(block2StorageBranchRootNode)).String(),
Content: block2StorageBranchRootNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot0StorageLeafNode)).String(),
Content: slot0StorageLeafNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot1StorageLeafNode)).String(),
Content: slot1StorageLeafNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account2AtBlock2LeafNode)).String(),
Content: account2AtBlock2LeafNode,
},
},
},
},
{
"testBlock3",
//the contract's storage is changed
//and the block is mined by account 2
block3.Root(),
&sdtypes.StateObject{
Nodes: []sdtypes.StateLeafNode{
{
Removed: false,
AccountWrapper: sdtypes.AccountWrapper{
Account: minerAccountAtBlock2,
LeafKey: minerLeafKey,
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(minerAccountAtBlock2LeafNode)).String()},
StorageDiff: emptyStorage,
},
{
Removed: false,
AccountWrapper: sdtypes.AccountWrapper{
Account: account1AtBlock2,
LeafKey: test_helpers.Account1LeafKey,
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account1AtBlock2LeafNode)).String()},
StorageDiff: emptyStorage,
},
{
Removed: false,
AccountWrapper: sdtypes.AccountWrapper{
Account: bankAccountAtBlock3,
LeafKey: test_helpers.BankLeafKey,
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(bankAccountAtBlock3LeafNode)).String()},
StorageDiff: emptyStorage,
},
{
Removed: false,
AccountWrapper: sdtypes.AccountWrapper{
Account: contractAccountAtBlock3,
LeafKey: contractLeafKey,
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(contractAccountAtBlock3LeafNode)).String()},
StorageDiff: []sdtypes.StorageLeafNode{
{
Removed: false,
Value: slot0StorageValue,
LeafKey: slot0StorageKey.Bytes(),
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot0StorageLeafNode)).String(),
},
{
Removed: false,
Value: slot1StorageValue,
LeafKey: slot1StorageKey.Bytes(),
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot1StorageLeafNode)).String(),
},
{
Removed: false,
Value: slot3StorageValue,
LeafKey: slot3StorageKey.Bytes(),
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot3StorageLeafNode)).String(),
},
},
},
{
Removed: false,
AccountWrapper: sdtypes.AccountWrapper{
Account: account2AtBlock3,
LeafKey: test_helpers.Account2LeafKey,
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account2AtBlock3LeafNode)).String()},
StorageDiff: emptyStorage,
},
},
IPLDs: []sdtypes.IPLD{
{
CID: ipld.Keccak256ToCid(ipld.RawBinary, test_helpers.CodeHash.Bytes()).String(),
Content: test_helpers.ByteCodeAfterDeployment,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(minerAccountAtBlock2LeafNode)).String(),
Content: minerAccountAtBlock2LeafNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account1AtBlock2LeafNode)).String(),
Content: account1AtBlock2LeafNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot0StorageLeafNode)).String(),
Content: slot0StorageLeafNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot1StorageLeafNode)).String(),
Content: slot1StorageLeafNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(block3BranchRootNode)).String(),
Content: block3BranchRootNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(bankAccountAtBlock3LeafNode)).String(),
Content: bankAccountAtBlock3LeafNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(contractAccountAtBlock3LeafNode)).String(),
Content: contractAccountAtBlock3LeafNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(block3StorageBranchRootNode)).String(),
Content: block3StorageBranchRootNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot3StorageLeafNode)).String(),
Content: slot3StorageLeafNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account2AtBlock3LeafNode)).String(),
Content: account2AtBlock3LeafNode,
},
},
},
},
}
for _, test := range tests {
test_helpers.RunStateSnapshot(t, chain.StateCache(), test, params)
}
}
func TestBuilderSnapshotWithWatchedAddressList(t *testing.T) {
blocks, chain := test_helpers.MakeChain(3, test_helpers.Genesis, test_helpers.TestChainGen)
contractLeafKey = test_helpers.AddressToLeafKey(test_helpers.ContractAddr)
defer chain.Stop()
block0 = test_helpers.Genesis
block1 = blocks[0]
block2 = blocks[1]
block3 = blocks[2]
params := statediff.Params{
WatchedAddresses: []common.Address{test_helpers.Account1Addr, test_helpers.ContractAddr},
}
params.ComputeWatchedAddressesLeafPaths()
var tests = []test_helpers.SnapshotTestCase{
{
"testBlock0",
//10000 transferred from testBankAddress to account1Addr
block0.Root(),
&sdtypes.StateObject{
Nodes: emptyDiffs,
},
},
{
"testBlock1",
//10000 transferred from testBankAddress to account1Addr
block1.Root(),
&sdtypes.StateObject{
Nodes: []sdtypes.StateLeafNode{
{
Removed: false,
AccountWrapper: sdtypes.AccountWrapper{
Account: account1AtBlock1,
LeafKey: test_helpers.Account1LeafKey,
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account1AtBlock1LeafNode)).String()},
StorageDiff: emptyStorage,
},
},
IPLDs: []sdtypes.IPLD{
{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(block1BranchRootNode)).String(),
Content: block1BranchRootNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account1AtBlock1LeafNode)).String(),
Content: account1AtBlock1LeafNode,
},
},
},
},
{
"testBlock2",
//1000 transferred from testBankAddress to account1Addr
//1000 transferred from account1Addr to account2Addr
block2.Root(),
&sdtypes.StateObject{
Nodes: []sdtypes.StateLeafNode{
{
Removed: false,
AccountWrapper: sdtypes.AccountWrapper{
Account: contractAccountAtBlock2,
LeafKey: contractLeafKey,
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(contractAccountAtBlock2LeafNode)).String(),
},
StorageDiff: []sdtypes.StorageLeafNode{
{
Removed: false,
Value: slot0StorageValue,
LeafKey: slot0StorageKey.Bytes(),
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot0StorageLeafNode)).String(),
},
{
Removed: false,
Value: slot1StorageValue,
LeafKey: slot1StorageKey.Bytes(),
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot1StorageLeafNode)).String(),
},
},
},
{
Removed: false,
AccountWrapper: sdtypes.AccountWrapper{
Account: account1AtBlock2,
LeafKey: test_helpers.Account1LeafKey,
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account1AtBlock2LeafNode)).String()},
StorageDiff: emptyStorage,
},
},
IPLDs: []sdtypes.IPLD{
{
CID: ipld.Keccak256ToCid(ipld.RawBinary, test_helpers.CodeHash.Bytes()).String(),
Content: test_helpers.ByteCodeAfterDeployment,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(block2BranchRootNode)).String(),
Content: block2BranchRootNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(contractAccountAtBlock2LeafNode)).String(),
Content: contractAccountAtBlock2LeafNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(block2StorageBranchRootNode)).String(),
Content: block2StorageBranchRootNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot0StorageLeafNode)).String(),
Content: slot0StorageLeafNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot1StorageLeafNode)).String(),
Content: slot1StorageLeafNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account1AtBlock2LeafNode)).String(),
Content: account1AtBlock2LeafNode,
},
},
},
},
{
"testBlock3",
//the contract's storage is changed
//and the block is mined by account 2
block3.Root(),
&sdtypes.StateObject{
Nodes: []sdtypes.StateLeafNode{
{
Removed: false,
AccountWrapper: sdtypes.AccountWrapper{
Account: account1AtBlock2,
LeafKey: test_helpers.Account1LeafKey,
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account1AtBlock2LeafNode)).String()},
StorageDiff: emptyStorage,
},
{
Removed: false,
AccountWrapper: sdtypes.AccountWrapper{
Account: contractAccountAtBlock3,
LeafKey: contractLeafKey,
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(contractAccountAtBlock3LeafNode)).String()},
StorageDiff: []sdtypes.StorageLeafNode{
{
Removed: false,
Value: slot0StorageValue,
LeafKey: slot0StorageKey.Bytes(),
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot0StorageLeafNode)).String(),
},
{
Removed: false,
Value: slot1StorageValue,
LeafKey: slot1StorageKey.Bytes(),
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot1StorageLeafNode)).String(),
},
{
Removed: false,
Value: slot3StorageValue,
LeafKey: slot3StorageKey.Bytes(),
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot3StorageLeafNode)).String(),
},
},
},
},
IPLDs: []sdtypes.IPLD{
{
CID: ipld.Keccak256ToCid(ipld.RawBinary, test_helpers.CodeHash.Bytes()).String(),
Content: test_helpers.ByteCodeAfterDeployment,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(account1AtBlock2LeafNode)).String(),
Content: account1AtBlock2LeafNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot0StorageLeafNode)).String(),
Content: slot0StorageLeafNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot1StorageLeafNode)).String(),
Content: slot1StorageLeafNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(block3BranchRootNode)).String(),
Content: block3BranchRootNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStateTrie, crypto.Keccak256(contractAccountAtBlock3LeafNode)).String(),
Content: contractAccountAtBlock3LeafNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(block3StorageBranchRootNode)).String(),
Content: block3StorageBranchRootNode,
},
{
CID: ipld.Keccak256ToCid(ipld.MEthStorageTrie, crypto.Keccak256(slot3StorageLeafNode)).String(),
Content: slot3StorageLeafNode,
},
},
},
},
}
for _, test := range tests {
test_helpers.RunStateSnapshot(t, chain.StateCache(), test, params)
}
}

View File

@ -503,7 +503,7 @@ func TestBuilder(t *testing.T) {
block3 = blocks[2]
params := statediff.Params{}
var tests = []test_helpers.TestCase{
var tests = []test_helpers.DiffTestCase{
{
"testEmptyDiff",
statediff.Args{
@ -795,7 +795,7 @@ func TestBuilder(t *testing.T) {
},
}
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
test_helpers.RunBuildStateDiff(t, chain.StateCache(), tests, params)
test_helpers.CheckedRoots{
block0: bankAccountAtBlock0LeafNode,
block1: block1BranchRootNode,
@ -817,7 +817,7 @@ func TestBuilderWithWatchedAddressList(t *testing.T) {
}
params.ComputeWatchedAddressesLeafPaths()
var tests = []test_helpers.TestCase{
var tests = []test_helpers.DiffTestCase{
{
"testEmptyDiff",
statediff.Args{
@ -1009,7 +1009,7 @@ func TestBuilderWithWatchedAddressList(t *testing.T) {
},
}
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
test_helpers.RunBuildStateDiff(t, chain.StateCache(), tests, params)
test_helpers.CheckedRoots{
block0: bankAccountAtBlock0LeafNode,
block1: block1BranchRootNode,
@ -1028,7 +1028,7 @@ func TestBuilderWithRemovedAccountAndStorage(t *testing.T) {
block6 = blocks[5]
params := statediff.Params{}
var tests = []test_helpers.TestCase{
var tests = []test_helpers.DiffTestCase{
// blocks 0-3 are the same as in TestBuilderWithIntermediateNodes
{
"testBlock4",
@ -1260,7 +1260,7 @@ func TestBuilderWithRemovedAccountAndStorage(t *testing.T) {
},
}
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
test_helpers.RunBuildStateDiff(t, chain.StateCache(), tests, params)
test_helpers.CheckedRoots{
block4: block4BranchRootNode,
block5: block5BranchRootNode,
@ -1281,7 +1281,7 @@ func TestBuilderWithRemovedNonWatchedAccount(t *testing.T) {
}
params.ComputeWatchedAddressesLeafPaths()
var tests = []test_helpers.TestCase{
var tests = []test_helpers.DiffTestCase{
{
"testBlock4",
statediff.Args{
@ -1395,7 +1395,7 @@ func TestBuilderWithRemovedNonWatchedAccount(t *testing.T) {
},
}
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
test_helpers.RunBuildStateDiff(t, chain.StateCache(), tests, params)
test_helpers.CheckedRoots{
block4: block4BranchRootNode,
block5: block5BranchRootNode,
@ -1416,7 +1416,7 @@ func TestBuilderWithRemovedWatchedAccount(t *testing.T) {
}
params.ComputeWatchedAddressesLeafPaths()
var tests = []test_helpers.TestCase{
var tests = []test_helpers.DiffTestCase{
{
"testBlock4",
statediff.Args{
@ -1599,7 +1599,7 @@ func TestBuilderWithRemovedWatchedAccount(t *testing.T) {
},
}
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
test_helpers.RunBuildStateDiff(t, chain.StateCache(), tests, params)
test_helpers.CheckedRoots{
block4: block4BranchRootNode,
block5: block5BranchRootNode,
@ -1700,7 +1700,7 @@ func TestBuilderWithMovedAccount(t *testing.T) {
block2 = blocks[1]
params := statediff.Params{}
var tests = []test_helpers.TestCase{
var tests = []test_helpers.DiffTestCase{
{
"testBlock1",
statediff.Args{
@ -1827,7 +1827,7 @@ func TestBuilderWithMovedAccount(t *testing.T) {
},
}
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
test_helpers.RunBuildStateDiff(t, chain.StateCache(), tests, params)
test_helpers.CheckedRoots{
block1: block01BranchRootNode,
block2: bankAccountAtBlock02LeafNode,
@ -2088,7 +2088,7 @@ func TestBuilderWithInternalizedLeafNode(t *testing.T) {
block3 = blocks[2]
params := statediff.Params{}
var tests = []test_helpers.TestCase{
var tests = []test_helpers.DiffTestCase{
{
"testEmptyDiff",
statediff.Args{
@ -2354,7 +2354,7 @@ func TestBuilderWithInternalizedLeafNode(t *testing.T) {
},
}
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
test_helpers.RunBuildStateDiff(t, chain.StateCache(), tests, params)
test_helpers.CheckedRoots{
block1: block1bBranchRootNode,
block2: block2bBranchRootNode,
@ -2377,7 +2377,7 @@ func TestBuilderWithInternalizedLeafNodeAndWatchedAddress(t *testing.T) {
}
params.ComputeWatchedAddressesLeafPaths()
var tests = []test_helpers.TestCase{
var tests = []test_helpers.DiffTestCase{
{
"testEmptyDiff",
statediff.Args{
@ -2556,7 +2556,7 @@ func TestBuilderWithInternalizedLeafNodeAndWatchedAddress(t *testing.T) {
},
}
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
test_helpers.RunBuildStateDiff(t, chain.StateCache(), tests, params)
test_helpers.CheckedRoots{
block1: block1bBranchRootNode,
block2: block2bBranchRootNode,

View File

@ -48,16 +48,16 @@ type Config struct {
SubtrieWorkers uint
// Should the statediff service wait until geth has synced to the head of the blockchain?
WaitForSync bool
// Context used during DB initialization
// Context passed to all DB method calls
Context context.Context
}
// Params contains config parameters for the state diff builder
type Params struct {
IncludeBlock bool
IncludeBlock bool // TODO: not used in write-requests
IncludeReceipts bool
IncludeTD bool
IncludeCode bool
IncludeCode bool // TODO: not used by anything?
WatchedAddresses []common.Address
watchedAddressesLeafPaths [][]byte
}

4
go.mod
View File

@ -124,8 +124,8 @@ require (
)
replace (
github.com/cerc-io/eth-iterator-utils => git.vdb.to/cerc-io/eth-iterator-utils v0.1.1
github.com/cerc-io/eth-testing => git.vdb.to/cerc-io/eth-testing v0.2.1
github.com/cerc-io/eth-iterator-utils => git.vdb.to/cerc-io/eth-iterator-utils v0.1.2
github.com/cerc-io/eth-testing => git.vdb.to/cerc-io/eth-testing v0.3.1
github.com/ethereum/go-ethereum => git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1
github.com/openrelayxyz/plugeth-utils => git.vdb.to/cerc-io/plugeth-utils v0.0.0-20230706160122-cd41de354c46
)

8
go.sum
View File

@ -1,8 +1,8 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
git.vdb.to/cerc-io/eth-iterator-utils v0.1.1 h1:AGen4U2GaYJVzPjEo3U+GPczSfOUEMkM1nWTM+cq5Dk=
git.vdb.to/cerc-io/eth-iterator-utils v0.1.1/go.mod h1:uiocO9elfDe78kd3c/VZ2in26V+gyXJuN+sdTxK4Xag=
git.vdb.to/cerc-io/eth-testing v0.2.1 h1:IZAX7DVgzPkSmu1xdKZ5aOemdEYbvtgae7GUl/TUNtQ=
git.vdb.to/cerc-io/eth-testing v0.2.1/go.mod h1:qdvpc/W1xvf2MKx3rMOqvFvYaYIHG77Z1g0lwsmw0Uk=
git.vdb.to/cerc-io/eth-iterator-utils v0.1.2 h1:PdMR5B9wrQSYuYpFhN+9Kc8AEZ0pTt5eKCmu8oCtFcY=
git.vdb.to/cerc-io/eth-iterator-utils v0.1.2/go.mod h1:OvXbdWbZ5viBXC/Ui1EkhsSmGB+AUX+TjGa3UDAfjfg=
git.vdb.to/cerc-io/eth-testing v0.3.1 h1:sPnlMev6oEgTjsW7GtUkSsjKNG/+X6P9q0izSejLGpM=
git.vdb.to/cerc-io/eth-testing v0.3.1/go.mod h1:qdvpc/W1xvf2MKx3rMOqvFvYaYIHG77Z1g0lwsmw0Uk=
git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1 h1:KLjxHwp9Zp7xhECccmJS00RiL+VwTuUGLU7qeIctg8g=
git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1/go.mod h1:cYXZu70+6xmDgIgrTD81GPasv16piiAFJnKyAbwVPMU=
git.vdb.to/cerc-io/plugeth-utils v0.0.0-20230706160122-cd41de354c46 h1:KYcbbne/RXd7AuxbUd/3hgk1jPN+33k2CKiNsUsMCC0=

View File

@ -33,7 +33,16 @@ import (
)
// NewStateDiffIndexer creates and returns an implementation of the StateDiffIndexer interface.
func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, nodeInfo node.Info, config interfaces.Config) (sql.Database, interfaces.StateDiffIndexer, error) {
func NewStateDiffIndexer(
ctx context.Context,
chainConfig *params.ChainConfig,
nodeInfo node.Info,
config interfaces.Config,
) (
sql.Database,
interfaces.StateDiffIndexer,
error,
) {
switch config.Type() {
case shared.FILE:
log.Info("Starting statediff service in SQL file writing mode")
@ -41,8 +50,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, n
if !ok {
return nil, nil, fmt.Errorf("file config is not the correct type: got %T, expected %T", config, file.Config{})
}
fc.NodeInfo = nodeInfo
ind, err := file.NewStateDiffIndexer(chainConfig, fc)
ind, err := file.NewStateDiffIndexer(chainConfig, fc, nodeInfo)
return nil, ind, err
case shared.POSTGRES:
log.Info("Starting statediff service in Postgres writing mode")

View File

@ -19,26 +19,55 @@ package dump
import (
"fmt"
"io"
"math/big"
"github.com/cerc-io/plugeth-statediff/indexer/ipld"
"github.com/cerc-io/plugeth-statediff/indexer/models"
"github.com/cerc-io/plugeth-statediff/utils/log"
)
// BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration
type BatchTx struct {
BlockNumber string
dump io.Writer
quit chan struct{}
iplds chan models.IPLDModel
ipldCache models.IPLDBatch
submit func(blockTx *BatchTx, err error) error
blockNum string
dump io.Writer
quit chan struct{}
iplds chan models.IPLDModel
ipldCache models.IPLDBatch
}
// Submit satisfies indexer.AtomicTx
func (tx *BatchTx) Submit(err error) error {
return tx.submit(tx, err)
func NewBatch(number *big.Int, dest io.Writer) *BatchTx {
batch := &BatchTx{
blockNum: number.String(),
dump: dest,
iplds: make(chan models.IPLDModel),
quit: make(chan struct{}),
ipldCache: models.IPLDBatch{},
}
go batch.cache()
return batch
}
func (self *BatchTx) Submit() error {
close(self.quit)
close(self.iplds)
if err := self.flush(); err != nil {
return err
}
return nil
}
func (tx *BatchTx) BlockNumber() string {
return tx.blockNum
}
func (tx *BatchTx) RollbackOnFailure(err error) {
if p := recover(); p != nil {
log.Info("panic detected before tx submission, but rollback not supported", "panic", p)
panic(p)
} else if err != nil {
log.Info("error detected before tx submission, but rollback not supported", "error", err)
}
}
func (tx *BatchTx) flush() error {
@ -65,7 +94,7 @@ func (tx *BatchTx) cache() {
func (tx *BatchTx) cacheDirect(key string, value []byte) {
tx.iplds <- models.IPLDModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
Key: key,
Data: value,
}
@ -73,7 +102,7 @@ func (tx *BatchTx) cacheDirect(key string, value []byte) {
func (tx *BatchTx) cacheIPLD(i ipld.IPLD) {
tx.iplds <- models.IPLDModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
Key: i.Cid().String(),
Data: i.RawData(),
}

View File

@ -17,6 +17,7 @@
package dump
import (
"context"
"encoding/hex"
"fmt"
"io"
@ -37,7 +38,6 @@ import (
"github.com/cerc-io/plugeth-statediff/indexer/models"
"github.com/cerc-io/plugeth-statediff/indexer/shared"
sdtypes "github.com/cerc-io/plugeth-statediff/types"
"github.com/cerc-io/plugeth-statediff/utils/log"
)
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
@ -62,7 +62,7 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(time.Duration, <-chan bool) {}
// PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts)
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
start, t := time.Now(), time.Now()
t := time.Now()
blockHash := block.Hash()
blockHashStr := blockHash.String()
height := block.NumberU64()
@ -74,7 +74,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
}
// Generate the block iplds
headerNode, txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts)
txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts)
if err != nil {
return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err)
}
@ -91,49 +91,17 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
} else {
reward = shared.CalcEthBlockReward(block.Header(), block.Uncles(), block.Transactions(), receipts)
}
t = time.Now()
blockTx := &BatchTx{
BlockNumber: block.Number().String(),
dump: sdi.dump,
iplds: make(chan models.IPLDModel),
quit: make(chan struct{}),
ipldCache: models.IPLDBatch{},
submit: func(self *BatchTx, err error) error {
close(self.quit)
close(self.iplds)
tDiff := time.Since(t)
metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
t = time.Now()
if err := self.flush(); err != nil {
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
log.Debug(traceMsg)
return err
}
tDiff = time.Since(t)
metrics.IndexerMetrics.PostgresCommitTimer.Update(tDiff)
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
log.Debug(traceMsg)
return err
},
}
go blockTx.cache()
tDiff := time.Since(t)
metrics.IndexerMetrics.FreePostgresTimer.Update(tDiff)
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
blockTx := NewBatch(block.Number(), sdi.dump)
t = time.Now()
// Publish and index header, collect headerID
var headerID string
headerID, err = sdi.processHeader(blockTx, block.Header(), headerNode, reward, totalDifficulty)
headerID, err = sdi.PushHeader(blockTx, block.Header(), reward, totalDifficulty)
if err != nil {
return nil, err
}
tDiff = time.Since(t)
tDiff := time.Since(t)
metrics.IndexerMetrics.HeaderProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
t = time.Now()
@ -167,9 +135,17 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return blockTx, err
}
// processHeader publishes and indexes a header IPLD in Postgres
// PushHeader publishes and indexes a header IPLD in Postgres
// it returns the headerID
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) (string, error) {
func (sdi *StateDiffIndexer) PushHeader(batch interfaces.Batch, header *types.Header, reward, td *big.Int) (string, error) {
tx, ok := batch.(*BatchTx)
if !ok {
return "", fmt.Errorf("sql: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
}
headerNode, err := ipld.NewEthHeader(header)
if err != nil {
return "", err
}
tx.cacheIPLD(headerNode)
headerID := header.Hash().String()
@ -189,7 +165,7 @@ func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, he
Coinbase: header.Coinbase.String(),
Canonical: true,
}
_, err := fmt.Fprintf(sdi.dump, "%+v\r\n", mod)
_, err = fmt.Fprintf(sdi.dump, "%+v\r\n", mod)
return headerID, err
}
@ -344,7 +320,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// short circuit if it is a Removed node
// this assumes the db has been initialized and a ipld.blocks entry for the Removed node is present
stateModel = models.StateNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
CID: shared.RemovedNodeStateCID,
@ -352,7 +328,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
}
} else {
stateModel = models.StateNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
CID: stateNode.AccountWrapper.CID,
@ -375,7 +351,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// short circuit if it is a Removed node
// this assumes the db has been initialized and a ipld.blocks entry for the Removed node is present
storageModel := models.StorageNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
@ -388,7 +364,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
continue
}
storageModel := models.StorageNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
@ -432,6 +408,10 @@ func (sdi *StateDiffIndexer) DetectGaps(beginBlockNumber uint64, endBlockNumber
return nil, nil
}
func (sdi *StateDiffIndexer) BeginTx(number *big.Int, _ context.Context) interfaces.Batch {
return NewBatch(number, sdi.dump)
}
// Close satisfies io.Closer
func (sdi *StateDiffIndexer) Close() error {
return sdi.dump.Close()

View File

@ -16,14 +16,29 @@
package file
import "github.com/cerc-io/plugeth-statediff/utils/log"
// BatchTx wraps a void with the state necessary for building the tx concurrently during trie difference iteration
type BatchTx struct {
BlockNumber string
submit func(blockTx *BatchTx, err error) error
blockNum string
fileWriter FileWriter
}
// Submit satisfies indexer.AtomicTx
func (tx *BatchTx) Submit(err error) error {
return tx.submit(tx, err)
func (tx *BatchTx) Submit() error {
tx.fileWriter.Flush()
return nil
}
func (tx *BatchTx) BlockNumber() string {
return tx.blockNum
}
func (tx *BatchTx) RollbackOnFailure(err error) {
if p := recover(); p != nil {
log.Info("panic detected before tx submission, but rollback not supported", "panic", p)
panic(p)
} else if err != nil {
log.Info("error detected before tx submission, but rollback not supported", "error", err)
}
}

View File

@ -20,7 +20,6 @@ import (
"fmt"
"strings"
"github.com/cerc-io/plugeth-statediff/indexer/node"
"github.com/cerc-io/plugeth-statediff/indexer/shared"
)
@ -30,7 +29,6 @@ type Config struct {
OutputDir string
FilePath string
WatchedAddressesFilePath string
NodeInfo node.Info
}
// FileMode to explicitly type the mode of file writer we are using
@ -70,20 +68,11 @@ func (c Config) Type() shared.DBType {
return shared.FILE
}
var nodeInfo = node.Info{
GenesisBlock: "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3",
NetworkID: "1",
ChainID: 1,
ID: "mockNodeID",
ClientName: "go-ethereum",
}
// CSVTestConfig config for unit tests
var CSVTestConfig = Config{
Mode: CSV,
OutputDir: "./statediffing_test",
WatchedAddressesFilePath: "./statediffing_watched_addresses_test_file.csv",
NodeInfo: nodeInfo,
}
// SQLTestConfig config for unit tests
@ -91,5 +80,4 @@ var SQLTestConfig = Config{
Mode: SQL,
FilePath: "./statediffing_test_file.sql",
WatchedAddressesFilePath: "./statediffing_watched_addresses_test_file.sql",
NodeInfo: nodeInfo,
}

View File

@ -43,7 +43,7 @@ func setupLegacyCSVIndexer(t *testing.T) {
require.NoError(t, err)
}
ind, err = file.NewStateDiffIndexer(test.LegacyConfig, file.CSVTestConfig)
ind, err = file.NewStateDiffIndexer(test.LegacyConfig, file.CSVTestConfig, test.LegacyNodeInfo)
require.NoError(t, err)
db, err = postgres.SetupSQLXDB()

View File

@ -41,7 +41,7 @@ func setupCSVIndexer(t *testing.T) {
require.NoError(t, err)
}
ind, err = file.NewStateDiffIndexer(mocks.TestChainConfig, file.CSVTestConfig)
ind, err = file.NewStateDiffIndexer(mocks.TestChainConfig, file.CSVTestConfig, test.LegacyNodeInfo)
require.NoError(t, err)
db, err = postgres.SetupSQLXDB()

View File

@ -17,6 +17,7 @@
package file
import (
"context"
"errors"
"fmt"
"math/big"
@ -37,6 +38,7 @@ import (
"github.com/cerc-io/plugeth-statediff/indexer/interfaces"
"github.com/cerc-io/plugeth-statediff/indexer/ipld"
"github.com/cerc-io/plugeth-statediff/indexer/models"
"github.com/cerc-io/plugeth-statediff/indexer/node"
"github.com/cerc-io/plugeth-statediff/indexer/shared"
sdtypes "github.com/cerc-io/plugeth-statediff/types"
"github.com/cerc-io/plugeth-statediff/utils/log"
@ -61,7 +63,7 @@ type StateDiffIndexer struct {
}
// NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer
func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config) (*StateDiffIndexer, error) {
func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config, nodeInfo node.Info) (*StateDiffIndexer, error) {
var err error
var writer FileWriter
@ -114,12 +116,12 @@ func NewStateDiffIndexer(chainConfig *params.ChainConfig, config Config) (*State
wg := new(sync.WaitGroup)
writer.Loop()
writer.upsertNode(config.NodeInfo)
writer.upsertNode(nodeInfo)
return &StateDiffIndexer{
fileWriter: writer,
chainConfig: chainConfig,
nodeID: config.NodeInfo.ID,
nodeID: nodeInfo.ID,
wg: wg,
}, nil
}
@ -130,7 +132,7 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(time.Duration, <-chan bool) {}
// PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts)
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
start, t := time.Now(), time.Now()
t := time.Now()
blockHash := block.Hash()
blockHashStr := blockHash.String()
height := block.NumberU64()
@ -142,7 +144,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
}
// Generate the block iplds
headerNode, txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts)
txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts)
if err != nil {
return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err)
}
@ -159,32 +161,19 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
} else {
reward = shared.CalcEthBlockReward(block.Header(), block.Uncles(), block.Transactions(), receipts)
}
t = time.Now()
blockTx := &BatchTx{
BlockNumber: block.Number().String(),
submit: func(self *BatchTx, err error) error {
tDiff := time.Since(t)
metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
t = time.Now()
sdi.fileWriter.Flush()
tDiff = time.Since(t)
metrics.IndexerMetrics.PostgresCommitTimer.Update(tDiff)
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
log.Trace(traceMsg)
return err
},
blockNum: block.Number().String(),
fileWriter: sdi.fileWriter,
}
tDiff := time.Since(t)
metrics.IndexerMetrics.FreePostgresTimer.Update(tDiff)
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
t = time.Now()
// write header, collect headerID
headerID := sdi.processHeader(block.Header(), headerNode, reward, totalDifficulty)
tDiff = time.Since(t)
headerID, err := sdi.PushHeader(blockTx, block.Header(), reward, totalDifficulty)
if err != nil {
return nil, err
}
tDiff := time.Since(t)
metrics.IndexerMetrics.HeaderProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
t = time.Now()
@ -217,9 +206,14 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return blockTx, err
}
// processHeader write a header IPLD insert SQL stmt to a file
// PushHeader write a header IPLD insert SQL stmt to a file
// it returns the headerID
func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) string {
func (sdi *StateDiffIndexer) PushHeader(_ interfaces.Batch, header *types.Header, reward, td *big.Int) (string, error) {
// Process the header
headerNode, err := ipld.NewEthHeader(header)
if err != nil {
return "", err
}
sdi.fileWriter.upsertIPLDNode(header.Number.String(), headerNode)
headerID := header.Hash().String()
@ -240,7 +234,7 @@ func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode ipld
Coinbase: header.Coinbase.String(),
Canonical: true,
})
return headerID
return headerID, nil
}
// processUncles publishes and indexes uncle IPLDs in Postgres
@ -374,20 +368,16 @@ func (sdi *StateDiffIndexer) processReceiptsAndTxs(args processArgs) error {
}
// PushStateNode writes a state diff node object (including any child storage nodes) IPLD insert SQL stmt to a file
func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdtypes.StateLeafNode, headerID string) error {
tx, ok := batch.(*BatchTx)
if !ok {
return fmt.Errorf("file: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
}
func (sdi *StateDiffIndexer) PushStateNode(tx interfaces.Batch, stateNode sdtypes.StateLeafNode, headerID string) error {
// publish the state node
var stateModel models.StateNodeModel
if stateNode.Removed {
if atomic.LoadUint32(&sdi.removedCacheFlag) == 0 {
atomic.StoreUint32(&sdi.removedCacheFlag, 1)
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeStateCID, []byte{})
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber(), shared.RemovedNodeStateCID, []byte{})
}
stateModel = models.StateNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
CID: shared.RemovedNodeStateCID,
@ -395,7 +385,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
}
} else {
stateModel = models.StateNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
CID: stateNode.AccountWrapper.CID,
@ -415,10 +405,10 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
if storageNode.Removed {
if atomic.LoadUint32(&sdi.removedCacheFlag) == 0 {
atomic.StoreUint32(&sdi.removedCacheFlag, 1)
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeStorageCID, []byte{})
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber(), shared.RemovedNodeStorageCID, []byte{})
}
storageModel := models.StorageNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
@ -430,7 +420,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
continue
}
storageModel := models.StorageNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
@ -445,12 +435,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
}
// PushIPLD writes iplds to ipld.blocks
func (sdi *StateDiffIndexer) PushIPLD(batch interfaces.Batch, ipld sdtypes.IPLD) error {
tx, ok := batch.(*BatchTx)
if !ok {
return fmt.Errorf("file: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
}
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, ipld.CID, ipld.Content)
func (sdi *StateDiffIndexer) PushIPLD(tx interfaces.Batch, ipld sdtypes.IPLD) error {
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber(), ipld.CID, ipld.Content)
return nil
}
@ -472,6 +458,13 @@ func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, er
return false, nil
}
func (sdi *StateDiffIndexer) BeginTx(number *big.Int, _ context.Context) interfaces.Batch {
return &BatchTx{
blockNum: number.String(),
fileWriter: sdi.fileWriter,
}
}
// Close satisfies io.Closer
func (sdi *StateDiffIndexer) Close() error {
return sdi.fileWriter.Close()

View File

@ -83,7 +83,7 @@ func setupMainnetIndexer(t *testing.T) {
require.NoError(t, err)
}
ind, err = file.NewStateDiffIndexer(chainConf, file.CSVTestConfig)
ind, err = file.NewStateDiffIndexer(chainConf, file.CSVTestConfig, test.LegacyNodeInfo)
require.NoError(t, err)
db, err = postgres.SetupSQLXDB()

View File

@ -44,7 +44,7 @@ func setupLegacySQLIndexer(t *testing.T) {
require.NoError(t, err)
}
ind, err = file.NewStateDiffIndexer(test.LegacyConfig, file.SQLTestConfig)
ind, err = file.NewStateDiffIndexer(test.LegacyConfig, file.SQLTestConfig, test.LegacyNodeInfo)
require.NoError(t, err)
db, err = postgres.SetupSQLXDB()

View File

@ -41,7 +41,7 @@ func setupIndexer(t *testing.T) {
require.NoError(t, err)
}
ind, err = file.NewStateDiffIndexer(mocks.TestChainConfig, file.SQLTestConfig)
ind, err = file.NewStateDiffIndexer(mocks.TestChainConfig, file.SQLTestConfig, test.LegacyNodeInfo)
require.NoError(t, err)
db, err = postgres.SetupSQLXDB()

View File

@ -18,11 +18,14 @@ package sql
import (
"context"
"math/big"
"sync"
"sync/atomic"
"time"
"github.com/lib/pq"
"github.com/cerc-io/plugeth-statediff/indexer/database/metrics"
"github.com/cerc-io/plugeth-statediff/indexer/ipld"
"github.com/cerc-io/plugeth-statediff/indexer/models"
"github.com/cerc-io/plugeth-statediff/utils/log"
@ -32,7 +35,7 @@ const startingCacheCapacity = 1024 * 24
// BatchTx wraps a sql tx with the state necessary for building the tx concurrently during trie difference iteration
type BatchTx struct {
BlockNumber string
blockNum string
ctx context.Context
dbtx Tx
stm string
@ -42,13 +45,68 @@ type BatchTx struct {
removedCacheFlag *uint32
// Tracks expected cache size and ensures cache is caught up before flush
cacheWg sync.WaitGroup
submit func(blockTx *BatchTx, err error) error
}
// Submit satisfies indexer.AtomicTx
func (tx *BatchTx) Submit(err error) error {
return tx.submit(tx, err)
func NewBatch(insertStm string, ctx context.Context, number *big.Int, tx Tx) *BatchTx {
blockTx := &BatchTx{
removedCacheFlag: new(uint32),
ctx: ctx,
blockNum: number.String(),
stm: insertStm,
iplds: make(chan models.IPLDModel),
quit: make(chan (chan<- struct{})),
ipldCache: models.IPLDBatch{
BlockNumbers: make([]string, 0, startingCacheCapacity),
Keys: make([]string, 0, startingCacheCapacity),
Values: make([][]byte, 0, startingCacheCapacity),
},
dbtx: tx,
}
go blockTx.cache()
return blockTx
}
// Submit satisfies indexer.Batch
func (tx *BatchTx) Submit() error {
defer tx.close()
t := time.Now()
if err := tx.flush(); err != nil {
rollback(tx.ctx, tx.dbtx)
return err
}
err := tx.dbtx.Commit(tx.ctx)
metrics.IndexerMetrics.PostgresCommitTimer.Update(time.Since(t))
return err
}
func (tx *BatchTx) BlockNumber() string {
return tx.blockNum
}
func (tx *BatchTx) RollbackOnFailure(err error) {
if p := recover(); p != nil {
defer tx.close()
log.Info("panic detected before tx submission, rolling back the tx", "panic", p)
rollback(tx.ctx, tx.dbtx)
panic(p)
} else if err != nil {
defer tx.close()
log.Info("error detected before tx submission, rolling back the tx", "error", err)
rollback(tx.ctx, tx.dbtx)
}
}
func (tx *BatchTx) close() {
if tx.quit == nil {
return
}
confirm := make(chan struct{})
tx.quit <- confirm
close(tx.quit)
<-confirm
close(tx.iplds)
tx.quit = nil
}
func (tx *BatchTx) flush() error {
@ -92,7 +150,7 @@ func (tx *BatchTx) cache() {
func (tx *BatchTx) cacheDirect(key string, value []byte) {
tx.cacheWg.Add(1)
tx.iplds <- models.IPLDModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
Key: key,
Data: value,
}
@ -101,7 +159,7 @@ func (tx *BatchTx) cacheDirect(key string, value []byte) {
func (tx *BatchTx) cacheIPLD(i ipld.IPLD) {
tx.cacheWg.Add(1)
tx.iplds <- models.IPLDModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
Key: i.Cid().String(),
Data: i.RawData(),
}
@ -112,7 +170,7 @@ func (tx *BatchTx) cacheRemoved(key string, value []byte) {
atomic.StoreUint32(tx.removedCacheFlag, 1)
tx.cacheWg.Add(1)
tx.iplds <- models.IPLDModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
Key: key,
Data: value,
}

View File

@ -39,7 +39,6 @@ import (
"github.com/cerc-io/plugeth-statediff/indexer/models"
"github.com/cerc-io/plugeth-statediff/indexer/shared"
sdtypes "github.com/cerc-io/plugeth-statediff/types"
"github.com/cerc-io/plugeth-statediff/utils/log"
)
var _ interfaces.StateDiffIndexer = &StateDiffIndexer{}
@ -82,24 +81,26 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bo
// PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts)
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
start, t := time.Now(), time.Now()
t := time.Now()
blockHash := block.Hash()
height := block.NumberU64()
traceMsg := fmt.Sprintf("indexer stats for statediff at %d with hash %s:\r\n", height, blockHash)
transactions := block.Transactions()
var err error
// Derive any missing fields
if err := receipts.DeriveFields(sdi.chainConfig, blockHash, height, block.BaseFee(), transactions); err != nil {
return nil, err
}
// Generate the block iplds
headerNode, txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts)
txNodes, rctNodes, logNodes, err := ipld.FromBlockAndReceipts(block, receipts)
if err != nil {
return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %v", err)
return nil, fmt.Errorf("error creating IPLD nodes from block and receipts: %w", err)
}
if len(txNodes) != len(rctNodes) {
return nil, fmt.Errorf("expected number of transactions (%d), receipts (%d)", len(txNodes), len(rctNodes))
return nil, fmt.Errorf("expected number of transactions (%d) does not match number of receipts (%d)",
len(txNodes), len(rctNodes))
}
// Calculate reward
@ -108,99 +109,35 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
if sdi.chainConfig.Clique != nil {
reward = big.NewInt(0)
} else {
reward = shared.CalcEthBlockReward(block.Header(), block.Uncles(), block.Transactions(), receipts)
reward = shared.CalcEthBlockReward(block.Header(), block.Uncles(), transactions, receipts)
}
t = time.Now()
// Begin new DB tx for everything
tx := NewDelayedTx(sdi.dbWriter.db)
defer func() {
if p := recover(); p != nil {
rollback(sdi.ctx, tx)
panic(p)
} else if err != nil {
rollback(sdi.ctx, tx)
}
}()
blockTx := &BatchTx{
removedCacheFlag: new(uint32),
ctx: sdi.ctx,
BlockNumber: block.Number().String(),
stm: sdi.dbWriter.db.InsertIPLDsStm(),
iplds: make(chan models.IPLDModel),
quit: make(chan (chan<- struct{})),
ipldCache: models.IPLDBatch{
BlockNumbers: make([]string, 0, startingCacheCapacity),
Keys: make([]string, 0, startingCacheCapacity),
Values: make([][]byte, 0, startingCacheCapacity),
},
dbtx: tx,
// handle transaction commit or rollback for any return case
submit: func(self *BatchTx, err error) error {
defer func() {
confirm := make(chan struct{})
self.quit <- confirm
close(self.quit)
<-confirm
close(self.iplds)
}()
if p := recover(); p != nil {
log.Info("panic detected before tx submission, rolling back the tx", "panic", p)
rollback(sdi.ctx, tx)
panic(p)
} else if err != nil {
log.Info("error detected before tx submission, rolling back the tx", "error", err)
rollback(sdi.ctx, tx)
} else {
tDiff := time.Since(t)
metrics2.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff)
t = time.Now()
if err := self.flush(); err != nil {
rollback(sdi.ctx, tx)
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start))
log.Debug(traceMsg)
return err
}
err = tx.Commit(sdi.ctx)
tDiff = time.Since(t)
metrics2.IndexerMetrics.PostgresCommitTimer.Update(tDiff)
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff)
}
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start))
log.Debug(traceMsg)
return err
},
}
go blockTx.cache()
tDiff := time.Since(t)
metrics2.IndexerMetrics.FreePostgresTimer.Update(tDiff)
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff)
t = time.Now()
batch := NewBatch(
sdi.dbWriter.db.InsertIPLDsStm(), sdi.ctx,
block.Number(),
NewDelayedTx(sdi.dbWriter.db),
)
// handle transaction rollback for failures in this scope
defer batch.RollbackOnFailure(err)
// Publish and index header, collect headerID
var headerID string
headerID, err = sdi.processHeader(blockTx, block.Header(), headerNode, reward, totalDifficulty)
headerID, err := sdi.PushHeader(batch, block.Header(), reward, totalDifficulty)
if err != nil {
return nil, err
}
tDiff = time.Since(t)
metrics2.IndexerMetrics.HeaderProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff)
metrics2.IndexerMetrics.HeaderProcessingTimer.Update(time.Since(t))
t = time.Now()
// Publish and index uncles
err = sdi.processUncles(blockTx, headerID, block.Number(), block.UncleHash(), block.Uncles())
err = sdi.processUncles(batch, headerID, block.Number(), block.UncleHash(), block.Uncles())
if err != nil {
return nil, err
}
tDiff = time.Since(t)
metrics2.IndexerMetrics.UncleProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff)
metrics2.IndexerMetrics.UncleProcessingTimer.Update(time.Since(t))
t = time.Now()
// Publish and index receipts and txs
err = sdi.processReceiptsAndTxs(blockTx, processArgs{
err = sdi.processReceiptsAndTxs(batch, processArgs{
headerID: headerID,
blockNumber: block.Number(),
receipts: receipts,
@ -212,12 +149,9 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
if err != nil {
return nil, err
}
tDiff = time.Since(t)
metrics2.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff)
t = time.Now()
metrics2.IndexerMetrics.TxAndRecProcessingTimer.Update(time.Since(t))
return blockTx, err
return batch, err
}
// CurrentBlock returns the HeaderModel of the highest existing block in the database.
@ -230,9 +164,18 @@ func (sdi *StateDiffIndexer) DetectGaps(beginBlockNumber uint64, endBlockNumber
return sdi.dbWriter.detectGaps(beginBlockNumber, endBlockNumber)
}
// processHeader publishes and indexes a header IPLD in Postgres
// PushHeader publishes and indexes a header IPLD in Postgres
// it returns the headerID
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) (string, error) {
func (sdi *StateDiffIndexer) PushHeader(batch interfaces.Batch, header *types.Header, reward, td *big.Int) (string, error) {
tx, ok := batch.(*BatchTx)
if !ok {
return "", fmt.Errorf("sql: batch is expected to be of type %T, got %T", &BatchTx{}, batch)
telackey marked this conversation as resolved
Review

Whoa, does this happen? Why?

Whoa, does this happen? Why?
Review

Haha, it shouldn't and hasn't, but it's technically possible for someone to create an indexer of a different type and pass the wrong batch in. That would just panic otherwise, so the error is more to document the fact that this batch type is covariant with the indexer.

Haha, it shouldn't and hasn't, but it's technically possible for someone to create an indexer of a different type and pass the wrong batch in. That would just panic otherwise, so the error is more to document the fact that this batch type is covariant with the indexer.
Review

Oh good. Absolutely best always to check, I was just shocked for a moment to think it might be something that really happens.

Oh good. Absolutely best always to check, I was just shocked for a moment to think it might be something that really happens.
}
// Process the header
headerNode, err := ipld.NewEthHeader(header)
if err != nil {
return "", err
}
tx.cacheIPLD(headerNode)
headerID := header.Hash().String()
@ -406,7 +349,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
if stateNode.Removed {
tx.cacheRemoved(shared.RemovedNodeStateCID, []byte{})
stateModel = models.StateNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
CID: shared.RemovedNodeStateCID,
@ -414,7 +357,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
}
} else {
stateModel = models.StateNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
CID: stateNode.AccountWrapper.CID,
@ -436,7 +379,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
if storageNode.Removed {
tx.cacheRemoved(shared.RemovedNodeStorageCID, []byte{})
storageModel := models.StorageNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
@ -450,7 +393,7 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
continue
}
storageModel := models.StorageNodeModel{
BlockNumber: tx.BlockNumber,
BlockNumber: tx.BlockNumber(),
HeaderID: headerID,
StateKey: common.BytesToHash(stateNode.AccountWrapper.LeafKey).String(),
StorageKey: common.BytesToHash(storageNode.LeafKey).String(),
@ -481,6 +424,15 @@ func (sdi *StateDiffIndexer) HasBlock(hash common.Hash, number uint64) (bool, er
return sdi.dbWriter.hasHeader(hash, number)
}
func (sdi *StateDiffIndexer) BeginTx(number *big.Int, ctx context.Context) interfaces.Batch {
return NewBatch(
sdi.dbWriter.db.InsertIPLDsStm(),
ctx,
number,
NewDelayedTx(sdi.dbWriter.db),
)
}
// Close satisfies io.Closer
func (sdi *StateDiffIndexer) Close() error {
return sdi.dbWriter.Close()

View File

@ -3,7 +3,9 @@ package sql
import (
"context"
"reflect"
"time"
"github.com/cerc-io/plugeth-statediff/indexer/database/metrics"
"github.com/cerc-io/plugeth-statediff/utils/log"
)
@ -69,10 +71,12 @@ func (tx *DelayedTx) Exec(ctx context.Context, sql string, args ...interface{})
}
func (tx *DelayedTx) Commit(ctx context.Context) error {
t := time.Now()
base, err := tx.db.Begin(ctx)
if err != nil {
return err
}
metrics.IndexerMetrics.FreePostgresTimer.Update(time.Since(t))
defer func() {
if p := recover(); p != nil {
rollback(ctx, base)

View File

@ -44,10 +44,6 @@ type Config struct {
ConnTimeout time.Duration
LogStatements bool
// node info params
ID string
ClientName string
// driver type
Driver DriverType

View File

@ -17,6 +17,7 @@
package interfaces
import (
"context"
"math/big"
"time"
@ -34,10 +35,13 @@ type StateDiffIndexer interface {
CurrentBlock() (*models.HeaderModel, error)
HasBlock(hash common.Hash, number uint64) (bool, error)
PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (Batch, error)
PushHeader(batch Batch, header *types.Header, reward, td *big.Int) (string, error)
PushStateNode(tx Batch, stateNode sdtypes.StateLeafNode, headerID string) error
PushIPLD(tx Batch, ipld sdtypes.IPLD) error
ReportDBMetrics(delay time.Duration, quit <-chan bool)
BeginTx(number *big.Int, ctx context.Context) Batch
// Methods used by WatchAddress API/functionality
LoadWatchedAddresses() ([]common.Address, error)
InsertWatchedAddresses(addresses []sdtypes.WatchAddressArg, currentBlock *big.Int) error
@ -50,7 +54,12 @@ type StateDiffIndexer interface {
// Batch required for indexing data atomically
type Batch interface {
Submit(err error) error
// Submit commits the batch transaction
Submit() error
// BlockNumber is the block number of the header this batch contains
BlockNumber() string
// RollbackOnFailure rolls back the batch transaction if the error is not nil
RollbackOnFailure(error)
}
// Config used to configure different underlying implementations

View File

@ -22,23 +22,17 @@ import (
// FromBlockAndReceipts takes a block and processes it
// to return it a set of IPLD nodes for further processing.
func FromBlockAndReceipts(block *types.Block, receipts []*types.Receipt) (*EthHeader, []*EthTx, []*EthReceipt, [][]*EthLog, error) {
// Process the header
headerNode, err := NewEthHeader(block.Header())
if err != nil {
return nil, nil, nil, nil, err
}
func FromBlockAndReceipts(block *types.Block, receipts []*types.Receipt) ([]*EthTx, []*EthReceipt, [][]*EthLog, error) {
// Process the txs
txNodes, err := processTransactions(block.Transactions())
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, err
}
// Process the receipts and logs
rctNodes, logNodes, err := processReceiptsAndLogs(receipts)
return headerNode, txNodes, rctNodes, logNodes, err
return txNodes, rctNodes, logNodes, err
}
// processTransactions will take the found transactions in a parsed block body

View File

@ -92,7 +92,7 @@ func loadBlockData(t *testing.T) []testCase {
func TestFromBlockAndReceipts(t *testing.T) {
testCases := loadBlockData(t)
for _, tc := range testCases {
_, _, _, _, err := FromBlockAndReceipts(tc.block, tc.receipts)
_, _, _, err := FromBlockAndReceipts(tc.block, tc.receipts)
if err != nil {
t.Fatalf("error generating IPLDs from block and receipts, err %v, kind %s, block hash %s", err, tc.kind, tc.block.Hash())
}

7
indexer/reexport.go Normal file
View File

@ -0,0 +1,7 @@
package indexer
import "github.com/cerc-io/plugeth-statediff/indexer/interfaces"
type Indexer = interfaces.StateDiffIndexer
type Batch = interfaces.Batch
type Config = interfaces.Config

View File

@ -43,7 +43,8 @@ var testHeaderTable = Table{
"mh_key",
"times_validated",
"coinbase",
)}
),
}
func TestTable(t *testing.T) {
headerUpsert := `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (block_hash, block_number) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)`

View File

@ -28,7 +28,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/cerc-io/plugeth-statediff/indexer/database/file"
"github.com/cerc-io/plugeth-statediff/indexer/database/sql"
"github.com/cerc-io/plugeth-statediff/indexer/interfaces"
"github.com/cerc-io/plugeth-statediff/indexer/mocks"
@ -48,7 +47,7 @@ func SetupTestData(t *testing.T, ind interfaces.StateDiffIndexer) {
t.Fatal(err)
}
defer func() {
roysc marked this conversation as resolved
Review

Do you recall why we passed err in to Submit() before? I have a vague memory buzzing in the back of my mind and I want to double check that there is no longer any reason.

Do you recall why we passed err in to Submit() before? I have a vague memory buzzing in the back of my mind and I want to double check that there is no longer any reason.
Review

Basically it was due to handling rollbacks and commit in the submit function, so it needed the error to check whether to rollback. By factoring the rollback out, we avoid that, but now the caller is responsible for it.

Which reminds me that I need to defer a rollback call from the scope of service.writeStateDiff - since the tx is returned from PushBlock.

Basically it was due to handling rollbacks and commit in the submit function, so it needed the error to check whether to rollback. By factoring the rollback out, we avoid that, but now the caller is responsible for it. Which reminds me that I need to defer a rollback call from the scope of `service.writeStateDiff` - since the tx is returned from `PushBlock`.
Review

If you've added the rollback, feel free to resolve this conversation.

If you've added the rollback, feel free to resolve this conversation.
if err := tx.Submit(err); err != nil {
if err := tx.Submit(); err != nil {
t.Fatal(err)
}
}()
@ -61,11 +60,7 @@ func SetupTestData(t *testing.T, ind interfaces.StateDiffIndexer) {
require.NoError(t, err)
}
if batchTx, ok := tx.(*sql.BatchTx); ok {
require.Equal(t, mocks.BlockNumber.String(), batchTx.BlockNumber)
} else if batchTx, ok := tx.(*file.BatchTx); ok {
require.Equal(t, mocks.BlockNumber.String(), batchTx.BlockNumber)
}
require.Equal(t, mocks.BlockNumber.String(), tx.BlockNumber())
}
func DoTestPublishAndIndexHeaderIPLDs(t *testing.T, db sql.Database) {
@ -547,13 +542,9 @@ func SetupTestDataNonCanonical(t *testing.T, ind interfaces.StateDiffIndexer) {
require.NoError(t, err)
}
if batchTx, ok := tx1.(*sql.BatchTx); ok {
require.Equal(t, mocks.BlockNumber.String(), batchTx.BlockNumber)
} else if batchTx, ok := tx1.(*file.BatchTx); ok {
require.Equal(t, mocks.BlockNumber.String(), batchTx.BlockNumber)
}
require.Equal(t, mocks.BlockNumber.String(), tx1.BlockNumber())
if err := tx1.Submit(err); err != nil {
if err := tx1.Submit(); err != nil {
t.Fatal(err)
}
@ -572,13 +563,9 @@ func SetupTestDataNonCanonical(t *testing.T, ind interfaces.StateDiffIndexer) {
require.NoError(t, err)
}
if tx, ok := tx2.(*sql.BatchTx); ok {
require.Equal(t, mocks.BlockNumber.String(), tx.BlockNumber)
} else if tx, ok := tx2.(*sql.BatchTx); ok {
require.Equal(t, mocks.BlockNumber.String(), tx.BlockNumber)
}
require.Equal(t, mocks.BlockNumber.String(), tx2.BlockNumber())
if err := tx2.Submit(err); err != nil {
if err := tx2.Submit(); err != nil {
t.Fatal(err)
}
@ -597,13 +584,9 @@ func SetupTestDataNonCanonical(t *testing.T, ind interfaces.StateDiffIndexer) {
require.NoError(t, err)
}
if batchTx, ok := tx3.(*sql.BatchTx); ok {
require.Equal(t, mocks.Block2Number.String(), batchTx.BlockNumber)
} else if batchTx, ok := tx3.(*file.BatchTx); ok {
require.Equal(t, mocks.Block2Number.String(), batchTx.BlockNumber)
}
require.Equal(t, mocks.Block2Number.String(), tx3.BlockNumber())
if err := tx3.Submit(err); err != nil {
if err := tx3.Submit(); err != nil {
t.Fatal(err)
}
}

View File

@ -20,11 +20,11 @@ import (
"context"
"testing"
"github.com/cerc-io/plugeth-statediff/indexer/database/file"
"github.com/cerc-io/plugeth-statediff/indexer/database/sql"
"github.com/cerc-io/plugeth-statediff/indexer/interfaces"
"github.com/cerc-io/plugeth-statediff/indexer/ipld"
"github.com/cerc-io/plugeth-statediff/indexer/mocks"
"github.com/cerc-io/plugeth-statediff/indexer/node"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/params"
"github.com/ipfs/go-cid"
@ -37,6 +37,14 @@ var (
legacyData = mocks.NewLegacyData(LegacyConfig)
mockLegacyBlock *types.Block
legacyHeaderCID cid.Cid
// Mainnet node info
LegacyNodeInfo = node.Info{
GenesisBlock: "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3",
NetworkID: "1",
ChainID: 1,
ID: "mockNodeID",
ClientName: "go-ethereum",
}
)
func SetupLegacyTestData(t *testing.T, ind interfaces.StateDiffIndexer) {
@ -51,7 +59,7 @@ func SetupLegacyTestData(t *testing.T, ind interfaces.StateDiffIndexer) {
require.NoError(t, err)
defer func() {
if err := tx.Submit(err); err != nil {
if err := tx.Submit(); err != nil {
t.Fatal(err)
}
}()
@ -60,11 +68,7 @@ func SetupLegacyTestData(t *testing.T, ind interfaces.StateDiffIndexer) {
require.NoError(t, err)
}
if batchTx, ok := tx.(*sql.BatchTx); ok {
require.Equal(t, legacyData.BlockNumber.String(), batchTx.BlockNumber)
} else if batchTx, ok := tx.(*file.BatchTx); ok {
require.Equal(t, legacyData.BlockNumber.String(), batchTx.BlockNumber)
}
require.Equal(t, legacyData.BlockNumber.String(), tx.BlockNumber())
}
func TestLegacyIndexer(t *testing.T, db sql.Database) {

View File

@ -19,8 +19,6 @@ package test
import (
"testing"
"github.com/cerc-io/plugeth-statediff/indexer/database/file"
"github.com/cerc-io/plugeth-statediff/indexer/database/sql"
"github.com/cerc-io/plugeth-statediff/indexer/interfaces"
"github.com/cerc-io/plugeth-statediff/indexer/mocks"
"github.com/ethereum/go-ethereum/core/types"
@ -36,7 +34,7 @@ func TestBlock(t *testing.T, ind interfaces.StateDiffIndexer, testBlock *types.B
require.NoError(t, err)
defer func() {
if err := tx.Submit(err); err != nil {
if err := tx.Submit(); err != nil {
t.Fatal(err)
}
}()
@ -45,9 +43,5 @@ func TestBlock(t *testing.T, ind interfaces.StateDiffIndexer, testBlock *types.B
require.NoError(t, err)
}
if batchTx, ok := tx.(*sql.BatchTx); ok {
require.Equal(t, testBlock.Number().String(), batchTx.BlockNumber)
} else if batchTx, ok := tx.(*file.BatchTx); ok {
require.Equal(t, testBlock.Number().String(), batchTx.BlockNumber)
}
require.Equal(t, testBlock.Number().String(), tx.BlockNumber())
}

View File

@ -173,8 +173,6 @@ func initConfig() {
case shared.FILE:
indexerConfig = fileConfig
case shared.POSTGRES:
dbConfig.ID = config.ID
dbConfig.ClientName = config.ClientName
indexerConfig = dbConfig
case shared.DUMP:
switch dbDumpDst {

View File

@ -47,8 +47,12 @@ func InitializeNode(stack core.Node, b core.Backend) {
ClientName: serviceConfig.ClientName,
}
var err error
_, indexer, err = ind.NewStateDiffIndexer(serviceConfig.Context,
adapt.ChainConfig(backend.ChainConfig()), info, serviceConfig.IndexerConfig)
_, indexer, err = ind.NewStateDiffIndexer(
serviceConfig.Context,
adapt.ChainConfig(backend.ChainConfig()),
info,
serviceConfig.IndexerConfig,
)
if err != nil {
log.Error("failed to construct indexer", "error", err)
}

View File

@ -444,7 +444,7 @@ func TestBuilderOnMainnetBlocks(t *testing.T) {
}
params := statediff.Params{}
var tests = []test_helpers.TestCase{
var tests = []test_helpers.DiffTestCase{
// note that block0 (genesis) has over 1000 nodes due to the pre-allocation for the crowd-sale
// it is not feasible to write a unit test of that size at this time
{
@ -624,7 +624,7 @@ func TestBuilderOnMainnetBlocks(t *testing.T) {
},
}
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
test_helpers.RunBuildStateDiff(t, chain.StateCache(), tests, params)
test_helpers.CheckedRoots{
block1: block1RootBranchNode,
block2: block2RootBranchNode,

View File

@ -817,11 +817,15 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
if params.IncludeReceipts {
receipts = sds.BlockChain.GetReceiptsByHash(block.Hash())
}
t := time.Now()
tx, err = sds.indexer.PushBlock(block, receipts, totalDifficulty)
if err != nil {
return err
}
defer tx.RollbackOnFailure(err)
// TODO: review/remove the need to sync here
var nodeMtx, ipldMtx sync.Mutex
nodeSink := func(node types2.StateLeafNode) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.OutputTimer)
@ -842,9 +846,12 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
BlockHash: block.Hash(),
BlockNumber: block.Number(),
}, params, nodeSink, ipldSink)
if err != nil {
return err
}
// TODO this anti-pattern needs to be sorted out eventually
if err = tx.Submit(err); err != nil {
metrics.IndexerMetrics.StateStoreCodeProcessingTimer.Update(time.Since(t))
if err = tx.Submit(); err != nil {
return fmt.Errorf("batch transaction submission failed: %w", err)
}
return nil

View File

@ -4,9 +4,13 @@ import (
"bytes"
"fmt"
"math/big"
"math/rand"
"path/filepath"
"sort"
"sync"
"testing"
"github.com/cerc-io/eth-iterator-utils/tracker"
statediff "github.com/cerc-io/plugeth-statediff"
"github.com/cerc-io/plugeth-statediff/adapt"
sdtypes "github.com/cerc-io/plugeth-statediff/types"
@ -17,12 +21,20 @@ import (
"github.com/stretchr/testify/require"
)
type TestCase struct {
var subtrieCounts = []uint{1, 8, 32}
type DiffTestCase struct {
Name string
Args statediff.Args
Expected *sdtypes.StateObject
}
type SnapshotTestCase struct {
Name string
StateRoot common.Hash
Expected *sdtypes.StateObject
}
type CheckedRoots map[*types.Block][]byte
// Replicates the statediff object, but indexes nodes by CID
@ -33,12 +45,11 @@ type normalizedStateDiff struct {
IPLDs map[string]sdtypes.IPLD
}
func RunBuilderTests(
func RunBuildStateDiff(
t *testing.T,
sdb state.Database,
tests []TestCase,
tests []DiffTestCase,
params statediff.Params,
subtrieCounts []uint,
) {
builder := statediff.NewBuilder(adapt.GethStateView(sdb))
for _, test := range tests {
@ -58,6 +69,79 @@ func RunBuilderTests(
}
}
func RunStateSnapshot(
t *testing.T,
sdb state.Database,
test SnapshotTestCase,
params statediff.Params,
) {
builder := statediff.NewBuilder(adapt.GethStateView(sdb))
for _, subtries := range subtrieCounts {
// Skip the recovery test for empty diffs
doRecovery := len(test.Expected.Nodes) != 0
t.Run(fmt.Sprintf("%s with %d subtries", test.Name, subtries), func(t *testing.T) {
builder.SetSubtrieWorkers(subtries)
var stateNodes []sdtypes.StateLeafNode
var iplds []sdtypes.IPLD
interrupt := randomInterrupt(len(test.Expected.IPLDs))
stateAppender := failingSyncedAppender(&stateNodes, -1)
ipldAppender := failingSyncedAppender(&iplds, interrupt)
recoveryFile := filepath.Join(t.TempDir(), "recovery.txt")
build := func() error {
tr := tracker.New(recoveryFile, subtries)
defer tr.CloseAndSave()
return builder.WriteStateSnapshot(
test.StateRoot, params, stateAppender, ipldAppender, tr,
)
}
if doRecovery {
// First attempt fails, second succeeds
if build() == nil {
t.Fatal("expected an error")
}
require.FileExists(t, recoveryFile)
}
ipldAppender = failingSyncedAppender(&iplds, -1)
if err := build(); err != nil {
t.Fatal(err)
}
diff := sdtypes.StateObject{
Nodes: stateNodes,
IPLDs: iplds,
}
require.Equal(t,
normalize(test.Expected),
normalize(&diff),
)
})
}
}
// an appender which fails on a configured trigger
func failingSyncedAppender[T any](to *[]T, failAt int) func(T) error {
var mtx sync.Mutex
return func(item T) error {
mtx.Lock()
defer mtx.Unlock()
if len(*to) == failAt {
return fmt.Errorf("failing at %d items", failAt)
}
*to = append(*to, item)
return nil
}
}
// function to pick random int between N/4 and 3N/4
func randomInterrupt(N int) int {
if N < 2 {
return 0
}
return rand.Intn(N/2) + N/4
}
func (roots CheckedRoots) Check(t *testing.T) {
// Let's also confirm that our root state nodes form the state root hash in the headers
for block, node := range roots {

View File

@ -17,6 +17,7 @@
package mocks
import (
context "context"
"math/big"
"time"
@ -52,6 +53,10 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
return &batch{}, nil
}
func (sdi *StateDiffIndexer) PushHeader(batch interfaces.Batch, header *types.Header, reward, td *big.Int) (string, error) {
return "", nil
}
func (sdi *StateDiffIndexer) PushStateNode(txi interfaces.Batch, stateNode sdtypes.StateLeafNode, headerID string) error {
return nil
}
@ -80,10 +85,21 @@ func (sdi *StateDiffIndexer) ClearWatchedAddresses() error {
return nil
}
func (sdi *StateDiffIndexer) BeginTx(number *big.Int, ctx context.Context) interfaces.Batch {
return &batch{}
}
func (sdi *StateDiffIndexer) Close() error {
return nil
}
func (tx *batch) Submit(err error) error {
func (tx *batch) RollbackOnFailure(error) {}
func (tx *batch) Submit() error {
return nil
}
// batch.BlockNumber
func (tx *batch) BlockNumber() string {
return "0"
}

View File

@ -53,7 +53,7 @@ type StateLeafNode struct {
StorageDiff []StorageLeafNode
}
// StorageLeafNode holds the data for a single storage diff node leaf node
// StorageLeafNode holds the data for a single storage diff leaf node
type StorageLeafNode struct {
Removed bool
Value []byte

View File

@ -7,24 +7,9 @@ import (
"github.com/ethereum/go-ethereum/trie"
)
type symmDiffIterator struct {
a, b iterState // Nodes returned are those in b - a and a - b (keys only)
yieldFromA bool // Whether next node comes from a
count int // Number of nodes scanned on either trie
eqPathIndex int // Count index of last pair of equal paths, to detect an updated key
}
// NewSymmetricDifferenceIterator constructs a trie.NodeIterator that iterates over the symmetric difference
// of elements in a and b, i.e., the elements in a that are not in b, and vice versa.
// Returns the iterator, and a pointer to an integer recording the number of nodes seen.
func NewSymmetricDifferenceIterator(a, b trie.NodeIterator) (*symmDiffIterator, *int) {
it := &symmDiffIterator{
a: iterState{a, true},
b: iterState{b, true},
// common paths are detected by a distance <=1 from this index, so put it out of reach
eqPathIndex: -2,
}
return it, &it.count
type SymmDiffIterator struct {
a, b iterState // Nodes returned are those in b - a and a - b (keys only)
SymmDiffState
}
// pairs an iterator with a cache of its valid status
@ -33,66 +18,93 @@ type iterState struct {
valid bool
}
// SymmDiffState exposes state specific to symmetric difference iteration, which is not accessible
// from the NodeIterator interface. This includes the number of nodes seen, whether the current key
// is common to both A and B, and whether the current node is sourced from A or B.
type SymmDiffState struct {
yieldFromA bool // Whether next node comes from a
count int // Number of nodes scanned on either trie
eqPathIndex int // Count index of last pair of equal paths, to detect an updated key
}
// NewSymmetricDifferenceIterator constructs a trie.NodeIterator that iterates over the symmetric difference
// of elements in a and b, i.e., the elements in a that are not in b, and vice versa.
// Returns the iterator, and a pointer to an auxiliary object for accessing the state not exposed by the NodeIterator interface recording the number of nodes seen.
func NewSymmetricDifferenceIterator(a, b trie.NodeIterator) *SymmDiffIterator {
it := &SymmDiffIterator{
a: iterState{a, true},
b: iterState{b, true},
// common paths are detected by a distance <=1 between count and this index, so we start at -2
SymmDiffState: SymmDiffState{eqPathIndex: -2},
}
return it
}
func (st *iterState) Next(descend bool) bool {
st.valid = st.NodeIterator.Next(descend)
return st.valid
}
func (it *symmDiffIterator) curr() *iterState {
// FromA returns true if the current node is sourced from A.
func (it *SymmDiffState) FromA() bool {
return it.yieldFromA
}
// CommonPath returns true if a node with the current path exists in each sub-iterator - i.e. it
// represents an updated node.
func (it *SymmDiffState) CommonPath() bool {
return it.count-it.eqPathIndex <= 1
}
// Count returns the number of nodes seen.
func (it *SymmDiffState) Count() int {
return it.count
}
func (it *SymmDiffIterator) curr() *iterState {
if it.yieldFromA {
return &it.a
}
return &it.b
}
// FromA returns true if the current node is sourced from A.
func (it *symmDiffIterator) FromA() bool {
return it.yieldFromA
}
// CommonPath returns true if a node with the current path exists in each sub-iterator - i.e. it
// represents an updated node.
func (it *symmDiffIterator) CommonPath() bool {
return it.count-it.eqPathIndex <= 1
}
func (it *symmDiffIterator) Hash() common.Hash {
func (it *SymmDiffIterator) Hash() common.Hash {
return it.curr().Hash()
}
func (it *symmDiffIterator) Parent() common.Hash {
func (it *SymmDiffIterator) Parent() common.Hash {
return it.curr().Parent()
}
func (it *symmDiffIterator) Leaf() bool {
func (it *SymmDiffIterator) Leaf() bool {
return it.curr().Leaf()
}
func (it *symmDiffIterator) LeafKey() []byte {
func (it *SymmDiffIterator) LeafKey() []byte {
return it.curr().LeafKey()
}
func (it *symmDiffIterator) LeafBlob() []byte {
func (it *SymmDiffIterator) LeafBlob() []byte {
return it.curr().LeafBlob()
}
func (it *symmDiffIterator) LeafProof() [][]byte {
func (it *SymmDiffIterator) LeafProof() [][]byte {
return it.curr().LeafProof()
}
func (it *symmDiffIterator) Path() []byte {
func (it *SymmDiffIterator) Path() []byte {
return it.curr().Path()
}
func (it *symmDiffIterator) NodeBlob() []byte {
func (it *SymmDiffIterator) NodeBlob() []byte {
return it.curr().NodeBlob()
}
func (it *symmDiffIterator) AddResolver(resolver trie.NodeResolver) {
func (it *SymmDiffIterator) AddResolver(resolver trie.NodeResolver) {
panic("not implemented")
}
func (it *symmDiffIterator) Next(bool) bool {
func (it *SymmDiffIterator) Next(bool) bool {
// NodeIterators start in a "pre-valid" state, so the first Next advances to a valid node.
if it.count == 0 {
if it.a.Next(true) {
@ -110,7 +122,7 @@ func (it *symmDiffIterator) Next(bool) bool {
return it.a.valid || it.b.valid
}
func (it *symmDiffIterator) seek() {
func (it *SymmDiffIterator) seek() {
// Invariants:
// - At the end of the function, the sub-iterator with the lexically lesser path
// points to the next element
@ -151,7 +163,7 @@ func (it *symmDiffIterator) seek() {
}
}
func (it *symmDiffIterator) Error() error {
func (it *SymmDiffIterator) Error() error {
if err := it.a.Error(); err != nil {
return err
}
@ -172,3 +184,9 @@ func compareNodes(a, b trie.NodeIterator) int {
}
return 0
}
// AlwaysBState returns a dummy SymmDiffState that indicates all elements are from B, and have no
// common paths with A. This is equivalent to a diff against an empty A.
func AlwaysBState() SymmDiffState {
return SymmDiffState{yieldFromA: false, eqPathIndex: -2}
}

View File

@ -45,37 +45,33 @@ func TestSymmetricDifferenceIterator(t *testing.T) {
t.Run("with no difference", func(t *testing.T) {
db := trie.NewDatabase(rawdb.NewMemoryDatabase())
triea := trie.NewEmpty(db)
di, count := utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), triea.NodeIterator(nil))
di := utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), triea.NodeIterator(nil))
for di.Next(true) {
t.Errorf("iterator should not yield any elements")
}
assert.Equal(t, 0, *count)
assert.Equal(t, 0, di.Count())
triea.MustUpdate([]byte("foo"), []byte("bar"))
di, count = utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), triea.NodeIterator(nil))
di = utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), triea.NodeIterator(nil))
for di.Next(true) {
t.Errorf("iterator should not yield any elements")
}
assert.Equal(t, 2, *count)
// two nodes visited: the leaf (value) and its parent
assert.Equal(t, 2, di.Count())
// TODO will fail until fixed https://github.com/ethereum/go-ethereum/pull/27838
trieb := trie.NewEmpty(db)
di, count = utils.NewSymmetricDifferenceIterator(
triea.NodeIterator([]byte("jars")),
trieb.NodeIterator(nil))
di = utils.NewSymmetricDifferenceIterator(triea.NodeIterator([]byte("jars")), trieb.NodeIterator(nil))
for di.Next(true) {
t.Errorf("iterator should not yield any elements, but got key %s", di.Path())
t.Errorf("iterator should not yield any elements")
}
assert.Equal(t, 0, *count)
assert.Equal(t, 0, di.Count())
// // TODO will fail until merged: https://github.com/ethereum/go-ethereum/pull/27838
// di, count = utils.NewSymmetricDifferenceIterator(
// triea.NodeIterator([]byte("food")),
// trieb.NodeIterator(nil))
// TODO will fail until merged: https://github.com/ethereum/go-ethereum/pull/27838
// di, aux = utils.NewSymmetricDifferenceIterator(triea.NodeIterator([]byte("food")), trieb.NodeIterator(nil))
// for di.Next(true) {
// t.Errorf("iterator should not yield any elements, but got key %s", di.Path())
// t.Errorf("iterator should not yield any elements")
// }
// assert.Equal(t, 0, *count)
// assert.Equal(t, 0, di.Count())
})
t.Run("small difference", func(t *testing.T) {
@ -86,7 +82,7 @@ func TestSymmetricDifferenceIterator(t *testing.T) {
trieb := trie.NewEmpty(dbb)
trieb.MustUpdate([]byte("foo"), []byte("bar"))
di, count := utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), trieb.NodeIterator(nil))
di := utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), trieb.NodeIterator(nil))
leaves := 0
for di.Next(true) {
if di.Leaf() {
@ -97,10 +93,10 @@ func TestSymmetricDifferenceIterator(t *testing.T) {
}
}
assert.Equal(t, 1, leaves)
assert.Equal(t, 2, *count)
assert.Equal(t, 2, di.Count())
trieb.MustUpdate([]byte("quux"), []byte("bars"))
di, count = utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), trieb.NodeIterator([]byte("quux")))
di = utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), trieb.NodeIterator([]byte("quux")))
leaves = 0
for di.Next(true) {
if di.Leaf() {
@ -111,7 +107,7 @@ func TestSymmetricDifferenceIterator(t *testing.T) {
}
}
assert.Equal(t, 1, leaves)
assert.Equal(t, 1, *count)
assert.Equal(t, 1, di.Count())
})
dba := trie.NewDatabase(rawdb.NewMemoryDatabase())
@ -128,7 +124,7 @@ func TestSymmetricDifferenceIterator(t *testing.T) {
onlyA := make(map[string]string)
onlyB := make(map[string]string)
var deletions, creations []string
it, _ := utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), trieb.NodeIterator(nil))
it := utils.NewSymmetricDifferenceIterator(triea.NodeIterator(nil), trieb.NodeIterator(nil))
for it.Next(true) {
if !it.Leaf() {
continue
@ -209,7 +205,7 @@ func TestCompareDifferenceIterators(t *testing.T) {
pathsA = append(pathsA, itAonly.Path())
}
itSym, _ := utils.NewSymmetricDifferenceIterator(treeA.NodeIterator(nil), treeB.NodeIterator(nil))
itSym := utils.NewSymmetricDifferenceIterator(treeA.NodeIterator(nil), treeB.NodeIterator(nil))
var idxA, idxB int
for itSym.Next(true) {
if itSym.FromA() {

View File

@ -17,6 +17,7 @@ func init() {
// The plugeth logger is only initialized with the geth runtime,
// but tests expect to have a logger available, so default to this.
DefaultLogger = TestLogger
TestLogger.SetLevel(int(log15.LvlInfo))
}
func Trace(m string, a ...interface{}) { DefaultLogger.Trace(m, a...) }