Concurrent statediff iteration #12

Merged
roysc merged 14 commits from use-concurrent-iterator into main 2023-09-22 08:44:38 +00:00
18 changed files with 216 additions and 188 deletions

View File

@ -21,14 +21,18 @@ package statediff
import (
"bytes"
"context"
"fmt"
"sync"
"time"
iterutils "github.com/cerc-io/eth-iterator-utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
"golang.org/x/sync/errgroup"
"github.com/cerc-io/plugeth-statediff/adapt"
"github.com/cerc-io/plugeth-statediff/indexer/database/metrics"
@ -44,6 +48,8 @@ var (
emptyContractRoot = crypto.Keccak256Hash(emptyNode)
nullCodeHash = crypto.Keccak256([]byte{})
zeroHash common.Hash
defaultSubtrieWorkers uint = 1
)
// Builder interface exposes the method for building a state diff between two blocks
@ -54,11 +60,8 @@ type Builder interface {
type StateDiffBuilder struct {
// state cache is safe for concurrent reads
stateCache adapt.StateView
}
type iterPair struct {
Older, Newer trie.NodeIterator
stateCache adapt.StateView
subtrieWorkers uint
}
type accountUpdate struct {
@ -74,19 +77,39 @@ func appender[T any](to *[]T) func(T) error {
}
}
// NewBuilder is used to create a statediff builder
func NewBuilder(stateCache adapt.StateView) Builder {
return &StateDiffBuilder{
stateCache: stateCache,
func syncedAppender[T any](to *[]T) func(T) error {
var mtx sync.Mutex
return func(a T) error {
mtx.Lock()
*to = append(*to, a)
mtx.Unlock()
return nil
}
}
// NewBuilder is used to create a statediff builder
func NewBuilder(stateCache adapt.StateView) *StateDiffBuilder {
return &StateDiffBuilder{
stateCache: stateCache,
subtrieWorkers: defaultSubtrieWorkers,
}
}
// SetSubtrieWorkers sets the number of disjoint subtries to divide among parallel workers.
// Passing 0 will reset this to the default value.
func (sdb *StateDiffBuilder) SetSubtrieWorkers(n uint) {
if n == 0 {
n = defaultSubtrieWorkers
}
sdb.subtrieWorkers = n
}
// BuildStateDiffObject builds a statediff object from two blocks and the provided parameters
func (sdb *StateDiffBuilder) BuildStateDiffObject(args Args, params Params) (sdtypes.StateObject, error) {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.BuildStateDiffObjectTimer)
var stateNodes []sdtypes.StateLeafNode
var iplds []sdtypes.IPLD
err := sdb.WriteStateDiff(args, params, appender(&stateNodes), appender(&iplds))
err := sdb.WriteStateDiff(args, params, syncedAppender(&stateNodes), syncedAppender(&iplds))
if err != nil {
return sdtypes.StateObject{}, err
}
@ -106,35 +129,39 @@ func (sdb *StateDiffBuilder) WriteStateDiff(
) error {
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.WriteStateDiffTimer)
// Load tries for old and new states
oldTrie, err := sdb.stateCache.OpenTrie(args.OldStateRoot)
triea, err := sdb.stateCache.OpenTrie(args.OldStateRoot)
if err != nil {
return fmt.Errorf("error opening old state trie: %w", err)
}
newTrie, err := sdb.stateCache.OpenTrie(args.NewStateRoot)
trieb, err := sdb.stateCache.OpenTrie(args.NewStateRoot)
if err != nil {
return fmt.Errorf("error opening new state trie: %w", err)
}
subitersA := iterutils.SubtrieIterators(triea.NodeIterator, uint(sdb.subtrieWorkers))
subitersB := iterutils.SubtrieIterators(trieb.NodeIterator, uint(sdb.subtrieWorkers))
iters := iterPair{
Older: oldTrie.NodeIterator(nil),
Newer: newTrie.NodeIterator(nil),
}
logger := log.New("hash", args.BlockHash, "number", args.BlockNumber)
err = sdb.processAccounts(
iters.Older, iters.Newer,
params.watchedAddressesLeafPaths,
nodeSink, ipldSink, logger)
if err != nil {
return fmt.Errorf("error collecting createdAndUpdatedNodes: %w", err)
// errgroup will cancel if any gr fails
g, ctx := errgroup.WithContext(context.Background())
for i := uint(0); i < sdb.subtrieWorkers; i++ {
func(subdiv uint) {
g.Go(func() error {
a, b := subitersA[subdiv], subitersB[subdiv]
return sdb.processAccounts(ctx,
a, b, params.watchedAddressesLeafPaths,
nodeSink, ipldSink, logger,
)
})
}(i)
}
return nil
return g.Wait()
}
// processAccounts processes account creations and deletions, and returns a set of updated
// existing accounts, indexed by leaf key.
func (sdb *StateDiffBuilder) processAccounts(a, b trie.NodeIterator,
watchedAddressesLeafPaths [][]byte,
func (sdb *StateDiffBuilder) processAccounts(
ctx context.Context,
a, b trie.NodeIterator, watchedAddressesLeafPaths [][]byte,
nodeSink sdtypes.StateNodeSink, ipldSink sdtypes.IPLDSink,
logger log.Logger,
) error {
@ -146,7 +173,14 @@ func (sdb *StateDiffBuilder) processAccounts(a, b trie.NodeIterator,
// Cache the RLP of the previous node. When we hit a value node this will be the parent blob.
var prevBlob []byte
it, itCount := utils.NewSymmetricDifferenceIterator(a, b)
prevBlob = it.NodeBlob()
for it.Next(true) {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// ignore node if it is not along paths of interest
if !isWatchedPathPrefix(watchedAddressesLeafPaths, it.Path()) {
continue
@ -205,6 +239,7 @@ func (sdb *StateDiffBuilder) processAccounts(a, b trie.NodeIterator,
if it.Hash() == zeroHash {
continue
}
// TODO - this can be handled when value node is (craeted?)
nodeVal := make([]byte, len(it.NodeBlob()))
copy(nodeVal, it.NodeBlob())
// if doing a selective diff, we need to ensure this is a watched path

View File

@ -795,13 +795,13 @@ func TestBuilder(t *testing.T) {
},
}
test_helpers.RunBuilderTests(t, chain.StateCache(),
tests, params, test_helpers.CheckedRoots{
block0: bankAccountAtBlock0LeafNode,
block1: block1BranchRootNode,
block2: block2BranchRootNode,
block3: block3BranchRootNode,
})
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
test_helpers.CheckedRoots{
block0: bankAccountAtBlock0LeafNode,
block1: block1BranchRootNode,
block2: block2BranchRootNode,
block3: block3BranchRootNode,
}.Check(t)
}
func TestBuilderWithWatchedAddressList(t *testing.T) {
@ -1009,12 +1009,13 @@ func TestBuilderWithWatchedAddressList(t *testing.T) {
},
}
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
test_helpers.CheckedRoots{
block0: bankAccountAtBlock0LeafNode,
block1: block1BranchRootNode,
block2: block2BranchRootNode,
block3: block3BranchRootNode,
})
}.Check(t)
}
func TestBuilderWithRemovedAccountAndStorage(t *testing.T) {
@ -1259,11 +1260,12 @@ func TestBuilderWithRemovedAccountAndStorage(t *testing.T) {
},
}
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
test_helpers.CheckedRoots{
block4: block4BranchRootNode,
block5: block5BranchRootNode,
block6: block6BranchRootNode,
})
}.Check(t)
}
func TestBuilderWithRemovedNonWatchedAccount(t *testing.T) {
@ -1393,11 +1395,12 @@ func TestBuilderWithRemovedNonWatchedAccount(t *testing.T) {
},
}
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
test_helpers.CheckedRoots{
block4: block4BranchRootNode,
block5: block5BranchRootNode,
block6: block6BranchRootNode,
})
}.Check(t)
}
func TestBuilderWithRemovedWatchedAccount(t *testing.T) {
@ -1596,11 +1599,12 @@ func TestBuilderWithRemovedWatchedAccount(t *testing.T) {
},
}
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
test_helpers.CheckedRoots{
block4: block4BranchRootNode,
block5: block5BranchRootNode,
block6: block6BranchRootNode,
})
}.Check(t)
}
var (
@ -1823,10 +1827,11 @@ func TestBuilderWithMovedAccount(t *testing.T) {
},
}
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
test_helpers.CheckedRoots{
block1: block01BranchRootNode,
block2: bankAccountAtBlock02LeafNode,
})
}.Check(t)
}
/*
@ -2349,11 +2354,12 @@ func TestBuilderWithInternalizedLeafNode(t *testing.T) {
},
}
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
test_helpers.CheckedRoots{
block1: block1bBranchRootNode,
block2: block2bBranchRootNode,
block3: block3bBranchRootNode,
})
}.Check(t)
}
func TestBuilderWithInternalizedLeafNodeAndWatchedAddress(t *testing.T) {
@ -2550,11 +2556,12 @@ func TestBuilderWithInternalizedLeafNodeAndWatchedAddress(t *testing.T) {
},
}
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, test_helpers.CheckedRoots{
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
test_helpers.CheckedRoots{
block1: block1bBranchRootNode,
block2: block2bBranchRootNode,
block3: block3bBranchRootNode,
})
}.Check(t)
}
/*

View File

@ -44,6 +44,8 @@ type Config struct {
BackfillCheckPastBlocks uint64
// Size of the worker pool
NumWorkers uint
// Number of subtries to iterate in parallel
SubtrieWorkers uint
// Should the statediff service wait until geth has synced to the head of the blockchain?
WaitForSync bool
// Context used during DB initialization

8
go.mod
View File

@ -3,6 +3,7 @@ module github.com/cerc-io/plugeth-statediff
go 1.19
require (
github.com/cerc-io/eth-iterator-utils v0.1.1
github.com/cerc-io/eth-testing v0.2.1
github.com/ethereum/go-ethereum v1.11.6
github.com/georgysavva/scany v0.2.9
@ -18,8 +19,9 @@ require (
github.com/openrelayxyz/plugeth-utils v1.2.0
github.com/pganalyze/pg_query_go/v4 v4.2.1
github.com/shopspring/decimal v1.2.0
github.com/stretchr/testify v1.8.1
github.com/thoas/go-funk v0.9.2
github.com/stretchr/testify v1.8.2
github.com/thoas/go-funk v0.9.3
golang.org/x/sync v0.1.0
)
require (
@ -109,7 +111,6 @@ require (
golang.org/x/crypto v0.1.0 // indirect
golang.org/x/exp v0.0.0-20230206171751-46f607a40771 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/term v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
@ -123,6 +124,7 @@ require (
)
replace (
github.com/cerc-io/eth-iterator-utils => git.vdb.to/cerc-io/eth-iterator-utils v0.1.1
github.com/cerc-io/eth-testing => git.vdb.to/cerc-io/eth-testing v0.2.1
github.com/ethereum/go-ethereum => git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1
github.com/openrelayxyz/plugeth-utils => git.vdb.to/cerc-io/plugeth-utils v0.0.0-20230706160122-cd41de354c46

10
go.sum
View File

@ -1,4 +1,6 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
git.vdb.to/cerc-io/eth-iterator-utils v0.1.1 h1:AGen4U2GaYJVzPjEo3U+GPczSfOUEMkM1nWTM+cq5Dk=
git.vdb.to/cerc-io/eth-iterator-utils v0.1.1/go.mod h1:uiocO9elfDe78kd3c/VZ2in26V+gyXJuN+sdTxK4Xag=
git.vdb.to/cerc-io/eth-testing v0.2.1 h1:IZAX7DVgzPkSmu1xdKZ5aOemdEYbvtgae7GUl/TUNtQ=
git.vdb.to/cerc-io/eth-testing v0.2.1/go.mod h1:qdvpc/W1xvf2MKx3rMOqvFvYaYIHG77Z1g0lwsmw0Uk=
git.vdb.to/cerc-io/plugeth v0.0.0-20230808125822-691dc334fab1 h1:KLjxHwp9Zp7xhECccmJS00RiL+VwTuUGLU7qeIctg8g=
@ -493,12 +495,12 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY=
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc=
github.com/thoas/go-funk v0.9.2 h1:oKlNYv0AY5nyf9g+/GhMgS/UO2ces0QRdPKwkhY3VCk=
github.com/thoas/go-funk v0.9.2/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q=
github.com/thoas/go-funk v0.9.3 h1:7+nAEx3kn5ZJcnDm2Bh23N2yOtweO14bi//dvRtgLpw=
github.com/thoas/go-funk v0.9.3/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q=
github.com/tklauser/go-sysconf v0.3.5 h1:uu3Xl4nkLzQfXNsWn15rPc/HQCJKObbt1dKJeWp3vU4=
github.com/tklauser/go-sysconf v0.3.5/go.mod h1:MkWzOF4RMCshBAMXuhXJs64Rte09mITnppBXY/rYEFI=
github.com/tklauser/numcpus v0.2.2 h1:oyhllyrScuYI6g+h/zUvNXNp1wy7x8qQy3t/piefldA=

View File

@ -57,7 +57,7 @@ type StateDiffIndexer struct {
chainConfig *params.ChainConfig
nodeID string
wg *sync.WaitGroup
removedCacheFlag *uint32
removedCacheFlag uint32
}
// NewStateDiffIndexer creates a void implementation of interfaces.StateDiffIndexer
@ -130,7 +130,6 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(time.Duration, <-chan bool) {}
// PushBlock pushes and indexes block data in sql, except state & storage nodes (includes header, uncles, transactions & receipts)
// Returns an initiated DB transaction which must be Closed via defer to commit or rollback
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
sdi.removedCacheFlag = new(uint32)
start, t := time.Now(), time.Now()
blockHash := block.Hash()
blockHashStr := blockHash.String()
@ -223,11 +222,6 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
func (sdi *StateDiffIndexer) processHeader(header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) string {
sdi.fileWriter.upsertIPLDNode(header.Number.String(), headerNode)
var baseFee *string
if header.BaseFee != nil {
baseFee = new(string)
*baseFee = header.BaseFee.String()
}
headerID := header.Hash().String()
sdi.fileWriter.upsertHeaderCID(models.HeaderModel{
NodeIDs: pq.StringArray([]string{sdi.nodeID}),
@ -388,8 +382,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// publish the state node
var stateModel models.StateNodeModel
if stateNode.Removed {
if atomic.LoadUint32(sdi.removedCacheFlag) == 0 {
atomic.StoreUint32(sdi.removedCacheFlag, 1)
if atomic.LoadUint32(&sdi.removedCacheFlag) == 0 {
atomic.StoreUint32(&sdi.removedCacheFlag, 1)
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeStateCID, []byte{})
}
stateModel = models.StateNodeModel{
@ -419,8 +413,8 @@ func (sdi *StateDiffIndexer) PushStateNode(batch interfaces.Batch, stateNode sdt
// if there are any storage nodes associated with this node, publish and index them
for _, storageNode := range stateNode.StorageDiff {
if storageNode.Removed {
if atomic.LoadUint32(sdi.removedCacheFlag) == 0 {
atomic.StoreUint32(sdi.removedCacheFlag, 1)
if atomic.LoadUint32(&sdi.removedCacheFlag) == 0 {
atomic.StoreUint32(&sdi.removedCacheFlag, 1)
sdi.fileWriter.upsertIPLDDirect(tx.BlockNumber, shared.RemovedNodeStorageCID, []byte{})
}
storageModel := models.StorageNodeModel{

View File

@ -84,9 +84,8 @@ func (sdi *StateDiffIndexer) ReportDBMetrics(delay time.Duration, quit <-chan bo
func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receipts, totalDifficulty *big.Int) (interfaces.Batch, error) {
start, t := time.Now(), time.Now()
blockHash := block.Hash()
blockHashStr := blockHash.String()
height := block.NumberU64()
traceMsg := fmt.Sprintf("indexer stats for statediff at %d with hash %s:\r\n", height, blockHashStr)
traceMsg := fmt.Sprintf("indexer stats for statediff at %d with hash %s:\r\n", height, blockHash)
transactions := block.Transactions()
// Derive any missing fields
if err := receipts.DeriveFields(sdi.chainConfig, blockHash, height, block.BaseFee(), transactions); err != nil {
@ -155,20 +154,20 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
} else {
tDiff := time.Since(t)
metrics2.IndexerMetrics.StateStoreCodeProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff.String())
traceMsg += fmt.Sprintf("state, storage, and code storage processing time: %s\r\n", tDiff)
t = time.Now()
if err := self.flush(); err != nil {
rollback(sdi.ctx, tx)
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start))
log.Debug(traceMsg)
return err
}
err = tx.Commit(sdi.ctx)
tDiff = time.Since(t)
metrics2.IndexerMetrics.PostgresCommitTimer.Update(tDiff)
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff.String())
traceMsg += fmt.Sprintf("postgres transaction commit duration: %s\r\n", tDiff)
}
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start).String())
traceMsg += fmt.Sprintf(" TOTAL PROCESSING DURATION: %s\r\n", time.Since(start))
log.Debug(traceMsg)
return err
},
@ -178,7 +177,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
tDiff := time.Since(t)
metrics2.IndexerMetrics.FreePostgresTimer.Update(tDiff)
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff.String())
traceMsg += fmt.Sprintf("time spent waiting for free postgres tx: %s:\r\n", tDiff)
t = time.Now()
// Publish and index header, collect headerID
@ -189,7 +188,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
}
tDiff = time.Since(t)
metrics2.IndexerMetrics.HeaderProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff.String())
traceMsg += fmt.Sprintf("header processing time: %s\r\n", tDiff)
t = time.Now()
// Publish and index uncles
err = sdi.processUncles(blockTx, headerID, block.Number(), block.UncleHash(), block.Uncles())
@ -198,7 +197,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
}
tDiff = time.Since(t)
metrics2.IndexerMetrics.UncleProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff.String())
traceMsg += fmt.Sprintf("uncle processing time: %s\r\n", tDiff)
t = time.Now()
// Publish and index receipts and txs
err = sdi.processReceiptsAndTxs(blockTx, processArgs{
@ -215,7 +214,7 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
}
tDiff = time.Since(t)
metrics2.IndexerMetrics.TxAndRecProcessingTimer.Update(tDiff)
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff.String())
traceMsg += fmt.Sprintf("tx and receipt processing time: %s\r\n", tDiff)
t = time.Now()
return blockTx, err
@ -236,11 +235,6 @@ func (sdi *StateDiffIndexer) DetectGaps(beginBlockNumber uint64, endBlockNumber
func (sdi *StateDiffIndexer) processHeader(tx *BatchTx, header *types.Header, headerNode ipld.IPLD, reward, td *big.Int) (string, error) {
tx.cacheIPLD(headerNode)
var baseFee *string
if header.BaseFee != nil {
baseFee = new(string)
*baseFee = header.BaseFee.String()
}
headerID := header.Hash().String()
// index header
return headerID, sdi.dbWriter.upsertHeaderCID(tx.dbtx, models.HeaderModel{

View File

@ -58,18 +58,6 @@ type Statements interface {
InsertStorageStm() string
InsertIPLDStm() string
InsertIPLDsStm() string
// Table/column descriptions for use with CopyFrom and similar commands.
LogTableName() []string
LogColumnNames() []string
RctTableName() []string
RctColumnNames() []string
StateTableName() []string
StateColumnNames() []string
StorageTableName() []string
StorageColumnNames() []string
TxTableName() []string
TxColumnNames() []string
}
// Tx interface to accommodate different concrete SQL transaction types

View File

@ -107,43 +107,3 @@ func (db *DB) InsertIPLDStm() string {
func (db *DB) InsertIPLDsStm() string {
return `INSERT INTO ipld.blocks (block_number, key, data) VALUES (unnest($1::BIGINT[]), unnest($2::TEXT[]), unnest($3::BYTEA[])) ON CONFLICT DO NOTHING`
}
func (db *DB) LogTableName() []string {
return []string{"eth", "log_cids"}
}
func (db *DB) LogColumnNames() []string {
return []string{"block_number", "header_id", "cid", "rct_id", "address", "index", "topic0", "topic1", "topic2", "topic3"}
}
func (db *DB) RctTableName() []string {
return []string{"eth", "receipt_cids"}
}
func (db *DB) RctColumnNames() []string {
return []string{"block_number", "header_id", "tx_id", "cid", "contract", "post_state", "post_status"}
}
func (db *DB) StateTableName() []string {
return []string{"eth", "state_cids"}
}
func (db *DB) StateColumnNames() []string {
return []string{"block_number", "header_id", "state_leaf_key", "cid", "diff", "balance", "nonce", "code_hash", "storage_root", "removed"}
}
func (db *DB) StorageTableName() []string {
return []string{"eth", "storage_cids"}
}
func (db *DB) StorageColumnNames() []string {
return []string{"block_number", "header_id", "state_leaf_key", "storage_leaf_key", "cid", "diff", "val", "removed"}
}
func (db *DB) TxTableName() []string {
return []string{"eth", "transaction_cids"}
}
func (db *DB) TxColumnNames() []string {
return []string{"block_number", "header_id", "tx_hash", "cid", "dst", "src", "index", "tx_type", "value"}
}

View File

@ -36,12 +36,6 @@ var (
ctx = context.Background()
)
func expectContainsSubstring(t *testing.T, full string, sub string) {
if !strings.Contains(full, sub) {
t.Fatalf("Expected \"%v\" to contain substring \"%v\"\n", full, sub)
}
}
func TestPostgresPGX(t *testing.T) {
t.Run("connects to the sql", func(t *testing.T) {
dbPool, err := pgxpool.ConnectConfig(context.Background(), pgxConfig)
@ -105,7 +99,7 @@ func TestPostgresPGX(t *testing.T) {
t.Fatal("Expected an error")
}
expectContainsSubstring(t, err.Error(), postgres.DbConnectionFailedMsg)
require.Contains(t, err.Error(), postgres.DbConnectionFailedMsg)
})
t.Run("throws error when can't create node", func(t *testing.T) {
@ -117,6 +111,6 @@ func TestPostgresPGX(t *testing.T) {
t.Fatal("Expected an error")
}
expectContainsSubstring(t, err.Error(), postgres.SettingNodeFailedMsg)
require.Contains(t, err.Error(), postgres.SettingNodeFailedMsg)
})
}

View File

@ -102,7 +102,7 @@ func TestPostgresSQLX(t *testing.T) {
t.Fatal("Expected an error")
}
expectContainsSubstring(t, err.Error(), postgres.DbConnectionFailedMsg)
require.Contains(t, err.Error(), postgres.DbConnectionFailedMsg)
})
t.Run("throws error when can't create node", func(t *testing.T) {
@ -114,6 +114,6 @@ func TestPostgresSQLX(t *testing.T) {
t.Fatal("Expected an error")
}
expectContainsSubstring(t, err.Error(), postgres.SettingNodeFailedMsg)
require.Contains(t, err.Error(), postgres.SettingNodeFailedMsg)
})
}

View File

@ -29,6 +29,7 @@ import (
"github.com/cerc-io/plugeth-statediff/indexer/database/metrics"
"github.com/cerc-io/plugeth-statediff/indexer/interfaces"
"github.com/cerc-io/plugeth-statediff/indexer/models"
"github.com/cerc-io/plugeth-statediff/indexer/shared/schema"
)
// Writer handles processing and writing of indexed IPLD objects to Postgres
@ -65,7 +66,8 @@ func (w *Writer) detectGaps(beginBlockNumber uint64, endBlockNumber uint64) ([]*
// pgx misdetects the parameter OIDs and selects int8, which can overflow.
// unfortunately there is no good place to override it, so it is safer to pass the uint64s as text
// and let PG handle the cast
err := w.db.Select(w.db.Context(), &gaps, w.db.DetectGapsStm(), strconv.FormatUint(beginBlockNumber, 10), strconv.FormatUint(endBlockNumber, 10))
err := w.db.Select(w.db.Context(), &gaps, w.db.DetectGapsStm(),
strconv.FormatUint(beginBlockNumber, 10), strconv.FormatUint(endBlockNumber, 10))
return gaps, err
}
@ -177,7 +179,8 @@ func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error {
return insertError{"eth.transaction_cids", err, "COPY", transaction}
}
_, err = tx.CopyFrom(w.db.Context(), w.db.TxTableName(), w.db.TxColumnNames(),
_, err = tx.CopyFrom(w.db.Context(),
schema.TableTransaction.TableName(), schema.TableTransaction.ColumnNames(),
toRows(toRow(blockNum, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst,
transaction.Src, transaction.Index, int(transaction.Type), value)))
if err != nil {
@ -213,7 +216,7 @@ func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error {
return insertError{"eth.receipt_cids", err, "COPY", rct}
}
_, err = tx.CopyFrom(w.db.Context(), w.db.RctTableName(), w.db.RctColumnNames(),
_, err = tx.CopyFrom(w.db.Context(), schema.TableReceipt.TableName(), schema.TableReceipt.ColumnNames(),
toRows(toRow(blockNum, rct.HeaderID, rct.TxID, rct.CID, rct.Contract,
rct.PostState, int(rct.PostStatus))))
if err != nil {
@ -253,7 +256,7 @@ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error {
log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3))
}
if nil != rows && len(rows) >= 0 {
_, err := tx.CopyFrom(w.db.Context(), w.db.LogTableName(), w.db.LogColumnNames(), rows)
_, err := tx.CopyFrom(w.db.Context(), schema.TableLog.TableName(), schema.TableLog.ColumnNames(), rows)
if err != nil {
return insertError{"eth.log_cids", err, "COPY", rows}
}
@ -302,7 +305,8 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error {
return insertError{"eth.state_cids", err, "COPY", stateNode}
}
_, err = tx.CopyFrom(w.db.Context(), w.db.StateTableName(), w.db.StateColumnNames(),
_, err = tx.CopyFrom(w.db.Context(),
schema.TableStateNode.TableName(), schema.TableStateNode.ColumnNames(),
toRows(toRow(blockNum, stateNode.HeaderID, stateNode.StateKey, stateNode.CID,
true, balance, stateNode.Nonce, stateNode.CodeHash, stateNode.StorageRoot, stateNode.Removed)))
if err != nil {
@ -339,7 +343,8 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err
return insertError{"eth.storage_cids", err, "COPY", storageCID}
}
_, err = tx.CopyFrom(w.db.Context(), w.db.StorageTableName(), w.db.StorageColumnNames(),
_, err = tx.CopyFrom(w.db.Context(),
schema.TableStorageNode.TableName(), schema.TableStorageNode.ColumnNames(),
toRows(toRow(blockNum, storageCID.HeaderID, storageCID.StateKey, storageCID.StorageKey, storageCID.CID,
true, storageCID.Value, storageCID.Removed)))
if err != nil {

View File

@ -45,6 +45,7 @@ type Column struct {
Type colType
Array bool
}
type Table struct {
Name string
Columns []Column
@ -117,6 +118,20 @@ func (tbl *Table) ToInsertStatement(upsert bool) string {
)
}
// TableName returns a pgx-compatible table name.
func (tbl *Table) TableName() []string {
return strings.Split(tbl.Name, ".")
}
// ColumnNames returns the ordered list of column names.
func (tbl *Table) ColumnNames() []string {
var names []string
for _, col := range tbl.Columns {
names = append(names, col.Name)
}
return names
}
func sprintf(f string) colfmt {
return func(x interface{}) string { return fmt.Sprintf(f, x) }
}

View File

@ -46,7 +46,11 @@ func init() {
)
Flags.UintVar(&config.NumWorkers,
"statediff.workers", 1,
"Number of concurrent workers to use during statediff processing (default 1)",
"Number of concurrent workers to dispatch to during statediff processing",
)
Flags.UintVar(&config.SubtrieWorkers,
"statediff.subtries", 1,
"Number of subtries to iterate in parallel",
)
Flags.BoolVar(&config.WaitForSync,
"statediff.waitforsync", false,

View File

@ -566,7 +566,9 @@ func TestBuilderOnMainnetBlocks(t *testing.T) {
},
StorageDiff: emptyStorage,
},
{ // this is the new account created due to the coinbase mining a block, it's creation shouldn't affect 0x 0e 05 07
{
// this is the new account created due to the coinbase mining a block, its
// creation shouldn't affect 0x 0e 05 07
Removed: false,
AccountWrapper: sdtypes.AccountWrapper{
Account: block3CoinbaseAccount,
@ -622,11 +624,10 @@ func TestBuilderOnMainnetBlocks(t *testing.T) {
},
}
test_helpers.RunBuilderTests(t,
chain.StateCache(),
tests, params, test_helpers.CheckedRoots{
block1: block1RootBranchNode,
block2: block2RootBranchNode,
block3: block3RootBranchNode,
})
test_helpers.RunBuilderTests(t, chain.StateCache(), tests, params, []uint{1, 8, 32})
test_helpers.CheckedRoots{
block1: block1RootBranchNode,
block2: block2RootBranchNode,
block3: block3RootBranchNode,
}.Check(t)
}

View File

@ -166,10 +166,12 @@ func NewService(cfg Config, blockChain BlockChain, backend plugeth.Backend, inde
workers = 1
}
builder := NewBuilder(blockChain.StateCache())
builder.SetSubtrieWorkers(cfg.SubtrieWorkers)
quitCh := make(chan bool)
sds := &Service{
BlockChain: blockChain,
Builder: NewBuilder(blockChain.StateCache()),
Builder: builder,
QuitChan: quitCh,
Subscriptions: make(map[common.Hash]map[SubID]Subscription),
SubscriptionTypes: make(map[common.Hash]Params),
@ -785,6 +787,10 @@ func (sds *Service) claimExclusiveAccess(block *types.Block) (bool, func()) {
// Writes a state diff from the current block, parent state root, and provided params
func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error {
if sds.indexer == nil {
return fmt.Errorf("indexer is not set; cannot write indexed diffs")
}
log := log.New("hash", block.Hash(), "number", block.Number())
if granted, relinquish := sds.claimExclusiveAccess(block); granted {
defer relinquish()
@ -804,9 +810,6 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
start := countStateDiffBegin(block, log)
defer countStateDiffEnd(start, log, &err)
if sds.indexer == nil {
return fmt.Errorf("indexer is not set; cannot write indexed diffs")
}
if params.IncludeTD {
totalDifficulty = sds.BlockChain.GetTd(block.Hash(), block.NumberU64())
@ -819,14 +822,17 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
return err
}
var nodeMtx, ipldMtx sync.Mutex
nodeSink := func(node types2.StateLeafNode) error {
defer metrics.ReportAndUpdateDuration("statediff output", time.Now(), log,
metrics.IndexerMetrics.OutputTimer)
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.OutputTimer)
nodeMtx.Lock()
defer nodeMtx.Unlock()
return sds.indexer.PushStateNode(tx, node, block.Hash().String())
}
ipldSink := func(c types2.IPLD) error {
defer metrics.ReportAndUpdateDuration("statediff ipldOutput", time.Now(), log,
metrics.IndexerMetrics.IPLDOutputTimer)
defer metrics.UpdateDuration(time.Now(), metrics.IndexerMetrics.IPLDOutputTimer)
ipldMtx.Lock()
defer ipldMtx.Unlock()
return sds.indexer.PushIPLD(tx, c)
}

View File

@ -2,12 +2,15 @@ package test_helpers
import (
"bytes"
"fmt"
"math/big"
"sort"
"testing"
statediff "github.com/cerc-io/plugeth-statediff"
"github.com/cerc-io/plugeth-statediff/adapt"
sdtypes "github.com/cerc-io/plugeth-statediff/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
@ -20,28 +23,42 @@ type TestCase struct {
Expected *sdtypes.StateObject
}
type CheckedRoots = map[*types.Block][]byte
type CheckedRoots map[*types.Block][]byte
// Replicates the statediff object, but indexes nodes by CID
type normalizedStateDiff struct {
BlockNumber *big.Int
BlockHash common.Hash
Nodes map[string]sdtypes.StateLeafNode
IPLDs map[string]sdtypes.IPLD
}
func RunBuilderTests(
t *testing.T,
sdb state.Database,
tests []TestCase,
params statediff.Params,
roots CheckedRoots,
subtrieCounts []uint,
) {
builder := statediff.NewBuilder(adapt.GethStateView(sdb))
for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
diff, err := builder.BuildStateDiffObject(test.Args, params)
if err != nil {
t.Error(err)
}
normalize(test.Expected)
normalize(&diff)
require.Equal(t, *test.Expected, diff)
})
for _, subtries := range subtrieCounts {
t.Run(fmt.Sprintf("%s with %d subtries", test.Name, subtries), func(t *testing.T) {
builder.SetSubtrieWorkers(subtries)
diff, err := builder.BuildStateDiffObject(test.Args, params)
if err != nil {
t.Error(err)
}
require.Equal(t,
normalize(test.Expected),
normalize(&diff),
)
})
}
}
}
func (roots CheckedRoots) Check(t *testing.T) {
// Let's also confirm that our root state nodes form the state root hash in the headers
for block, node := range roots {
require.Equal(t, block.Root(), crypto.Keccak256Hash(node),
@ -49,17 +66,13 @@ func RunBuilderTests(
}
}
// Sorts contained state nodes, storage nodes, and IPLDs
func normalize(diff *sdtypes.StateObject) {
sort.Slice(diff.IPLDs, func(i, j int) bool {
return diff.IPLDs[i].CID < diff.IPLDs[j].CID
})
sort.Slice(diff.Nodes, func(i, j int) bool {
return bytes.Compare(
diff.Nodes[i].AccountWrapper.LeafKey,
diff.Nodes[j].AccountWrapper.LeafKey,
) < 0
})
func normalize(diff *sdtypes.StateObject) normalizedStateDiff {
norm := normalizedStateDiff{
BlockNumber: diff.BlockNumber,
BlockHash: diff.BlockHash,
Nodes: make(map[string]sdtypes.StateLeafNode),
IPLDs: make(map[string]sdtypes.IPLD),
}
for _, node := range diff.Nodes {
sort.Slice(node.StorageDiff, func(i, j int) bool {
return bytes.Compare(
@ -67,5 +80,10 @@ func normalize(diff *sdtypes.StateObject) {
node.StorageDiff[j].LeafKey,
) < 0
})
norm.Nodes[node.AccountWrapper.CID] = node
}
for _, ipld := range diff.IPLDs {
norm.IPLDs[ipld.CID] = ipld
}
return norm
}

View File

@ -58,6 +58,7 @@ func TestSymmetricDifferenceIterator(t *testing.T) {
}
assert.Equal(t, 2, *count)
// TODO will fail until fixed https://github.com/ethereum/go-ethereum/pull/27838
trieb := trie.NewEmpty(db)
di, count = utils.NewSymmetricDifferenceIterator(
triea.NodeIterator([]byte("jars")),