Concurrent statediff iteration (#12)
Uses subtrie iterators to concurrently process the state trie. Reviewed-on: #12
This commit is contained in:
parent
981bfb5895
commit
82131564ca
87
builder.go
87
builder.go
@ -21,14 +21,18 @@ package statediff
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
iterutils "github.com/cerc-io/eth-iterator-utils"
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/crypto"
|
"github.com/ethereum/go-ethereum/crypto"
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
"github.com/ethereum/go-ethereum/trie"
|
"github.com/ethereum/go-ethereum/trie"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
"github.com/cerc-io/plugeth-statediff/adapt"
|
"github.com/cerc-io/plugeth-statediff/adapt"
|
||||||
"github.com/cerc-io/plugeth-statediff/indexer/database/metrics"
|
"github.com/cerc-io/plugeth-statediff/indexer/database/metrics"
|
||||||
@ -44,6 +48,8 @@ var (
|
|||||||
emptyContractRoot = crypto.Keccak256Hash(emptyNode)
|
emptyContractRoot = crypto.Keccak256Hash(emptyNode)
|
||||||
nullCodeHash = crypto.Keccak256([]byte{})
|
nullCodeHash = crypto.Keccak256([]byte{})
|
||||||
zeroHash common.Hash
|
zeroHash common.Hash
|
||||||
|
|
||||||
|
defaultSubtrieWorkers uint = 1
|
||||||
)
|
)
|
||||||
|
|
||||||
// Builder interface exposes the method for building a state diff between two blocks
|
// Builder interface exposes the method for building a state diff between two blocks
|
||||||
@ -54,11 +60,8 @@ type Builder interface {
|
|||||||
|
|
||||||
type StateDiffBuilder struct {
|
type StateDiffBuilder struct {
|
||||||
// state cache is safe for concurrent reads
|
// state cache is safe for concurrent reads
|
||||||
stateCache adapt.StateView
|
stateCache adapt.StateView
|
||||||
}
|
subtrieWorkers uint
|
||||||
|
|
||||||
type iterPair struct {
|
|
||||||
Older, Newer trie.NodeIterator
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type accountUpdate struct {
|
type accountUpdate struct {
|
||||||
@ -74,19 +77,39 @@ func appender[T any](to *[]T) func(T) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBuilder is used to create a statediff builder
|
func syncedAppender[T any](to *[]T) func(T) error {
|
||||||
func NewBuilder(stateCache adapt.StateView) Builder {
|
var mtx sync.Mutex
|
||||||
return &StateDiffBuilder{
|
return func(a T) error {
|
||||||
stateCache: stateCache,
|
mtx.Lock()
|
||||||
|
*to = append(*to, a)
|
||||||
|
mtx.Unlock()
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewBuilder is used to create a statediff builder
|
||||||
|
func NewBuilder(stateCache adapt.StateView) *StateDiffBuilder {
|
||||||
|
return &StateDiffBuilder{
|
||||||
|
stateCache: stateCache,
|
||||||
|
subtrieWorkers: defaultSubtrieWorkers,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetSubtrieWorkers sets the number of disjoint subtries to divide among parallel workers.
|
||||||
|
// Passing 0 will reset this to the default value.
|
||||||
|
func (sdb *StateDiffBuilder) SetSubtrieWorkers(n uint) {
|
||||||
|
if n == 0 {
|
||||||
|
n = defaultSubtrieWorkers
|
||||||
|
}
|
||||||
|
sdb.subtrieWorkers = n
|
||||||
|
}
|
||||||
|
|
||||||
// BuildStateDiffObject builds a statediff object from two blocks and the provided parameters
|
// BuildStateDiffObject builds a statediff object from two blocks and the provided parameters
|
||||||
func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (sdtypes.StateObject, error) {
|
func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (sdtypes.StateObject, error) {
|
||||||
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStateDiffObjectTimer)
|
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStateDiffObjectTimer)
|
||||||
var stateNodes []sdtypes.StateLeafNode
|
var stateNodes []sdtypes.StateLeafNode
|
||||||
var iplds []sdtypes.IPLD
|
var iplds []sdtypes.IPLD
|
||||||
err := sdb.WriteStateDiff(args, params, appender(&stateNodes), appender(&iplds))
|
err := sdb.WriteStateDiff(args, params, syncedAppender(&stateNodes), syncedAppender(&iplds))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return sdtypes.StateObject{}, err
|
return sdtypes.StateObject{}, err
|
||||||
}
|
}
|
||||||
@ -106,35 +129,39 @@ func (sdb *StateDiffBuilder) WriteStateDiff(
|
|||||||
) error {
|
) error {
|
||||||
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.WriteStateDiffTimer)
|
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.WriteStateDiffTimer)
|
||||||
// Load tries for old and new states
|
// Load tries for old and new states
|
||||||
oldTrie, err := sdb.stateCache.OpenTrie(args.OldStateRoot)
|
triea, err := sdb.stateCache.OpenTrie(args.OldStateRoot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error opening old state trie: %w", err)
|
return fmt.Errorf("error opening old state trie: %w", err)
|
||||||
}
|
}
|
||||||
newTrie, err := sdb.stateCache.OpenTrie(args.NewStateRoot)
|
trieb, err := sdb.stateCache.OpenTrie(args.NewStateRoot)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error opening new state trie: %w", err)
|
return fmt.Errorf("error opening new state trie: %w", err)
|
||||||
}
|
}
|
||||||
|
subitersA := iterutils.SubtrieIterators(triea.NodeIterator, uint(sdb.subtrieWorkers))
|
||||||
|
subitersB := iterutils.SubtrieIterators(trieb.NodeIterator, uint(sdb.subtrieWorkers))
|
||||||
|
|
||||||
iters := iterPair{
|
|
||||||
Older: oldTrie.NodeIterator(nil),
|
|
||||||
Newer: newTrie.NodeIterator(nil),
|
|
||||||
}
|
|
||||||
logger := log.New("hash", args.BlockHash, "number", args.BlockNumber)
|
logger := log.New("hash", args.BlockHash, "number", args.BlockNumber)
|
||||||
|
// errgroup will cancel if any gr fails
|
||||||
err = sdb.processAccounts(
|
g, ctx := errgroup.WithContext(context.Background())
|
||||||
iters.Older, iters.Newer,
|
for i := uint(0); i < sdb.subtrieWorkers; i++ {
|
||||||
params.watchedAddressesLeafPaths,
|
func(subdiv uint) {
|
||||||
nodeSink, ipldSink, logger)
|
g.Go(func() error {
|
||||||
if err != nil {
|
a, b := subitersA[subdiv], subitersB[subdiv]
|
||||||
return fmt.Errorf("error collecting createdAndUpdatedNodes: %w", err)
|
return sdb.processAccounts(ctx,
|
||||||
|
a, b, params.watchedAddressesLeafPaths,
|
||||||
|
nodeSink, ipldSink, logger,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
}(i)
|
||||||
}
|
}
|
||||||
return nil
|
return g.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
// processAccounts processes account creations and deletions, and returns a set of updated
|
// processAccounts processes account creations and deletions, and returns a set of updated
|
||||||
// existing accounts, indexed by leaf key.
|
// existing accounts, indexed by leaf key.
|
||||||
func (sdb *StateDiffBuilder) processAccounts(a, b trie.NodeIterator,
|
func (sdb *StateDiffBuilder) processAccounts(
|
||||||
watchedAddressesLeafPaths [][]byte,
|
ctx context.Context,
|
||||||
|
a, b trie.NodeIterator, watchedAddressesLeafPaths [][]byte,
|
||||||
nodeSink sdtypes.StateNodeSink, ipldSink sdtypes.IPLDSink,
|
nodeSink sdtypes.StateNodeSink, ipldSink sdtypes.IPLDSink,
|
||||||
logger log.Logger,
|
logger log.Logger,
|
||||||
) error {
|
) error {
|
||||||
@ -146,7 +173,14 @@ func (sdb *StateDiffBuilder) processAccounts(a, b trie.NodeIterator,
|
|||||||
// Cache the RLP of the previous node. When we hit a value node this will be the parent blob.
|
// Cache the RLP of the previous node. When we hit a value node this will be the parent blob.
|
||||||
var prevBlob []byte
|
var prevBlob []byte
|
||||||
it, itCount := utils.NewSymmetricDifferenceIterator(a, b)
|
it, itCount := utils.NewSymmetricDifferenceIterator(a, b)
|
||||||
|
prevBlob = it.NodeBlob()
|
||||||
for it.Next(true) {
|
for it.Next(true) {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
// ignore node if it is not along paths of interest
|
// ignore node if it is not along paths of interest
|
||||||
if !isWatchedPathPrefix(watchedAddressesLeafPaths, it.Path()) {
|
if !isWatchedPathPrefix(watchedAddressesLeafPaths, it.Path()) {
|
||||||
continue
|
continue
|
||||||
@ -205,6 +239,7 @@ func (sdb *StateDiffBuilder) processAccounts(a, b trie.NodeIterator,
|
|||||||
if it.Hash() == zeroHash {
|
if it.Hash() == zeroHash {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
// TODO - this can be handled when value node is (craeted?)
|
||||||
nodeVal := make([]byte, len(it.NodeBlob()))
|
nodeVal := make([]byte, len(it.NodeBlob()))
|
||||||
copy(nodeVal, it.NodeBlob())
|
copy(nodeVal, it.NodeBlob())
|
||||||
// if doing a selective diff, we need to ensure this is a watched path
|
// if doing a selective diff, we need to ensure this is a watched path
|
||||||
|
@ -795,13 +795,13 @@ func TestBuilder(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
test_helpers.RunBuilderTests(t, chain.StateCache(),
|
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
|
||||||
tests, params, test_helpers.CheckedRoots{
|
test_helpers.CheckedRoots{
|
||||||
block0: bankAccountAtBlock0LeafNode,
|
block0: bankAccountAtBlock0LeafNode,
|
||||||
block1: block1BranchRootNode,
|
block1: block1BranchRootNode,
|
||||||
block2: block2BranchRootNode,
|
block2: block2BranchRootNode,
|
||||||
block3: block3BranchRootNode,
|
block3: block3BranchRootNode,
|
||||||
})
|
}.Check(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBuilderWithWatchedAddressList(t *testing.T) {
|
func TestBuilderWithWatchedAddressList(t *testing.T) {
|
||||||
@ -1009,12 +1009,13 @@ func TestBuilderWithWatchedAddressList(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
|
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
|
||||||
|
test_helpers.CheckedRoots{
|
||||||
block0: bankAccountAtBlock0LeafNode,
|
block0: bankAccountAtBlock0LeafNode,
|
||||||
block1: block1BranchRootNode,
|
block1: block1BranchRootNode,
|
||||||
block2: block2BranchRootNode,
|
block2: block2BranchRootNode,
|
||||||
block3: block3BranchRootNode,
|
block3: block3BranchRootNode,
|
||||||
})
|
}.Check(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBuilderWithRemovedAccountAndStorage(t *testing.T) {
|
func TestBuilderWithRemovedAccountAndStorage(t *testing.T) {
|
||||||
@ -1259,11 +1260,12 @@ func TestBuilderWithRemovedAccountAndStorage(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
|
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
|
||||||
|
test_helpers.CheckedRoots{
|
||||||
block4: block4BranchRootNode,
|
block4: block4BranchRootNode,
|
||||||
block5: block5BranchRootNode,
|
block5: block5BranchRootNode,
|
||||||
block6: block6BranchRootNode,
|
block6: block6BranchRootNode,
|
||||||
})
|
}.Check(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBuilderWithRemovedNonWatchedAccount(t *testing.T) {
|
func TestBuilderWithRemovedNonWatchedAccount(t *testing.T) {
|
||||||
@ -1393,11 +1395,12 @@ func TestBuilderWithRemovedNonWatchedAccount(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
|
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
|
||||||
|
test_helpers.CheckedRoots{
|
||||||
block4: block4BranchRootNode,
|
block4: block4BranchRootNode,
|
||||||
block5: block5BranchRootNode,
|
block5: block5BranchRootNode,
|
||||||
block6: block6BranchRootNode,
|
block6: block6BranchRootNode,
|
||||||
})
|
}.Check(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBuilderWithRemovedWatchedAccount(t *testing.T) {
|
func TestBuilderWithRemovedWatchedAccount(t *testing.T) {
|
||||||
@ -1596,11 +1599,12 @@ func TestBuilderWithRemovedWatchedAccount(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
|
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
|
||||||
|
test_helpers.CheckedRoots{
|
||||||
block4: block4BranchRootNode,
|
block4: block4BranchRootNode,
|
||||||
block5: block5BranchRootNode,
|
block5: block5BranchRootNode,
|
||||||
block6: block6BranchRootNode,
|
block6: block6BranchRootNode,
|
||||||
})
|
}.Check(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -1823,10 +1827,11 @@ func TestBuilderWithMovedAccount(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
|
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
|
||||||
|
test_helpers.CheckedRoots{
|
||||||
block1: block01BranchRootNode,
|
block1: block01BranchRootNode,
|
||||||
block2: bankAccountAtBlock02LeafNode,
|
block2: bankAccountAtBlock02LeafNode,
|
||||||
})
|
}.Check(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -2349,11 +2354,12 @@ func TestBuilderWithInternalizedLeafNode(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
|
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
|
||||||
|
test_helpers.CheckedRoots{
|
||||||
block1: block1bBranchRootNode,
|
block1: block1bBranchRootNode,
|
||||||
block2: block2bBranchRootNode,
|
block2: block2bBranchRootNode,
|
||||||
block3: block3bBranchRootNode,
|
block3: block3bBranchRootNode,
|
||||||
})
|
}.Check(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestBuilderWithInternalizedLeafNodeAndWatchedAddress(t *testing.T) {
|
func TestBuilderWithInternalizedLeafNodeAndWatchedAddress(t *testing.T) {
|
||||||
@ -2550,11 +2556,12 @@ func TestBuilderWithInternalizedLeafNodeAndWatchedAddress(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
|
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
|
||||||
|
test_helpers.CheckedRoots{
|
||||||
block1: block1bBranchRootNode,
|
block1: block1bBranchRootNode,
|
||||||
block2: block2bBranchRootNode,
|
block2: block2bBranchRootNode,
|
||||||
block3: block3bBranchRootNode,
|
block3: block3bBranchRootNode,
|
||||||
})
|
}.Check(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -44,6 +44,8 @@ type Config struct {
|
|||||||
BackfillCheckPastBlocks uint64
|
BackfillCheckPastBlocks uint64
|
||||||
// Size of the worker pool
|
// Size of the worker pool
|
||||||
NumWorkers uint
|
NumWorkers uint
|
||||||
|
// Number of subtries to iterate in parallel
|
||||||
|
SubtrieWorkers uint
|
||||||
// Should the statediff service wait until geth has synced to the head of the blockchain?
|
// Should the statediff service wait until geth has synced to the head of the blockchain?
|
||||||
WaitForSync bool
|
WaitForSync bool
|
||||||
// Context used during DB initialization
|
// Context used during DB initialization
|
||||||
|
8
go.mod
8
go.mod
@ -3,6 +3,7 @@ module github.com/cerc-io/plugeth-statediff
|
|||||||
go 1.19
|
go 1.19
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/cerc-io/eth-iterator-utils v0.1.1
|
||||||
github.com/cerc-io/eth-testing v0.2.1
|
github.com/cerc-io/eth-testing v0.2.1
|
||||||
github.com/ethereum/go-ethereum v1.11.6
|
github.com/ethereum/go-ethereum v1.11.6
|
||||||
github.com/georgysavva/scany v0.2.9
|
github.com/georgysavva/scany v0.2.9
|
||||||
@ -18,8 +19,9 @@ require (
|
|||||||
github.com/openrelayxyz/plugeth-utils v1.2.0
|
github.com/openrelayxyz/plugeth-utils v1.2.0
|
||||||
github.com/pganalyze/pg_query_go/v4 v4.2.1
|
github.com/pganalyze/pg_query_go/v4 v4.2.1
|
||||||
github.com/shopspring/decimal v1.2.0
|
github.com/shopspring/decimal v1.2.0
|
||||||
github.com/stretchr/testify v1.8.1
|
github.com/stretchr/testify v1.8.2
|
||||||
github.com/thoas/go-funk v0.9.2
|
github.com/thoas/go-funk v0.9.3
|
||||||
|
golang.org/x/sync v0.1.0
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
@ -109,7 +111,6 @@ require (
|
|||||||
golang.org/x/crypto v0.1.0 // indirect
|
golang.org/x/crypto v0.1.0 // indirect
|
||||||
golang.org/x/exp v0.0.0-20230206171751-46f607a40771 // indirect
|
golang.org/x/exp v0.0.0-20230206171751-46f607a40771 // indirect
|
||||||
golang.org/x/net v0.8.0 // indirect
|
golang.org/x/net v0.8.0 // indirect
|
||||||
golang.org/x/sync v0.1.0 // indirect
|
|
||||||
golang.org/x/sys v0.6.0 // indirect
|
golang.org/x/sys v0.6.0 // indirect
|
||||||
golang.org/x/term v0.6.0 // indirect
|
golang.org/x/term v0.6.0 // indirect
|
||||||
golang.org/x/text v0.8.0 // indirect
|
golang.org/x/text v0.8.0 // indirect
|
||||||
@ -123,6 +124,7 @@ require (
|
|||||||
)
|
)
|
||||||
|
|
||||||
replace (
|
replace (
|
||||||
|
github.com/cerc-io/eth-iterator-utils => git.vdb.to/cerc-io/eth-iterator-utils v0.1.1
|
||||||
github.com/cerc-io/eth-testing => git.vdb.to/cerc-io/eth-testing v0.2.1
|
github.com/cerc-io/eth-testing => git.vdb.to/cerc-io/eth-testing v0.2.1
|
||||||
github.com/ethereum/go-ethereum => git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1
|
github.com/ethereum/go-ethereum => git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1
|
||||||
github.com/openrelayxyz/plugeth-utils => git.vdb.to/cerc-io/plugeth-utils v0.0.0-20230706160122-cd41de354c46
|
github.com/openrelayxyz/plugeth-utils => git.vdb.to/cerc-io/plugeth-utils v0.0.0-20230706160122-cd41de354c46
|
||||||
|
10
go.sum
10
go.sum
@ -1,4 +1,6 @@
|
|||||||
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
||||||
|
git.vdb.to/cerc-io/eth-iterator-utils v0.1.1 h1:AGen4U2GaYJVzPjEo3U+GPczSfOUEMkM1nWTM+cq5Dk=
|
||||||
|
git.vdb.to/cerc-io/eth-iterator-utils v0.1.1/go.mod h1:uiocO9elfDe78kd3c/VZ2in26V+gyXJuN+sdTxK4Xag=
|
||||||
git.vdb.to/cerc-io/eth-testing v0.2.1 h1:IZAX7DVgzPkSmu1xdKZ5aOemdEYbvtgae7GUl/TUNtQ=
|
git.vdb.to/cerc-io/eth-testing v0.2.1 h1:IZAX7DVgzPkSmu1xdKZ5aOemdEYbvtgae7GUl/TUNtQ=
|
||||||
git.vdb.to/cerc-io/eth-testing v0.2.1/go.mod h1:qdvpc/W1xvf2MKx3rMOqvFvYaYIHG77Z1g0lwsmw0Uk=
|
git.vdb.to/cerc-io/eth-testing v0.2.1/go.mod h1:qdvpc/W1xvf2MKx3rMOqvFvYaYIHG77Z1g0lwsmw0Uk=
|
||||||
git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1 h1:KLjxHwp9Zp7xhECccmJS00RiL+VwTuUGLU7qeIctg8g=
|
git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1 h1:KLjxHwp9Zp7xhECccmJS00RiL+VwTuUGLU7qeIctg8g=
|
||||||
@ -493,12 +495,12 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
|
|||||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||||
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
|
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
|
||||||
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||||
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY=
|
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY=
|
||||||
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc=
|
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc=
|
||||||
github.com/thoas/go-funk v0.9.2 h1:oKlNYv0AY5nyf9g+/GhMgS/UO2ces0QRdPKwkhY3VCk=
|
github.com/thoas/go-funk v0.9.3 h1:7+nAEx3kn5ZJcnDm2Bh23N2yOtweO14bi//dvRtgLpw=
|
||||||
github.com/thoas/go-funk v0.9.2/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q=
|
github.com/thoas/go-funk v0.9.3/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q=
|
||||||
github.com/tklauser/go-sysconf v0.3.5 h1:uu3Xl4nkLzQfXNsWn15rPc/HQCJKObbt1dKJeWp3vU4=
|
github.com/tklauser/go-sysconf v0.3.5 h1:uu3Xl4nkLzQfXNsWn15rPc/HQCJKObbt1dKJeWp3vU4=
|
||||||
github.com/tklauser/go-sysconf v0.3.5/go.mod h1:MkWzOF4RMCshBAMXuhXJs64Rte09mITnppBXY/rYEFI=
|
github.com/tklauser/go-sysconf v0.3.5/go.mod h1:MkWzOF4RMCshBAMXuhXJs64Rte09mITnppBXY/rYEFI=
|
||||||
github.com/tklauser/numcpus v0.2.2 h1:oyhllyrScuYI6g+h/zUvNXNp1wy7x8qQy3t/piefldA=
|
github.com/tklauser/numcpus v0.2.2 h1:oyhllyrScuYI6g+h/zUvNXNp1wy7x8qQy3t/piefldA=
|
||||||
|
@ -57,7 +57,7 @@ type StateDiffIndexer struct {
|
|||||||
chainConfig *params.ChainConfig
|
chainConfig *params.ChainConfig
|
||||||
nodeID string
|
nodeID string
|
||||||
wg *sync.WaitGroup
|
wg *sync.WaitGroup
|
||||||
removedCacheFlag *uint32
|
removedCacheFlag uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer
|
// NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer
|
||||||
@ -130,7 +130,6 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(time.Duration, <-chan bool) {}
|
|||||||
// PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts)
|
// PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts)
|
||||||
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
|
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
|
||||||
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
|
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
|
||||||
sdi.removedCacheFlag = new(uint32)
|
|
||||||
start, t := time.Now(), time.Now()
|
start, t := time.Now(), time.Now()
|
||||||
blockHash := block.Hash()
|
blockHash := block.Hash()
|
||||||
blockHashStr := blockHash.String()
|
blockHashStr := blockHash.String()
|
||||||
@ -223,11 +222,6 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
|||||||
func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) string {
|
func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) string {
|
||||||
sdi.fileWriter.upsertIPLDNode(header.Number.String(), headerNode)
|
sdi.fileWriter.upsertIPLDNode(header.Number.String(), headerNode)
|
||||||
|
|
||||||
var baseFee *string
|
|
||||||
if header.BaseFee != nil {
|
|
||||||
baseFee = new(string)
|
|
||||||
*baseFee = header.BaseFee.String()
|
|
||||||
}
|
|
||||||
headerID := header.Hash().String()
|
headerID := header.Hash().String()
|
||||||
sdi.fileWriter.upsertHeaderCID(models.HeaderModel{
|
sdi.fileWriter.upsertHeaderCID(models.HeaderModel{
|
||||||
NodeIDs: pq.StringArray([]string{sdi.nodeID}),
|
NodeIDs: pq.StringArray([]string{sdi.nodeID}),
|
||||||
@ -388,8 +382,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
|
|||||||
// publish the state node
|
// publish the state node
|
||||||
var stateModel models.StateNodeModel
|
var stateModel models.StateNodeModel
|
||||||
if stateNode.Removed {
|
if stateNode.Removed {
|
||||||
if atomic.LoadUint32(sdi.removedCacheFlag) == 0 {
|
if atomic.LoadUint32(&sdi.removedCacheFlag) == 0 {
|
||||||
atomic.StoreUint32(sdi.removedCacheFlag, 1)
|
atomic.StoreUint32(&sdi.removedCacheFlag, 1)
|
||||||
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeStateCID, []byte{})
|
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeStateCID, []byte{})
|
||||||
}
|
}
|
||||||
stateModel = models.StateNodeModel{
|
stateModel = models.StateNodeModel{
|
||||||
@ -419,8 +413,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
|
|||||||
// if there are any storage nodes associated with this node, publish and index them
|
// if there are any storage nodes associated with this node, publish and index them
|
||||||
for _, storageNode := range stateNode.StorageDiff {
|
for _, storageNode := range stateNode.StorageDiff {
|
||||||
if storageNode.Removed {
|
if storageNode.Removed {
|
||||||
if atomic.LoadUint32(sdi.removedCacheFlag) == 0 {
|
if atomic.LoadUint32(&sdi.removedCacheFlag) == 0 {
|
||||||
atomic.StoreUint32(sdi.removedCacheFlag, 1)
|
atomic.StoreUint32(&sdi.removedCacheFlag, 1)
|
||||||
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeStorageCID, []byte{})
|
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeStorageCID, []byte{})
|
||||||
}
|
}
|
||||||
storageModel := models.StorageNodeModel{
|
storageModel := models.StorageNodeModel{
|
||||||
|
@ -84,9 +84,8 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bo
|
|||||||
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
|
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
|
||||||
start, t := time.Now(), time.Now()
|
start, t := time.Now(), time.Now()
|
||||||
blockHash := block.Hash()
|
blockHash := block.Hash()
|
||||||
blockHashStr := blockHash.String()
|
|
||||||
height := block.NumberU64()
|
height := block.NumberU64()
|
||||||
traceMsg := fmt.Sprintf("indexer stats for statediff at %d with hash %s:\r\n", height, blockHashStr)
|
traceMsg := fmt.Sprintf("indexer stats for statediff at %d with hash %s:\r\n", height, blockHash)
|
||||||
transactions := block.Transactions()
|
transactions := block.Transactions()
|
||||||
// Derive any missing fields
|
// Derive any missing fields
|
||||||
if err := receipts.DeriveFields(sdi.chainConfig, blockHash, height, block.BaseFee(), transactions); err != nil {
|
if err := receipts.DeriveFields(sdi.chainConfig, blockHash, height, block.BaseFee(), transactions); err != nil {
|
||||||
@ -155,20 +154,20 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
|||||||
} else {
|
} else {
|
||||||
tDiff := time.Since(t)
|
tDiff := time.Since(t)
|
||||||
metrics2.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff)
|
metrics2.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff)
|
||||||
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
|
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff)
|
||||||
t = time.Now()
|
t = time.Now()
|
||||||
if err := self.flush(); err != nil {
|
if err := self.flush(); err != nil {
|
||||||
rollback(sdi.ctx, tx)
|
rollback(sdi.ctx, tx)
|
||||||
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
|
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start))
|
||||||
log.Debug(traceMsg)
|
log.Debug(traceMsg)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
err = tx.Commit(sdi.ctx)
|
err = tx.Commit(sdi.ctx)
|
||||||
tDiff = time.Since(t)
|
tDiff = time.Since(t)
|
||||||
metrics2.IndexerMetrics.PostgresCommitTimer.Update(tDiff)
|
metrics2.IndexerMetrics.PostgresCommitTimer.Update(tDiff)
|
||||||
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
|
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff)
|
||||||
}
|
}
|
||||||
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
|
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start))
|
||||||
log.Debug(traceMsg)
|
log.Debug(traceMsg)
|
||||||
return err
|
return err
|
||||||
},
|
},
|
||||||
@ -178,7 +177,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
|||||||
tDiff := time.Since(t)
|
tDiff := time.Since(t)
|
||||||
metrics2.IndexerMetrics.FreePostgresTimer.Update(tDiff)
|
metrics2.IndexerMetrics.FreePostgresTimer.Update(tDiff)
|
||||||
|
|
||||||
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
|
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff)
|
||||||
t = time.Now()
|
t = time.Now()
|
||||||
|
|
||||||
// Publish and index header, collect headerID
|
// Publish and index header, collect headerID
|
||||||
@ -189,7 +188,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
|||||||
}
|
}
|
||||||
tDiff = time.Since(t)
|
tDiff = time.Since(t)
|
||||||
metrics2.IndexerMetrics.HeaderProcessingTimer.Update(tDiff)
|
metrics2.IndexerMetrics.HeaderProcessingTimer.Update(tDiff)
|
||||||
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
|
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff)
|
||||||
t = time.Now()
|
t = time.Now()
|
||||||
// Publish and index uncles
|
// Publish and index uncles
|
||||||
err = sdi.processUncles(blockTx, headerID, block.Number(), block.UncleHash(), block.Uncles())
|
err = sdi.processUncles(blockTx, headerID, block.Number(), block.UncleHash(), block.Uncles())
|
||||||
@ -198,7 +197,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
|||||||
}
|
}
|
||||||
tDiff = time.Since(t)
|
tDiff = time.Since(t)
|
||||||
metrics2.IndexerMetrics.UncleProcessingTimer.Update(tDiff)
|
metrics2.IndexerMetrics.UncleProcessingTimer.Update(tDiff)
|
||||||
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
|
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff)
|
||||||
t = time.Now()
|
t = time.Now()
|
||||||
// Publish and index receipts and txs
|
// Publish and index receipts and txs
|
||||||
err = sdi.processReceiptsAndTxs(blockTx, processArgs{
|
err = sdi.processReceiptsAndTxs(blockTx, processArgs{
|
||||||
@ -215,7 +214,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
|||||||
}
|
}
|
||||||
tDiff = time.Since(t)
|
tDiff = time.Since(t)
|
||||||
metrics2.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff)
|
metrics2.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff)
|
||||||
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String())
|
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff)
|
||||||
t = time.Now()
|
t = time.Now()
|
||||||
|
|
||||||
return blockTx, err
|
return blockTx, err
|
||||||
@ -236,11 +235,6 @@ func (sdi *StateDiffIndexer) DetectGaps(beginBlockNumber uint64, endBlockNumber
|
|||||||
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) (string, error) {
|
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) (string, error) {
|
||||||
tx.cacheIPLD(headerNode)
|
tx.cacheIPLD(headerNode)
|
||||||
|
|
||||||
var baseFee *string
|
|
||||||
if header.BaseFee != nil {
|
|
||||||
baseFee = new(string)
|
|
||||||
*baseFee = header.BaseFee.String()
|
|
||||||
}
|
|
||||||
headerID := header.Hash().String()
|
headerID := header.Hash().String()
|
||||||
// index header
|
// index header
|
||||||
return headerID, sdi.dbWriter.upsertHeaderCID(tx.dbtx, models.HeaderModel{
|
return headerID, sdi.dbWriter.upsertHeaderCID(tx.dbtx, models.HeaderModel{
|
||||||
|
@ -58,18 +58,6 @@ type Statements interface {
|
|||||||
InsertStorageStm() string
|
InsertStorageStm() string
|
||||||
InsertIPLDStm() string
|
InsertIPLDStm() string
|
||||||
InsertIPLDsStm() string
|
InsertIPLDsStm() string
|
||||||
|
|
||||||
// Table/column descriptions for use with CopyFrom and similar commands.
|
|
||||||
LogTableName() []string
|
|
||||||
LogColumnNames() []string
|
|
||||||
RctTableName() []string
|
|
||||||
RctColumnNames() []string
|
|
||||||
StateTableName() []string
|
|
||||||
StateColumnNames() []string
|
|
||||||
StorageTableName() []string
|
|
||||||
StorageColumnNames() []string
|
|
||||||
TxTableName() []string
|
|
||||||
TxColumnNames() []string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tx interface to accommodate different concrete SQL transaction types
|
// Tx interface to accommodate different concrete SQL transaction types
|
||||||
|
@ -107,43 +107,3 @@ func (db *DB) InsertIPLDStm() string {
|
|||||||
func (db *DB) InsertIPLDsStm() string {
|
func (db *DB) InsertIPLDsStm() string {
|
||||||
return `INSERT INTO ipld.blocks (block_number, key, data) VALUES (unnest($1::BIGINT[]), unnest($2::TEXT[]), unnest($3::BYTEA[])) ON CONFLICT DO NOTHING`
|
return `INSERT INTO ipld.blocks (block_number, key, data) VALUES (unnest($1::BIGINT[]), unnest($2::TEXT[]), unnest($3::BYTEA[])) ON CONFLICT DO NOTHING`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) LogTableName() []string {
|
|
||||||
return []string{"eth", "log_cids"}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *DB) LogColumnNames() []string {
|
|
||||||
return []string{"block_number", "header_id", "cid", "rct_id", "address", "index", "topic0", "topic1", "topic2", "topic3"}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *DB) RctTableName() []string {
|
|
||||||
return []string{"eth", "receipt_cids"}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *DB) RctColumnNames() []string {
|
|
||||||
return []string{"block_number", "header_id", "tx_id", "cid", "contract", "post_state", "post_status"}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *DB) StateTableName() []string {
|
|
||||||
return []string{"eth", "state_cids"}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *DB) StateColumnNames() []string {
|
|
||||||
return []string{"block_number", "header_id", "state_leaf_key", "cid", "diff", "balance", "nonce", "code_hash", "storage_root", "removed"}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *DB) StorageTableName() []string {
|
|
||||||
return []string{"eth", "storage_cids"}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *DB) StorageColumnNames() []string {
|
|
||||||
return []string{"block_number", "header_id", "state_leaf_key", "storage_leaf_key", "cid", "diff", "val", "removed"}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *DB) TxTableName() []string {
|
|
||||||
return []string{"eth", "transaction_cids"}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *DB) TxColumnNames() []string {
|
|
||||||
return []string{"block_number", "header_id", "tx_hash", "cid", "dst", "src", "index", "tx_type", "value"}
|
|
||||||
}
|
|
||||||
|
@ -36,12 +36,6 @@ var (
|
|||||||
ctx = context.Background()
|
ctx = context.Background()
|
||||||
)
|
)
|
||||||
|
|
||||||
func expectContainsSubstring(t *testing.T, full string, sub string) {
|
|
||||||
if !strings.Contains(full, sub) {
|
|
||||||
t.Fatalf("Expected \"%v\" to contain substring \"%v\"\n", full, sub)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPostgresPGX(t *testing.T) {
|
func TestPostgresPGX(t *testing.T) {
|
||||||
t.Run("connects to the sql", func(t *testing.T) {
|
t.Run("connects to the sql", func(t *testing.T) {
|
||||||
dbPool, err := pgxpool.ConnectConfig(context.Background(), pgxConfig)
|
dbPool, err := pgxpool.ConnectConfig(context.Background(), pgxConfig)
|
||||||
@ -105,7 +99,7 @@ func TestPostgresPGX(t *testing.T) {
|
|||||||
t.Fatal("Expected an error")
|
t.Fatal("Expected an error")
|
||||||
}
|
}
|
||||||
|
|
||||||
expectContainsSubstring(t, err.Error(), postgres.DbConnectionFailedMsg)
|
require.Contains(t, err.Error(), postgres.DbConnectionFailedMsg)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("throws error when can't create node", func(t *testing.T) {
|
t.Run("throws error when can't create node", func(t *testing.T) {
|
||||||
@ -117,6 +111,6 @@ func TestPostgresPGX(t *testing.T) {
|
|||||||
t.Fatal("Expected an error")
|
t.Fatal("Expected an error")
|
||||||
}
|
}
|
||||||
|
|
||||||
expectContainsSubstring(t, err.Error(), postgres.SettingNodeFailedMsg)
|
require.Contains(t, err.Error(), postgres.SettingNodeFailedMsg)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -102,7 +102,7 @@ func TestPostgresSQLX(t *testing.T) {
|
|||||||
t.Fatal("Expected an error")
|
t.Fatal("Expected an error")
|
||||||
}
|
}
|
||||||
|
|
||||||
expectContainsSubstring(t, err.Error(), postgres.DbConnectionFailedMsg)
|
require.Contains(t, err.Error(), postgres.DbConnectionFailedMsg)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("throws error when can't create node", func(t *testing.T) {
|
t.Run("throws error when can't create node", func(t *testing.T) {
|
||||||
@ -114,6 +114,6 @@ func TestPostgresSQLX(t *testing.T) {
|
|||||||
t.Fatal("Expected an error")
|
t.Fatal("Expected an error")
|
||||||
}
|
}
|
||||||
|
|
||||||
expectContainsSubstring(t, err.Error(), postgres.SettingNodeFailedMsg)
|
require.Contains(t, err.Error(), postgres.SettingNodeFailedMsg)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -29,6 +29,7 @@ import (
|
|||||||
"github.com/cerc-io/plugeth-statediff/indexer/database/metrics"
|
"github.com/cerc-io/plugeth-statediff/indexer/database/metrics"
|
||||||
"github.com/cerc-io/plugeth-statediff/indexer/interfaces"
|
"github.com/cerc-io/plugeth-statediff/indexer/interfaces"
|
||||||
"github.com/cerc-io/plugeth-statediff/indexer/models"
|
"github.com/cerc-io/plugeth-statediff/indexer/models"
|
||||||
|
"github.com/cerc-io/plugeth-statediff/indexer/shared/schema"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Writer handles processing and writing of indexed IPLD objects to Postgres
|
// Writer handles processing and writing of indexed IPLD objects to Postgres
|
||||||
@ -65,7 +66,8 @@ func (w *Writer) detectGaps(beginBlockNumber uint64, endBlockNumber uint64) ([]*
|
|||||||
// pgx misdetects the parameter OIDs and selects int8, which can overflow.
|
// pgx misdetects the parameter OIDs and selects int8, which can overflow.
|
||||||
// unfortunately there is no good place to override it, so it is safer to pass the uint64s as text
|
// unfortunately there is no good place to override it, so it is safer to pass the uint64s as text
|
||||||
// and let PG handle the cast
|
// and let PG handle the cast
|
||||||
err := w.db.Select(w.db.Context(), &gaps, w.db.DetectGapsStm(), strconv.FormatUint(beginBlockNumber, 10), strconv.FormatUint(endBlockNumber, 10))
|
err := w.db.Select(w.db.Context(), &gaps, w.db.DetectGapsStm(),
|
||||||
|
strconv.FormatUint(beginBlockNumber, 10), strconv.FormatUint(endBlockNumber, 10))
|
||||||
return gaps, err
|
return gaps, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -177,7 +179,8 @@ func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error {
|
|||||||
return insertError{"eth.transaction_cids", err, "COPY", transaction}
|
return insertError{"eth.transaction_cids", err, "COPY", transaction}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.CopyFrom(w.db.Context(), w.db.TxTableName(), w.db.TxColumnNames(),
|
_, err = tx.CopyFrom(w.db.Context(),
|
||||||
|
schema.TableTransaction.TableName(), schema.TableTransaction.ColumnNames(),
|
||||||
toRows(toRow(blockNum, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst,
|
toRows(toRow(blockNum, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst,
|
||||||
transaction.Src, transaction.Index, int(transaction.Type), value)))
|
transaction.Src, transaction.Index, int(transaction.Type), value)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -213,7 +216,7 @@ func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error {
|
|||||||
return insertError{"eth.receipt_cids", err, "COPY", rct}
|
return insertError{"eth.receipt_cids", err, "COPY", rct}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.CopyFrom(w.db.Context(), w.db.RctTableName(), w.db.RctColumnNames(),
|
_, err = tx.CopyFrom(w.db.Context(), schema.TableReceipt.TableName(), schema.TableReceipt.ColumnNames(),
|
||||||
toRows(toRow(blockNum, rct.HeaderID, rct.TxID, rct.CID, rct.Contract,
|
toRows(toRow(blockNum, rct.HeaderID, rct.TxID, rct.CID, rct.Contract,
|
||||||
rct.PostState, int(rct.PostStatus))))
|
rct.PostState, int(rct.PostStatus))))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -253,7 +256,7 @@ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error {
|
|||||||
log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3))
|
log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3))
|
||||||
}
|
}
|
||||||
if nil != rows && len(rows) >= 0 {
|
if nil != rows && len(rows) >= 0 {
|
||||||
_, err := tx.CopyFrom(w.db.Context(), w.db.LogTableName(), w.db.LogColumnNames(), rows)
|
_, err := tx.CopyFrom(w.db.Context(), schema.TableLog.TableName(), schema.TableLog.ColumnNames(), rows)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return insertError{"eth.log_cids", err, "COPY", rows}
|
return insertError{"eth.log_cids", err, "COPY", rows}
|
||||||
}
|
}
|
||||||
@ -302,7 +305,8 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error {
|
|||||||
return insertError{"eth.state_cids", err, "COPY", stateNode}
|
return insertError{"eth.state_cids", err, "COPY", stateNode}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.CopyFrom(w.db.Context(), w.db.StateTableName(), w.db.StateColumnNames(),
|
_, err = tx.CopyFrom(w.db.Context(),
|
||||||
|
schema.TableStateNode.TableName(), schema.TableStateNode.ColumnNames(),
|
||||||
toRows(toRow(blockNum, stateNode.HeaderID, stateNode.StateKey, stateNode.CID,
|
toRows(toRow(blockNum, stateNode.HeaderID, stateNode.StateKey, stateNode.CID,
|
||||||
true, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed)))
|
true, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -339,7 +343,8 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err
|
|||||||
return insertError{"eth.storage_cids", err, "COPY", storageCID}
|
return insertError{"eth.storage_cids", err, "COPY", storageCID}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.CopyFrom(w.db.Context(), w.db.StorageTableName(), w.db.StorageColumnNames(),
|
_, err = tx.CopyFrom(w.db.Context(),
|
||||||
|
schema.TableStorageNode.TableName(), schema.TableStorageNode.ColumnNames(),
|
||||||
toRows(toRow(blockNum, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID,
|
toRows(toRow(blockNum, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID,
|
||||||
true, storageCID.Value, storageCID.Removed)))
|
true, storageCID.Value, storageCID.Removed)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -45,6 +45,7 @@ type Column struct {
|
|||||||
Type colType
|
Type colType
|
||||||
Array bool
|
Array bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type Table struct {
|
type Table struct {
|
||||||
Name string
|
Name string
|
||||||
Columns []Column
|
Columns []Column
|
||||||
@ -117,6 +118,20 @@ func (tbl *Table) ToInsertStatement(upsert bool) string {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TableName returns a pgx-compatible table name.
|
||||||
|
func (tbl *Table) TableName() []string {
|
||||||
|
return strings.Split(tbl.Name, ".")
|
||||||
|
}
|
||||||
|
|
||||||
|
// ColumnNames returns the ordered list of column names.
|
||||||
|
func (tbl *Table) ColumnNames() []string {
|
||||||
|
var names []string
|
||||||
|
for _, col := range tbl.Columns {
|
||||||
|
names = append(names, col.Name)
|
||||||
|
}
|
||||||
|
return names
|
||||||
|
}
|
||||||
|
|
||||||
func sprintf(f string) colfmt {
|
func sprintf(f string) colfmt {
|
||||||
return func(x interface{}) string { return fmt.Sprintf(f, x) }
|
return func(x interface{}) string { return fmt.Sprintf(f, x) }
|
||||||
}
|
}
|
||||||
|
@ -46,7 +46,11 @@ func init() {
|
|||||||
)
|
)
|
||||||
Flags.UintVar(&config.NumWorkers,
|
Flags.UintVar(&config.NumWorkers,
|
||||||
"statediff.workers", 1,
|
"statediff.workers", 1,
|
||||||
"Number of concurrent workers to use during statediff processing (default 1)",
|
"Number of concurrent workers to dispatch to during statediff processing",
|
||||||
|
)
|
||||||
|
Flags.UintVar(&config.SubtrieWorkers,
|
||||||
|
"statediff.subtries", 1,
|
||||||
|
"Number of subtries to iterate in parallel",
|
||||||
)
|
)
|
||||||
Flags.BoolVar(&config.WaitForSync,
|
Flags.BoolVar(&config.WaitForSync,
|
||||||
"statediff.waitforsync", false,
|
"statediff.waitforsync", false,
|
||||||
|
@ -566,7 +566,9 @@ func TestBuilderOnMainnetBlocks(t *testing.T) {
|
|||||||
},
|
},
|
||||||
StorageDiff: emptyStorage,
|
StorageDiff: emptyStorage,
|
||||||
},
|
},
|
||||||
{ // this is the new account created due to the coinbase mining a block, it's creation shouldn't affect 0x 0e 05 07
|
{
|
||||||
|
// this is the new account created due to the coinbase mining a block, its
|
||||||
|
// creation shouldn't affect 0x 0e 05 07
|
||||||
Removed: false,
|
Removed: false,
|
||||||
AccountWrapper: sdtypes.AccountWrapper{
|
AccountWrapper: sdtypes.AccountWrapper{
|
||||||
Account: block3CoinbaseAccount,
|
Account: block3CoinbaseAccount,
|
||||||
@ -622,11 +624,10 @@ func TestBuilderOnMainnetBlocks(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
test_helpers.RunBuilderTests(t,
|
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
|
||||||
chain.StateCache(),
|
test_helpers.CheckedRoots{
|
||||||
tests, params, test_helpers.CheckedRoots{
|
block1: block1RootBranchNode,
|
||||||
block1: block1RootBranchNode,
|
block2: block2RootBranchNode,
|
||||||
block2: block2RootBranchNode,
|
block3: block3RootBranchNode,
|
||||||
block3: block3RootBranchNode,
|
}.Check(t)
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
22
service.go
22
service.go
@ -166,10 +166,12 @@ func NewService(cfg Config, blockChain BlockChain, backend plugeth.Backend, inde
|
|||||||
workers = 1
|
workers = 1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
builder := NewBuilder(blockChain.StateCache())
|
||||||
|
builder.SetSubtrieWorkers(cfg.SubtrieWorkers)
|
||||||
quitCh := make(chan bool)
|
quitCh := make(chan bool)
|
||||||
sds := &Service{
|
sds := &Service{
|
||||||
BlockChain: blockChain,
|
BlockChain: blockChain,
|
||||||
Builder: NewBuilder(blockChain.StateCache()),
|
Builder: builder,
|
||||||
QuitChan: quitCh,
|
QuitChan: quitCh,
|
||||||
Subscriptions: make(map[common.Hash]map[SubID]Subscription),
|
Subscriptions: make(map[common.Hash]map[SubID]Subscription),
|
||||||
SubscriptionTypes: make(map[common.Hash]Params),
|
SubscriptionTypes: make(map[common.Hash]Params),
|
||||||
@ -785,6 +787,10 @@ func (sds *Service) claimExclusiveAccess(block *types.Block) (bool, func()) {
|
|||||||
|
|
||||||
// Writes a state diff from the current block, parent state root, and provided params
|
// Writes a state diff from the current block, parent state root, and provided params
|
||||||
func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error {
|
func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error {
|
||||||
|
if sds.indexer == nil {
|
||||||
|
return fmt.Errorf("indexer is not set; cannot write indexed diffs")
|
||||||
|
}
|
||||||
|
|
||||||
log := log.New("hash", block.Hash(), "number", block.Number())
|
log := log.New("hash", block.Hash(), "number", block.Number())
|
||||||
if granted, relinquish := sds.claimExclusiveAccess(block); granted {
|
if granted, relinquish := sds.claimExclusiveAccess(block); granted {
|
||||||
defer relinquish()
|
defer relinquish()
|
||||||
@ -804,9 +810,6 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
|
|||||||
|
|
||||||
start := countStateDiffBegin(block, log)
|
start := countStateDiffBegin(block, log)
|
||||||
defer countStateDiffEnd(start, log, &err)
|
defer countStateDiffEnd(start, log, &err)
|
||||||
if sds.indexer == nil {
|
|
||||||
return fmt.Errorf("indexer is not set; cannot write indexed diffs")
|
|
||||||
}
|
|
||||||
|
|
||||||
if params.IncludeTD {
|
if params.IncludeTD {
|
||||||
totalDifficulty = sds.BlockChain.GetTd(block.Hash(), block.NumberU64())
|
totalDifficulty = sds.BlockChain.GetTd(block.Hash(), block.NumberU64())
|
||||||
@ -819,14 +822,17 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var nodeMtx, ipldMtx sync.Mutex
|
||||||
nodeSink := func(node types2.StateLeafNode) error {
|
nodeSink := func(node types2.StateLeafNode) error {
|
||||||
defer metrics.ReportAndUpdateDuration("statediff output", time.Now(), log,
|
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.OutputTimer)
|
||||||
metrics.IndexerMetrics.OutputTimer)
|
nodeMtx.Lock()
|
||||||
|
defer nodeMtx.Unlock()
|
||||||
return sds.indexer.PushStateNode(tx, node, block.Hash().String())
|
return sds.indexer.PushStateNode(tx, node, block.Hash().String())
|
||||||
}
|
}
|
||||||
ipldSink := func(c types2.IPLD) error {
|
ipldSink := func(c types2.IPLD) error {
|
||||||
defer metrics.ReportAndUpdateDuration("statediff ipldOutput", time.Now(), log,
|
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.IPLDOutputTimer)
|
||||||
metrics.IndexerMetrics.IPLDOutputTimer)
|
ipldMtx.Lock()
|
||||||
|
defer ipldMtx.Unlock()
|
||||||
return sds.indexer.PushIPLD(tx, c)
|
return sds.indexer.PushIPLD(tx, c)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2,12 +2,15 @@ package test_helpers
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"math/big"
|
||||||
"sort"
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
statediff "github.com/cerc-io/plugeth-statediff"
|
statediff "github.com/cerc-io/plugeth-statediff"
|
||||||
"github.com/cerc-io/plugeth-statediff/adapt"
|
"github.com/cerc-io/plugeth-statediff/adapt"
|
||||||
sdtypes "github.com/cerc-io/plugeth-statediff/types"
|
sdtypes "github.com/cerc-io/plugeth-statediff/types"
|
||||||
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core/state"
|
"github.com/ethereum/go-ethereum/core/state"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/crypto"
|
"github.com/ethereum/go-ethereum/crypto"
|
||||||
@ -20,28 +23,42 @@ type TestCase struct {
|
|||||||
Expected *sdtypes.StateObject
|
Expected *sdtypes.StateObject
|
||||||
}
|
}
|
||||||
|
|
||||||
type CheckedRoots = map[*types.Block][]byte
|
type CheckedRoots map[*types.Block][]byte
|
||||||
|
|
||||||
|
// Replicates the statediff object, but indexes nodes by CID
|
||||||
|
type normalizedStateDiff struct {
|
||||||
|
BlockNumber *big.Int
|
||||||
|
BlockHash common.Hash
|
||||||
|
Nodes map[string]sdtypes.StateLeafNode
|
||||||
|
IPLDs map[string]sdtypes.IPLD
|
||||||
|
}
|
||||||
|
|
||||||
func RunBuilderTests(
|
func RunBuilderTests(
|
||||||
t *testing.T,
|
t *testing.T,
|
||||||
sdb state.Database,
|
sdb state.Database,
|
||||||
tests []TestCase,
|
tests []TestCase,
|
||||||
params statediff.Params,
|
params statediff.Params,
|
||||||
roots CheckedRoots,
|
subtrieCounts []uint,
|
||||||
) {
|
) {
|
||||||
builder := statediff.NewBuilder(adapt.GethStateView(sdb))
|
builder := statediff.NewBuilder(adapt.GethStateView(sdb))
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
t.Run(test.Name, func(t *testing.T) {
|
for _, subtries := range subtrieCounts {
|
||||||
diff, err := builder.BuildStateDiffObject(test.Args, params)
|
t.Run(fmt.Sprintf("%s with %d subtries", test.Name, subtries), func(t *testing.T) {
|
||||||
if err != nil {
|
builder.SetSubtrieWorkers(subtries)
|
||||||
t.Error(err)
|
diff, err := builder.BuildStateDiffObject(test.Args, params)
|
||||||
}
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
normalize(test.Expected)
|
}
|
||||||
normalize(&diff)
|
require.Equal(t,
|
||||||
require.Equal(t, *test.Expected, diff)
|
normalize(test.Expected),
|
||||||
})
|
normalize(&diff),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (roots CheckedRoots) Check(t *testing.T) {
|
||||||
// Let's also confirm that our root state nodes form the state root hash in the headers
|
// Let's also confirm that our root state nodes form the state root hash in the headers
|
||||||
for block, node := range roots {
|
for block, node := range roots {
|
||||||
require.Equal(t, block.Root(), crypto.Keccak256Hash(node),
|
require.Equal(t, block.Root(), crypto.Keccak256Hash(node),
|
||||||
@ -49,17 +66,13 @@ func RunBuilderTests(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sorts contained state nodes, storage nodes, and IPLDs
|
func normalize(diff *sdtypes.StateObject) normalizedStateDiff {
|
||||||
func normalize(diff *sdtypes.StateObject) {
|
norm := normalizedStateDiff{
|
||||||
sort.Slice(diff.IPLDs, func(i, j int) bool {
|
BlockNumber: diff.BlockNumber,
|
||||||
return diff.IPLDs[i].CID < diff.IPLDs[j].CID
|
BlockHash: diff.BlockHash,
|
||||||
})
|
Nodes: make(map[string]sdtypes.StateLeafNode),
|
||||||
sort.Slice(diff.Nodes, func(i, j int) bool {
|
IPLDs: make(map[string]sdtypes.IPLD),
|
||||||
return bytes.Compare(
|
}
|
||||||
diff.Nodes[i].AccountWrapper.LeafKey,
|
|
||||||
diff.Nodes[j].AccountWrapper.LeafKey,
|
|
||||||
) < 0
|
|
||||||
})
|
|
||||||
for _, node := range diff.Nodes {
|
for _, node := range diff.Nodes {
|
||||||
sort.Slice(node.StorageDiff, func(i, j int) bool {
|
sort.Slice(node.StorageDiff, func(i, j int) bool {
|
||||||
return bytes.Compare(
|
return bytes.Compare(
|
||||||
@ -67,5 +80,10 @@ func normalize(diff *sdtypes.StateObject) {
|
|||||||
node.StorageDiff[j].LeafKey,
|
node.StorageDiff[j].LeafKey,
|
||||||
) < 0
|
) < 0
|
||||||
})
|
})
|
||||||
|
norm.Nodes[node.AccountWrapper.CID] = node
|
||||||
}
|
}
|
||||||
|
for _, ipld := range diff.IPLDs {
|
||||||
|
norm.IPLDs[ipld.CID] = ipld
|
||||||
|
}
|
||||||
|
return norm
|
||||||
}
|
}
|
||||||
|
@ -58,6 +58,7 @@ func TestSymmetricDifferenceIterator(t *testing.T) {
|
|||||||
}
|
}
|
||||||
assert.Equal(t, 2, *count)
|
assert.Equal(t, 2, *count)
|
||||||
|
|
||||||
|
// TODO will fail until fixed https://github.com/ethereum/go-ethereum/pull/27838
|
||||||
trieb := trie.NewEmpty(db)
|
trieb := trie.NewEmpty(db)
|
||||||
di, count = utils.NewSymmetricDifferenceIterator(
|
di, count = utils.NewSymmetricDifferenceIterator(
|
||||||
triea.NodeIterator([]byte("jars")),
|
triea.NodeIterator([]byte("jars")),
|
||||||
|
Loading…
Reference in New Issue
Block a user