Merge branch 'v1.10.26-statediff-v4' into port_iterator
This commit is contained in:
commit
eb482ee783
21
.github/workflows/manual_binary_publish.yaml
vendored
Normal file
21
.github/workflows/manual_binary_publish.yaml
vendored
Normal file
@ -0,0 +1,21 @@
|
||||
name: MANUAL Override Publish geth binary to release
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
giteaPublishTag:
|
||||
description: 'Package to publish TO on gitea; e.g. v1.10.25-statediff-4.2.1-alpha'
|
||||
required: true
|
||||
cercContainerTag:
|
||||
description: 'Tagged Container to extract geth binary FROM'
|
||||
required: true
|
||||
jobs:
|
||||
build:
|
||||
name: Manual override publish of geth binary FROM tagged release TO TAGGED package on git.vdb.to
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Copy ethereum binary file
|
||||
run: docker run --rm --entrypoint cat git.vdb.to/cerc-io/go-ethereum/go-ethereum:${{ github.event.inputs.cercContainerTag }} /usr/local/bin/geth > geth-linux-amd64
|
||||
- name: curl
|
||||
uses: enflo/curl-action@master
|
||||
with:
|
||||
curl: --user cerccicd:${{ secrets.GITEA_TOKEN }} --upload-file geth-linux-amd64 https://git.vdb.to/api/packages/cerc-io/generic/go-ethereum/${{ github.event.inputs.giteaPublishTag }}/geth-linux-amd64
|
21
.github/workflows/manual_publish.yaml
vendored
Normal file
21
.github/workflows/manual_publish.yaml
vendored
Normal file
@ -0,0 +1,21 @@
|
||||
name: MANUAL Override Publish geth binary to release
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
giteaPublishTag:
|
||||
description: 'Package to publish TO on gitea; e.g. v1.10.25-statediff-4.2.1-alpha'
|
||||
required: true
|
||||
cercContainerTag:
|
||||
description: 'Tagged Container to extract geth binary FROM'
|
||||
required: true
|
||||
jobs:
|
||||
build:
|
||||
name: Manual override publish of geth binary FROM tagged release TO TAGGED package on git.vdb.to
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Copy ethereum binary file
|
||||
run: docker run --rm --entrypoint cat git.vdb.to/cerc-io/go-ethereum/go-ethereum:${{ github.event.inputs.cercContainerTag }} /usr/local/bin/geth > geth-linux-amd64
|
||||
- name: curl
|
||||
uses: enflo/curl-action@master
|
||||
with:
|
||||
curl: --user circcicd:${{ secrets.GITEA_TOKEN }} --upload-file geth-linux-amd64 https://git.vdb.to/api/packages/cerc-io/generic/go-ethereum/${{ github.event.inputs.giteaPublishTag }}/geth-linux-amd64
|
55
.github/workflows/publish.yaml
vendored
55
.github/workflows/publish.yaml
vendored
@ -12,50 +12,25 @@ jobs:
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- name: Run docker build
|
||||
run: docker build -t vulcanize/go-ethereum -f Dockerfile .
|
||||
- name: Get the version
|
||||
id: vars
|
||||
run: echo ::set-output name=sha::$(echo ${GITHUB_SHA:0:7})
|
||||
- name: Tag docker image
|
||||
run: docker tag vulcanize/go-ethereum docker.pkg.github.com/vulcanize/go-ethereum/go-ethereum:${{steps.vars.outputs.sha}}
|
||||
- name: Docker Login
|
||||
run: echo ${{ secrets.GITHUB_TOKEN }} | docker login https://docker.pkg.github.com -u vulcanize --password-stdin
|
||||
- name: Docker Push
|
||||
run: docker push docker.pkg.github.com/vulcanize/go-ethereum/go-ethereum:${{steps.vars.outputs.sha}}
|
||||
push_to_registries:
|
||||
name: Publish assets to Release
|
||||
runs-on: ubuntu-latest
|
||||
needs: build
|
||||
steps:
|
||||
run: docker build -t cerc-io/go-ethereum -f Dockerfile .
|
||||
- name: Get the version
|
||||
id: vars
|
||||
run: |
|
||||
echo ::set-output name=sha::$(echo ${GITHUB_SHA:0:7})
|
||||
echo ::set-output name=tag::$(echo ${GITHUB_REF#refs/tags/})
|
||||
- name: Docker Login to Github Registry
|
||||
run: echo ${{ secrets.GITHUB_TOKEN }} | docker login https://docker.pkg.github.com -u vulcanize --password-stdin
|
||||
- name: Docker Pull
|
||||
run: docker pull docker.pkg.github.com/vulcanize/go-ethereum/go-ethereum:${{steps.vars.outputs.sha}}
|
||||
- name: Copy ethereum binary file
|
||||
run: docker run --rm --entrypoint cat docker.pkg.github.com/vulcanize/go-ethereum/go-ethereum:${{steps.vars.outputs.sha}} /usr/local/bin/geth > geth-linux-amd64
|
||||
- name: Docker Login to Docker Registry
|
||||
run: echo ${{ secrets.VULCANIZEJENKINS_PAT }} | docker login -u vulcanizejenkins --password-stdin
|
||||
- name: Tag docker image
|
||||
run: docker tag docker.pkg.github.com/vulcanize/go-ethereum/go-ethereum:${{steps.vars.outputs.sha}} vulcanize/vdb-geth:${{steps.vars.outputs.tag}}
|
||||
- name: Docker Push to Docker Hub
|
||||
run: docker push vulcanize/vdb-geth:${{steps.vars.outputs.tag}}
|
||||
- name: Get release
|
||||
id: get_release
|
||||
uses: bruceadams/get-release@v1.2.0
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
- name: Upload Release Asset
|
||||
id: upload-release-asset
|
||||
uses: actions/upload-release-asset@v1
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
run: docker tag cerc-io/go-ethereum git.vdb.to/cerc-io/go-ethereum/go-ethereum:${{steps.vars.outputs.sha}}
|
||||
- name: Tag docker image
|
||||
run: docker tag git.vdb.to/cerc-io/go-ethereum/go-ethereum:${{steps.vars.outputs.sha}} git.vdb.to/cerc-io/go-ethereum/go-ethereum:${{steps.vars.outputs.tag}}
|
||||
- name: Docker Login
|
||||
run: echo ${{ secrets.GITEA_TOKEN }} | docker login https://git.vdb.to -u cerccicd --password-stdin
|
||||
- name: Docker Push
|
||||
run: docker push git.vdb.to/cerc-io/go-ethereum/go-ethereum:${{steps.vars.outputs.sha}}
|
||||
- name: Docker Push TAGGED
|
||||
run: docker push git.vdb.to/cerc-io/go-ethereum/go-ethereum:${{steps.vars.outputs.tag}}
|
||||
- name: Copy ethereum binary file
|
||||
run: docker run --rm --entrypoint cat git.vdb.to/cerc-io/go-ethereum/go-ethereum:${{steps.vars.outputs.sha}} /usr/local/bin/geth > geth-linux-amd64
|
||||
- name: curl
|
||||
uses: enflo/curl-action@master
|
||||
with:
|
||||
upload_url: ${{ steps.get_release.outputs.upload_url }}
|
||||
asset_path: geth-linux-amd64
|
||||
asset_name: geth-linux-amd64
|
||||
asset_content_type: application/octet-stream
|
||||
curl: --user cerccicd:${{ secrets.GITEA_TOKEN }} --upload-file geth-linux-amd64 https://git.vdb.to/api/packages/cerc-io/generic/go-ethereum/${{steps.vars.outputs.tag}}/geth-linux-amd64
|
||||
|
4
.github/workflows/tests.yml
vendored
4
.github/workflows/tests.yml
vendored
@ -15,7 +15,7 @@ jobs:
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- name: Run docker build
|
||||
run: docker build -t vulcanize/go-ethereum .
|
||||
run: docker build -t cerc-io/go-ethereum .
|
||||
|
||||
geth-unit-test:
|
||||
name: Run geth unit test
|
||||
@ -90,7 +90,7 @@ jobs:
|
||||
- uses: actions/checkout@v3
|
||||
with:
|
||||
ref: ${{ env.ipld-eth-db-ref }}
|
||||
repository: vulcanize/ipld-eth-db
|
||||
repository: cerc-io/ipld-eth-db
|
||||
path: "./ipld-eth-db/"
|
||||
fetch-depth: 0
|
||||
|
||||
|
50
Jenkinsfile
vendored
Normal file
50
Jenkinsfile
vendored
Normal file
@ -0,0 +1,50 @@
|
||||
pipeline {
|
||||
agent any
|
||||
|
||||
stages {
|
||||
stage('Build') {
|
||||
steps {
|
||||
script{
|
||||
docker.withRegistry('https://git.vdb.to'){
|
||||
echo 'Building geth image...'
|
||||
//def geth_image = docker.build("cerc-io/go-ethereum:jenkinscicd")
|
||||
echo 'built geth image'
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
stage('Test') {
|
||||
agent {
|
||||
docker {
|
||||
image 'cerc-io/foundation:jenkinscicd'
|
||||
//image 'cerc-io/foundation_alpine:jenkinscicd'
|
||||
}
|
||||
}
|
||||
|
||||
environment {
|
||||
GO111MODULE = "on"
|
||||
CGO_ENABLED = 1
|
||||
//GOPATH = "${JENKINS_HOME}/jobs/${JOB_NAME}/builds/${BUILD_ID}"
|
||||
//GOPATH = "/go"
|
||||
GOPATH = "/tmp/go"
|
||||
//GOMODCACHE = "/go/pkg/mod"
|
||||
GOCACHE = "${WORKSPACE}/.cache/go-build"
|
||||
GOENV = "${WORKSPACE}/.config/go/env"
|
||||
GOMODCACHE = "/tmp/go/pkg/mod"
|
||||
GOWORK=""
|
||||
//GOFLAGS=""
|
||||
|
||||
}
|
||||
steps {
|
||||
echo 'Testing ...'
|
||||
//sh '/usr/local/go/bin/go test -p 1 -v ./...'
|
||||
sh 'make test'
|
||||
}
|
||||
}
|
||||
stage('Packaging') {
|
||||
steps {
|
||||
echo 'Packaging ...'
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
2
Makefile
2
Makefile
@ -50,7 +50,7 @@ ios:
|
||||
@echo "Import \"$(GOBIN)/Geth.framework\" to use the library."
|
||||
|
||||
test: all
|
||||
$(GORUN) build/ci.go test
|
||||
$(GORUN) build/ci.go test -v
|
||||
|
||||
lint: ## Run linters.
|
||||
$(GORUN) build/ci.go lint
|
||||
|
@ -244,6 +244,9 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
|
||||
ClientName: clientName,
|
||||
Driver: driverType,
|
||||
}
|
||||
if ctx.IsSet(utils.StateDiffUpsert.Name) {
|
||||
pgConfig.Upsert = ctx.Bool(utils.StateDiffUpsert.Name)
|
||||
}
|
||||
if ctx.IsSet(utils.StateDiffDBMinConns.Name) {
|
||||
pgConfig.MinConns = ctx.Int(utils.StateDiffDBMinConns.Name)
|
||||
}
|
||||
@ -262,6 +265,9 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
|
||||
if ctx.IsSet(utils.StateDiffDBConnTimeout.Name) {
|
||||
pgConfig.ConnTimeout = time.Duration(ctx.Duration(utils.StateDiffDBConnTimeout.Name).Seconds())
|
||||
}
|
||||
if ctx.IsSet(utils.StateDiffLogStatements.Name) {
|
||||
pgConfig.LogStatements = ctx.Bool(utils.StateDiffLogStatements.Name)
|
||||
}
|
||||
indexerConfig = pgConfig
|
||||
case shared.DUMP:
|
||||
dumpTypeStr := ctx.String(utils.StateDiffDBDumpDst.Name)
|
||||
@ -284,14 +290,13 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
|
||||
}
|
||||
}
|
||||
p := statediff.Config{
|
||||
IndexerConfig: indexerConfig,
|
||||
KnownGapsFilePath: ctx.String(utils.StateDiffKnownGapsFilePath.Name),
|
||||
ID: nodeID,
|
||||
ClientName: clientName,
|
||||
Context: context.Background(),
|
||||
EnableWriteLoop: ctx.Bool(utils.StateDiffWritingFlag.Name),
|
||||
NumWorkers: ctx.Uint(utils.StateDiffWorkersFlag.Name),
|
||||
WaitForSync: ctx.Bool(utils.StateDiffWaitForSync.Name),
|
||||
IndexerConfig: indexerConfig,
|
||||
ID: nodeID,
|
||||
ClientName: clientName,
|
||||
Context: context.Background(),
|
||||
EnableWriteLoop: ctx.Bool(utils.StateDiffWritingFlag.Name),
|
||||
NumWorkers: ctx.Uint(utils.StateDiffWorkersFlag.Name),
|
||||
WaitForSync: ctx.Bool(utils.StateDiffWaitForSync.Name),
|
||||
}
|
||||
utils.RegisterStateDiffService(stack, eth, &cfg.Eth, p, backend)
|
||||
}
|
||||
|
@ -176,9 +176,10 @@ var (
|
||||
utils.StateDiffFileMode,
|
||||
utils.StateDiffFileCsvDir,
|
||||
utils.StateDiffFilePath,
|
||||
utils.StateDiffKnownGapsFilePath,
|
||||
utils.StateDiffWaitForSync,
|
||||
utils.StateDiffWatchedAddressesFilePath,
|
||||
utils.StateDiffUpsert,
|
||||
utils.StateDiffLogStatements,
|
||||
configFileFlag,
|
||||
}, utils.NetworkFlags, utils.DatabasePathFlags)
|
||||
|
||||
|
@ -1064,11 +1064,6 @@ var (
|
||||
Name: "statediff.file.path",
|
||||
Usage: "Full path (including filename) to write statediff data out to when operating in sql file mode",
|
||||
}
|
||||
StateDiffKnownGapsFilePath = &cli.StringFlag{
|
||||
Name: "statediff.knowngapsfile.path",
|
||||
Usage: "Full path (including filename) to write knownGaps statements when the DB is unavailable.",
|
||||
Value: "./known_gaps.sql",
|
||||
}
|
||||
StateDiffWatchedAddressesFilePath = &cli.StringFlag{
|
||||
Name: "statediff.file.wapath",
|
||||
Usage: "Full path (including filename) to write statediff watched addresses out to when operating in file mode",
|
||||
@ -1078,6 +1073,16 @@ var (
|
||||
Usage: "Client name to use when writing state diffs to database",
|
||||
Value: "go-ethereum",
|
||||
}
|
||||
StateDiffUpsert = &cli.BoolFlag{
|
||||
Name: "statediff.db.upsert",
|
||||
Usage: "Should the statediff service overwrite data existing in the database?",
|
||||
Value: false,
|
||||
}
|
||||
StateDiffLogStatements = &cli.BoolFlag{
|
||||
Name: "statediff.db.logstatements",
|
||||
Usage: "Should the statediff service log all database statements? (Note: pgx only)",
|
||||
Value: false,
|
||||
}
|
||||
StateDiffWritingFlag = &cli.BoolFlag{
|
||||
Name: "statediff.writing",
|
||||
Usage: "Activates progressive writing of state diffs to database as new block are synced",
|
||||
|
@ -644,6 +644,23 @@ func ReadReceipts(db ethdb.Reader, hash common.Hash, number uint64, config *para
|
||||
return receipts
|
||||
}
|
||||
|
||||
// ReadStorageReceipts is a modification of ReadRawReceipts that skips a conversion to types.Receipt
|
||||
// and returns the ReceipstForStorage instead
|
||||
func ReadStorageReceipts(db ethdb.Reader, hash common.Hash, number uint64) []*types.ReceiptForStorage {
|
||||
// Retrieve the flattened receipt slice
|
||||
data := ReadReceiptsRLP(db, hash, number)
|
||||
if len(data) == 0 {
|
||||
return nil
|
||||
}
|
||||
// Convert the receipts from their storage form to their internal representation
|
||||
var storageReceipts []*types.ReceiptForStorage
|
||||
if err := rlp.DecodeBytes(data, &storageReceipts); err != nil {
|
||||
log.Error("Invalid receipt array RLP", "hash", hash, "err", err)
|
||||
return nil
|
||||
}
|
||||
return storageReceipts
|
||||
}
|
||||
|
||||
// WriteReceipts stores all the transaction receipts belonging to a block.
|
||||
func WriteReceipts(db ethdb.KeyValueWriter, hash common.Hash, number uint64, receipts types.Receipts) {
|
||||
// Convert the receipts into their storage form and serialize them
|
||||
@ -817,6 +834,17 @@ func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts
|
||||
})
|
||||
}
|
||||
|
||||
// WriteAncientBlock is a public wrapper of writeAncientBlock
|
||||
func WriteAncientBlock(op ethdb.AncientWriteOp, block *types.Block, receipts types.Receipts, td *big.Int) error {
|
||||
stReceipts := make([]*types.ReceiptForStorage, len(receipts))
|
||||
// Convert receipts to storage format and sum up total difficulty.
|
||||
stReceipts = stReceipts[:0]
|
||||
for i, receipt := range receipts {
|
||||
stReceipts[i] = (*types.ReceiptForStorage)(receipt)
|
||||
}
|
||||
return writeAncientBlock(op, block, block.Header(), stReceipts, td)
|
||||
}
|
||||
|
||||
func writeAncientBlock(op ethdb.AncientWriteOp, block *types.Block, header *types.Header, receipts []*types.ReceiptForStorage, td *big.Int) error {
|
||||
num := block.NumberU64()
|
||||
if err := op.AppendRaw(chainFreezerHashTable, num, block.Hash().Bytes()); err != nil {
|
||||
|
@ -34,8 +34,8 @@ type Filter struct {
|
||||
addresses []common.Address
|
||||
topics [][]common.Hash
|
||||
|
||||
block common.Hash // Block hash if filtering a single block
|
||||
begin, end int64 // Range interval if filtering multiple blocks
|
||||
block *common.Hash // Block hash if filtering a single block
|
||||
begin, end int64 // Range interval if filtering multiple blocks
|
||||
|
||||
matcher *bloombits.Matcher
|
||||
}
|
||||
@ -78,7 +78,7 @@ func (sys *FilterSystem) NewRangeFilter(begin, end int64, addresses []common.Add
|
||||
func (sys *FilterSystem) NewBlockFilter(block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter {
|
||||
// Create a generic filter and convert it into a block filter
|
||||
filter := newFilter(sys, addresses, topics)
|
||||
filter.block = block
|
||||
filter.block = &block
|
||||
return filter
|
||||
}
|
||||
|
||||
@ -96,8 +96,8 @@ func newFilter(sys *FilterSystem, addresses []common.Address, topics [][]common.
|
||||
// first block that contains matches, updating the start of the filter accordingly.
|
||||
func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
|
||||
// If we're doing singleton block filtering, execute and return
|
||||
if f.block != (common.Hash{}) {
|
||||
header, err := f.sys.backend.HeaderByHash(ctx, f.block)
|
||||
if f.block != nil {
|
||||
header, err := f.sys.backend.HeaderByHash(ctx, *f.block)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -391,11 +391,16 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
|
||||
if h.checkpointHash != (common.Hash{}) {
|
||||
// Request the peer's checkpoint header for chain height/weight validation
|
||||
resCh := make(chan *eth.Response)
|
||||
if _, err := peer.RequestHeadersByNumber(h.checkpointNumber, 1, 0, false, resCh); err != nil {
|
||||
|
||||
req, err := peer.RequestHeadersByNumber(h.checkpointNumber, 1, 0, false, resCh)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Start a timer to disconnect if the peer doesn't reply in time
|
||||
go func() {
|
||||
// Ensure the request gets cancelled in case of error/drop
|
||||
defer req.Close()
|
||||
|
||||
timeout := time.NewTimer(syncChallengeTimeout)
|
||||
defer timeout.Stop()
|
||||
|
||||
@ -437,10 +442,15 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
|
||||
// If we have any explicit peer required block hashes, request them
|
||||
for number, hash := range h.requiredBlocks {
|
||||
resCh := make(chan *eth.Response)
|
||||
if _, err := peer.RequestHeadersByNumber(number, 1, 0, false, resCh); err != nil {
|
||||
|
||||
req, err := peer.RequestHeadersByNumber(number, 1, 0, false, resCh)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go func(number uint64, hash common.Hash) {
|
||||
go func(number uint64, hash common.Hash, req *eth.Request) {
|
||||
// Ensure the request gets cancelled in case of error/drop
|
||||
defer req.Close()
|
||||
|
||||
timeout := time.NewTimer(syncChallengeTimeout)
|
||||
defer timeout.Stop()
|
||||
|
||||
@ -469,7 +479,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error {
|
||||
peer.Log().Warn("Required block challenge timed out, dropping", "addr", peer.RemoteAddr(), "type", peer.Name())
|
||||
h.removePeer(peer.ID())
|
||||
}
|
||||
}(number, hash)
|
||||
}(number, hash, req)
|
||||
}
|
||||
// Handle incoming messages until the connection is torn down
|
||||
return handler(peer)
|
||||
|
@ -21,10 +21,12 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
gomath "math"
|
||||
"math/big"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
@ -78,6 +80,29 @@ const (
|
||||
// and waste round trip times. If it's too high, we're capping responses and
|
||||
// waste bandwidth.
|
||||
maxTrieRequestCount = maxRequestSize / 512
|
||||
|
||||
// trienodeHealRateMeasurementImpact is the impact a single measurement has on
|
||||
// the local node's trienode processing capacity. A value closer to 0 reacts
|
||||
// slower to sudden changes, but it is also more stable against temporary hiccups.
|
||||
trienodeHealRateMeasurementImpact = 0.005
|
||||
|
||||
// minTrienodeHealThrottle is the minimum divisor for throttling trie node
|
||||
// heal requests to avoid overloading the local node and exessively expanding
|
||||
// the state trie bedth wise.
|
||||
minTrienodeHealThrottle = 1
|
||||
|
||||
// maxTrienodeHealThrottle is the maximum divisor for throttling trie node
|
||||
// heal requests to avoid overloading the local node and exessively expanding
|
||||
// the state trie bedth wise.
|
||||
maxTrienodeHealThrottle = maxTrieRequestCount
|
||||
|
||||
// trienodeHealThrottleIncrease is the multiplier for the throttle when the
|
||||
// rate of arriving data is higher than the rate of processing it.
|
||||
trienodeHealThrottleIncrease = 1.33
|
||||
|
||||
// trienodeHealThrottleDecrease is the divisor for the throttle when the
|
||||
// rate of arriving data is lower than the rate of processing it.
|
||||
trienodeHealThrottleDecrease = 1.25
|
||||
)
|
||||
|
||||
var (
|
||||
@ -431,6 +456,11 @@ type Syncer struct {
|
||||
trienodeHealReqs map[uint64]*trienodeHealRequest // Trie node requests currently running
|
||||
bytecodeHealReqs map[uint64]*bytecodeHealRequest // Bytecode requests currently running
|
||||
|
||||
trienodeHealRate float64 // Average heal rate for processing trie node data
|
||||
trienodeHealPend uint64 // Number of trie nodes currently pending for processing
|
||||
trienodeHealThrottle float64 // Divisor for throttling the amount of trienode heal data requested
|
||||
trienodeHealThrottled time.Time // Timestamp the last time the throttle was updated
|
||||
|
||||
trienodeHealSynced uint64 // Number of state trie nodes downloaded
|
||||
trienodeHealBytes common.StorageSize // Number of state trie bytes persisted to disk
|
||||
trienodeHealDups uint64 // Number of state trie nodes already processed
|
||||
@ -476,9 +506,10 @@ func NewSyncer(db ethdb.KeyValueStore) *Syncer {
|
||||
trienodeHealIdlers: make(map[string]struct{}),
|
||||
bytecodeHealIdlers: make(map[string]struct{}),
|
||||
|
||||
trienodeHealReqs: make(map[uint64]*trienodeHealRequest),
|
||||
bytecodeHealReqs: make(map[uint64]*bytecodeHealRequest),
|
||||
stateWriter: db.NewBatch(),
|
||||
trienodeHealReqs: make(map[uint64]*trienodeHealRequest),
|
||||
bytecodeHealReqs: make(map[uint64]*bytecodeHealRequest),
|
||||
trienodeHealThrottle: maxTrienodeHealThrottle, // Tune downward instead of insta-filling with junk
|
||||
stateWriter: db.NewBatch(),
|
||||
|
||||
extProgress: new(SyncProgress),
|
||||
}
|
||||
@ -1321,6 +1352,10 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai
|
||||
if cap > maxTrieRequestCount {
|
||||
cap = maxTrieRequestCount
|
||||
}
|
||||
cap = int(float64(cap) / s.trienodeHealThrottle)
|
||||
if cap <= 0 {
|
||||
cap = 1
|
||||
}
|
||||
var (
|
||||
hashes = make([]common.Hash, 0, cap)
|
||||
paths = make([]string, 0, cap)
|
||||
@ -2090,6 +2125,10 @@ func (s *Syncer) processStorageResponse(res *storageResponse) {
|
||||
// processTrienodeHealResponse integrates an already validated trienode response
|
||||
// into the healer tasks.
|
||||
func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
|
||||
var (
|
||||
start = time.Now()
|
||||
fills int
|
||||
)
|
||||
for i, hash := range res.hashes {
|
||||
node := res.nodes[i]
|
||||
|
||||
@ -2098,6 +2137,8 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
|
||||
res.task.trieTasks[res.paths[i]] = res.hashes[i]
|
||||
continue
|
||||
}
|
||||
fills++
|
||||
|
||||
// Push the trie node into the state syncer
|
||||
s.trienodeHealSynced++
|
||||
s.trienodeHealBytes += common.StorageSize(len(node))
|
||||
@ -2121,6 +2162,50 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) {
|
||||
log.Crit("Failed to persist healing data", "err", err)
|
||||
}
|
||||
log.Debug("Persisted set of healing data", "type", "trienodes", "bytes", common.StorageSize(batch.ValueSize()))
|
||||
|
||||
// Calculate the processing rate of one filled trie node
|
||||
rate := float64(fills) / (float64(time.Since(start)) / float64(time.Second))
|
||||
|
||||
// Update the currently measured trienode queueing and processing throughput.
|
||||
//
|
||||
// The processing rate needs to be updated uniformly independent if we've
|
||||
// processed 1x100 trie nodes or 100x1 to keep the rate consistent even in
|
||||
// the face of varying network packets. As such, we cannot just measure the
|
||||
// time it took to process N trie nodes and update once, we need one update
|
||||
// per trie node.
|
||||
//
|
||||
// Naively, that would be:
|
||||
//
|
||||
// for i:=0; i<fills; i++ {
|
||||
// healRate = (1-measurementImpact)*oldRate + measurementImpact*newRate
|
||||
// }
|
||||
//
|
||||
// Essentially, a recursive expansion of HR = (1-MI)*HR + MI*NR.
|
||||
//
|
||||
// We can expand that formula for the Nth item as:
|
||||
// HR(N) = (1-MI)^N*OR + (1-MI)^(N-1)*MI*NR + (1-MI)^(N-2)*MI*NR + ... + (1-MI)^0*MI*NR
|
||||
//
|
||||
// The above is a geometric sequence that can be summed to:
|
||||
// HR(N) = (1-MI)^N*(OR-NR) + NR
|
||||
s.trienodeHealRate = gomath.Pow(1-trienodeHealRateMeasurementImpact, float64(fills))*(s.trienodeHealRate-rate) + rate
|
||||
|
||||
pending := atomic.LoadUint64(&s.trienodeHealPend)
|
||||
if time.Since(s.trienodeHealThrottled) > time.Second {
|
||||
// Periodically adjust the trie node throttler
|
||||
if float64(pending) > 2*s.trienodeHealRate {
|
||||
s.trienodeHealThrottle *= trienodeHealThrottleIncrease
|
||||
} else {
|
||||
s.trienodeHealThrottle /= trienodeHealThrottleDecrease
|
||||
}
|
||||
if s.trienodeHealThrottle > maxTrienodeHealThrottle {
|
||||
s.trienodeHealThrottle = maxTrienodeHealThrottle
|
||||
} else if s.trienodeHealThrottle < minTrienodeHealThrottle {
|
||||
s.trienodeHealThrottle = minTrienodeHealThrottle
|
||||
}
|
||||
s.trienodeHealThrottled = time.Now()
|
||||
|
||||
log.Debug("Updated trie node heal throttler", "rate", s.trienodeHealRate, "pending", pending, "throttle", s.trienodeHealThrottle)
|
||||
}
|
||||
}
|
||||
|
||||
// processBytecodeHealResponse integrates an already validated bytecode response
|
||||
@ -2248,14 +2333,18 @@ func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, acco
|
||||
// Whether or not the response is valid, we can mark the peer as idle and
|
||||
// notify the scheduler to assign a new task. If the response is invalid,
|
||||
// we'll drop the peer in a bit.
|
||||
defer func() {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
if _, ok := s.peers[peer.ID()]; ok {
|
||||
s.accountIdlers[peer.ID()] = struct{}{}
|
||||
}
|
||||
select {
|
||||
case s.update <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}()
|
||||
s.lock.Lock()
|
||||
if _, ok := s.peers[peer.ID()]; ok {
|
||||
s.accountIdlers[peer.ID()] = struct{}{}
|
||||
}
|
||||
select {
|
||||
case s.update <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
// Ensure the response is for a valid request
|
||||
req, ok := s.accountReqs[id]
|
||||
if !ok {
|
||||
@ -2360,14 +2449,18 @@ func (s *Syncer) onByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error
|
||||
// Whether or not the response is valid, we can mark the peer as idle and
|
||||
// notify the scheduler to assign a new task. If the response is invalid,
|
||||
// we'll drop the peer in a bit.
|
||||
defer func() {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
if _, ok := s.peers[peer.ID()]; ok {
|
||||
s.bytecodeIdlers[peer.ID()] = struct{}{}
|
||||
}
|
||||
select {
|
||||
case s.update <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}()
|
||||
s.lock.Lock()
|
||||
if _, ok := s.peers[peer.ID()]; ok {
|
||||
s.bytecodeIdlers[peer.ID()] = struct{}{}
|
||||
}
|
||||
select {
|
||||
case s.update <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
// Ensure the response is for a valid request
|
||||
req, ok := s.bytecodeReqs[id]
|
||||
if !ok {
|
||||
@ -2469,14 +2562,18 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo
|
||||
// Whether or not the response is valid, we can mark the peer as idle and
|
||||
// notify the scheduler to assign a new task. If the response is invalid,
|
||||
// we'll drop the peer in a bit.
|
||||
defer func() {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
if _, ok := s.peers[peer.ID()]; ok {
|
||||
s.storageIdlers[peer.ID()] = struct{}{}
|
||||
}
|
||||
select {
|
||||
case s.update <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}()
|
||||
s.lock.Lock()
|
||||
if _, ok := s.peers[peer.ID()]; ok {
|
||||
s.storageIdlers[peer.ID()] = struct{}{}
|
||||
}
|
||||
select {
|
||||
case s.update <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
// Ensure the response is for a valid request
|
||||
req, ok := s.storageReqs[id]
|
||||
if !ok {
|
||||
@ -2596,14 +2693,18 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error
|
||||
// Whether or not the response is valid, we can mark the peer as idle and
|
||||
// notify the scheduler to assign a new task. If the response is invalid,
|
||||
// we'll drop the peer in a bit.
|
||||
defer func() {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
if _, ok := s.peers[peer.ID()]; ok {
|
||||
s.trienodeHealIdlers[peer.ID()] = struct{}{}
|
||||
}
|
||||
select {
|
||||
case s.update <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}()
|
||||
s.lock.Lock()
|
||||
if _, ok := s.peers[peer.ID()]; ok {
|
||||
s.trienodeHealIdlers[peer.ID()] = struct{}{}
|
||||
}
|
||||
select {
|
||||
case s.update <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
// Ensure the response is for a valid request
|
||||
req, ok := s.trienodeHealReqs[id]
|
||||
if !ok {
|
||||
@ -2639,10 +2740,12 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error
|
||||
|
||||
// Cross reference the requested trienodes with the response to find gaps
|
||||
// that the serving node is missing
|
||||
hasher := sha3.NewLegacyKeccak256().(crypto.KeccakState)
|
||||
hash := make([]byte, 32)
|
||||
|
||||
nodes := make([][]byte, len(req.hashes))
|
||||
var (
|
||||
hasher = sha3.NewLegacyKeccak256().(crypto.KeccakState)
|
||||
hash = make([]byte, 32)
|
||||
nodes = make([][]byte, len(req.hashes))
|
||||
fills uint64
|
||||
)
|
||||
for i, j := 0, 0; i < len(trienodes); i++ {
|
||||
// Find the next hash that we've been served, leaving misses with nils
|
||||
hasher.Reset()
|
||||
@ -2654,16 +2757,22 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error
|
||||
}
|
||||
if j < len(req.hashes) {
|
||||
nodes[j] = trienodes[i]
|
||||
fills++
|
||||
j++
|
||||
continue
|
||||
}
|
||||
// We've either ran out of hashes, or got unrequested data
|
||||
logger.Warn("Unexpected healing trienodes", "count", len(trienodes)-i)
|
||||
|
||||
// Signal this request as failed, and ready for rescheduling
|
||||
s.scheduleRevertTrienodeHealRequest(req)
|
||||
return errors.New("unexpected healing trienode")
|
||||
}
|
||||
// Response validated, send it to the scheduler for filling
|
||||
atomic.AddUint64(&s.trienodeHealPend, fills)
|
||||
defer func() {
|
||||
atomic.AddUint64(&s.trienodeHealPend, ^(fills - 1))
|
||||
}()
|
||||
response := &trienodeHealResponse{
|
||||
paths: req.paths,
|
||||
task: req.task,
|
||||
@ -2691,14 +2800,18 @@ func (s *Syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) e
|
||||
// Whether or not the response is valid, we can mark the peer as idle and
|
||||
// notify the scheduler to assign a new task. If the response is invalid,
|
||||
// we'll drop the peer in a bit.
|
||||
defer func() {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
if _, ok := s.peers[peer.ID()]; ok {
|
||||
s.bytecodeHealIdlers[peer.ID()] = struct{}{}
|
||||
}
|
||||
select {
|
||||
case s.update <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}()
|
||||
s.lock.Lock()
|
||||
if _, ok := s.peers[peer.ID()]; ok {
|
||||
s.bytecodeHealIdlers[peer.ID()] = struct{}{}
|
||||
}
|
||||
select {
|
||||
case s.update <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
// Ensure the response is for a valid request
|
||||
req, ok := s.bytecodeHealReqs[id]
|
||||
if !ok {
|
||||
|
1
go.mod
1
go.mod
@ -146,4 +146,5 @@ require (
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
lukechampine.com/blake3 v1.1.6 // indirect
|
||||
|
||||
)
|
||||
|
13
go.sum
13
go.sum
@ -135,6 +135,7 @@ github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 h1:FtmdgXiUlNeRsoNMFlK
|
||||
github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5/go.mod h1:VvhXpOYNQvB+uIk2RvXzuaQtkQJzzIx6lSBe1xv7hi0=
|
||||
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
|
||||
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
||||
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
|
||||
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
|
||||
github.com/garslo/gogen v0.0.0-20170306192744-1d203ffc1f61 h1:IZqZOB2fydHte3kUgxrzK5E1fW7RQGeDwE8F/ZZnUYc=
|
||||
github.com/garslo/gogen v0.0.0-20170306192744-1d203ffc1f61/go.mod h1:Q0X6pkwTILDlzrGEckF6HKjXe48EgsY/l7K7vhY4MW8=
|
||||
@ -501,19 +502,18 @@ github.com/naoina/go-stringutil v0.1.0 h1:rCUeRUHjBjGTSHl0VC00jUPLz8/F9dDzYI70Hz
|
||||
github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h1USek5+NqSA0=
|
||||
github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416 h1:shk/vn9oCoOTmwcouEdwIeOtOGA/ELRUw/GwvxwfT+0=
|
||||
github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E=
|
||||
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
|
||||
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
|
||||
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
|
||||
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
|
||||
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
|
||||
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
|
||||
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
|
||||
github.com/onsi/ginkgo v1.10.3 h1:OoxbjfXVZyod1fmWYhI7SEyaD8B00ynP3T+D5GiyHOY=
|
||||
github.com/onsi/ginkgo v1.10.3/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
|
||||
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
|
||||
github.com/onsi/ginkgo v1.14.0 h1:2mOpI4JVVPBN+WQRa0WKH2eXR+Ey+uK4n7Zj0aYpIQA=
|
||||
github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
|
||||
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
|
||||
github.com/onsi/gomega v1.7.1 h1:K0jcRCwNQM3vFGh1ppMtDh/+7ApJrjldlX8fA0jDTLQ=
|
||||
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
|
||||
github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
|
||||
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
|
||||
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
|
||||
github.com/opentracing/opentracing-go v1.0.3-0.20180606204148-bd9c31933947/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
|
||||
@ -572,8 +572,6 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeV
|
||||
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
|
||||
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
|
||||
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
|
||||
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
|
||||
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
|
||||
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
|
||||
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
|
||||
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
|
||||
@ -597,8 +595,6 @@ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5Cc
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/supranational/blst v0.3.8 h1:glwLF4oBRSJOTr05lRBgNwGQST0ndP2wg29fSeTRKCY=
|
||||
github.com/supranational/blst v0.3.8/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw=
|
||||
github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE=
|
||||
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
|
||||
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY=
|
||||
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc=
|
||||
github.com/thoas/go-funk v0.9.2 h1:oKlNYv0AY5nyf9g+/GhMgS/UO2ces0QRdPKwkhY3VCk=
|
||||
@ -789,6 +785,7 @@ golang.org/x/sys v0.0.0-20210420205809-ac73e9fd8988/go.mod h1:h1NjWce9XRLGQEsW7w
|
||||
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k=
|
||||
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
|
||||
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"sort"
|
||||
"strconv"
|
||||
|
||||
"github.com/ethereum/go-ethereum"
|
||||
@ -478,13 +479,16 @@ func (t *Transaction) getLogs(ctx context.Context) (*[]*Log, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ret := make([]*Log, 0, len(logs))
|
||||
for _, log := range logs {
|
||||
var ret []*Log
|
||||
// Select tx logs from all block logs
|
||||
ix := sort.Search(len(logs), func(i int) bool { return uint64(logs[i].TxIndex) >= t.index })
|
||||
for ix < len(logs) && uint64(logs[ix].TxIndex) == t.index {
|
||||
ret = append(ret, &Log{
|
||||
r: t.r,
|
||||
transaction: t,
|
||||
log: log,
|
||||
log: logs[ix],
|
||||
})
|
||||
ix++
|
||||
}
|
||||
return &ret, nil
|
||||
}
|
||||
|
@ -17,6 +17,8 @@
|
||||
package graphql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/big"
|
||||
@ -51,15 +53,21 @@ func TestBuildSchema(t *testing.T) {
|
||||
}
|
||||
defer stack.Close()
|
||||
// Make sure the schema can be parsed and matched up to the object model.
|
||||
if err := newHandler(stack, nil, nil, []string{}, []string{}); err != nil {
|
||||
if _, err := newHandler(stack, nil, nil, []string{}, []string{}); err != nil {
|
||||
t.Errorf("Could not construct GraphQL handler: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Tests that a graphQL request is successfully handled when graphql is enabled on the specified endpoint
|
||||
func TestGraphQLBlockSerialization(t *testing.T) {
|
||||
stack := createNode(t, true, false)
|
||||
stack := createNode(t)
|
||||
defer stack.Close()
|
||||
genesis := &core.Genesis{
|
||||
Config: params.AllEthashProtocolChanges,
|
||||
GasLimit: 11500000,
|
||||
Difficulty: big.NewInt(1048576),
|
||||
}
|
||||
newGQLService(t, stack, genesis, 10, func(i int, gen *core.BlockGen) {})
|
||||
// start node
|
||||
if err := stack.Start(); err != nil {
|
||||
t.Fatalf("could not start node: %v", err)
|
||||
@ -161,8 +169,55 @@ func TestGraphQLBlockSerialization(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGraphQLBlockSerializationEIP2718(t *testing.T) {
|
||||
stack := createNode(t, true, true)
|
||||
// Account for signing txes
|
||||
var (
|
||||
key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
|
||||
address = crypto.PubkeyToAddress(key.PublicKey)
|
||||
funds = big.NewInt(1000000000000000)
|
||||
dad = common.HexToAddress("0x0000000000000000000000000000000000000dad")
|
||||
)
|
||||
stack := createNode(t)
|
||||
defer stack.Close()
|
||||
genesis := &core.Genesis{
|
||||
Config: params.AllEthashProtocolChanges,
|
||||
GasLimit: 11500000,
|
||||
Difficulty: big.NewInt(1048576),
|
||||
Alloc: core.GenesisAlloc{
|
||||
address: {Balance: funds},
|
||||
// The address 0xdad sloads 0x00 and 0x01
|
||||
dad: {
|
||||
Code: []byte{byte(vm.PC), byte(vm.PC), byte(vm.SLOAD), byte(vm.SLOAD)},
|
||||
Nonce: 0,
|
||||
Balance: big.NewInt(0),
|
||||
},
|
||||
},
|
||||
BaseFee: big.NewInt(params.InitialBaseFee),
|
||||
}
|
||||
signer := types.LatestSigner(genesis.Config)
|
||||
newGQLService(t, stack, genesis, 1, func(i int, gen *core.BlockGen) {
|
||||
gen.SetCoinbase(common.Address{1})
|
||||
tx, _ := types.SignNewTx(key, signer, &types.LegacyTx{
|
||||
Nonce: uint64(0),
|
||||
To: &dad,
|
||||
Value: big.NewInt(100),
|
||||
Gas: 50000,
|
||||
GasPrice: big.NewInt(params.InitialBaseFee),
|
||||
})
|
||||
gen.AddTx(tx)
|
||||
tx, _ = types.SignNewTx(key, signer, &types.AccessListTx{
|
||||
ChainID: genesis.Config.ChainID,
|
||||
Nonce: uint64(1),
|
||||
To: &dad,
|
||||
Gas: 30000,
|
||||
GasPrice: big.NewInt(params.InitialBaseFee),
|
||||
Value: big.NewInt(50),
|
||||
AccessList: types.AccessList{{
|
||||
Address: dad,
|
||||
StorageKeys: []common.Hash{{0}},
|
||||
}},
|
||||
})
|
||||
gen.AddTx(tx)
|
||||
})
|
||||
// start node
|
||||
if err := stack.Start(); err != nil {
|
||||
t.Fatalf("could not start node: %v", err)
|
||||
@ -198,7 +253,7 @@ func TestGraphQLBlockSerializationEIP2718(t *testing.T) {
|
||||
|
||||
// Tests that a graphQL request is not handled successfully when graphql is not enabled on the specified endpoint
|
||||
func TestGraphQLHTTPOnSamePort_GQLRequest_Unsuccessful(t *testing.T) {
|
||||
stack := createNode(t, false, false)
|
||||
stack := createNode(t)
|
||||
defer stack.Close()
|
||||
if err := stack.Start(); err != nil {
|
||||
t.Fatalf("could not start node: %v", err)
|
||||
@ -212,7 +267,59 @@ func TestGraphQLHTTPOnSamePort_GQLRequest_Unsuccessful(t *testing.T) {
|
||||
assert.Equal(t, http.StatusNotFound, resp.StatusCode)
|
||||
}
|
||||
|
||||
func createNode(t *testing.T, gqlEnabled bool, txEnabled bool) *node.Node {
|
||||
func TestGraphQLTransactionLogs(t *testing.T) {
|
||||
var (
|
||||
key, _ = crypto.GenerateKey()
|
||||
addr = crypto.PubkeyToAddress(key.PublicKey)
|
||||
dadStr = "0x0000000000000000000000000000000000000dad"
|
||||
dad = common.HexToAddress(dadStr)
|
||||
genesis = &core.Genesis{
|
||||
Config: params.AllEthashProtocolChanges,
|
||||
GasLimit: 11500000,
|
||||
Difficulty: big.NewInt(1048576),
|
||||
Alloc: core.GenesisAlloc{
|
||||
addr: {Balance: big.NewInt(params.Ether)},
|
||||
dad: {
|
||||
// LOG0(0, 0), LOG0(0, 0), RETURN(0, 0)
|
||||
Code: common.Hex2Bytes("60006000a060006000a060006000f3"),
|
||||
Nonce: 0,
|
||||
Balance: big.NewInt(0),
|
||||
},
|
||||
},
|
||||
}
|
||||
signer = types.LatestSigner(genesis.Config)
|
||||
stack = createNode(t)
|
||||
)
|
||||
defer stack.Close()
|
||||
|
||||
handler := newGQLService(t, stack, genesis, 1, func(i int, gen *core.BlockGen) {
|
||||
tx, _ := types.SignNewTx(key, signer, &types.LegacyTx{To: &dad, Gas: 100000, GasPrice: big.NewInt(params.InitialBaseFee)})
|
||||
gen.AddTx(tx)
|
||||
tx, _ = types.SignNewTx(key, signer, &types.LegacyTx{To: &dad, Nonce: 1, Gas: 100000, GasPrice: big.NewInt(params.InitialBaseFee)})
|
||||
gen.AddTx(tx)
|
||||
tx, _ = types.SignNewTx(key, signer, &types.LegacyTx{To: &dad, Nonce: 2, Gas: 100000, GasPrice: big.NewInt(params.InitialBaseFee)})
|
||||
gen.AddTx(tx)
|
||||
})
|
||||
// start node
|
||||
if err := stack.Start(); err != nil {
|
||||
t.Fatalf("could not start node: %v", err)
|
||||
}
|
||||
query := `{block { transactions { logs { account { address } } } } }`
|
||||
res := handler.Schema.Exec(context.Background(), query, "", map[string]interface{}{})
|
||||
if res.Errors != nil {
|
||||
t.Fatalf("graphql query failed: %v", res.Errors)
|
||||
}
|
||||
have, err := json.Marshal(res.Data)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to encode graphql response: %s", err)
|
||||
}
|
||||
want := fmt.Sprintf(`{"block":{"transactions":[{"logs":[{"account":{"address":"%s"}},{"account":{"address":"%s"}}]},{"logs":[{"account":{"address":"%s"}},{"account":{"address":"%s"}}]},{"logs":[{"account":{"address":"%s"}},{"account":{"address":"%s"}}]}]}}`, dadStr, dadStr, dadStr, dadStr, dadStr, dadStr)
|
||||
if string(have) != want {
|
||||
t.Errorf("response unmatch. expected %s, got %s", want, have)
|
||||
}
|
||||
}
|
||||
|
||||
func createNode(t *testing.T) *node.Node {
|
||||
stack, err := node.New(&node.Config{
|
||||
HTTPHost: "127.0.0.1",
|
||||
HTTPPort: 0,
|
||||
@ -222,25 +329,12 @@ func createNode(t *testing.T, gqlEnabled bool, txEnabled bool) *node.Node {
|
||||
if err != nil {
|
||||
t.Fatalf("could not create node: %v", err)
|
||||
}
|
||||
if !gqlEnabled {
|
||||
return stack
|
||||
}
|
||||
if !txEnabled {
|
||||
createGQLService(t, stack)
|
||||
} else {
|
||||
createGQLServiceWithTransactions(t, stack)
|
||||
}
|
||||
return stack
|
||||
}
|
||||
|
||||
func createGQLService(t *testing.T, stack *node.Node) {
|
||||
// create backend
|
||||
func newGQLService(t *testing.T, stack *node.Node, gspec *core.Genesis, genBlocks int, genfunc func(i int, gen *core.BlockGen)) *handler {
|
||||
ethConf := ðconfig.Config{
|
||||
Genesis: &core.Genesis{
|
||||
Config: params.AllEthashProtocolChanges,
|
||||
GasLimit: 11500000,
|
||||
Difficulty: big.NewInt(1048576),
|
||||
},
|
||||
Genesis: gspec,
|
||||
Ethash: ethash.Config{
|
||||
PowMode: ethash.ModeFake,
|
||||
},
|
||||
@ -258,101 +352,16 @@ func createGQLService(t *testing.T, stack *node.Node) {
|
||||
}
|
||||
// Create some blocks and import them
|
||||
chain, _ := core.GenerateChain(params.AllEthashProtocolChanges, ethBackend.BlockChain().Genesis(),
|
||||
ethash.NewFaker(), ethBackend.ChainDb(), 10, func(i int, gen *core.BlockGen) {})
|
||||
ethash.NewFaker(), ethBackend.ChainDb(), genBlocks, genfunc)
|
||||
_, err = ethBackend.BlockChain().InsertChain(chain)
|
||||
if err != nil {
|
||||
t.Fatalf("could not create import blocks: %v", err)
|
||||
}
|
||||
// create gql service
|
||||
// Set up handler
|
||||
filterSystem := filters.NewFilterSystem(ethBackend.APIBackend, filters.Config{})
|
||||
err = New(stack, ethBackend.APIBackend, filterSystem, []string{}, []string{})
|
||||
if err != nil {
|
||||
t.Fatalf("could not create graphql service: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func createGQLServiceWithTransactions(t *testing.T, stack *node.Node) {
|
||||
// create backend
|
||||
key, _ := crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
|
||||
address := crypto.PubkeyToAddress(key.PublicKey)
|
||||
funds := big.NewInt(1000000000000000)
|
||||
dad := common.HexToAddress("0x0000000000000000000000000000000000000dad")
|
||||
|
||||
ethConf := ðconfig.Config{
|
||||
Genesis: &core.Genesis{
|
||||
Config: params.AllEthashProtocolChanges,
|
||||
GasLimit: 11500000,
|
||||
Difficulty: big.NewInt(1048576),
|
||||
Alloc: core.GenesisAlloc{
|
||||
address: {Balance: funds},
|
||||
// The address 0xdad sloads 0x00 and 0x01
|
||||
dad: {
|
||||
Code: []byte{
|
||||
byte(vm.PC),
|
||||
byte(vm.PC),
|
||||
byte(vm.SLOAD),
|
||||
byte(vm.SLOAD),
|
||||
},
|
||||
Nonce: 0,
|
||||
Balance: big.NewInt(0),
|
||||
},
|
||||
},
|
||||
BaseFee: big.NewInt(params.InitialBaseFee),
|
||||
},
|
||||
Ethash: ethash.Config{
|
||||
PowMode: ethash.ModeFake,
|
||||
},
|
||||
NetworkId: 1337,
|
||||
TrieCleanCache: 5,
|
||||
TrieCleanCacheJournal: "triecache",
|
||||
TrieCleanCacheRejournal: 60 * time.Minute,
|
||||
TrieDirtyCache: 5,
|
||||
TrieTimeout: 60 * time.Minute,
|
||||
SnapshotCache: 5,
|
||||
}
|
||||
|
||||
ethBackend, err := eth.New(stack, ethConf)
|
||||
if err != nil {
|
||||
t.Fatalf("could not create eth backend: %v", err)
|
||||
}
|
||||
signer := types.LatestSigner(ethConf.Genesis.Config)
|
||||
|
||||
legacyTx, _ := types.SignNewTx(key, signer, &types.LegacyTx{
|
||||
Nonce: uint64(0),
|
||||
To: &dad,
|
||||
Value: big.NewInt(100),
|
||||
Gas: 50000,
|
||||
GasPrice: big.NewInt(params.InitialBaseFee),
|
||||
})
|
||||
envelopTx, _ := types.SignNewTx(key, signer, &types.AccessListTx{
|
||||
ChainID: ethConf.Genesis.Config.ChainID,
|
||||
Nonce: uint64(1),
|
||||
To: &dad,
|
||||
Gas: 30000,
|
||||
GasPrice: big.NewInt(params.InitialBaseFee),
|
||||
Value: big.NewInt(50),
|
||||
AccessList: types.AccessList{{
|
||||
Address: dad,
|
||||
StorageKeys: []common.Hash{{0}},
|
||||
}},
|
||||
})
|
||||
|
||||
// Create some blocks and import them
|
||||
chain, _ := core.GenerateChain(params.AllEthashProtocolChanges, ethBackend.BlockChain().Genesis(),
|
||||
ethash.NewFaker(), ethBackend.ChainDb(), 1, func(i int, b *core.BlockGen) {
|
||||
b.SetCoinbase(common.Address{1})
|
||||
b.AddTx(legacyTx)
|
||||
b.AddTx(envelopTx)
|
||||
})
|
||||
|
||||
_, err = ethBackend.BlockChain().InsertChain(chain)
|
||||
if err != nil {
|
||||
t.Fatalf("could not create import blocks: %v", err)
|
||||
}
|
||||
// create gql service
|
||||
filterSystem := filters.NewFilterSystem(ethBackend.APIBackend, filters.Config{})
|
||||
err = New(stack, ethBackend.APIBackend, filterSystem, []string{}, []string{})
|
||||
handler, err := newHandler(stack, ethBackend.APIBackend, filterSystem, []string{}, []string{})
|
||||
if err != nil {
|
||||
t.Fatalf("could not create graphql service: %v", err)
|
||||
}
|
||||
return handler
|
||||
}
|
||||
|
@ -57,17 +57,18 @@ func (h handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// New constructs a new GraphQL service instance.
|
||||
func New(stack *node.Node, backend ethapi.Backend, filterSystem *filters.FilterSystem, cors, vhosts []string) error {
|
||||
return newHandler(stack, backend, filterSystem, cors, vhosts)
|
||||
_, err := newHandler(stack, backend, filterSystem, cors, vhosts)
|
||||
return err
|
||||
}
|
||||
|
||||
// newHandler returns a new `http.Handler` that will answer GraphQL queries.
|
||||
// It additionally exports an interactive query browser on the / endpoint.
|
||||
func newHandler(stack *node.Node, backend ethapi.Backend, filterSystem *filters.FilterSystem, cors, vhosts []string) error {
|
||||
func newHandler(stack *node.Node, backend ethapi.Backend, filterSystem *filters.FilterSystem, cors, vhosts []string) (*handler, error) {
|
||||
q := Resolver{backend, filterSystem}
|
||||
|
||||
s, err := graphql.ParseSchema(schema, &q)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
h := handler{Schema: s}
|
||||
handler := node.NewHTTPHandlerStack(h, cors, vhosts, nil)
|
||||
@ -76,5 +77,5 @@ func newHandler(stack *node.Node, backend ethapi.Backend, filterSystem *filters.
|
||||
stack.RegisterHandler("GraphQL", "/graphql", handler)
|
||||
stack.RegisterHandler("GraphQL", "/graphql/", handler)
|
||||
|
||||
return nil
|
||||
return &h, nil
|
||||
}
|
||||
|
@ -59,25 +59,26 @@ var (
|
||||
|
||||
// MainnetChainConfig is the chain parameters to run a node on the main network.
|
||||
MainnetChainConfig = &ChainConfig{
|
||||
ChainID: big.NewInt(1),
|
||||
HomesteadBlock: big.NewInt(1_150_000),
|
||||
DAOForkBlock: big.NewInt(1_920_000),
|
||||
DAOForkSupport: true,
|
||||
EIP150Block: big.NewInt(2_463_000),
|
||||
EIP150Hash: common.HexToHash("0x2086799aeebeae135c246c65021c82b4e15a2c451340993aacfd2751886514f0"),
|
||||
EIP155Block: big.NewInt(2_675_000),
|
||||
EIP158Block: big.NewInt(2_675_000),
|
||||
ByzantiumBlock: big.NewInt(4_370_000),
|
||||
ConstantinopleBlock: big.NewInt(7_280_000),
|
||||
PetersburgBlock: big.NewInt(7_280_000),
|
||||
IstanbulBlock: big.NewInt(9_069_000),
|
||||
MuirGlacierBlock: big.NewInt(9_200_000),
|
||||
BerlinBlock: big.NewInt(12_244_000),
|
||||
LondonBlock: big.NewInt(12_965_000),
|
||||
ArrowGlacierBlock: big.NewInt(13_773_000),
|
||||
GrayGlacierBlock: big.NewInt(15_050_000),
|
||||
TerminalTotalDifficulty: MainnetTerminalTotalDifficulty, // 58_750_000_000_000_000_000_000
|
||||
Ethash: new(EthashConfig),
|
||||
ChainID: big.NewInt(1),
|
||||
HomesteadBlock: big.NewInt(1_150_000),
|
||||
DAOForkBlock: big.NewInt(1_920_000),
|
||||
DAOForkSupport: true,
|
||||
EIP150Block: big.NewInt(2_463_000),
|
||||
EIP150Hash: common.HexToHash("0x2086799aeebeae135c246c65021c82b4e15a2c451340993aacfd2751886514f0"),
|
||||
EIP155Block: big.NewInt(2_675_000),
|
||||
EIP158Block: big.NewInt(2_675_000),
|
||||
ByzantiumBlock: big.NewInt(4_370_000),
|
||||
ConstantinopleBlock: big.NewInt(7_280_000),
|
||||
PetersburgBlock: big.NewInt(7_280_000),
|
||||
IstanbulBlock: big.NewInt(9_069_000),
|
||||
MuirGlacierBlock: big.NewInt(9_200_000),
|
||||
BerlinBlock: big.NewInt(12_244_000),
|
||||
LondonBlock: big.NewInt(12_965_000),
|
||||
ArrowGlacierBlock: big.NewInt(13_773_000),
|
||||
GrayGlacierBlock: big.NewInt(15_050_000),
|
||||
TerminalTotalDifficulty: MainnetTerminalTotalDifficulty, // 58_750_000_000_000_000_000_000
|
||||
TerminalTotalDifficultyPassed: true,
|
||||
Ethash: new(EthashConfig),
|
||||
}
|
||||
|
||||
// MainnetTrustedCheckpoint contains the light client trusted checkpoint for the main network.
|
||||
|
@ -23,8 +23,8 @@ import (
|
||||
const (
|
||||
VersionMajor = 1 // Major version component of the current release
|
||||
VersionMinor = 10 // Minor version component of the current release
|
||||
VersionPatch = 23 // Patch version component of the current release
|
||||
VersionMeta = "statediff-4.2.0-alpha" // Version metadata to append to the version string
|
||||
VersionPatch = 26 // Patch version component of the current release
|
||||
VersionMeta = "statediff-4.3.3-alpha" // Version metadata to append to the version string
|
||||
)
|
||||
|
||||
// Version holds the textual version string.
|
||||
|
@ -31,6 +31,7 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
ErrBadResult = errors.New("bad result in JSON-RPC response")
|
||||
ErrClientQuit = errors.New("client is closed")
|
||||
ErrNoResult = errors.New("no result in JSON-RPC response")
|
||||
ErrSubscriptionQueueOverflow = errors.New("subscription queue overflow")
|
||||
|
@ -19,6 +19,7 @@ package rpc
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
@ -144,6 +145,53 @@ func TestClientBatchRequest(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientBatchRequest_len(t *testing.T) {
|
||||
b, err := json.Marshal([]jsonrpcMessage{
|
||||
{Version: "2.0", ID: json.RawMessage("1"), Method: "foo", Result: json.RawMessage(`"0x1"`)},
|
||||
{Version: "2.0", ID: json.RawMessage("2"), Method: "bar", Result: json.RawMessage(`"0x2"`)},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal("failed to encode jsonrpc message:", err)
|
||||
}
|
||||
s := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
|
||||
_, err := rw.Write(b)
|
||||
if err != nil {
|
||||
t.Error("failed to write response:", err)
|
||||
}
|
||||
}))
|
||||
t.Cleanup(s.Close)
|
||||
|
||||
client, err := Dial(s.URL)
|
||||
if err != nil {
|
||||
t.Fatal("failed to dial test server:", err)
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
t.Run("too-few", func(t *testing.T) {
|
||||
batch := []BatchElem{
|
||||
{Method: "foo"},
|
||||
{Method: "bar"},
|
||||
{Method: "baz"},
|
||||
}
|
||||
ctx, cancelFn := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancelFn()
|
||||
if err := client.BatchCallContext(ctx, batch); !errors.Is(err, ErrBadResult) {
|
||||
t.Errorf("expected %q but got: %v", ErrBadResult, err)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("too-many", func(t *testing.T) {
|
||||
batch := []BatchElem{
|
||||
{Method: "foo"},
|
||||
}
|
||||
ctx, cancelFn := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancelFn()
|
||||
if err := client.BatchCallContext(ctx, batch); !errors.Is(err, ErrBadResult) {
|
||||
t.Errorf("expected %q but got: %v", ErrBadResult, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestClientNotify(t *testing.T) {
|
||||
server := newTestServer()
|
||||
defer server.Stop()
|
||||
|
@ -173,6 +173,9 @@ func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonr
|
||||
if err := json.NewDecoder(respBody).Decode(&respmsgs); err != nil {
|
||||
return err
|
||||
}
|
||||
if len(respmsgs) != len(msgs) {
|
||||
return fmt.Errorf("batch has %d requests but response has %d: %w", len(msgs), len(respmsgs), ErrBadResult)
|
||||
}
|
||||
for i := 0; i < len(respmsgs); i++ {
|
||||
op.resp <- &respmsgs[i]
|
||||
}
|
||||
|
@ -118,6 +118,8 @@ This service introduces a CLI flag namespace `statediff`
|
||||
|
||||
`--statediff.db.clientname` is the client name to use in the Postgres database
|
||||
|
||||
`--statediff.db.upsert` whether or not the service, when operating in a direct database writing mode, should overwrite any existing conflicting data
|
||||
|
||||
`--statediff.file.path` full path (including filename) to write statediff data out to when operating in file mode
|
||||
|
||||
`--statediff.file.wapath` full path (including filename) to write statediff watched addresses out to when operating in file mode
|
||||
|
@ -141,16 +141,73 @@ func (api *PublicStateDiffAPI) StreamCodeAndCodeHash(ctx context.Context, blockN
|
||||
}
|
||||
|
||||
// WriteStateDiffAt writes a state diff object directly to DB at the specific blockheight
|
||||
func (api *PublicStateDiffAPI) WriteStateDiffAt(ctx context.Context, blockNumber uint64, params Params) error {
|
||||
func (api *PublicStateDiffAPI) WriteStateDiffAt(ctx context.Context, blockNumber uint64, params Params) JobID {
|
||||
var err error
|
||||
start, logger := countApiRequestBegin("writeStateDiffAt", blockNumber)
|
||||
defer countApiRequestEnd(start, logger, err)
|
||||
|
||||
return api.sds.WriteStateDiffAt(blockNumber, params)
|
||||
}
|
||||
|
||||
// WriteStateDiffFor writes a state diff object directly to DB for the specific block hash
|
||||
func (api *PublicStateDiffAPI) WriteStateDiffFor(ctx context.Context, blockHash common.Hash, params Params) error {
|
||||
return api.sds.WriteStateDiffFor(blockHash, params)
|
||||
var err error
|
||||
start, logger := countApiRequestBegin("writeStateDiffFor", blockHash.Hex())
|
||||
defer countApiRequestEnd(start, logger, err)
|
||||
|
||||
err = api.sds.WriteStateDiffFor(blockHash, params)
|
||||
return err
|
||||
}
|
||||
|
||||
// WatchAddress changes the list of watched addresses to which the direct indexing is restricted according to given operation
|
||||
func (api *PublicStateDiffAPI) WatchAddress(operation types.OperationType, args []types.WatchAddressArg) error {
|
||||
return api.sds.WatchAddress(operation, args)
|
||||
}
|
||||
|
||||
// StreamWrites sets up a subscription that streams the status of completed calls to WriteStateDiff*
|
||||
func (api *PublicStateDiffAPI) StreamWrites(ctx context.Context) (*rpc.Subscription, error) {
|
||||
// ensure that the RPC connection supports subscriptions
|
||||
notifier, supported := rpc.NotifierFromContext(ctx)
|
||||
if !supported {
|
||||
return nil, rpc.ErrNotificationsUnsupported
|
||||
}
|
||||
|
||||
// create subscription and start waiting for events
|
||||
rpcSub := notifier.CreateSubscription()
|
||||
|
||||
go func() {
|
||||
// subscribe to events from the statediff service
|
||||
statusChan := make(chan JobStatus, chainEventChanSize)
|
||||
quitChan := make(chan bool, 1)
|
||||
api.sds.SubscribeWriteStatus(rpcSub.ID, statusChan, quitChan)
|
||||
|
||||
var err error
|
||||
defer func() {
|
||||
if err != nil {
|
||||
if err = api.sds.UnsubscribeWriteStatus(rpcSub.ID); err != nil {
|
||||
log.Error("Failed to unsubscribe from job status stream: " + err.Error())
|
||||
}
|
||||
}
|
||||
}()
|
||||
// loop and await payloads and relay them to the subscriber with the notifier
|
||||
for {
|
||||
select {
|
||||
case status := <-statusChan:
|
||||
if err = notifier.Notify(rpcSub.ID, status); err != nil {
|
||||
log.Error("Failed to send job status; error: " + err.Error())
|
||||
return
|
||||
}
|
||||
case err = <-rpcSub.Err():
|
||||
if err != nil {
|
||||
log.Error("State diff service rpcSub error: " + err.Error())
|
||||
return
|
||||
}
|
||||
case <-quitChan:
|
||||
// don't need to unsubscribe, service does so before sending the quit signal
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return rpcSub, nil
|
||||
}
|
||||
|
@ -727,7 +727,7 @@ func (sdb *StateDiffBuilder) buildStorageNodesIncremental(oldSR common.Hash, new
|
||||
if bytes.Equal(newSR.Bytes(), oldSR.Bytes()) {
|
||||
return nil
|
||||
}
|
||||
log.Debug("Storage Roots for Incremental Diff", "old", oldSR.Hex(), "new", newSR.Hex())
|
||||
log.Trace("Storage Roots for Incremental Diff", "old", oldSR.Hex(), "new", newSR.Hex())
|
||||
oldTrie, err := sdb.StateCache.OpenTrie(oldSR)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -30,8 +30,6 @@ import (
|
||||
type Config struct {
|
||||
// The configuration used for the stateDiff Indexer
|
||||
IndexerConfig interfaces.Config
|
||||
// The filepath to write knownGaps insert statements if we can't connect to the DB.
|
||||
KnownGapsFilePath string
|
||||
// A unique ID used for this service
|
||||
ID string
|
||||
// Name for the client this service is running
|
||||
|
@ -65,7 +65,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, n
|
||||
default:
|
||||
return nil, nil, fmt.Errorf("unrecognized Postgres driver type: %s", pgc.Driver)
|
||||
}
|
||||
db := postgres.NewPostgresDB(driver)
|
||||
db := postgres.NewPostgresDB(driver, pgc.Upsert)
|
||||
ind, err := sql.NewStateDiffIndexer(ctx, chainConfig, db)
|
||||
return db, ind, err
|
||||
case shared.DUMP:
|
||||
|
@ -18,6 +18,7 @@ package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
||||
@ -42,6 +43,8 @@ type BatchTx struct {
|
||||
iplds chan models.IPLDModel
|
||||
ipldCache models.IPLDBatch
|
||||
removedCacheFlag *uint32
|
||||
// Tracks expected cache size and ensures cache is caught up before flush
|
||||
cacheWg sync.WaitGroup
|
||||
|
||||
submit func(blockTx *BatchTx, err error) error
|
||||
}
|
||||
@ -52,10 +55,21 @@ func (tx *BatchTx) Submit(err error) error {
|
||||
}
|
||||
|
||||
func (tx *BatchTx) flush() error {
|
||||
tx.cacheWg.Wait()
|
||||
_, err := tx.dbtx.Exec(tx.ctx, tx.stm, pq.Array(tx.ipldCache.BlockNumbers), pq.Array(tx.ipldCache.Keys),
|
||||
pq.Array(tx.ipldCache.Values))
|
||||
if err != nil {
|
||||
return err
|
||||
log.Debug(insertError{"public.blocks", err, tx.stm,
|
||||
struct {
|
||||
blockNumbers []string
|
||||
keys []string
|
||||
values [][]byte
|
||||
}{
|
||||
tx.ipldCache.BlockNumbers,
|
||||
tx.ipldCache.Keys,
|
||||
tx.ipldCache.Values,
|
||||
}}.Error())
|
||||
return insertError{"public.blocks", err, tx.stm, "too many arguments; use debug mode for full list"}
|
||||
}
|
||||
tx.ipldCache = models.IPLDBatch{}
|
||||
return nil
|
||||
@ -69,6 +83,7 @@ func (tx *BatchTx) cache() {
|
||||
tx.ipldCache.BlockNumbers = append(tx.ipldCache.BlockNumbers, i.BlockNumber)
|
||||
tx.ipldCache.Keys = append(tx.ipldCache.Keys, i.Key)
|
||||
tx.ipldCache.Values = append(tx.ipldCache.Values, i.Data)
|
||||
tx.cacheWg.Done()
|
||||
case <-tx.quit:
|
||||
tx.ipldCache = models.IPLDBatch{}
|
||||
return
|
||||
@ -77,6 +92,7 @@ func (tx *BatchTx) cache() {
|
||||
}
|
||||
|
||||
func (tx *BatchTx) cacheDirect(key string, value []byte) {
|
||||
tx.cacheWg.Add(1)
|
||||
tx.iplds <- models.IPLDModel{
|
||||
BlockNumber: tx.BlockNumber,
|
||||
Key: key,
|
||||
@ -85,6 +101,7 @@ func (tx *BatchTx) cacheDirect(key string, value []byte) {
|
||||
}
|
||||
|
||||
func (tx *BatchTx) cacheIPLD(i node.Node) {
|
||||
tx.cacheWg.Add(1)
|
||||
tx.iplds <- models.IPLDModel{
|
||||
BlockNumber: tx.BlockNumber,
|
||||
Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(),
|
||||
@ -98,6 +115,7 @@ func (tx *BatchTx) cacheRaw(codec, mh uint64, raw []byte) (string, string, error
|
||||
return "", "", err
|
||||
}
|
||||
prefixedKey := blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(c.Hash()).String()
|
||||
tx.cacheWg.Add(1)
|
||||
tx.iplds <- models.IPLDModel{
|
||||
BlockNumber: tx.BlockNumber,
|
||||
Key: prefixedKey,
|
||||
@ -109,6 +127,7 @@ func (tx *BatchTx) cacheRaw(codec, mh uint64, raw []byte) (string, string, error
|
||||
func (tx *BatchTx) cacheRemoved(key string, value []byte) {
|
||||
if atomic.LoadUint32(tx.removedCacheFlag) == 0 {
|
||||
atomic.StoreUint32(tx.removedCacheFlag, 1)
|
||||
tx.cacheWg.Add(1)
|
||||
tx.iplds <- models.IPLDModel{
|
||||
BlockNumber: tx.BlockNumber,
|
||||
Key: key,
|
||||
|
@ -122,11 +122,8 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip
|
||||
}
|
||||
t = time.Now()
|
||||
|
||||
// Begin new db tx for everything
|
||||
tx, err := sdi.dbWriter.db.Begin(sdi.ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Begin new DB tx for everything
|
||||
tx := NewDelayedTx(sdi.dbWriter.db)
|
||||
defer func() {
|
||||
if p := recover(); p != nil {
|
||||
rollback(sdi.ctx, tx)
|
||||
@ -589,11 +586,8 @@ func (sdi *StateDiffIndexer) LoadWatchedAddresses() ([]common.Address, error) {
|
||||
}
|
||||
|
||||
// InsertWatchedAddresses inserts the given addresses in the database
|
||||
func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error {
|
||||
tx, err := sdi.dbWriter.db.Begin(sdi.ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) (err error) {
|
||||
tx := NewDelayedTx(sdi.dbWriter.db)
|
||||
defer func() {
|
||||
if p := recover(); p != nil {
|
||||
rollback(sdi.ctx, tx)
|
||||
@ -617,11 +611,8 @@ func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressA
|
||||
}
|
||||
|
||||
// RemoveWatchedAddresses removes the given watched addresses from the database
|
||||
func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressArg) error {
|
||||
tx, err := sdi.dbWriter.db.Begin(sdi.ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressArg) (err error) {
|
||||
tx := NewDelayedTx(sdi.dbWriter.db)
|
||||
defer func() {
|
||||
if p := recover(); p != nil {
|
||||
rollback(sdi.ctx, tx)
|
||||
@ -644,11 +635,8 @@ func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressA
|
||||
}
|
||||
|
||||
// SetWatchedAddresses clears and inserts the given addresses in the database
|
||||
func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error {
|
||||
tx, err := sdi.dbWriter.db.Begin(sdi.ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) (err error) {
|
||||
tx := NewDelayedTx(sdi.dbWriter.db)
|
||||
defer func() {
|
||||
if p := recover(); p != nil {
|
||||
rollback(sdi.ctx, tx)
|
||||
|
55
statediff/indexer/database/sql/lazy_tx.go
Normal file
55
statediff/indexer/database/sql/lazy_tx.go
Normal file
@ -0,0 +1,55 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
type DelayedTx struct {
|
||||
cache []cachedStmt
|
||||
db Database
|
||||
}
|
||||
type cachedStmt struct {
|
||||
sql string
|
||||
args []interface{}
|
||||
}
|
||||
|
||||
func NewDelayedTx(db Database) *DelayedTx {
|
||||
return &DelayedTx{db: db}
|
||||
}
|
||||
|
||||
func (tx *DelayedTx) QueryRow(ctx context.Context, sql string, args ...interface{}) ScannableRow {
|
||||
return tx.db.QueryRow(ctx, sql, args...)
|
||||
}
|
||||
|
||||
func (tx *DelayedTx) Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) {
|
||||
tx.cache = append(tx.cache, cachedStmt{sql, args})
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (tx *DelayedTx) Commit(ctx context.Context) error {
|
||||
base, err := tx.db.Begin(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if p := recover(); p != nil {
|
||||
rollback(ctx, base)
|
||||
panic(p)
|
||||
} else if err != nil {
|
||||
rollback(ctx, base)
|
||||
}
|
||||
}()
|
||||
for _, stmt := range tx.cache {
|
||||
_, err := base.Exec(ctx, stmt.sql, stmt.args...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
tx.cache = nil
|
||||
return base.Commit(ctx)
|
||||
}
|
||||
|
||||
func (tx *DelayedTx) Rollback(ctx context.Context) error {
|
||||
tx.cache = nil
|
||||
return nil
|
||||
}
|
@ -70,6 +70,7 @@ type Config struct {
|
||||
MaxConnIdleTime time.Duration
|
||||
MaxConnLifetime time.Duration
|
||||
ConnTimeout time.Duration
|
||||
LogStatements bool
|
||||
|
||||
// node info params
|
||||
ID string
|
||||
@ -77,6 +78,9 @@ type Config struct {
|
||||
|
||||
// driver type
|
||||
Driver DriverType
|
||||
|
||||
// toggle on/off upserts
|
||||
Upsert bool
|
||||
}
|
||||
|
||||
// Type satisfies interfaces.Config
|
||||
|
@ -26,21 +26,27 @@ const (
|
||||
)
|
||||
|
||||
// NewPostgresDB returns a postgres.DB using the provided driver
|
||||
func NewPostgresDB(driver sql.Driver) *DB {
|
||||
return &DB{driver}
|
||||
func NewPostgresDB(driver sql.Driver, upsert bool) *DB {
|
||||
return &DB{upsert, driver}
|
||||
}
|
||||
|
||||
// DB implements sql.Database using a configured driver and Postgres statement syntax
|
||||
type DB struct {
|
||||
upsert bool
|
||||
sql.Driver
|
||||
}
|
||||
|
||||
// InsertHeaderStm satisfies the sql.Statements interface
|
||||
// Stm == Statement
|
||||
func (db *DB) InsertHeaderStm() string {
|
||||
return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase)
|
||||
if db.upsert {
|
||||
return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
|
||||
ON CONFLICT (block_hash, block_number) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)`
|
||||
}
|
||||
return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
|
||||
ON CONFLICT (block_hash, block_number) DO NOTHING`
|
||||
}
|
||||
|
||||
// InsertUncleStm satisfies the sql.Statements interface
|
||||
@ -75,8 +81,12 @@ func (db *DB) InsertLogStm() string {
|
||||
|
||||
// InsertStateStm satisfies the sql.Statements interface
|
||||
func (db *DB) InsertStateStm() string {
|
||||
return `INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||
if db.upsert {
|
||||
return `INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||
ON CONFLICT (header_id, state_path, block_number) DO UPDATE SET (block_number, state_leaf_key, cid, node_type, diff, mh_key) = ($1, $3, $4, $6, $7, $8)`
|
||||
}
|
||||
return `INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||
ON CONFLICT (header_id, state_path, block_number) DO NOTHING`
|
||||
}
|
||||
|
||||
// InsertAccountStm satisfies the sql.Statements interface
|
||||
@ -87,8 +97,12 @@ func (db *DB) InsertAccountStm() string {
|
||||
|
||||
// InsertStorageStm satisfies the sql.Statements interface
|
||||
func (db *DB) InsertStorageStm() string {
|
||||
return `INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
||||
if db.upsert {
|
||||
return `INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
||||
ON CONFLICT (header_id, state_path, storage_path, block_number) DO UPDATE SET (block_number, storage_leaf_key, cid, node_type, diff, mh_key) = ($1, $4, $5, $7, $8, $9)`
|
||||
}
|
||||
return `INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
||||
ON CONFLICT (header_id, state_path, storage_path, block_number) DO NOTHING`
|
||||
}
|
||||
|
||||
// InsertIPLDStm satisfies the sql.Statements interface
|
||||
|
61
statediff/indexer/database/sql/postgres/log_adapter.go
Normal file
61
statediff/indexer/database/sql/postgres/log_adapter.go
Normal file
@ -0,0 +1,61 @@
|
||||
// Copyright © 2023 Cerc
|
||||
|
||||
// This program is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Affero General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Affero General Public License for more details.
|
||||
|
||||
// You should have received a copy of the GNU Affero General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/jackc/pgx/v4"
|
||||
)
|
||||
|
||||
type LogAdapter struct {
|
||||
l log.Logger
|
||||
}
|
||||
|
||||
func NewLogAdapter(l log.Logger) *LogAdapter {
|
||||
return &LogAdapter{l: l}
|
||||
}
|
||||
|
||||
func (l *LogAdapter) Log(ctx context.Context, level pgx.LogLevel, msg string, data map[string]interface{}) {
|
||||
var logger log.Logger
|
||||
if data != nil {
|
||||
var args = make([]interface{}, 0)
|
||||
for key, value := range data {
|
||||
if value != nil {
|
||||
args = append(args, key, value)
|
||||
}
|
||||
}
|
||||
logger = l.l.New(args...)
|
||||
} else {
|
||||
logger = l.l
|
||||
}
|
||||
|
||||
switch level {
|
||||
case pgx.LogLevelTrace:
|
||||
logger.Trace(msg)
|
||||
case pgx.LogLevelDebug:
|
||||
logger.Debug(msg)
|
||||
case pgx.LogLevelInfo:
|
||||
logger.Info(msg)
|
||||
case pgx.LogLevelWarn:
|
||||
logger.Warn(msg)
|
||||
case pgx.LogLevelError:
|
||||
logger.Error(msg)
|
||||
default:
|
||||
logger.New("INVALID_PGX_LOG_LEVEL", level).Error(msg)
|
||||
}
|
||||
}
|
@ -20,6 +20,8 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
|
||||
"github.com/georgysavva/scany/pgxscan"
|
||||
"github.com/jackc/pgconn"
|
||||
"github.com/jackc/pgx/v4"
|
||||
@ -85,6 +87,11 @@ func MakeConfig(config Config) (*pgxpool.Config, error) {
|
||||
if config.MaxConnIdleTime != 0 {
|
||||
conf.MaxConnIdleTime = config.MaxConnIdleTime
|
||||
}
|
||||
|
||||
if config.LogStatements {
|
||||
conf.ConnConfig.Logger = NewLogAdapter(log.New())
|
||||
}
|
||||
|
||||
return conf, nil
|
||||
}
|
||||
|
||||
|
@ -31,7 +31,7 @@ func SetupSQLXDB() (sql.Database, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewPostgresDB(driver), nil
|
||||
return NewPostgresDB(driver, false), nil
|
||||
}
|
||||
|
||||
// SetupPGXDB is used to setup a pgx db for tests
|
||||
@ -40,5 +40,5 @@ func SetupPGXDB() (sql.Database, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewPostgresDB(driver), nil
|
||||
return NewPostgresDB(driver, false), nil
|
||||
}
|
||||
|
@ -55,7 +55,7 @@ func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error {
|
||||
header.Reward, header.StateRoot, header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom,
|
||||
header.Timestamp, header.MhKey, 1, header.Coinbase)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error upserting header_cids entry: %v", err)
|
||||
return insertError{"eth.header_cids", err, w.db.InsertHeaderStm(), header}
|
||||
}
|
||||
indexerMetrics.blocks.Inc(1)
|
||||
return nil
|
||||
@ -69,7 +69,7 @@ func (w *Writer) upsertUncleCID(tx Tx, uncle models.UncleModel) error {
|
||||
_, err := tx.Exec(w.db.Context(), w.db.InsertUncleStm(),
|
||||
uncle.BlockNumber, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error upserting uncle_cids entry: %v", err)
|
||||
return insertError{"eth.uncle_cids", err, w.db.InsertUncleStm(), uncle}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -83,7 +83,7 @@ func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error {
|
||||
transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src,
|
||||
transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error upserting transaction_cids entry: %v", err)
|
||||
return insertError{"eth.transaction_cids", err, w.db.InsertTxStm(), transaction}
|
||||
}
|
||||
indexerMetrics.transactions.Inc(1)
|
||||
return nil
|
||||
@ -98,7 +98,7 @@ func (w *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessL
|
||||
accessListElement.BlockNumber, accessListElement.TxID, accessListElement.Index, accessListElement.Address,
|
||||
accessListElement.StorageKeys)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error upserting access_list_element entry: %v", err)
|
||||
return insertError{"eth.access_list_elements", err, w.db.InsertAccessListElementStm(), accessListElement}
|
||||
}
|
||||
indexerMetrics.accessListEntries.Inc(1)
|
||||
return nil
|
||||
@ -113,7 +113,7 @@ func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error {
|
||||
rct.BlockNumber, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState,
|
||||
rct.PostStatus, rct.LogRoot)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error upserting receipt_cids entry: %w", err)
|
||||
return insertError{"eth.receipt_cids", err, w.db.InsertRctStm(), *rct}
|
||||
}
|
||||
indexerMetrics.receipts.Inc(1)
|
||||
return nil
|
||||
@ -129,7 +129,7 @@ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error {
|
||||
log.BlockNumber, log.HeaderID, log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1,
|
||||
log.Topic2, log.Topic3, log.Data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error upserting logs entry: %w", err)
|
||||
return insertError{"eth.log_cids", err, w.db.InsertLogStm(), *log}
|
||||
}
|
||||
indexerMetrics.logs.Inc(1)
|
||||
}
|
||||
@ -149,7 +149,7 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error {
|
||||
stateNode.BlockNumber, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true,
|
||||
stateNode.MhKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error upserting state_cids entry: %v", err)
|
||||
return insertError{"eth.state_cids", err, w.db.InsertStateStm(), stateNode}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -163,7 +163,7 @@ func (w *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel
|
||||
stateAccount.BlockNumber, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance,
|
||||
stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error upserting state_accounts entry: %v", err)
|
||||
return insertError{"eth.state_accounts", err, w.db.InsertAccountStm(), stateAccount}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -181,7 +181,21 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err
|
||||
storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path,
|
||||
storageCID.NodeType, true, storageCID.MhKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error upserting storage_cids entry: %v", err)
|
||||
return insertError{"eth.storage_cids", err, w.db.InsertStorageStm(), storageCID}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type insertError struct {
|
||||
table string
|
||||
err error
|
||||
stmt string
|
||||
arguments interface{}
|
||||
}
|
||||
|
||||
var _ error = insertError{}
|
||||
|
||||
func (dbe insertError) Error() string {
|
||||
return fmt.Sprintf("error inserting %s entry: %v\r\nstatement: %s\r\narguments: %+v",
|
||||
dbe.table, dbe.err, dbe.stmt, dbe.arguments)
|
||||
}
|
||||
|
@ -26,6 +26,8 @@ const (
|
||||
namespace = "statediff"
|
||||
)
|
||||
|
||||
var defaultStatediffMetrics = RegisterStatediffMetrics(metrics.DefaultRegistry)
|
||||
|
||||
// Build a fully qualified metric name
|
||||
func metricName(subsystem, name string) string {
|
||||
if name == "" {
|
||||
@ -58,6 +60,14 @@ type statediffMetricsHandles struct {
|
||||
knownGapErrorStart metrics.Gauge
|
||||
// A known gaps end block which had an error being written to the DB
|
||||
knownGapErrorEnd metrics.Gauge
|
||||
|
||||
apiRequests metrics.Counter
|
||||
apiRequestsUnderway metrics.Counter
|
||||
|
||||
failed metrics.Counter
|
||||
succeeded metrics.Counter
|
||||
underway metrics.Counter
|
||||
totalProcessingTime metrics.Gauge
|
||||
}
|
||||
|
||||
func RegisterStatediffMetrics(reg metrics.Registry) statediffMetricsHandles {
|
||||
@ -71,6 +81,12 @@ func RegisterStatediffMetrics(reg metrics.Registry) statediffMetricsHandles {
|
||||
knownGapEnd: metrics.NewGauge(),
|
||||
knownGapErrorStart: metrics.NewGauge(),
|
||||
knownGapErrorEnd: metrics.NewGauge(),
|
||||
apiRequests: metrics.NewCounter(),
|
||||
apiRequestsUnderway: metrics.NewCounter(),
|
||||
failed: metrics.NewCounter(),
|
||||
succeeded: metrics.NewCounter(),
|
||||
underway: metrics.NewCounter(),
|
||||
totalProcessingTime: metrics.NewGauge(),
|
||||
}
|
||||
subsys := "service"
|
||||
reg.Register(metricName(subsys, "last_sync_height"), ctx.lastSyncHeight)
|
||||
@ -82,5 +98,11 @@ func RegisterStatediffMetrics(reg metrics.Registry) statediffMetricsHandles {
|
||||
reg.Register(metricName(subsys, "known_gaps_end"), ctx.knownGapEnd)
|
||||
reg.Register(metricName(subsys, "known_gaps_error_start"), ctx.knownGapErrorStart)
|
||||
reg.Register(metricName(subsys, "known_gaps_error_end"), ctx.knownGapErrorEnd)
|
||||
reg.Register(metricName(subsys, "api_requests"), ctx.apiRequests)
|
||||
reg.Register(metricName(subsys, "api_requests_underway"), ctx.apiRequestsUnderway)
|
||||
reg.Register(metricName(subsys, "failed"), ctx.failed)
|
||||
reg.Register(metricName(subsys, "succeeded"), ctx.succeeded)
|
||||
reg.Register(metricName(subsys, "underway"), ctx.underway)
|
||||
reg.Register(metricName(subsys, "total_processing_time"), ctx.totalProcessingTime)
|
||||
return ctx
|
||||
}
|
||||
|
89
statediff/metrics_helpers.go
Normal file
89
statediff/metrics_helpers.go
Normal file
@ -0,0 +1,89 @@
|
||||
// Copyright 2019 The go-ethereum Authors
|
||||
// This file is part of the go-ethereum library.
|
||||
//
|
||||
// The go-ethereum library is free software: you can redistribute it and/or modify
|
||||
// it under the terms of the GNU Lesser General Public License as published by
|
||||
// the Free Software Foundation, either version 3 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// The go-ethereum library is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU Lesser General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU Lesser General Public License
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package statediff
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
)
|
||||
|
||||
func countStateDiffBegin(block *types.Block) (time.Time, log.Logger) {
|
||||
start := time.Now()
|
||||
logger := log.New("hash", block.Hash().Hex(), "number", block.NumberU64())
|
||||
|
||||
defaultStatediffMetrics.underway.Inc(1)
|
||||
logger.Debug(fmt.Sprintf("writeStateDiff BEGIN [underway=%d, succeeded=%d, failed=%d, total_time=%dms]",
|
||||
defaultStatediffMetrics.underway.Count(),
|
||||
defaultStatediffMetrics.succeeded.Count(),
|
||||
defaultStatediffMetrics.failed.Count(),
|
||||
defaultStatediffMetrics.totalProcessingTime.Value(),
|
||||
))
|
||||
|
||||
return start, logger
|
||||
}
|
||||
|
||||
func countStateDiffEnd(start time.Time, logger log.Logger, err error) time.Duration {
|
||||
duration := time.Since(start)
|
||||
defaultStatediffMetrics.underway.Dec(1)
|
||||
if nil == err {
|
||||
defaultStatediffMetrics.succeeded.Inc(1)
|
||||
} else {
|
||||
defaultStatediffMetrics.failed.Inc(1)
|
||||
}
|
||||
defaultStatediffMetrics.totalProcessingTime.Inc(duration.Milliseconds())
|
||||
|
||||
logger.Debug(fmt.Sprintf("writeStateDiff END (duration=%dms, err=%t) [underway=%d, succeeded=%d, failed=%d, total_time=%dms]",
|
||||
duration.Milliseconds(), nil != err,
|
||||
defaultStatediffMetrics.underway.Count(),
|
||||
defaultStatediffMetrics.succeeded.Count(),
|
||||
defaultStatediffMetrics.failed.Count(),
|
||||
defaultStatediffMetrics.totalProcessingTime.Value(),
|
||||
))
|
||||
|
||||
return duration
|
||||
}
|
||||
|
||||
func countApiRequestBegin(methodName string, blockHashOrNumber interface{}) (time.Time, log.Logger) {
|
||||
start := time.Now()
|
||||
logger := log.New(methodName, blockHashOrNumber)
|
||||
|
||||
defaultStatediffMetrics.apiRequests.Inc(1)
|
||||
defaultStatediffMetrics.apiRequestsUnderway.Inc(1)
|
||||
|
||||
logger.Debug(fmt.Sprintf("statediff API BEGIN [underway=%d, requests=%d])",
|
||||
defaultStatediffMetrics.apiRequestsUnderway.Count(),
|
||||
defaultStatediffMetrics.apiRequests.Count(),
|
||||
))
|
||||
|
||||
return start, logger
|
||||
}
|
||||
|
||||
func countApiRequestEnd(start time.Time, logger log.Logger, err error) time.Duration {
|
||||
duration := time.Since(start)
|
||||
defaultStatediffMetrics.apiRequestsUnderway.Dec(1)
|
||||
|
||||
logger.Debug(fmt.Sprintf("statediff API END (duration=%dms, err=%t) [underway=%d, requests=%d]",
|
||||
duration.Milliseconds(), nil != err,
|
||||
defaultStatediffMetrics.apiRequestsUnderway.Count(),
|
||||
defaultStatediffMetrics.apiRequests.Count(),
|
||||
))
|
||||
|
||||
return duration
|
||||
}
|
@ -36,16 +36,13 @@ import (
|
||||
"github.com/ethereum/go-ethereum/event"
|
||||
"github.com/ethereum/go-ethereum/internal/ethapi"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/metrics"
|
||||
"github.com/ethereum/go-ethereum/node"
|
||||
"github.com/ethereum/go-ethereum/p2p"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
ind "github.com/ethereum/go-ethereum/statediff/indexer"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
|
||||
nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node"
|
||||
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
|
||||
types2 "github.com/ethereum/go-ethereum/statediff/types"
|
||||
"github.com/ethereum/go-ethereum/trie"
|
||||
"github.com/thoas/go-funk"
|
||||
@ -71,8 +68,6 @@ var writeLoopParams = ParamsWithMutex{
|
||||
},
|
||||
}
|
||||
|
||||
var statediffMetrics = RegisterStatediffMetrics(metrics.DefaultRegistry)
|
||||
|
||||
type blockChain interface {
|
||||
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
|
||||
CurrentBlock() *types.Block
|
||||
@ -92,7 +87,7 @@ type IService interface {
|
||||
APIs() []rpc.API
|
||||
// Loop is the main event loop for processing state diffs
|
||||
Loop(chainEventCh chan core.ChainEvent)
|
||||
// Subscribe method to subscribe to receive state diff processing output`
|
||||
// Subscribe method to subscribe to receive state diff processing output
|
||||
Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool, params Params)
|
||||
// Unsubscribe method to unsubscribe from state diff processing
|
||||
Unsubscribe(id rpc.ID) error
|
||||
@ -105,13 +100,18 @@ type IService interface {
|
||||
// StreamCodeAndCodeHash method to stream out all code and codehash pairs
|
||||
StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- types2.CodeAndCodeHash, quitChan chan<- bool)
|
||||
// WriteStateDiffAt method to write state diff object directly to DB
|
||||
WriteStateDiffAt(blockNumber uint64, params Params) error
|
||||
WriteStateDiffAt(blockNumber uint64, params Params) JobID
|
||||
// WriteStateDiffFor method to write state diff object directly to DB
|
||||
WriteStateDiffFor(blockHash common.Hash, params Params) error
|
||||
// WriteLoop event loop for progressively processing and writing diffs directly to DB
|
||||
WriteLoop(chainEventCh chan core.ChainEvent)
|
||||
// Method to change the addresses being watched in write loop params
|
||||
WatchAddress(operation types2.OperationType, args []types2.WatchAddressArg) error
|
||||
|
||||
// SubscribeWriteStatus method to subscribe to receive state diff processing output
|
||||
SubscribeWriteStatus(id rpc.ID, sub chan<- JobStatus, quitChan chan<- bool)
|
||||
// UnsubscribeWriteStatus method to unsubscribe from state diff processing
|
||||
UnsubscribeWriteStatus(id rpc.ID) error
|
||||
}
|
||||
|
||||
// Service is the underlying struct for the state diffing service
|
||||
@ -134,9 +134,7 @@ type Service struct {
|
||||
BackendAPI ethapi.Backend
|
||||
// Should the statediff service wait for geth to sync to head?
|
||||
WaitForSync bool
|
||||
// Used to signal if we should check for KnownGaps
|
||||
KnownGaps KnownGapsState
|
||||
// Whether or not we have any subscribers; only if we do, do we processes state diffs
|
||||
// Whether we have any subscribers
|
||||
subscribers int32
|
||||
// Interface for publishing statediffs as PG-IPLD objects
|
||||
indexer interfaces.StateDiffIndexer
|
||||
@ -146,6 +144,27 @@ type Service struct {
|
||||
numWorkers uint
|
||||
// Number of retry for aborted transactions due to deadlock.
|
||||
maxRetry uint
|
||||
// Write job status subscriptions
|
||||
jobStatusSubs map[rpc.ID]statusSubscription
|
||||
// Job ID ticker
|
||||
lastJobID uint64
|
||||
// In flight jobs (for WriteStateDiffAt)
|
||||
currentJobs map[uint64]JobID
|
||||
currentJobsMutex sync.Mutex
|
||||
}
|
||||
|
||||
// IDs used for tracking in-progress jobs (0 for invalid)
|
||||
type JobID uint64
|
||||
|
||||
// JobStatus represents the status of a completed job
|
||||
type JobStatus struct {
|
||||
id JobID
|
||||
err error
|
||||
}
|
||||
|
||||
type statusSubscription struct {
|
||||
statusChan chan<- JobStatus
|
||||
quitChan chan<- bool
|
||||
}
|
||||
|
||||
// BlockCache caches the last block for safe access from different service loops
|
||||
@ -167,7 +186,6 @@ func NewBlockCache(max uint) BlockCache {
|
||||
func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params Config, backend ethapi.Backend) error {
|
||||
blockChain := ethServ.BlockChain()
|
||||
var indexer interfaces.StateDiffIndexer
|
||||
var db sql.Database
|
||||
var err error
|
||||
quitCh := make(chan bool)
|
||||
indexerConfigAvailable := params.IndexerConfig != nil
|
||||
@ -180,7 +198,7 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
|
||||
ClientName: params.ClientName,
|
||||
}
|
||||
var err error
|
||||
db, indexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.IndexerConfig)
|
||||
_, indexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.IndexerConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -191,25 +209,7 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
|
||||
if workers == 0 {
|
||||
workers = 1
|
||||
}
|
||||
// If we ever have multiple processingKeys we can update them here
|
||||
// along with the expectedDifference
|
||||
knownGaps := &KnownGapsState{
|
||||
processingKey: 0,
|
||||
expectedDifference: big.NewInt(1),
|
||||
errorState: false,
|
||||
writeFilePath: params.KnownGapsFilePath,
|
||||
db: db,
|
||||
statediffMetrics: statediffMetrics,
|
||||
sqlFileWaitingForWrite: false,
|
||||
}
|
||||
if indexerConfigAvailable {
|
||||
if params.IndexerConfig.Type() == shared.POSTGRES {
|
||||
knownGaps.checkForGaps = true
|
||||
} else {
|
||||
log.Info("We are not going to check for gaps on start up since we are not connected to Postgres!")
|
||||
knownGaps.checkForGaps = false
|
||||
}
|
||||
}
|
||||
|
||||
sds := &Service{
|
||||
Mutex: sync.Mutex{},
|
||||
BlockChain: blockChain,
|
||||
@ -220,7 +220,6 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
|
||||
BlockCache: NewBlockCache(workers),
|
||||
BackendAPI: backend,
|
||||
WaitForSync: params.WaitForSync,
|
||||
KnownGaps: *knownGaps,
|
||||
indexer: indexer,
|
||||
enableWriteLoop: params.EnableWriteLoop,
|
||||
numWorkers: workers,
|
||||
@ -294,8 +293,13 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
|
||||
for {
|
||||
select {
|
||||
case chainEvent := <-chainEventCh:
|
||||
statediffMetrics.lastEventHeight.Update(int64(chainEvent.Block.Number().Uint64()))
|
||||
statediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh)))
|
||||
lastHeight := defaultStatediffMetrics.lastEventHeight.Value()
|
||||
nextHeight := int64(chainEvent.Block.Number().Uint64())
|
||||
if nextHeight-lastHeight != 1 {
|
||||
log.Warn("Statediffing service received block out-of-order", "next height", nextHeight, "last height", lastHeight)
|
||||
}
|
||||
defaultStatediffMetrics.lastEventHeight.Update(nextHeight)
|
||||
defaultStatediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh)))
|
||||
chainEventFwd <- chainEvent
|
||||
case err := <-errCh:
|
||||
log.Error("Error from chain event subscription", "error", err)
|
||||
@ -333,7 +337,7 @@ func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint)
|
||||
genesisBlockNumber, "error", err.Error(), "worker", workerId)
|
||||
return
|
||||
}
|
||||
statediffMetrics.lastStatediffHeight.Update(genesisBlockNumber)
|
||||
defaultStatediffMetrics.lastStatediffHeight.Update(genesisBlockNumber)
|
||||
}
|
||||
|
||||
func (sds *Service) writeLoopWorker(params workerParams) {
|
||||
@ -355,46 +359,21 @@ func (sds *Service) writeLoopWorker(params workerParams) {
|
||||
sds.writeGenesisStateDiff(parentBlock, params.id)
|
||||
}
|
||||
|
||||
// If for any reason we need to check for gaps,
|
||||
// Check and update the gaps table.
|
||||
if sds.KnownGaps.checkForGaps && !sds.KnownGaps.errorState {
|
||||
log.Info("Checking for Gaps at", "current block", currentBlock.Number())
|
||||
go sds.KnownGaps.findAndUpdateGaps(currentBlock.Number(), sds.KnownGaps.expectedDifference, sds.KnownGaps.processingKey)
|
||||
sds.KnownGaps.checkForGaps = false
|
||||
}
|
||||
|
||||
log.Info("Writing state diff", "block height", currentBlock.Number().Uint64(), "worker", params.id)
|
||||
writeLoopParams.RLock()
|
||||
err := sds.writeStateDiffWithRetry(currentBlock, parentBlock.Root(), writeLoopParams.Params)
|
||||
writeLoopParams.RUnlock()
|
||||
if err != nil {
|
||||
log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id)
|
||||
sds.KnownGaps.errorState = true
|
||||
log.Warn("Updating the following block to knownErrorBlocks to be inserted into knownGaps table", "blockNumber", currentBlock.Number())
|
||||
sds.KnownGaps.knownErrorBlocks = append(sds.KnownGaps.knownErrorBlocks, currentBlock.Number())
|
||||
// Write object to startdiff
|
||||
log.Error("statediff.Service.WriteLoop: processing error",
|
||||
"block height", currentBlock.Number().Uint64(),
|
||||
"block hash", currentBlock.Hash().Hex(),
|
||||
"error", err.Error(),
|
||||
"worker", params.id)
|
||||
continue
|
||||
}
|
||||
sds.KnownGaps.errorState = false
|
||||
if sds.KnownGaps.knownErrorBlocks != nil {
|
||||
// We must pass in parameters by VALUE not reference.
|
||||
// If we pass them in my reference, the references can change before the computation is complete!
|
||||
staticKnownErrorBlocks := make([]*big.Int, len(sds.KnownGaps.knownErrorBlocks))
|
||||
copy(staticKnownErrorBlocks, sds.KnownGaps.knownErrorBlocks)
|
||||
sds.KnownGaps.knownErrorBlocks = nil
|
||||
go sds.KnownGaps.captureErrorBlocks(staticKnownErrorBlocks)
|
||||
}
|
||||
|
||||
if sds.KnownGaps.sqlFileWaitingForWrite {
|
||||
log.Info("There are entries in the SQL file for knownGaps that should be written")
|
||||
err := sds.KnownGaps.writeSqlFileStmtToDb()
|
||||
if err != nil {
|
||||
log.Error("Unable to write KnownGap sql file to DB")
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: how to handle with concurrent workers
|
||||
statediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64()))
|
||||
defaultStatediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64()))
|
||||
case <-sds.QuitChan:
|
||||
log.Info("Quitting the statediff writing process", "worker", params.id)
|
||||
return
|
||||
@ -412,7 +391,7 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) {
|
||||
select {
|
||||
//Notify chain event channel of events
|
||||
case chainEvent := <-chainEventCh:
|
||||
statediffMetrics.serviceLoopChannelLen.Update(int64(len(chainEventCh)))
|
||||
defaultStatediffMetrics.serviceLoopChannelLen.Update(int64(len(chainEventCh)))
|
||||
log.Debug("Loop(): chain event received", "event", chainEvent)
|
||||
// if we don't have any subscribers, do not process a statediff
|
||||
if atomic.LoadInt32(&sds.subscribers) == 0 {
|
||||
@ -644,7 +623,7 @@ func (sds *Service) Unsubscribe(id rpc.ID) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// This function will check the status of geth syncing.
|
||||
// GetSyncStatus will check the status of geth syncing.
|
||||
// It will return false if geth has finished syncing.
|
||||
// It will return a true Geth is still syncing.
|
||||
func (sds *Service) GetSyncStatus(pubEthAPI *ethapi.EthereumAPI) (bool, error) {
|
||||
@ -659,7 +638,7 @@ func (sds *Service) GetSyncStatus(pubEthAPI *ethapi.EthereumAPI) (bool, error) {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// This function calls GetSyncStatus to check if we have caught up to head.
|
||||
// WaitingForSync calls GetSyncStatus to check if we have caught up to head.
|
||||
// It will keep looking and checking if we have caught up to head.
|
||||
// It will only complete if we catch up to head, otherwise it will keep looping forever.
|
||||
func (sds *Service) WaitingForSync() error {
|
||||
@ -795,7 +774,27 @@ func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- typ
|
||||
// WriteStateDiffAt writes a state diff at the specific blockheight directly to the database
|
||||
// This operation cannot be performed back past the point of db pruning; it requires an archival node
|
||||
// for historical data
|
||||
func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error {
|
||||
func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) JobID {
|
||||
sds.currentJobsMutex.Lock()
|
||||
defer sds.currentJobsMutex.Unlock()
|
||||
if id, has := sds.currentJobs[blockNumber]; has {
|
||||
return id
|
||||
}
|
||||
id := JobID(atomic.AddUint64(&sds.lastJobID, 1))
|
||||
sds.currentJobs[blockNumber] = id
|
||||
go func() {
|
||||
err := sds.writeStateDiffAt(blockNumber, params)
|
||||
sds.currentJobsMutex.Lock()
|
||||
delete(sds.currentJobs, blockNumber)
|
||||
sds.currentJobsMutex.Unlock()
|
||||
for _, sub := range sds.jobStatusSubs {
|
||||
sub.statusChan <- JobStatus{id, err}
|
||||
}
|
||||
}()
|
||||
return id
|
||||
}
|
||||
|
||||
func (sds *Service) writeStateDiffAt(blockNumber uint64, params Params) error {
|
||||
log.Info("writing state diff at", "block height", blockNumber)
|
||||
|
||||
// use watched addresses from statediffing write loop if not provided
|
||||
@ -844,11 +843,12 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) erro
|
||||
|
||||
// Writes a state diff from the current block, parent state root, and provided params
|
||||
func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error {
|
||||
// log.Info("Writing state diff", "block height", block.Number().Uint64())
|
||||
var totalDifficulty *big.Int
|
||||
var receipts types.Receipts
|
||||
var err error
|
||||
var tx interfaces.Batch
|
||||
start, logger := countStateDiffBegin(block)
|
||||
defer countStateDiffEnd(start, logger, err)
|
||||
if params.IncludeTD {
|
||||
totalDifficulty = sds.BlockChain.GetTd(block.Hash(), block.NumberU64())
|
||||
}
|
||||
@ -859,28 +859,25 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// defer handling of commit/rollback for any return case
|
||||
defer func() {
|
||||
if err := tx.Submit(err); err != nil {
|
||||
log.Error("batch transaction submission failed", "err", err)
|
||||
}
|
||||
}()
|
||||
|
||||
output := func(node types2.StateNode) error {
|
||||
return sds.indexer.PushStateNode(tx, node, block.Hash().String())
|
||||
}
|
||||
codeOutput := func(c types2.CodeAndCodeHash) error {
|
||||
return sds.indexer.PushCodeAndCodeHash(tx, c)
|
||||
}
|
||||
|
||||
err = sds.Builder.WriteStateDiffObject(types2.StateRoots{
|
||||
NewStateRoot: block.Root(),
|
||||
OldStateRoot: parentRoot,
|
||||
}, params, output, codeOutput)
|
||||
// TODO this anti-pattern needs to be sorted out eventually
|
||||
if err := tx.Submit(err); err != nil {
|
||||
return fmt.Errorf("batch transaction submission failed: %s", err.Error())
|
||||
}
|
||||
|
||||
// allow dereferencing of parent, keep current locked as it should be the next parent
|
||||
sds.BlockChain.UnlockTrie(parentRoot)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -891,8 +888,8 @@ func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot commo
|
||||
err = sds.writeStateDiff(block, parentRoot, params)
|
||||
if err != nil && strings.Contains(err.Error(), deadlockDetected) {
|
||||
// Retry only when the deadlock is detected.
|
||||
if i != sds.maxRetry {
|
||||
log.Info("dead lock detected while writing statediff", "err", err, "retry number", i)
|
||||
if i+1 < sds.maxRetry {
|
||||
log.Warn("dead lock detected while writing statediff", "err", err, "retry number", i)
|
||||
}
|
||||
continue
|
||||
}
|
||||
@ -901,7 +898,34 @@ func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot commo
|
||||
return err
|
||||
}
|
||||
|
||||
// Performs one of following operations on the watched addresses in writeLoopParams and the db:
|
||||
// SubscribeWriteStatus is used by the API to subscribe to the job status updates
|
||||
func (sds *Service) SubscribeWriteStatus(id rpc.ID, sub chan<- JobStatus, quitChan chan<- bool) {
|
||||
log.Info("Subscribing to job status updates", "subscription id", id)
|
||||
sds.Lock()
|
||||
if sds.jobStatusSubs == nil {
|
||||
sds.jobStatusSubs = map[rpc.ID]statusSubscription{}
|
||||
}
|
||||
sds.jobStatusSubs[id] = statusSubscription{
|
||||
statusChan: sub,
|
||||
quitChan: quitChan,
|
||||
}
|
||||
sds.Unlock()
|
||||
}
|
||||
|
||||
// UnsubscribeWriteStatus is used to unsubscribe from job status updates
|
||||
func (sds *Service) UnsubscribeWriteStatus(id rpc.ID) error {
|
||||
log.Info("Unsubscribing from job status updates", "subscription id", id)
|
||||
sds.Lock()
|
||||
close(sds.jobStatusSubs[id].quitChan)
|
||||
delete(sds.jobStatusSubs, id)
|
||||
if len(sds.jobStatusSubs) == 0 {
|
||||
sds.jobStatusSubs = nil
|
||||
}
|
||||
sds.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// WatchAddress performs one of following operations on the watched addresses in writeLoopParams and the db:
|
||||
// add | remove | set | clear
|
||||
func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.WatchAddressArg) error {
|
||||
// lock writeLoopParams for a write
|
||||
|
@ -189,9 +189,9 @@ func (sds *MockStateDiffService) newPayload(stateObject []byte, block *types.Blo
|
||||
}
|
||||
|
||||
// WriteStateDiffAt mock method
|
||||
func (sds *MockStateDiffService) WriteStateDiffAt(blockNumber uint64, params statediff.Params) error {
|
||||
func (sds *MockStateDiffService) WriteStateDiffAt(blockNumber uint64, params statediff.Params) statediff.JobID {
|
||||
// TODO: something useful here
|
||||
return nil
|
||||
return 0
|
||||
}
|
||||
|
||||
// WriteStateDiffFor mock method
|
||||
@ -436,3 +436,14 @@ func (sds *MockStateDiffService) WatchAddress(operation sdtypes.OperationType, a
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SubscribeWriteStatus is used by the API to subscribe to the job status updates
|
||||
func (sds *MockStateDiffService) SubscribeWriteStatus(id rpc.ID, sub chan<- statediff.JobStatus, quitChan chan<- bool) {
|
||||
// TODO when WriteStateDiff methods are implemented
|
||||
}
|
||||
|
||||
// UnsubscribeWriteStatus is used to unsubscribe from job status updates
|
||||
func (sds *MockStateDiffService) UnsubscribeWriteStatus(id rpc.ID) error {
|
||||
// TODO when WriteStateDiff methods are implemented
|
||||
return nil
|
||||
}
|
||||
|
57
trie/sync.go
57
trie/sync.go
@ -19,6 +19,7 @@ package trie
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/common/prque"
|
||||
@ -381,11 +382,11 @@ func (s *Sync) scheduleCodeRequest(req *codeRequest) {
|
||||
// retrieval scheduling.
|
||||
func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
|
||||
// Gather all the children of the node, irrelevant whether known or not
|
||||
type child struct {
|
||||
type childNode struct {
|
||||
path []byte
|
||||
node node
|
||||
}
|
||||
var children []child
|
||||
var children []childNode
|
||||
|
||||
switch node := (object).(type) {
|
||||
case *shortNode:
|
||||
@ -393,14 +394,14 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
|
||||
if hasTerm(key) {
|
||||
key = key[:len(key)-1]
|
||||
}
|
||||
children = []child{{
|
||||
children = []childNode{{
|
||||
node: node.Val,
|
||||
path: append(append([]byte(nil), req.path...), key...),
|
||||
}}
|
||||
case *fullNode:
|
||||
for i := 0; i < 17; i++ {
|
||||
if node.Children[i] != nil {
|
||||
children = append(children, child{
|
||||
children = append(children, childNode{
|
||||
node: node.Children[i],
|
||||
path: append(append([]byte(nil), req.path...), byte(i)),
|
||||
})
|
||||
@ -410,7 +411,10 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
|
||||
panic(fmt.Sprintf("unknown node: %+v", node))
|
||||
}
|
||||
// Iterate over the children, and request all unknown ones
|
||||
requests := make([]*nodeRequest, 0, len(children))
|
||||
var (
|
||||
missing = make(chan *nodeRequest, len(children))
|
||||
pending sync.WaitGroup
|
||||
)
|
||||
for _, child := range children {
|
||||
// Notify any external watcher of a new key/value node
|
||||
if req.callback != nil {
|
||||
@ -433,19 +437,36 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
|
||||
if s.membatch.hasNode(child.path) {
|
||||
continue
|
||||
}
|
||||
// If database says duplicate, then at least the trie node is present
|
||||
// and we hold the assumption that it's NOT legacy contract code.
|
||||
chash := common.BytesToHash(node)
|
||||
if rawdb.HasTrieNode(s.database, chash) {
|
||||
continue
|
||||
}
|
||||
// Locally unknown node, schedule for retrieval
|
||||
requests = append(requests, &nodeRequest{
|
||||
path: child.path,
|
||||
hash: chash,
|
||||
parent: req,
|
||||
callback: req.callback,
|
||||
})
|
||||
// Check the presence of children concurrently
|
||||
pending.Add(1)
|
||||
go func(child childNode) {
|
||||
defer pending.Done()
|
||||
|
||||
// If database says duplicate, then at least the trie node is present
|
||||
// and we hold the assumption that it's NOT legacy contract code.
|
||||
chash := common.BytesToHash(node)
|
||||
if rawdb.HasTrieNode(s.database, chash) {
|
||||
return
|
||||
}
|
||||
// Locally unknown node, schedule for retrieval
|
||||
missing <- &nodeRequest{
|
||||
path: child.path,
|
||||
hash: chash,
|
||||
parent: req,
|
||||
callback: req.callback,
|
||||
}
|
||||
}(child)
|
||||
}
|
||||
}
|
||||
pending.Wait()
|
||||
|
||||
requests := make([]*nodeRequest, 0, len(children))
|
||||
for done := false; !done; {
|
||||
select {
|
||||
case miss := <-missing:
|
||||
requests = append(requests, miss)
|
||||
default:
|
||||
done = true
|
||||
}
|
||||
}
|
||||
return requests, nil
|
||||
|
Loading…
Reference in New Issue
Block a user