diff --git a/.github/workflows/manual_binary_publish.yaml b/.github/workflows/manual_binary_publish.yaml new file mode 100644 index 000000000..843aa3644 --- /dev/null +++ b/.github/workflows/manual_binary_publish.yaml @@ -0,0 +1,21 @@ +name: MANUAL Override Publish geth binary to release +on: + workflow_dispatch: + inputs: + giteaPublishTag: + description: 'Package to publish TO on gitea; e.g. v1.10.25-statediff-4.2.1-alpha' + required: true + cercContainerTag: + description: 'Tagged Container to extract geth binary FROM' + required: true +jobs: + build: + name: Manual override publish of geth binary FROM tagged release TO TAGGED package on git.vdb.to + runs-on: ubuntu-latest + steps: + - name: Copy ethereum binary file + run: docker run --rm --entrypoint cat git.vdb.to/cerc-io/go-ethereum/go-ethereum:${{ github.event.inputs.cercContainerTag }} /usr/local/bin/geth > geth-linux-amd64 + - name: curl + uses: enflo/curl-action@master + with: + curl: --user cerccicd:${{ secrets.GITEA_TOKEN }} --upload-file geth-linux-amd64 https://git.vdb.to/api/packages/cerc-io/generic/go-ethereum/${{ github.event.inputs.giteaPublishTag }}/geth-linux-amd64 diff --git a/.github/workflows/manual_publish.yaml b/.github/workflows/manual_publish.yaml new file mode 100644 index 000000000..e8990be1e --- /dev/null +++ b/.github/workflows/manual_publish.yaml @@ -0,0 +1,21 @@ +name: MANUAL Override Publish geth binary to release +on: + workflow_dispatch: + inputs: + giteaPublishTag: + description: 'Package to publish TO on gitea; e.g. v1.10.25-statediff-4.2.1-alpha' + required: true + cercContainerTag: + description: 'Tagged Container to extract geth binary FROM' + required: true +jobs: + build: + name: Manual override publish of geth binary FROM tagged release TO TAGGED package on git.vdb.to + runs-on: ubuntu-latest + steps: + - name: Copy ethereum binary file + run: docker run --rm --entrypoint cat git.vdb.to/cerc-io/go-ethereum/go-ethereum:${{ github.event.inputs.cercContainerTag }} /usr/local/bin/geth > geth-linux-amd64 + - name: curl + uses: enflo/curl-action@master + with: + curl: --user circcicd:${{ secrets.GITEA_TOKEN }} --upload-file geth-linux-amd64 https://git.vdb.to/api/packages/cerc-io/generic/go-ethereum/${{ github.event.inputs.giteaPublishTag }}/geth-linux-amd64 diff --git a/.github/workflows/publish.yaml b/.github/workflows/publish.yaml index 610f8afbf..4feb43069 100644 --- a/.github/workflows/publish.yaml +++ b/.github/workflows/publish.yaml @@ -12,50 +12,25 @@ jobs: steps: - uses: actions/checkout@v2 - name: Run docker build - run: docker build -t vulcanize/go-ethereum -f Dockerfile . - - name: Get the version - id: vars - run: echo ::set-output name=sha::$(echo ${GITHUB_SHA:0:7}) - - name: Tag docker image - run: docker tag vulcanize/go-ethereum docker.pkg.github.com/vulcanize/go-ethereum/go-ethereum:${{steps.vars.outputs.sha}} - - name: Docker Login - run: echo ${{ secrets.GITHUB_TOKEN }} | docker login https://docker.pkg.github.com -u vulcanize --password-stdin - - name: Docker Push - run: docker push docker.pkg.github.com/vulcanize/go-ethereum/go-ethereum:${{steps.vars.outputs.sha}} - push_to_registries: - name: Publish assets to Release - runs-on: ubuntu-latest - needs: build - steps: + run: docker build -t cerc-io/go-ethereum -f Dockerfile . - name: Get the version id: vars run: | echo ::set-output name=sha::$(echo ${GITHUB_SHA:0:7}) echo ::set-output name=tag::$(echo ${GITHUB_REF#refs/tags/}) - - name: Docker Login to Github Registry - run: echo ${{ secrets.GITHUB_TOKEN }} | docker login https://docker.pkg.github.com -u vulcanize --password-stdin - - name: Docker Pull - run: docker pull docker.pkg.github.com/vulcanize/go-ethereum/go-ethereum:${{steps.vars.outputs.sha}} - - name: Copy ethereum binary file - run: docker run --rm --entrypoint cat docker.pkg.github.com/vulcanize/go-ethereum/go-ethereum:${{steps.vars.outputs.sha}} /usr/local/bin/geth > geth-linux-amd64 - - name: Docker Login to Docker Registry - run: echo ${{ secrets.VULCANIZEJENKINS_PAT }} | docker login -u vulcanizejenkins --password-stdin - name: Tag docker image - run: docker tag docker.pkg.github.com/vulcanize/go-ethereum/go-ethereum:${{steps.vars.outputs.sha}} vulcanize/vdb-geth:${{steps.vars.outputs.tag}} - - name: Docker Push to Docker Hub - run: docker push vulcanize/vdb-geth:${{steps.vars.outputs.tag}} - - name: Get release - id: get_release - uses: bruceadams/get-release@v1.2.0 - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} - - name: Upload Release Asset - id: upload-release-asset - uses: actions/upload-release-asset@v1 - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: docker tag cerc-io/go-ethereum git.vdb.to/cerc-io/go-ethereum/go-ethereum:${{steps.vars.outputs.sha}} + - name: Tag docker image + run: docker tag git.vdb.to/cerc-io/go-ethereum/go-ethereum:${{steps.vars.outputs.sha}} git.vdb.to/cerc-io/go-ethereum/go-ethereum:${{steps.vars.outputs.tag}} + - name: Docker Login + run: echo ${{ secrets.GITEA_TOKEN }} | docker login https://git.vdb.to -u cerccicd --password-stdin + - name: Docker Push + run: docker push git.vdb.to/cerc-io/go-ethereum/go-ethereum:${{steps.vars.outputs.sha}} + - name: Docker Push TAGGED + run: docker push git.vdb.to/cerc-io/go-ethereum/go-ethereum:${{steps.vars.outputs.tag}} + - name: Copy ethereum binary file + run: docker run --rm --entrypoint cat git.vdb.to/cerc-io/go-ethereum/go-ethereum:${{steps.vars.outputs.sha}} /usr/local/bin/geth > geth-linux-amd64 + - name: curl + uses: enflo/curl-action@master with: - upload_url: ${{ steps.get_release.outputs.upload_url }} - asset_path: geth-linux-amd64 - asset_name: geth-linux-amd64 - asset_content_type: application/octet-stream + curl: --user cerccicd:${{ secrets.GITEA_TOKEN }} --upload-file geth-linux-amd64 https://git.vdb.to/api/packages/cerc-io/generic/go-ethereum/${{steps.vars.outputs.tag}}/geth-linux-amd64 diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index c2de4fc02..94a9b02b8 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -15,7 +15,7 @@ jobs: steps: - uses: actions/checkout@v2 - name: Run docker build - run: docker build -t vulcanize/go-ethereum . + run: docker build -t cerc-io/go-ethereum . geth-unit-test: name: Run geth unit test @@ -90,7 +90,7 @@ jobs: - uses: actions/checkout@v3 with: ref: ${{ env.ipld-eth-db-ref }} - repository: vulcanize/ipld-eth-db + repository: cerc-io/ipld-eth-db path: "./ipld-eth-db/" fetch-depth: 0 diff --git a/Jenkinsfile b/Jenkinsfile new file mode 100644 index 000000000..98b02490a --- /dev/null +++ b/Jenkinsfile @@ -0,0 +1,50 @@ +pipeline { + agent any + + stages { + stage('Build') { + steps { + script{ + docker.withRegistry('https://git.vdb.to'){ + echo 'Building geth image...' + //def geth_image = docker.build("cerc-io/go-ethereum:jenkinscicd") + echo 'built geth image' + } + } + } + } + stage('Test') { + agent { + docker { + image 'cerc-io/foundation:jenkinscicd' + //image 'cerc-io/foundation_alpine:jenkinscicd' + } + } + + environment { + GO111MODULE = "on" + CGO_ENABLED = 1 + //GOPATH = "${JENKINS_HOME}/jobs/${JOB_NAME}/builds/${BUILD_ID}" + //GOPATH = "/go" + GOPATH = "/tmp/go" + //GOMODCACHE = "/go/pkg/mod" + GOCACHE = "${WORKSPACE}/.cache/go-build" + GOENV = "${WORKSPACE}/.config/go/env" + GOMODCACHE = "/tmp/go/pkg/mod" + GOWORK="" + //GOFLAGS="" + + } + steps { + echo 'Testing ...' + //sh '/usr/local/go/bin/go test -p 1 -v ./...' + sh 'make test' + } + } + stage('Packaging') { + steps { + echo 'Packaging ...' + } + } + } +} \ No newline at end of file diff --git a/Makefile b/Makefile index 706477c42..c06fa26b2 100644 --- a/Makefile +++ b/Makefile @@ -50,7 +50,7 @@ ios: @echo "Import \"$(GOBIN)/Geth.framework\" to use the library." test: all - $(GORUN) build/ci.go test + $(GORUN) build/ci.go test -v lint: ## Run linters. $(GORUN) build/ci.go lint diff --git a/cmd/geth/config.go b/cmd/geth/config.go index 7c1bb2dc5..ae5332e30 100644 --- a/cmd/geth/config.go +++ b/cmd/geth/config.go @@ -244,6 +244,9 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { ClientName: clientName, Driver: driverType, } + if ctx.IsSet(utils.StateDiffUpsert.Name) { + pgConfig.Upsert = ctx.Bool(utils.StateDiffUpsert.Name) + } if ctx.IsSet(utils.StateDiffDBMinConns.Name) { pgConfig.MinConns = ctx.Int(utils.StateDiffDBMinConns.Name) } @@ -262,6 +265,9 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { if ctx.IsSet(utils.StateDiffDBConnTimeout.Name) { pgConfig.ConnTimeout = time.Duration(ctx.Duration(utils.StateDiffDBConnTimeout.Name).Seconds()) } + if ctx.IsSet(utils.StateDiffLogStatements.Name) { + pgConfig.LogStatements = ctx.Bool(utils.StateDiffLogStatements.Name) + } indexerConfig = pgConfig case shared.DUMP: dumpTypeStr := ctx.String(utils.StateDiffDBDumpDst.Name) @@ -284,14 +290,13 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) { } } p := statediff.Config{ - IndexerConfig: indexerConfig, - KnownGapsFilePath: ctx.String(utils.StateDiffKnownGapsFilePath.Name), - ID: nodeID, - ClientName: clientName, - Context: context.Background(), - EnableWriteLoop: ctx.Bool(utils.StateDiffWritingFlag.Name), - NumWorkers: ctx.Uint(utils.StateDiffWorkersFlag.Name), - WaitForSync: ctx.Bool(utils.StateDiffWaitForSync.Name), + IndexerConfig: indexerConfig, + ID: nodeID, + ClientName: clientName, + Context: context.Background(), + EnableWriteLoop: ctx.Bool(utils.StateDiffWritingFlag.Name), + NumWorkers: ctx.Uint(utils.StateDiffWorkersFlag.Name), + WaitForSync: ctx.Bool(utils.StateDiffWaitForSync.Name), } utils.RegisterStateDiffService(stack, eth, &cfg.Eth, p, backend) } diff --git a/cmd/geth/main.go b/cmd/geth/main.go index c267d122c..16c549fed 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -176,9 +176,10 @@ var ( utils.StateDiffFileMode, utils.StateDiffFileCsvDir, utils.StateDiffFilePath, - utils.StateDiffKnownGapsFilePath, utils.StateDiffWaitForSync, utils.StateDiffWatchedAddressesFilePath, + utils.StateDiffUpsert, + utils.StateDiffLogStatements, configFileFlag, }, utils.NetworkFlags, utils.DatabasePathFlags) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index f7477c710..9f2624d3c 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1064,11 +1064,6 @@ var ( Name: "statediff.file.path", Usage: "Full path (including filename) to write statediff data out to when operating in sql file mode", } - StateDiffKnownGapsFilePath = &cli.StringFlag{ - Name: "statediff.knowngapsfile.path", - Usage: "Full path (including filename) to write knownGaps statements when the DB is unavailable.", - Value: "./known_gaps.sql", - } StateDiffWatchedAddressesFilePath = &cli.StringFlag{ Name: "statediff.file.wapath", Usage: "Full path (including filename) to write statediff watched addresses out to when operating in file mode", @@ -1078,6 +1073,16 @@ var ( Usage: "Client name to use when writing state diffs to database", Value: "go-ethereum", } + StateDiffUpsert = &cli.BoolFlag{ + Name: "statediff.db.upsert", + Usage: "Should the statediff service overwrite data existing in the database?", + Value: false, + } + StateDiffLogStatements = &cli.BoolFlag{ + Name: "statediff.db.logstatements", + Usage: "Should the statediff service log all database statements? (Note: pgx only)", + Value: false, + } StateDiffWritingFlag = &cli.BoolFlag{ Name: "statediff.writing", Usage: "Activates progressive writing of state diffs to database as new block are synced", diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index aeba3690d..deb76a257 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -644,6 +644,23 @@ func ReadReceipts(db ethdb.Reader, hash common.Hash, number uint64, config *para return receipts } +// ReadStorageReceipts is a modification of ReadRawReceipts that skips a conversion to types.Receipt +// and returns the ReceipstForStorage instead +func ReadStorageReceipts(db ethdb.Reader, hash common.Hash, number uint64) []*types.ReceiptForStorage { + // Retrieve the flattened receipt slice + data := ReadReceiptsRLP(db, hash, number) + if len(data) == 0 { + return nil + } + // Convert the receipts from their storage form to their internal representation + var storageReceipts []*types.ReceiptForStorage + if err := rlp.DecodeBytes(data, &storageReceipts); err != nil { + log.Error("Invalid receipt array RLP", "hash", hash, "err", err) + return nil + } + return storageReceipts +} + // WriteReceipts stores all the transaction receipts belonging to a block. func WriteReceipts(db ethdb.KeyValueWriter, hash common.Hash, number uint64, receipts types.Receipts) { // Convert the receipts into their storage form and serialize them @@ -817,6 +834,17 @@ func WriteAncientBlocks(db ethdb.AncientWriter, blocks []*types.Block, receipts }) } +// WriteAncientBlock is a public wrapper of writeAncientBlock +func WriteAncientBlock(op ethdb.AncientWriteOp, block *types.Block, receipts types.Receipts, td *big.Int) error { + stReceipts := make([]*types.ReceiptForStorage, len(receipts)) + // Convert receipts to storage format and sum up total difficulty. + stReceipts = stReceipts[:0] + for i, receipt := range receipts { + stReceipts[i] = (*types.ReceiptForStorage)(receipt) + } + return writeAncientBlock(op, block, block.Header(), stReceipts, td) +} + func writeAncientBlock(op ethdb.AncientWriteOp, block *types.Block, header *types.Header, receipts []*types.ReceiptForStorage, td *big.Int) error { num := block.NumberU64() if err := op.AppendRaw(chainFreezerHashTable, num, block.Hash().Bytes()); err != nil { diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 0a70c9ece..8d80647fc 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -34,8 +34,8 @@ type Filter struct { addresses []common.Address topics [][]common.Hash - block common.Hash // Block hash if filtering a single block - begin, end int64 // Range interval if filtering multiple blocks + block *common.Hash // Block hash if filtering a single block + begin, end int64 // Range interval if filtering multiple blocks matcher *bloombits.Matcher } @@ -78,7 +78,7 @@ func (sys *FilterSystem) NewRangeFilter(begin, end int64, addresses []common.Add func (sys *FilterSystem) NewBlockFilter(block common.Hash, addresses []common.Address, topics [][]common.Hash) *Filter { // Create a generic filter and convert it into a block filter filter := newFilter(sys, addresses, topics) - filter.block = block + filter.block = &block return filter } @@ -96,8 +96,8 @@ func newFilter(sys *FilterSystem, addresses []common.Address, topics [][]common. // first block that contains matches, updating the start of the filter accordingly. func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { // If we're doing singleton block filtering, execute and return - if f.block != (common.Hash{}) { - header, err := f.sys.backend.HeaderByHash(ctx, f.block) + if f.block != nil { + header, err := f.sys.backend.HeaderByHash(ctx, *f.block) if err != nil { return nil, err } diff --git a/eth/handler.go b/eth/handler.go index 4224a9f33..143147b0c 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -391,11 +391,16 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { if h.checkpointHash != (common.Hash{}) { // Request the peer's checkpoint header for chain height/weight validation resCh := make(chan *eth.Response) - if _, err := peer.RequestHeadersByNumber(h.checkpointNumber, 1, 0, false, resCh); err != nil { + + req, err := peer.RequestHeadersByNumber(h.checkpointNumber, 1, 0, false, resCh) + if err != nil { return err } // Start a timer to disconnect if the peer doesn't reply in time go func() { + // Ensure the request gets cancelled in case of error/drop + defer req.Close() + timeout := time.NewTimer(syncChallengeTimeout) defer timeout.Stop() @@ -437,10 +442,15 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { // If we have any explicit peer required block hashes, request them for number, hash := range h.requiredBlocks { resCh := make(chan *eth.Response) - if _, err := peer.RequestHeadersByNumber(number, 1, 0, false, resCh); err != nil { + + req, err := peer.RequestHeadersByNumber(number, 1, 0, false, resCh) + if err != nil { return err } - go func(number uint64, hash common.Hash) { + go func(number uint64, hash common.Hash, req *eth.Request) { + // Ensure the request gets cancelled in case of error/drop + defer req.Close() + timeout := time.NewTimer(syncChallengeTimeout) defer timeout.Stop() @@ -469,7 +479,7 @@ func (h *handler) runEthPeer(peer *eth.Peer, handler eth.Handler) error { peer.Log().Warn("Required block challenge timed out, dropping", "addr", peer.RemoteAddr(), "type", peer.Name()) h.removePeer(peer.ID()) } - }(number, hash) + }(number, hash, req) } // Handle incoming messages until the connection is torn down return handler(peer) diff --git a/eth/protocols/snap/sync.go b/eth/protocols/snap/sync.go index deaa4456e..eb8260bf7 100644 --- a/eth/protocols/snap/sync.go +++ b/eth/protocols/snap/sync.go @@ -21,10 +21,12 @@ import ( "encoding/json" "errors" "fmt" + gomath "math" "math/big" "math/rand" "sort" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -78,6 +80,29 @@ const ( // and waste round trip times. If it's too high, we're capping responses and // waste bandwidth. maxTrieRequestCount = maxRequestSize / 512 + + // trienodeHealRateMeasurementImpact is the impact a single measurement has on + // the local node's trienode processing capacity. A value closer to 0 reacts + // slower to sudden changes, but it is also more stable against temporary hiccups. + trienodeHealRateMeasurementImpact = 0.005 + + // minTrienodeHealThrottle is the minimum divisor for throttling trie node + // heal requests to avoid overloading the local node and exessively expanding + // the state trie bedth wise. + minTrienodeHealThrottle = 1 + + // maxTrienodeHealThrottle is the maximum divisor for throttling trie node + // heal requests to avoid overloading the local node and exessively expanding + // the state trie bedth wise. + maxTrienodeHealThrottle = maxTrieRequestCount + + // trienodeHealThrottleIncrease is the multiplier for the throttle when the + // rate of arriving data is higher than the rate of processing it. + trienodeHealThrottleIncrease = 1.33 + + // trienodeHealThrottleDecrease is the divisor for the throttle when the + // rate of arriving data is lower than the rate of processing it. + trienodeHealThrottleDecrease = 1.25 ) var ( @@ -431,6 +456,11 @@ type Syncer struct { trienodeHealReqs map[uint64]*trienodeHealRequest // Trie node requests currently running bytecodeHealReqs map[uint64]*bytecodeHealRequest // Bytecode requests currently running + trienodeHealRate float64 // Average heal rate for processing trie node data + trienodeHealPend uint64 // Number of trie nodes currently pending for processing + trienodeHealThrottle float64 // Divisor for throttling the amount of trienode heal data requested + trienodeHealThrottled time.Time // Timestamp the last time the throttle was updated + trienodeHealSynced uint64 // Number of state trie nodes downloaded trienodeHealBytes common.StorageSize // Number of state trie bytes persisted to disk trienodeHealDups uint64 // Number of state trie nodes already processed @@ -476,9 +506,10 @@ func NewSyncer(db ethdb.KeyValueStore) *Syncer { trienodeHealIdlers: make(map[string]struct{}), bytecodeHealIdlers: make(map[string]struct{}), - trienodeHealReqs: make(map[uint64]*trienodeHealRequest), - bytecodeHealReqs: make(map[uint64]*bytecodeHealRequest), - stateWriter: db.NewBatch(), + trienodeHealReqs: make(map[uint64]*trienodeHealRequest), + bytecodeHealReqs: make(map[uint64]*bytecodeHealRequest), + trienodeHealThrottle: maxTrienodeHealThrottle, // Tune downward instead of insta-filling with junk + stateWriter: db.NewBatch(), extProgress: new(SyncProgress), } @@ -1321,6 +1352,10 @@ func (s *Syncer) assignTrienodeHealTasks(success chan *trienodeHealResponse, fai if cap > maxTrieRequestCount { cap = maxTrieRequestCount } + cap = int(float64(cap) / s.trienodeHealThrottle) + if cap <= 0 { + cap = 1 + } var ( hashes = make([]common.Hash, 0, cap) paths = make([]string, 0, cap) @@ -2090,6 +2125,10 @@ func (s *Syncer) processStorageResponse(res *storageResponse) { // processTrienodeHealResponse integrates an already validated trienode response // into the healer tasks. func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) { + var ( + start = time.Now() + fills int + ) for i, hash := range res.hashes { node := res.nodes[i] @@ -2098,6 +2137,8 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) { res.task.trieTasks[res.paths[i]] = res.hashes[i] continue } + fills++ + // Push the trie node into the state syncer s.trienodeHealSynced++ s.trienodeHealBytes += common.StorageSize(len(node)) @@ -2121,6 +2162,50 @@ func (s *Syncer) processTrienodeHealResponse(res *trienodeHealResponse) { log.Crit("Failed to persist healing data", "err", err) } log.Debug("Persisted set of healing data", "type", "trienodes", "bytes", common.StorageSize(batch.ValueSize())) + + // Calculate the processing rate of one filled trie node + rate := float64(fills) / (float64(time.Since(start)) / float64(time.Second)) + + // Update the currently measured trienode queueing and processing throughput. + // + // The processing rate needs to be updated uniformly independent if we've + // processed 1x100 trie nodes or 100x1 to keep the rate consistent even in + // the face of varying network packets. As such, we cannot just measure the + // time it took to process N trie nodes and update once, we need one update + // per trie node. + // + // Naively, that would be: + // + // for i:=0; i time.Second { + // Periodically adjust the trie node throttler + if float64(pending) > 2*s.trienodeHealRate { + s.trienodeHealThrottle *= trienodeHealThrottleIncrease + } else { + s.trienodeHealThrottle /= trienodeHealThrottleDecrease + } + if s.trienodeHealThrottle > maxTrienodeHealThrottle { + s.trienodeHealThrottle = maxTrienodeHealThrottle + } else if s.trienodeHealThrottle < minTrienodeHealThrottle { + s.trienodeHealThrottle = minTrienodeHealThrottle + } + s.trienodeHealThrottled = time.Now() + + log.Debug("Updated trie node heal throttler", "rate", s.trienodeHealRate, "pending", pending, "throttle", s.trienodeHealThrottle) + } } // processBytecodeHealResponse integrates an already validated bytecode response @@ -2248,14 +2333,18 @@ func (s *Syncer) OnAccounts(peer SyncPeer, id uint64, hashes []common.Hash, acco // Whether or not the response is valid, we can mark the peer as idle and // notify the scheduler to assign a new task. If the response is invalid, // we'll drop the peer in a bit. + defer func() { + s.lock.Lock() + defer s.lock.Unlock() + if _, ok := s.peers[peer.ID()]; ok { + s.accountIdlers[peer.ID()] = struct{}{} + } + select { + case s.update <- struct{}{}: + default: + } + }() s.lock.Lock() - if _, ok := s.peers[peer.ID()]; ok { - s.accountIdlers[peer.ID()] = struct{}{} - } - select { - case s.update <- struct{}{}: - default: - } // Ensure the response is for a valid request req, ok := s.accountReqs[id] if !ok { @@ -2360,14 +2449,18 @@ func (s *Syncer) onByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) error // Whether or not the response is valid, we can mark the peer as idle and // notify the scheduler to assign a new task. If the response is invalid, // we'll drop the peer in a bit. + defer func() { + s.lock.Lock() + defer s.lock.Unlock() + if _, ok := s.peers[peer.ID()]; ok { + s.bytecodeIdlers[peer.ID()] = struct{}{} + } + select { + case s.update <- struct{}{}: + default: + } + }() s.lock.Lock() - if _, ok := s.peers[peer.ID()]; ok { - s.bytecodeIdlers[peer.ID()] = struct{}{} - } - select { - case s.update <- struct{}{}: - default: - } // Ensure the response is for a valid request req, ok := s.bytecodeReqs[id] if !ok { @@ -2469,14 +2562,18 @@ func (s *Syncer) OnStorage(peer SyncPeer, id uint64, hashes [][]common.Hash, slo // Whether or not the response is valid, we can mark the peer as idle and // notify the scheduler to assign a new task. If the response is invalid, // we'll drop the peer in a bit. + defer func() { + s.lock.Lock() + defer s.lock.Unlock() + if _, ok := s.peers[peer.ID()]; ok { + s.storageIdlers[peer.ID()] = struct{}{} + } + select { + case s.update <- struct{}{}: + default: + } + }() s.lock.Lock() - if _, ok := s.peers[peer.ID()]; ok { - s.storageIdlers[peer.ID()] = struct{}{} - } - select { - case s.update <- struct{}{}: - default: - } // Ensure the response is for a valid request req, ok := s.storageReqs[id] if !ok { @@ -2596,14 +2693,18 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error // Whether or not the response is valid, we can mark the peer as idle and // notify the scheduler to assign a new task. If the response is invalid, // we'll drop the peer in a bit. + defer func() { + s.lock.Lock() + defer s.lock.Unlock() + if _, ok := s.peers[peer.ID()]; ok { + s.trienodeHealIdlers[peer.ID()] = struct{}{} + } + select { + case s.update <- struct{}{}: + default: + } + }() s.lock.Lock() - if _, ok := s.peers[peer.ID()]; ok { - s.trienodeHealIdlers[peer.ID()] = struct{}{} - } - select { - case s.update <- struct{}{}: - default: - } // Ensure the response is for a valid request req, ok := s.trienodeHealReqs[id] if !ok { @@ -2639,10 +2740,12 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error // Cross reference the requested trienodes with the response to find gaps // that the serving node is missing - hasher := sha3.NewLegacyKeccak256().(crypto.KeccakState) - hash := make([]byte, 32) - - nodes := make([][]byte, len(req.hashes)) + var ( + hasher = sha3.NewLegacyKeccak256().(crypto.KeccakState) + hash = make([]byte, 32) + nodes = make([][]byte, len(req.hashes)) + fills uint64 + ) for i, j := 0, 0; i < len(trienodes); i++ { // Find the next hash that we've been served, leaving misses with nils hasher.Reset() @@ -2654,16 +2757,22 @@ func (s *Syncer) OnTrieNodes(peer SyncPeer, id uint64, trienodes [][]byte) error } if j < len(req.hashes) { nodes[j] = trienodes[i] + fills++ j++ continue } // We've either ran out of hashes, or got unrequested data logger.Warn("Unexpected healing trienodes", "count", len(trienodes)-i) + // Signal this request as failed, and ready for rescheduling s.scheduleRevertTrienodeHealRequest(req) return errors.New("unexpected healing trienode") } // Response validated, send it to the scheduler for filling + atomic.AddUint64(&s.trienodeHealPend, fills) + defer func() { + atomic.AddUint64(&s.trienodeHealPend, ^(fills - 1)) + }() response := &trienodeHealResponse{ paths: req.paths, task: req.task, @@ -2691,14 +2800,18 @@ func (s *Syncer) onHealByteCodes(peer SyncPeer, id uint64, bytecodes [][]byte) e // Whether or not the response is valid, we can mark the peer as idle and // notify the scheduler to assign a new task. If the response is invalid, // we'll drop the peer in a bit. + defer func() { + s.lock.Lock() + defer s.lock.Unlock() + if _, ok := s.peers[peer.ID()]; ok { + s.bytecodeHealIdlers[peer.ID()] = struct{}{} + } + select { + case s.update <- struct{}{}: + default: + } + }() s.lock.Lock() - if _, ok := s.peers[peer.ID()]; ok { - s.bytecodeHealIdlers[peer.ID()] = struct{}{} - } - select { - case s.update <- struct{}{}: - default: - } // Ensure the response is for a valid request req, ok := s.bytecodeHealReqs[id] if !ok { diff --git a/go.mod b/go.mod index 39f3d30de..1ce472bf6 100644 --- a/go.mod +++ b/go.mod @@ -146,4 +146,5 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/blake3 v1.1.6 // indirect + ) diff --git a/go.sum b/go.sum index ed3d81799..342a8d8c5 100644 --- a/go.sum +++ b/go.sum @@ -135,6 +135,7 @@ github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5 h1:FtmdgXiUlNeRsoNMFlK github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5/go.mod h1:VvhXpOYNQvB+uIk2RvXzuaQtkQJzzIx6lSBe1xv7hi0= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/garslo/gogen v0.0.0-20170306192744-1d203ffc1f61 h1:IZqZOB2fydHte3kUgxrzK5E1fW7RQGeDwE8F/ZZnUYc= github.com/garslo/gogen v0.0.0-20170306192744-1d203ffc1f61/go.mod h1:Q0X6pkwTILDlzrGEckF6HKjXe48EgsY/l7K7vhY4MW8= @@ -501,19 +502,18 @@ github.com/naoina/go-stringutil v0.1.0 h1:rCUeRUHjBjGTSHl0VC00jUPLz8/F9dDzYI70Hz github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h1USek5+NqSA0= github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416 h1:shk/vn9oCoOTmwcouEdwIeOtOGA/ELRUw/GwvxwfT+0= github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E= +github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.10.3 h1:OoxbjfXVZyod1fmWYhI7SEyaD8B00ynP3T+D5GiyHOY= github.com/onsi/ginkgo v1.10.3/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= +github.com/onsi/ginkgo v1.14.0 h1:2mOpI4JVVPBN+WQRa0WKH2eXR+Ey+uK4n7Zj0aYpIQA= github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= -github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= -github.com/onsi/gomega v1.7.1 h1:K0jcRCwNQM3vFGh1ppMtDh/+7ApJrjldlX8fA0jDTLQ= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= +github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.0.3-0.20180606204148-bd9c31933947/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= @@ -572,8 +572,6 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeV github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= -github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= -github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= @@ -597,8 +595,6 @@ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5Cc github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/supranational/blst v0.3.8 h1:glwLF4oBRSJOTr05lRBgNwGQST0ndP2wg29fSeTRKCY= github.com/supranational/blst v0.3.8/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw= -github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE= -github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= github.com/thoas/go-funk v0.9.2 h1:oKlNYv0AY5nyf9g+/GhMgS/UO2ces0QRdPKwkhY3VCk= @@ -789,6 +785,7 @@ golang.org/x/sys v0.0.0-20210420205809-ac73e9fd8988/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/graphql/graphql.go b/graphql/graphql.go index 97b460c20..356ff669f 100644 --- a/graphql/graphql.go +++ b/graphql/graphql.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "math/big" + "sort" "strconv" "github.com/ethereum/go-ethereum" @@ -478,13 +479,16 @@ func (t *Transaction) getLogs(ctx context.Context) (*[]*Log, error) { if err != nil { return nil, err } - ret := make([]*Log, 0, len(logs)) - for _, log := range logs { + var ret []*Log + // Select tx logs from all block logs + ix := sort.Search(len(logs), func(i int) bool { return uint64(logs[i].TxIndex) >= t.index }) + for ix < len(logs) && uint64(logs[ix].TxIndex) == t.index { ret = append(ret, &Log{ r: t.r, transaction: t, - log: log, + log: logs[ix], }) + ix++ } return &ret, nil } diff --git a/graphql/graphql_test.go b/graphql/graphql_test.go index d55f4e063..491c73152 100644 --- a/graphql/graphql_test.go +++ b/graphql/graphql_test.go @@ -17,6 +17,8 @@ package graphql import ( + "context" + "encoding/json" "fmt" "io" "math/big" @@ -51,15 +53,21 @@ func TestBuildSchema(t *testing.T) { } defer stack.Close() // Make sure the schema can be parsed and matched up to the object model. - if err := newHandler(stack, nil, nil, []string{}, []string{}); err != nil { + if _, err := newHandler(stack, nil, nil, []string{}, []string{}); err != nil { t.Errorf("Could not construct GraphQL handler: %v", err) } } // Tests that a graphQL request is successfully handled when graphql is enabled on the specified endpoint func TestGraphQLBlockSerialization(t *testing.T) { - stack := createNode(t, true, false) + stack := createNode(t) defer stack.Close() + genesis := &core.Genesis{ + Config: params.AllEthashProtocolChanges, + GasLimit: 11500000, + Difficulty: big.NewInt(1048576), + } + newGQLService(t, stack, genesis, 10, func(i int, gen *core.BlockGen) {}) // start node if err := stack.Start(); err != nil { t.Fatalf("could not start node: %v", err) @@ -161,8 +169,55 @@ func TestGraphQLBlockSerialization(t *testing.T) { } func TestGraphQLBlockSerializationEIP2718(t *testing.T) { - stack := createNode(t, true, true) + // Account for signing txes + var ( + key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + address = crypto.PubkeyToAddress(key.PublicKey) + funds = big.NewInt(1000000000000000) + dad = common.HexToAddress("0x0000000000000000000000000000000000000dad") + ) + stack := createNode(t) defer stack.Close() + genesis := &core.Genesis{ + Config: params.AllEthashProtocolChanges, + GasLimit: 11500000, + Difficulty: big.NewInt(1048576), + Alloc: core.GenesisAlloc{ + address: {Balance: funds}, + // The address 0xdad sloads 0x00 and 0x01 + dad: { + Code: []byte{byte(vm.PC), byte(vm.PC), byte(vm.SLOAD), byte(vm.SLOAD)}, + Nonce: 0, + Balance: big.NewInt(0), + }, + }, + BaseFee: big.NewInt(params.InitialBaseFee), + } + signer := types.LatestSigner(genesis.Config) + newGQLService(t, stack, genesis, 1, func(i int, gen *core.BlockGen) { + gen.SetCoinbase(common.Address{1}) + tx, _ := types.SignNewTx(key, signer, &types.LegacyTx{ + Nonce: uint64(0), + To: &dad, + Value: big.NewInt(100), + Gas: 50000, + GasPrice: big.NewInt(params.InitialBaseFee), + }) + gen.AddTx(tx) + tx, _ = types.SignNewTx(key, signer, &types.AccessListTx{ + ChainID: genesis.Config.ChainID, + Nonce: uint64(1), + To: &dad, + Gas: 30000, + GasPrice: big.NewInt(params.InitialBaseFee), + Value: big.NewInt(50), + AccessList: types.AccessList{{ + Address: dad, + StorageKeys: []common.Hash{{0}}, + }}, + }) + gen.AddTx(tx) + }) // start node if err := stack.Start(); err != nil { t.Fatalf("could not start node: %v", err) @@ -198,7 +253,7 @@ func TestGraphQLBlockSerializationEIP2718(t *testing.T) { // Tests that a graphQL request is not handled successfully when graphql is not enabled on the specified endpoint func TestGraphQLHTTPOnSamePort_GQLRequest_Unsuccessful(t *testing.T) { - stack := createNode(t, false, false) + stack := createNode(t) defer stack.Close() if err := stack.Start(); err != nil { t.Fatalf("could not start node: %v", err) @@ -212,7 +267,59 @@ func TestGraphQLHTTPOnSamePort_GQLRequest_Unsuccessful(t *testing.T) { assert.Equal(t, http.StatusNotFound, resp.StatusCode) } -func createNode(t *testing.T, gqlEnabled bool, txEnabled bool) *node.Node { +func TestGraphQLTransactionLogs(t *testing.T) { + var ( + key, _ = crypto.GenerateKey() + addr = crypto.PubkeyToAddress(key.PublicKey) + dadStr = "0x0000000000000000000000000000000000000dad" + dad = common.HexToAddress(dadStr) + genesis = &core.Genesis{ + Config: params.AllEthashProtocolChanges, + GasLimit: 11500000, + Difficulty: big.NewInt(1048576), + Alloc: core.GenesisAlloc{ + addr: {Balance: big.NewInt(params.Ether)}, + dad: { + // LOG0(0, 0), LOG0(0, 0), RETURN(0, 0) + Code: common.Hex2Bytes("60006000a060006000a060006000f3"), + Nonce: 0, + Balance: big.NewInt(0), + }, + }, + } + signer = types.LatestSigner(genesis.Config) + stack = createNode(t) + ) + defer stack.Close() + + handler := newGQLService(t, stack, genesis, 1, func(i int, gen *core.BlockGen) { + tx, _ := types.SignNewTx(key, signer, &types.LegacyTx{To: &dad, Gas: 100000, GasPrice: big.NewInt(params.InitialBaseFee)}) + gen.AddTx(tx) + tx, _ = types.SignNewTx(key, signer, &types.LegacyTx{To: &dad, Nonce: 1, Gas: 100000, GasPrice: big.NewInt(params.InitialBaseFee)}) + gen.AddTx(tx) + tx, _ = types.SignNewTx(key, signer, &types.LegacyTx{To: &dad, Nonce: 2, Gas: 100000, GasPrice: big.NewInt(params.InitialBaseFee)}) + gen.AddTx(tx) + }) + // start node + if err := stack.Start(); err != nil { + t.Fatalf("could not start node: %v", err) + } + query := `{block { transactions { logs { account { address } } } } }` + res := handler.Schema.Exec(context.Background(), query, "", map[string]interface{}{}) + if res.Errors != nil { + t.Fatalf("graphql query failed: %v", res.Errors) + } + have, err := json.Marshal(res.Data) + if err != nil { + t.Fatalf("failed to encode graphql response: %s", err) + } + want := fmt.Sprintf(`{"block":{"transactions":[{"logs":[{"account":{"address":"%s"}},{"account":{"address":"%s"}}]},{"logs":[{"account":{"address":"%s"}},{"account":{"address":"%s"}}]},{"logs":[{"account":{"address":"%s"}},{"account":{"address":"%s"}}]}]}}`, dadStr, dadStr, dadStr, dadStr, dadStr, dadStr) + if string(have) != want { + t.Errorf("response unmatch. expected %s, got %s", want, have) + } +} + +func createNode(t *testing.T) *node.Node { stack, err := node.New(&node.Config{ HTTPHost: "127.0.0.1", HTTPPort: 0, @@ -222,25 +329,12 @@ func createNode(t *testing.T, gqlEnabled bool, txEnabled bool) *node.Node { if err != nil { t.Fatalf("could not create node: %v", err) } - if !gqlEnabled { - return stack - } - if !txEnabled { - createGQLService(t, stack) - } else { - createGQLServiceWithTransactions(t, stack) - } return stack } -func createGQLService(t *testing.T, stack *node.Node) { - // create backend +func newGQLService(t *testing.T, stack *node.Node, gspec *core.Genesis, genBlocks int, genfunc func(i int, gen *core.BlockGen)) *handler { ethConf := ðconfig.Config{ - Genesis: &core.Genesis{ - Config: params.AllEthashProtocolChanges, - GasLimit: 11500000, - Difficulty: big.NewInt(1048576), - }, + Genesis: gspec, Ethash: ethash.Config{ PowMode: ethash.ModeFake, }, @@ -258,101 +352,16 @@ func createGQLService(t *testing.T, stack *node.Node) { } // Create some blocks and import them chain, _ := core.GenerateChain(params.AllEthashProtocolChanges, ethBackend.BlockChain().Genesis(), - ethash.NewFaker(), ethBackend.ChainDb(), 10, func(i int, gen *core.BlockGen) {}) + ethash.NewFaker(), ethBackend.ChainDb(), genBlocks, genfunc) _, err = ethBackend.BlockChain().InsertChain(chain) if err != nil { t.Fatalf("could not create import blocks: %v", err) } - // create gql service + // Set up handler filterSystem := filters.NewFilterSystem(ethBackend.APIBackend, filters.Config{}) - err = New(stack, ethBackend.APIBackend, filterSystem, []string{}, []string{}) - if err != nil { - t.Fatalf("could not create graphql service: %v", err) - } -} - -func createGQLServiceWithTransactions(t *testing.T, stack *node.Node) { - // create backend - key, _ := crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") - address := crypto.PubkeyToAddress(key.PublicKey) - funds := big.NewInt(1000000000000000) - dad := common.HexToAddress("0x0000000000000000000000000000000000000dad") - - ethConf := ðconfig.Config{ - Genesis: &core.Genesis{ - Config: params.AllEthashProtocolChanges, - GasLimit: 11500000, - Difficulty: big.NewInt(1048576), - Alloc: core.GenesisAlloc{ - address: {Balance: funds}, - // The address 0xdad sloads 0x00 and 0x01 - dad: { - Code: []byte{ - byte(vm.PC), - byte(vm.PC), - byte(vm.SLOAD), - byte(vm.SLOAD), - }, - Nonce: 0, - Balance: big.NewInt(0), - }, - }, - BaseFee: big.NewInt(params.InitialBaseFee), - }, - Ethash: ethash.Config{ - PowMode: ethash.ModeFake, - }, - NetworkId: 1337, - TrieCleanCache: 5, - TrieCleanCacheJournal: "triecache", - TrieCleanCacheRejournal: 60 * time.Minute, - TrieDirtyCache: 5, - TrieTimeout: 60 * time.Minute, - SnapshotCache: 5, - } - - ethBackend, err := eth.New(stack, ethConf) - if err != nil { - t.Fatalf("could not create eth backend: %v", err) - } - signer := types.LatestSigner(ethConf.Genesis.Config) - - legacyTx, _ := types.SignNewTx(key, signer, &types.LegacyTx{ - Nonce: uint64(0), - To: &dad, - Value: big.NewInt(100), - Gas: 50000, - GasPrice: big.NewInt(params.InitialBaseFee), - }) - envelopTx, _ := types.SignNewTx(key, signer, &types.AccessListTx{ - ChainID: ethConf.Genesis.Config.ChainID, - Nonce: uint64(1), - To: &dad, - Gas: 30000, - GasPrice: big.NewInt(params.InitialBaseFee), - Value: big.NewInt(50), - AccessList: types.AccessList{{ - Address: dad, - StorageKeys: []common.Hash{{0}}, - }}, - }) - - // Create some blocks and import them - chain, _ := core.GenerateChain(params.AllEthashProtocolChanges, ethBackend.BlockChain().Genesis(), - ethash.NewFaker(), ethBackend.ChainDb(), 1, func(i int, b *core.BlockGen) { - b.SetCoinbase(common.Address{1}) - b.AddTx(legacyTx) - b.AddTx(envelopTx) - }) - - _, err = ethBackend.BlockChain().InsertChain(chain) - if err != nil { - t.Fatalf("could not create import blocks: %v", err) - } - // create gql service - filterSystem := filters.NewFilterSystem(ethBackend.APIBackend, filters.Config{}) - err = New(stack, ethBackend.APIBackend, filterSystem, []string{}, []string{}) + handler, err := newHandler(stack, ethBackend.APIBackend, filterSystem, []string{}, []string{}) if err != nil { t.Fatalf("could not create graphql service: %v", err) } + return handler } diff --git a/graphql/service.go b/graphql/service.go index 019026bc7..6f6e58335 100644 --- a/graphql/service.go +++ b/graphql/service.go @@ -57,17 +57,18 @@ func (h handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // New constructs a new GraphQL service instance. func New(stack *node.Node, backend ethapi.Backend, filterSystem *filters.FilterSystem, cors, vhosts []string) error { - return newHandler(stack, backend, filterSystem, cors, vhosts) + _, err := newHandler(stack, backend, filterSystem, cors, vhosts) + return err } // newHandler returns a new `http.Handler` that will answer GraphQL queries. // It additionally exports an interactive query browser on the / endpoint. -func newHandler(stack *node.Node, backend ethapi.Backend, filterSystem *filters.FilterSystem, cors, vhosts []string) error { +func newHandler(stack *node.Node, backend ethapi.Backend, filterSystem *filters.FilterSystem, cors, vhosts []string) (*handler, error) { q := Resolver{backend, filterSystem} s, err := graphql.ParseSchema(schema, &q) if err != nil { - return err + return nil, err } h := handler{Schema: s} handler := node.NewHTTPHandlerStack(h, cors, vhosts, nil) @@ -76,5 +77,5 @@ func newHandler(stack *node.Node, backend ethapi.Backend, filterSystem *filters. stack.RegisterHandler("GraphQL", "/graphql", handler) stack.RegisterHandler("GraphQL", "/graphql/", handler) - return nil + return &h, nil } diff --git a/params/config.go b/params/config.go index 4d6eec893..80e671f9b 100644 --- a/params/config.go +++ b/params/config.go @@ -59,25 +59,26 @@ var ( // MainnetChainConfig is the chain parameters to run a node on the main network. MainnetChainConfig = &ChainConfig{ - ChainID: big.NewInt(1), - HomesteadBlock: big.NewInt(1_150_000), - DAOForkBlock: big.NewInt(1_920_000), - DAOForkSupport: true, - EIP150Block: big.NewInt(2_463_000), - EIP150Hash: common.HexToHash("0x2086799aeebeae135c246c65021c82b4e15a2c451340993aacfd2751886514f0"), - EIP155Block: big.NewInt(2_675_000), - EIP158Block: big.NewInt(2_675_000), - ByzantiumBlock: big.NewInt(4_370_000), - ConstantinopleBlock: big.NewInt(7_280_000), - PetersburgBlock: big.NewInt(7_280_000), - IstanbulBlock: big.NewInt(9_069_000), - MuirGlacierBlock: big.NewInt(9_200_000), - BerlinBlock: big.NewInt(12_244_000), - LondonBlock: big.NewInt(12_965_000), - ArrowGlacierBlock: big.NewInt(13_773_000), - GrayGlacierBlock: big.NewInt(15_050_000), - TerminalTotalDifficulty: MainnetTerminalTotalDifficulty, // 58_750_000_000_000_000_000_000 - Ethash: new(EthashConfig), + ChainID: big.NewInt(1), + HomesteadBlock: big.NewInt(1_150_000), + DAOForkBlock: big.NewInt(1_920_000), + DAOForkSupport: true, + EIP150Block: big.NewInt(2_463_000), + EIP150Hash: common.HexToHash("0x2086799aeebeae135c246c65021c82b4e15a2c451340993aacfd2751886514f0"), + EIP155Block: big.NewInt(2_675_000), + EIP158Block: big.NewInt(2_675_000), + ByzantiumBlock: big.NewInt(4_370_000), + ConstantinopleBlock: big.NewInt(7_280_000), + PetersburgBlock: big.NewInt(7_280_000), + IstanbulBlock: big.NewInt(9_069_000), + MuirGlacierBlock: big.NewInt(9_200_000), + BerlinBlock: big.NewInt(12_244_000), + LondonBlock: big.NewInt(12_965_000), + ArrowGlacierBlock: big.NewInt(13_773_000), + GrayGlacierBlock: big.NewInt(15_050_000), + TerminalTotalDifficulty: MainnetTerminalTotalDifficulty, // 58_750_000_000_000_000_000_000 + TerminalTotalDifficultyPassed: true, + Ethash: new(EthashConfig), } // MainnetTrustedCheckpoint contains the light client trusted checkpoint for the main network. diff --git a/params/version.go b/params/version.go index 234c4e339..26d5c2dcc 100644 --- a/params/version.go +++ b/params/version.go @@ -23,8 +23,8 @@ import ( const ( VersionMajor = 1 // Major version component of the current release VersionMinor = 10 // Minor version component of the current release - VersionPatch = 23 // Patch version component of the current release - VersionMeta = "statediff-4.2.0-alpha" // Version metadata to append to the version string + VersionPatch = 26 // Patch version component of the current release + VersionMeta = "statediff-4.3.3-alpha" // Version metadata to append to the version string ) // Version holds the textual version string. diff --git a/rpc/client.go b/rpc/client.go index d3ce02977..fcd31319a 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -31,6 +31,7 @@ import ( ) var ( + ErrBadResult = errors.New("bad result in JSON-RPC response") ErrClientQuit = errors.New("client is closed") ErrNoResult = errors.New("no result in JSON-RPC response") ErrSubscriptionQueueOverflow = errors.New("subscription queue overflow") diff --git a/rpc/client_test.go b/rpc/client_test.go index 04c847d0d..5ae549d86 100644 --- a/rpc/client_test.go +++ b/rpc/client_test.go @@ -19,6 +19,7 @@ package rpc import ( "context" "encoding/json" + "errors" "fmt" "math/rand" "net" @@ -144,6 +145,53 @@ func TestClientBatchRequest(t *testing.T) { } } +func TestClientBatchRequest_len(t *testing.T) { + b, err := json.Marshal([]jsonrpcMessage{ + {Version: "2.0", ID: json.RawMessage("1"), Method: "foo", Result: json.RawMessage(`"0x1"`)}, + {Version: "2.0", ID: json.RawMessage("2"), Method: "bar", Result: json.RawMessage(`"0x2"`)}, + }) + if err != nil { + t.Fatal("failed to encode jsonrpc message:", err) + } + s := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + _, err := rw.Write(b) + if err != nil { + t.Error("failed to write response:", err) + } + })) + t.Cleanup(s.Close) + + client, err := Dial(s.URL) + if err != nil { + t.Fatal("failed to dial test server:", err) + } + defer client.Close() + + t.Run("too-few", func(t *testing.T) { + batch := []BatchElem{ + {Method: "foo"}, + {Method: "bar"}, + {Method: "baz"}, + } + ctx, cancelFn := context.WithTimeout(context.Background(), time.Second) + defer cancelFn() + if err := client.BatchCallContext(ctx, batch); !errors.Is(err, ErrBadResult) { + t.Errorf("expected %q but got: %v", ErrBadResult, err) + } + }) + + t.Run("too-many", func(t *testing.T) { + batch := []BatchElem{ + {Method: "foo"}, + } + ctx, cancelFn := context.WithTimeout(context.Background(), time.Second) + defer cancelFn() + if err := client.BatchCallContext(ctx, batch); !errors.Is(err, ErrBadResult) { + t.Errorf("expected %q but got: %v", ErrBadResult, err) + } + }) +} + func TestClientNotify(t *testing.T) { server := newTestServer() defer server.Stop() diff --git a/rpc/http.go b/rpc/http.go index d2e3e6eb1..7aca255e8 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -173,6 +173,9 @@ func (c *Client) sendBatchHTTP(ctx context.Context, op *requestOp, msgs []*jsonr if err := json.NewDecoder(respBody).Decode(&respmsgs); err != nil { return err } + if len(respmsgs) != len(msgs) { + return fmt.Errorf("batch has %d requests but response has %d: %w", len(msgs), len(respmsgs), ErrBadResult) + } for i := 0; i < len(respmsgs); i++ { op.resp <- &respmsgs[i] } diff --git a/statediff/README.md b/statediff/README.md index f262d7a8e..56a2fffd2 100644 --- a/statediff/README.md +++ b/statediff/README.md @@ -118,6 +118,8 @@ This service introduces a CLI flag namespace `statediff` `--statediff.db.clientname` is the client name to use in the Postgres database +`--statediff.db.upsert` whether or not the service, when operating in a direct database writing mode, should overwrite any existing conflicting data + `--statediff.file.path` full path (including filename) to write statediff data out to when operating in file mode `--statediff.file.wapath` full path (including filename) to write statediff watched addresses out to when operating in file mode diff --git a/statediff/api.go b/statediff/api.go index 0a7c5bba8..09622d20b 100644 --- a/statediff/api.go +++ b/statediff/api.go @@ -141,16 +141,73 @@ func (api *PublicStateDiffAPI) StreamCodeAndCodeHash(ctx context.Context, blockN } // WriteStateDiffAt writes a state diff object directly to DB at the specific blockheight -func (api *PublicStateDiffAPI) WriteStateDiffAt(ctx context.Context, blockNumber uint64, params Params) error { +func (api *PublicStateDiffAPI) WriteStateDiffAt(ctx context.Context, blockNumber uint64, params Params) JobID { + var err error + start, logger := countApiRequestBegin("writeStateDiffAt", blockNumber) + defer countApiRequestEnd(start, logger, err) + return api.sds.WriteStateDiffAt(blockNumber, params) } // WriteStateDiffFor writes a state diff object directly to DB for the specific block hash func (api *PublicStateDiffAPI) WriteStateDiffFor(ctx context.Context, blockHash common.Hash, params Params) error { - return api.sds.WriteStateDiffFor(blockHash, params) + var err error + start, logger := countApiRequestBegin("writeStateDiffFor", blockHash.Hex()) + defer countApiRequestEnd(start, logger, err) + + err = api.sds.WriteStateDiffFor(blockHash, params) + return err } // WatchAddress changes the list of watched addresses to which the direct indexing is restricted according to given operation func (api *PublicStateDiffAPI) WatchAddress(operation types.OperationType, args []types.WatchAddressArg) error { return api.sds.WatchAddress(operation, args) } + +// StreamWrites sets up a subscription that streams the status of completed calls to WriteStateDiff* +func (api *PublicStateDiffAPI) StreamWrites(ctx context.Context) (*rpc.Subscription, error) { + // ensure that the RPC connection supports subscriptions + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return nil, rpc.ErrNotificationsUnsupported + } + + // create subscription and start waiting for events + rpcSub := notifier.CreateSubscription() + + go func() { + // subscribe to events from the statediff service + statusChan := make(chan JobStatus, chainEventChanSize) + quitChan := make(chan bool, 1) + api.sds.SubscribeWriteStatus(rpcSub.ID, statusChan, quitChan) + + var err error + defer func() { + if err != nil { + if err = api.sds.UnsubscribeWriteStatus(rpcSub.ID); err != nil { + log.Error("Failed to unsubscribe from job status stream: " + err.Error()) + } + } + }() + // loop and await payloads and relay them to the subscriber with the notifier + for { + select { + case status := <-statusChan: + if err = notifier.Notify(rpcSub.ID, status); err != nil { + log.Error("Failed to send job status; error: " + err.Error()) + return + } + case err = <-rpcSub.Err(): + if err != nil { + log.Error("State diff service rpcSub error: " + err.Error()) + return + } + case <-quitChan: + // don't need to unsubscribe, service does so before sending the quit signal + return + } + } + }() + + return rpcSub, nil +} diff --git a/statediff/builder.go b/statediff/builder.go index c3ecbf117..58cba37f4 100644 --- a/statediff/builder.go +++ b/statediff/builder.go @@ -727,7 +727,7 @@ func (sdb *StateDiffBuilder) buildStorageNodesIncremental(oldSR common.Hash, new if bytes.Equal(newSR.Bytes(), oldSR.Bytes()) { return nil } - log.Debug("Storage Roots for Incremental Diff", "old", oldSR.Hex(), "new", newSR.Hex()) + log.Trace("Storage Roots for Incremental Diff", "old", oldSR.Hex(), "new", newSR.Hex()) oldTrie, err := sdb.StateCache.OpenTrie(oldSR) if err != nil { return err diff --git a/statediff/config.go b/statediff/config.go index c6b305b88..0e3195524 100644 --- a/statediff/config.go +++ b/statediff/config.go @@ -30,8 +30,6 @@ import ( type Config struct { // The configuration used for the stateDiff Indexer IndexerConfig interfaces.Config - // The filepath to write knownGaps insert statements if we can't connect to the DB. - KnownGapsFilePath string // A unique ID used for this service ID string // Name for the client this service is running diff --git a/statediff/indexer/constructor.go b/statediff/indexer/constructor.go index 1a4f64001..0f07e7410 100644 --- a/statediff/indexer/constructor.go +++ b/statediff/indexer/constructor.go @@ -65,7 +65,7 @@ func NewStateDiffIndexer(ctx context.Context, chainConfig *params.ChainConfig, n default: return nil, nil, fmt.Errorf("unrecognized Postgres driver type: %s", pgc.Driver) } - db := postgres.NewPostgresDB(driver) + db := postgres.NewPostgresDB(driver, pgc.Upsert) ind, err := sql.NewStateDiffIndexer(ctx, chainConfig, db) return db, ind, err case shared.DUMP: diff --git a/statediff/indexer/database/sql/batch_tx.go b/statediff/indexer/database/sql/batch_tx.go index 06bb49c9e..5f9d09b25 100644 --- a/statediff/indexer/database/sql/batch_tx.go +++ b/statediff/indexer/database/sql/batch_tx.go @@ -18,6 +18,7 @@ package sql import ( "context" + "sync" "sync/atomic" blockstore "github.com/ipfs/go-ipfs-blockstore" @@ -42,6 +43,8 @@ type BatchTx struct { iplds chan models.IPLDModel ipldCache models.IPLDBatch removedCacheFlag *uint32 + // Tracks expected cache size and ensures cache is caught up before flush + cacheWg sync.WaitGroup submit func(blockTx *BatchTx, err error) error } @@ -52,10 +55,21 @@ func (tx *BatchTx) Submit(err error) error { } func (tx *BatchTx) flush() error { + tx.cacheWg.Wait() _, err := tx.dbtx.Exec(tx.ctx, tx.stm, pq.Array(tx.ipldCache.BlockNumbers), pq.Array(tx.ipldCache.Keys), pq.Array(tx.ipldCache.Values)) if err != nil { - return err + log.Debug(insertError{"public.blocks", err, tx.stm, + struct { + blockNumbers []string + keys []string + values [][]byte + }{ + tx.ipldCache.BlockNumbers, + tx.ipldCache.Keys, + tx.ipldCache.Values, + }}.Error()) + return insertError{"public.blocks", err, tx.stm, "too many arguments; use debug mode for full list"} } tx.ipldCache = models.IPLDBatch{} return nil @@ -69,6 +83,7 @@ func (tx *BatchTx) cache() { tx.ipldCache.BlockNumbers = append(tx.ipldCache.BlockNumbers, i.BlockNumber) tx.ipldCache.Keys = append(tx.ipldCache.Keys, i.Key) tx.ipldCache.Values = append(tx.ipldCache.Values, i.Data) + tx.cacheWg.Done() case <-tx.quit: tx.ipldCache = models.IPLDBatch{} return @@ -77,6 +92,7 @@ func (tx *BatchTx) cache() { } func (tx *BatchTx) cacheDirect(key string, value []byte) { + tx.cacheWg.Add(1) tx.iplds <- models.IPLDModel{ BlockNumber: tx.BlockNumber, Key: key, @@ -85,6 +101,7 @@ func (tx *BatchTx) cacheDirect(key string, value []byte) { } func (tx *BatchTx) cacheIPLD(i node.Node) { + tx.cacheWg.Add(1) tx.iplds <- models.IPLDModel{ BlockNumber: tx.BlockNumber, Key: blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(i.Cid().Hash()).String(), @@ -98,6 +115,7 @@ func (tx *BatchTx) cacheRaw(codec, mh uint64, raw []byte) (string, string, error return "", "", err } prefixedKey := blockstore.BlockPrefix.String() + dshelp.MultihashToDsKey(c.Hash()).String() + tx.cacheWg.Add(1) tx.iplds <- models.IPLDModel{ BlockNumber: tx.BlockNumber, Key: prefixedKey, @@ -109,6 +127,7 @@ func (tx *BatchTx) cacheRaw(codec, mh uint64, raw []byte) (string, string, error func (tx *BatchTx) cacheRemoved(key string, value []byte) { if atomic.LoadUint32(tx.removedCacheFlag) == 0 { atomic.StoreUint32(tx.removedCacheFlag, 1) + tx.cacheWg.Add(1) tx.iplds <- models.IPLDModel{ BlockNumber: tx.BlockNumber, Key: key, diff --git a/statediff/indexer/database/sql/indexer.go b/statediff/indexer/database/sql/indexer.go index 762107ee5..9e23405a0 100644 --- a/statediff/indexer/database/sql/indexer.go +++ b/statediff/indexer/database/sql/indexer.go @@ -122,11 +122,8 @@ func (sdi *StateDiffIndexer) PushBlock(block *types.Block, receipts types.Receip } t = time.Now() - // Begin new db tx for everything - tx, err := sdi.dbWriter.db.Begin(sdi.ctx) - if err != nil { - return nil, err - } + // Begin new DB tx for everything + tx := NewDelayedTx(sdi.dbWriter.db) defer func() { if p := recover(); p != nil { rollback(sdi.ctx, tx) @@ -589,11 +586,8 @@ func (sdi *StateDiffIndexer) LoadWatchedAddresses() ([]common.Address, error) { } // InsertWatchedAddresses inserts the given addresses in the database -func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error { - tx, err := sdi.dbWriter.db.Begin(sdi.ctx) - if err != nil { - return err - } +func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) (err error) { + tx := NewDelayedTx(sdi.dbWriter.db) defer func() { if p := recover(); p != nil { rollback(sdi.ctx, tx) @@ -617,11 +611,8 @@ func (sdi *StateDiffIndexer) InsertWatchedAddresses(args []sdtypes.WatchAddressA } // RemoveWatchedAddresses removes the given watched addresses from the database -func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressArg) error { - tx, err := sdi.dbWriter.db.Begin(sdi.ctx) - if err != nil { - return err - } +func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressArg) (err error) { + tx := NewDelayedTx(sdi.dbWriter.db) defer func() { if p := recover(); p != nil { rollback(sdi.ctx, tx) @@ -644,11 +635,8 @@ func (sdi *StateDiffIndexer) RemoveWatchedAddresses(args []sdtypes.WatchAddressA } // SetWatchedAddresses clears and inserts the given addresses in the database -func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) error { - tx, err := sdi.dbWriter.db.Begin(sdi.ctx) - if err != nil { - return err - } +func (sdi *StateDiffIndexer) SetWatchedAddresses(args []sdtypes.WatchAddressArg, currentBlockNumber *big.Int) (err error) { + tx := NewDelayedTx(sdi.dbWriter.db) defer func() { if p := recover(); p != nil { rollback(sdi.ctx, tx) diff --git a/statediff/indexer/database/sql/lazy_tx.go b/statediff/indexer/database/sql/lazy_tx.go new file mode 100644 index 000000000..922bf84a0 --- /dev/null +++ b/statediff/indexer/database/sql/lazy_tx.go @@ -0,0 +1,55 @@ +package sql + +import ( + "context" +) + +type DelayedTx struct { + cache []cachedStmt + db Database +} +type cachedStmt struct { + sql string + args []interface{} +} + +func NewDelayedTx(db Database) *DelayedTx { + return &DelayedTx{db: db} +} + +func (tx *DelayedTx) QueryRow(ctx context.Context, sql string, args ...interface{}) ScannableRow { + return tx.db.QueryRow(ctx, sql, args...) +} + +func (tx *DelayedTx) Exec(ctx context.Context, sql string, args ...interface{}) (Result, error) { + tx.cache = append(tx.cache, cachedStmt{sql, args}) + return nil, nil +} + +func (tx *DelayedTx) Commit(ctx context.Context) error { + base, err := tx.db.Begin(ctx) + if err != nil { + return err + } + defer func() { + if p := recover(); p != nil { + rollback(ctx, base) + panic(p) + } else if err != nil { + rollback(ctx, base) + } + }() + for _, stmt := range tx.cache { + _, err := base.Exec(ctx, stmt.sql, stmt.args...) + if err != nil { + return err + } + } + tx.cache = nil + return base.Commit(ctx) +} + +func (tx *DelayedTx) Rollback(ctx context.Context) error { + tx.cache = nil + return nil +} diff --git a/statediff/indexer/database/sql/postgres/config.go b/statediff/indexer/database/sql/postgres/config.go index 27f78f8f4..b5cdc02ab 100644 --- a/statediff/indexer/database/sql/postgres/config.go +++ b/statediff/indexer/database/sql/postgres/config.go @@ -70,6 +70,7 @@ type Config struct { MaxConnIdleTime time.Duration MaxConnLifetime time.Duration ConnTimeout time.Duration + LogStatements bool // node info params ID string @@ -77,6 +78,9 @@ type Config struct { // driver type Driver DriverType + + // toggle on/off upserts + Upsert bool } // Type satisfies interfaces.Config diff --git a/statediff/indexer/database/sql/postgres/database.go b/statediff/indexer/database/sql/postgres/database.go index 5484ff8d8..27f89ab83 100644 --- a/statediff/indexer/database/sql/postgres/database.go +++ b/statediff/indexer/database/sql/postgres/database.go @@ -26,21 +26,27 @@ const ( ) // NewPostgresDB returns a postgres.DB using the provided driver -func NewPostgresDB(driver sql.Driver) *DB { - return &DB{driver} +func NewPostgresDB(driver sql.Driver, upsert bool) *DB { + return &DB{upsert, driver} } // DB implements sql.Database using a configured driver and Postgres statement syntax type DB struct { + upsert bool sql.Driver } // InsertHeaderStm satisfies the sql.Statements interface // Stm == Statement func (db *DB) InsertHeaderStm() string { - return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) + if db.upsert { + return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT (block_hash, block_number) DO UPDATE SET (parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) = ($3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, eth.header_cids.times_validated + 1, $16)` + } + return `INSERT INTO eth.header_cids (block_number, block_hash, parent_hash, cid, td, node_id, reward, state_root, tx_root, receipt_root, uncle_root, bloom, timestamp, mh_key, times_validated, coinbase) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) + ON CONFLICT (block_hash, block_number) DO NOTHING` } // InsertUncleStm satisfies the sql.Statements interface @@ -75,8 +81,12 @@ func (db *DB) InsertLogStm() string { // InsertStateStm satisfies the sql.Statements interface func (db *DB) InsertStateStm() string { - return `INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + if db.upsert { + return `INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (header_id, state_path, block_number) DO UPDATE SET (block_number, state_leaf_key, cid, node_type, diff, mh_key) = ($1, $3, $4, $6, $7, $8)` + } + return `INSERT INTO eth.state_cids (block_number, header_id, state_leaf_key, cid, state_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT (header_id, state_path, block_number) DO NOTHING` } // InsertAccountStm satisfies the sql.Statements interface @@ -87,8 +97,12 @@ func (db *DB) InsertAccountStm() string { // InsertStorageStm satisfies the sql.Statements interface func (db *DB) InsertStorageStm() string { - return `INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + if db.upsert { + return `INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (header_id, state_path, storage_path, block_number) DO UPDATE SET (block_number, storage_leaf_key, cid, node_type, diff, mh_key) = ($1, $4, $5, $7, $8, $9)` + } + return `INSERT INTO eth.storage_cids (block_number, header_id, state_path, storage_leaf_key, cid, storage_path, node_type, diff, mh_key) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ON CONFLICT (header_id, state_path, storage_path, block_number) DO NOTHING` } // InsertIPLDStm satisfies the sql.Statements interface diff --git a/statediff/indexer/database/sql/postgres/log_adapter.go b/statediff/indexer/database/sql/postgres/log_adapter.go new file mode 100644 index 000000000..c3ceead46 --- /dev/null +++ b/statediff/indexer/database/sql/postgres/log_adapter.go @@ -0,0 +1,61 @@ +// Copyright © 2023 Cerc + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package postgres + +import ( + "context" + + "github.com/ethereum/go-ethereum/log" + "github.com/jackc/pgx/v4" +) + +type LogAdapter struct { + l log.Logger +} + +func NewLogAdapter(l log.Logger) *LogAdapter { + return &LogAdapter{l: l} +} + +func (l *LogAdapter) Log(ctx context.Context, level pgx.LogLevel, msg string, data map[string]interface{}) { + var logger log.Logger + if data != nil { + var args = make([]interface{}, 0) + for key, value := range data { + if value != nil { + args = append(args, key, value) + } + } + logger = l.l.New(args...) + } else { + logger = l.l + } + + switch level { + case pgx.LogLevelTrace: + logger.Trace(msg) + case pgx.LogLevelDebug: + logger.Debug(msg) + case pgx.LogLevelInfo: + logger.Info(msg) + case pgx.LogLevelWarn: + logger.Warn(msg) + case pgx.LogLevelError: + logger.Error(msg) + default: + logger.New("INVALID_PGX_LOG_LEVEL", level).Error(msg) + } +} diff --git a/statediff/indexer/database/sql/postgres/pgx.go b/statediff/indexer/database/sql/postgres/pgx.go index 936a3765d..9f1c4d571 100644 --- a/statediff/indexer/database/sql/postgres/pgx.go +++ b/statediff/indexer/database/sql/postgres/pgx.go @@ -20,6 +20,8 @@ import ( "context" "time" + "github.com/ethereum/go-ethereum/log" + "github.com/georgysavva/scany/pgxscan" "github.com/jackc/pgconn" "github.com/jackc/pgx/v4" @@ -85,6 +87,11 @@ func MakeConfig(config Config) (*pgxpool.Config, error) { if config.MaxConnIdleTime != 0 { conf.MaxConnIdleTime = config.MaxConnIdleTime } + + if config.LogStatements { + conf.ConnConfig.Logger = NewLogAdapter(log.New()) + } + return conf, nil } diff --git a/statediff/indexer/database/sql/postgres/test_helpers.go b/statediff/indexer/database/sql/postgres/test_helpers.go index 0eab778ae..f8311b413 100644 --- a/statediff/indexer/database/sql/postgres/test_helpers.go +++ b/statediff/indexer/database/sql/postgres/test_helpers.go @@ -31,7 +31,7 @@ func SetupSQLXDB() (sql.Database, error) { if err != nil { return nil, err } - return NewPostgresDB(driver), nil + return NewPostgresDB(driver, false), nil } // SetupPGXDB is used to setup a pgx db for tests @@ -40,5 +40,5 @@ func SetupPGXDB() (sql.Database, error) { if err != nil { return nil, err } - return NewPostgresDB(driver), nil + return NewPostgresDB(driver, false), nil } diff --git a/statediff/indexer/database/sql/writer.go b/statediff/indexer/database/sql/writer.go index 70d8ba45f..c1a67f2f8 100644 --- a/statediff/indexer/database/sql/writer.go +++ b/statediff/indexer/database/sql/writer.go @@ -55,7 +55,7 @@ func (w *Writer) upsertHeaderCID(tx Tx, header models.HeaderModel) error { header.Reward, header.StateRoot, header.TxRoot, header.RctRoot, header.UncleRoot, header.Bloom, header.Timestamp, header.MhKey, 1, header.Coinbase) if err != nil { - return fmt.Errorf("error upserting header_cids entry: %v", err) + return insertError{"eth.header_cids", err, w.db.InsertHeaderStm(), header} } indexerMetrics.blocks.Inc(1) return nil @@ -69,7 +69,7 @@ func (w *Writer) upsertUncleCID(tx Tx, uncle models.UncleModel) error { _, err := tx.Exec(w.db.Context(), w.db.InsertUncleStm(), uncle.BlockNumber, uncle.BlockHash, uncle.HeaderID, uncle.ParentHash, uncle.CID, uncle.Reward, uncle.MhKey) if err != nil { - return fmt.Errorf("error upserting uncle_cids entry: %v", err) + return insertError{"eth.uncle_cids", err, w.db.InsertUncleStm(), uncle} } return nil } @@ -83,7 +83,7 @@ func (w *Writer) upsertTransactionCID(tx Tx, transaction models.TxModel) error { transaction.BlockNumber, transaction.HeaderID, transaction.TxHash, transaction.CID, transaction.Dst, transaction.Src, transaction.Index, transaction.MhKey, transaction.Data, transaction.Type, transaction.Value) if err != nil { - return fmt.Errorf("error upserting transaction_cids entry: %v", err) + return insertError{"eth.transaction_cids", err, w.db.InsertTxStm(), transaction} } indexerMetrics.transactions.Inc(1) return nil @@ -98,7 +98,7 @@ func (w *Writer) upsertAccessListElement(tx Tx, accessListElement models.AccessL accessListElement.BlockNumber, accessListElement.TxID, accessListElement.Index, accessListElement.Address, accessListElement.StorageKeys) if err != nil { - return fmt.Errorf("error upserting access_list_element entry: %v", err) + return insertError{"eth.access_list_elements", err, w.db.InsertAccessListElementStm(), accessListElement} } indexerMetrics.accessListEntries.Inc(1) return nil @@ -113,7 +113,7 @@ func (w *Writer) upsertReceiptCID(tx Tx, rct *models.ReceiptModel) error { rct.BlockNumber, rct.HeaderID, rct.TxID, rct.LeafCID, rct.Contract, rct.ContractHash, rct.LeafMhKey, rct.PostState, rct.PostStatus, rct.LogRoot) if err != nil { - return fmt.Errorf("error upserting receipt_cids entry: %w", err) + return insertError{"eth.receipt_cids", err, w.db.InsertRctStm(), *rct} } indexerMetrics.receipts.Inc(1) return nil @@ -129,7 +129,7 @@ func (w *Writer) upsertLogCID(tx Tx, logs []*models.LogsModel) error { log.BlockNumber, log.HeaderID, log.LeafCID, log.LeafMhKey, log.ReceiptID, log.Address, log.Index, log.Topic0, log.Topic1, log.Topic2, log.Topic3, log.Data) if err != nil { - return fmt.Errorf("error upserting logs entry: %w", err) + return insertError{"eth.log_cids", err, w.db.InsertLogStm(), *log} } indexerMetrics.logs.Inc(1) } @@ -149,7 +149,7 @@ func (w *Writer) upsertStateCID(tx Tx, stateNode models.StateNodeModel) error { stateNode.BlockNumber, stateNode.HeaderID, stateKey, stateNode.CID, stateNode.Path, stateNode.NodeType, true, stateNode.MhKey) if err != nil { - return fmt.Errorf("error upserting state_cids entry: %v", err) + return insertError{"eth.state_cids", err, w.db.InsertStateStm(), stateNode} } return nil } @@ -163,7 +163,7 @@ func (w *Writer) upsertStateAccount(tx Tx, stateAccount models.StateAccountModel stateAccount.BlockNumber, stateAccount.HeaderID, stateAccount.StatePath, stateAccount.Balance, stateAccount.Nonce, stateAccount.CodeHash, stateAccount.StorageRoot) if err != nil { - return fmt.Errorf("error upserting state_accounts entry: %v", err) + return insertError{"eth.state_accounts", err, w.db.InsertAccountStm(), stateAccount} } return nil } @@ -181,7 +181,21 @@ func (w *Writer) upsertStorageCID(tx Tx, storageCID models.StorageNodeModel) err storageCID.BlockNumber, storageCID.HeaderID, storageCID.StatePath, storageKey, storageCID.CID, storageCID.Path, storageCID.NodeType, true, storageCID.MhKey) if err != nil { - return fmt.Errorf("error upserting storage_cids entry: %v", err) + return insertError{"eth.storage_cids", err, w.db.InsertStorageStm(), storageCID} } return nil } + +type insertError struct { + table string + err error + stmt string + arguments interface{} +} + +var _ error = insertError{} + +func (dbe insertError) Error() string { + return fmt.Sprintf("error inserting %s entry: %v\r\nstatement: %s\r\narguments: %+v", + dbe.table, dbe.err, dbe.stmt, dbe.arguments) +} diff --git a/statediff/metrics.go b/statediff/metrics.go index e67499b94..f3461ca21 100644 --- a/statediff/metrics.go +++ b/statediff/metrics.go @@ -26,6 +26,8 @@ const ( namespace = "statediff" ) +var defaultStatediffMetrics = RegisterStatediffMetrics(metrics.DefaultRegistry) + // Build a fully qualified metric name func metricName(subsystem, name string) string { if name == "" { @@ -58,6 +60,14 @@ type statediffMetricsHandles struct { knownGapErrorStart metrics.Gauge // A known gaps end block which had an error being written to the DB knownGapErrorEnd metrics.Gauge + + apiRequests metrics.Counter + apiRequestsUnderway metrics.Counter + + failed metrics.Counter + succeeded metrics.Counter + underway metrics.Counter + totalProcessingTime metrics.Gauge } func RegisterStatediffMetrics(reg metrics.Registry) statediffMetricsHandles { @@ -71,6 +81,12 @@ func RegisterStatediffMetrics(reg metrics.Registry) statediffMetricsHandles { knownGapEnd: metrics.NewGauge(), knownGapErrorStart: metrics.NewGauge(), knownGapErrorEnd: metrics.NewGauge(), + apiRequests: metrics.NewCounter(), + apiRequestsUnderway: metrics.NewCounter(), + failed: metrics.NewCounter(), + succeeded: metrics.NewCounter(), + underway: metrics.NewCounter(), + totalProcessingTime: metrics.NewGauge(), } subsys := "service" reg.Register(metricName(subsys, "last_sync_height"), ctx.lastSyncHeight) @@ -82,5 +98,11 @@ func RegisterStatediffMetrics(reg metrics.Registry) statediffMetricsHandles { reg.Register(metricName(subsys, "known_gaps_end"), ctx.knownGapEnd) reg.Register(metricName(subsys, "known_gaps_error_start"), ctx.knownGapErrorStart) reg.Register(metricName(subsys, "known_gaps_error_end"), ctx.knownGapErrorEnd) + reg.Register(metricName(subsys, "api_requests"), ctx.apiRequests) + reg.Register(metricName(subsys, "api_requests_underway"), ctx.apiRequestsUnderway) + reg.Register(metricName(subsys, "failed"), ctx.failed) + reg.Register(metricName(subsys, "succeeded"), ctx.succeeded) + reg.Register(metricName(subsys, "underway"), ctx.underway) + reg.Register(metricName(subsys, "total_processing_time"), ctx.totalProcessingTime) return ctx } diff --git a/statediff/metrics_helpers.go b/statediff/metrics_helpers.go new file mode 100644 index 000000000..2bebfe253 --- /dev/null +++ b/statediff/metrics_helpers.go @@ -0,0 +1,89 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package statediff + +import ( + "fmt" + "time" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" +) + +func countStateDiffBegin(block *types.Block) (time.Time, log.Logger) { + start := time.Now() + logger := log.New("hash", block.Hash().Hex(), "number", block.NumberU64()) + + defaultStatediffMetrics.underway.Inc(1) + logger.Debug(fmt.Sprintf("writeStateDiff BEGIN [underway=%d, succeeded=%d, failed=%d, total_time=%dms]", + defaultStatediffMetrics.underway.Count(), + defaultStatediffMetrics.succeeded.Count(), + defaultStatediffMetrics.failed.Count(), + defaultStatediffMetrics.totalProcessingTime.Value(), + )) + + return start, logger +} + +func countStateDiffEnd(start time.Time, logger log.Logger, err error) time.Duration { + duration := time.Since(start) + defaultStatediffMetrics.underway.Dec(1) + if nil == err { + defaultStatediffMetrics.succeeded.Inc(1) + } else { + defaultStatediffMetrics.failed.Inc(1) + } + defaultStatediffMetrics.totalProcessingTime.Inc(duration.Milliseconds()) + + logger.Debug(fmt.Sprintf("writeStateDiff END (duration=%dms, err=%t) [underway=%d, succeeded=%d, failed=%d, total_time=%dms]", + duration.Milliseconds(), nil != err, + defaultStatediffMetrics.underway.Count(), + defaultStatediffMetrics.succeeded.Count(), + defaultStatediffMetrics.failed.Count(), + defaultStatediffMetrics.totalProcessingTime.Value(), + )) + + return duration +} + +func countApiRequestBegin(methodName string, blockHashOrNumber interface{}) (time.Time, log.Logger) { + start := time.Now() + logger := log.New(methodName, blockHashOrNumber) + + defaultStatediffMetrics.apiRequests.Inc(1) + defaultStatediffMetrics.apiRequestsUnderway.Inc(1) + + logger.Debug(fmt.Sprintf("statediff API BEGIN [underway=%d, requests=%d])", + defaultStatediffMetrics.apiRequestsUnderway.Count(), + defaultStatediffMetrics.apiRequests.Count(), + )) + + return start, logger +} + +func countApiRequestEnd(start time.Time, logger log.Logger, err error) time.Duration { + duration := time.Since(start) + defaultStatediffMetrics.apiRequestsUnderway.Dec(1) + + logger.Debug(fmt.Sprintf("statediff API END (duration=%dms, err=%t) [underway=%d, requests=%d]", + duration.Milliseconds(), nil != err, + defaultStatediffMetrics.apiRequestsUnderway.Count(), + defaultStatediffMetrics.apiRequests.Count(), + )) + + return duration +} diff --git a/statediff/service.go b/statediff/service.go index b73247647..b5edd19fe 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -36,16 +36,13 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/internal/ethapi" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" ind "github.com/ethereum/go-ethereum/statediff/indexer" - "github.com/ethereum/go-ethereum/statediff/indexer/database/sql" "github.com/ethereum/go-ethereum/statediff/indexer/interfaces" nodeinfo "github.com/ethereum/go-ethereum/statediff/indexer/node" - "github.com/ethereum/go-ethereum/statediff/indexer/shared" types2 "github.com/ethereum/go-ethereum/statediff/types" "github.com/ethereum/go-ethereum/trie" "github.com/thoas/go-funk" @@ -71,8 +68,6 @@ var writeLoopParams = ParamsWithMutex{ }, } -var statediffMetrics = RegisterStatediffMetrics(metrics.DefaultRegistry) - type blockChain interface { SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription CurrentBlock() *types.Block @@ -92,7 +87,7 @@ type IService interface { APIs() []rpc.API // Loop is the main event loop for processing state diffs Loop(chainEventCh chan core.ChainEvent) - // Subscribe method to subscribe to receive state diff processing output` + // Subscribe method to subscribe to receive state diff processing output Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool, params Params) // Unsubscribe method to unsubscribe from state diff processing Unsubscribe(id rpc.ID) error @@ -105,13 +100,18 @@ type IService interface { // StreamCodeAndCodeHash method to stream out all code and codehash pairs StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- types2.CodeAndCodeHash, quitChan chan<- bool) // WriteStateDiffAt method to write state diff object directly to DB - WriteStateDiffAt(blockNumber uint64, params Params) error + WriteStateDiffAt(blockNumber uint64, params Params) JobID // WriteStateDiffFor method to write state diff object directly to DB WriteStateDiffFor(blockHash common.Hash, params Params) error // WriteLoop event loop for progressively processing and writing diffs directly to DB WriteLoop(chainEventCh chan core.ChainEvent) // Method to change the addresses being watched in write loop params WatchAddress(operation types2.OperationType, args []types2.WatchAddressArg) error + + // SubscribeWriteStatus method to subscribe to receive state diff processing output + SubscribeWriteStatus(id rpc.ID, sub chan<- JobStatus, quitChan chan<- bool) + // UnsubscribeWriteStatus method to unsubscribe from state diff processing + UnsubscribeWriteStatus(id rpc.ID) error } // Service is the underlying struct for the state diffing service @@ -134,9 +134,7 @@ type Service struct { BackendAPI ethapi.Backend // Should the statediff service wait for geth to sync to head? WaitForSync bool - // Used to signal if we should check for KnownGaps - KnownGaps KnownGapsState - // Whether or not we have any subscribers; only if we do, do we processes state diffs + // Whether we have any subscribers subscribers int32 // Interface for publishing statediffs as PG-IPLD objects indexer interfaces.StateDiffIndexer @@ -146,6 +144,27 @@ type Service struct { numWorkers uint // Number of retry for aborted transactions due to deadlock. maxRetry uint + // Write job status subscriptions + jobStatusSubs map[rpc.ID]statusSubscription + // Job ID ticker + lastJobID uint64 + // In flight jobs (for WriteStateDiffAt) + currentJobs map[uint64]JobID + currentJobsMutex sync.Mutex +} + +// IDs used for tracking in-progress jobs (0 for invalid) +type JobID uint64 + +// JobStatus represents the status of a completed job +type JobStatus struct { + id JobID + err error +} + +type statusSubscription struct { + statusChan chan<- JobStatus + quitChan chan<- bool } // BlockCache caches the last block for safe access from different service loops @@ -167,7 +186,6 @@ func NewBlockCache(max uint) BlockCache { func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params Config, backend ethapi.Backend) error { blockChain := ethServ.BlockChain() var indexer interfaces.StateDiffIndexer - var db sql.Database var err error quitCh := make(chan bool) indexerConfigAvailable := params.IndexerConfig != nil @@ -180,7 +198,7 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params ClientName: params.ClientName, } var err error - db, indexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.IndexerConfig) + _, indexer, err = ind.NewStateDiffIndexer(params.Context, blockChain.Config(), info, params.IndexerConfig) if err != nil { return err } @@ -191,25 +209,7 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params if workers == 0 { workers = 1 } - // If we ever have multiple processingKeys we can update them here - // along with the expectedDifference - knownGaps := &KnownGapsState{ - processingKey: 0, - expectedDifference: big.NewInt(1), - errorState: false, - writeFilePath: params.KnownGapsFilePath, - db: db, - statediffMetrics: statediffMetrics, - sqlFileWaitingForWrite: false, - } - if indexerConfigAvailable { - if params.IndexerConfig.Type() == shared.POSTGRES { - knownGaps.checkForGaps = true - } else { - log.Info("We are not going to check for gaps on start up since we are not connected to Postgres!") - knownGaps.checkForGaps = false - } - } + sds := &Service{ Mutex: sync.Mutex{}, BlockChain: blockChain, @@ -220,7 +220,6 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params BlockCache: NewBlockCache(workers), BackendAPI: backend, WaitForSync: params.WaitForSync, - KnownGaps: *knownGaps, indexer: indexer, enableWriteLoop: params.EnableWriteLoop, numWorkers: workers, @@ -294,8 +293,13 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) { for { select { case chainEvent := <-chainEventCh: - statediffMetrics.lastEventHeight.Update(int64(chainEvent.Block.Number().Uint64())) - statediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh))) + lastHeight := defaultStatediffMetrics.lastEventHeight.Value() + nextHeight := int64(chainEvent.Block.Number().Uint64()) + if nextHeight-lastHeight != 1 { + log.Warn("Statediffing service received block out-of-order", "next height", nextHeight, "last height", lastHeight) + } + defaultStatediffMetrics.lastEventHeight.Update(nextHeight) + defaultStatediffMetrics.writeLoopChannelLen.Update(int64(len(chainEventCh))) chainEventFwd <- chainEvent case err := <-errCh: log.Error("Error from chain event subscription", "error", err) @@ -333,7 +337,7 @@ func (sds *Service) writeGenesisStateDiff(currBlock *types.Block, workerId uint) genesisBlockNumber, "error", err.Error(), "worker", workerId) return } - statediffMetrics.lastStatediffHeight.Update(genesisBlockNumber) + defaultStatediffMetrics.lastStatediffHeight.Update(genesisBlockNumber) } func (sds *Service) writeLoopWorker(params workerParams) { @@ -355,46 +359,21 @@ func (sds *Service) writeLoopWorker(params workerParams) { sds.writeGenesisStateDiff(parentBlock, params.id) } - // If for any reason we need to check for gaps, - // Check and update the gaps table. - if sds.KnownGaps.checkForGaps && !sds.KnownGaps.errorState { - log.Info("Checking for Gaps at", "current block", currentBlock.Number()) - go sds.KnownGaps.findAndUpdateGaps(currentBlock.Number(), sds.KnownGaps.expectedDifference, sds.KnownGaps.processingKey) - sds.KnownGaps.checkForGaps = false - } - log.Info("Writing state diff", "block height", currentBlock.Number().Uint64(), "worker", params.id) writeLoopParams.RLock() err := sds.writeStateDiffWithRetry(currentBlock, parentBlock.Root(), writeLoopParams.Params) writeLoopParams.RUnlock() if err != nil { - log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id) - sds.KnownGaps.errorState = true - log.Warn("Updating the following block to knownErrorBlocks to be inserted into knownGaps table", "blockNumber", currentBlock.Number()) - sds.KnownGaps.knownErrorBlocks = append(sds.KnownGaps.knownErrorBlocks, currentBlock.Number()) - // Write object to startdiff + log.Error("statediff.Service.WriteLoop: processing error", + "block height", currentBlock.Number().Uint64(), + "block hash", currentBlock.Hash().Hex(), + "error", err.Error(), + "worker", params.id) continue } - sds.KnownGaps.errorState = false - if sds.KnownGaps.knownErrorBlocks != nil { - // We must pass in parameters by VALUE not reference. - // If we pass them in my reference, the references can change before the computation is complete! - staticKnownErrorBlocks := make([]*big.Int, len(sds.KnownGaps.knownErrorBlocks)) - copy(staticKnownErrorBlocks, sds.KnownGaps.knownErrorBlocks) - sds.KnownGaps.knownErrorBlocks = nil - go sds.KnownGaps.captureErrorBlocks(staticKnownErrorBlocks) - } - - if sds.KnownGaps.sqlFileWaitingForWrite { - log.Info("There are entries in the SQL file for knownGaps that should be written") - err := sds.KnownGaps.writeSqlFileStmtToDb() - if err != nil { - log.Error("Unable to write KnownGap sql file to DB") - } - } // TODO: how to handle with concurrent workers - statediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64())) + defaultStatediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64())) case <-sds.QuitChan: log.Info("Quitting the statediff writing process", "worker", params.id) return @@ -412,7 +391,7 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) { select { //Notify chain event channel of events case chainEvent := <-chainEventCh: - statediffMetrics.serviceLoopChannelLen.Update(int64(len(chainEventCh))) + defaultStatediffMetrics.serviceLoopChannelLen.Update(int64(len(chainEventCh))) log.Debug("Loop(): chain event received", "event", chainEvent) // if we don't have any subscribers, do not process a statediff if atomic.LoadInt32(&sds.subscribers) == 0 { @@ -644,7 +623,7 @@ func (sds *Service) Unsubscribe(id rpc.ID) error { return nil } -// This function will check the status of geth syncing. +// GetSyncStatus will check the status of geth syncing. // It will return false if geth has finished syncing. // It will return a true Geth is still syncing. func (sds *Service) GetSyncStatus(pubEthAPI *ethapi.EthereumAPI) (bool, error) { @@ -659,7 +638,7 @@ func (sds *Service) GetSyncStatus(pubEthAPI *ethapi.EthereumAPI) (bool, error) { return false, err } -// This function calls GetSyncStatus to check if we have caught up to head. +// WaitingForSync calls GetSyncStatus to check if we have caught up to head. // It will keep looking and checking if we have caught up to head. // It will only complete if we catch up to head, otherwise it will keep looping forever. func (sds *Service) WaitingForSync() error { @@ -795,7 +774,27 @@ func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- typ // WriteStateDiffAt writes a state diff at the specific blockheight directly to the database // This operation cannot be performed back past the point of db pruning; it requires an archival node // for historical data -func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error { +func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) JobID { + sds.currentJobsMutex.Lock() + defer sds.currentJobsMutex.Unlock() + if id, has := sds.currentJobs[blockNumber]; has { + return id + } + id := JobID(atomic.AddUint64(&sds.lastJobID, 1)) + sds.currentJobs[blockNumber] = id + go func() { + err := sds.writeStateDiffAt(blockNumber, params) + sds.currentJobsMutex.Lock() + delete(sds.currentJobs, blockNumber) + sds.currentJobsMutex.Unlock() + for _, sub := range sds.jobStatusSubs { + sub.statusChan <- JobStatus{id, err} + } + }() + return id +} + +func (sds *Service) writeStateDiffAt(blockNumber uint64, params Params) error { log.Info("writing state diff at", "block height", blockNumber) // use watched addresses from statediffing write loop if not provided @@ -844,11 +843,12 @@ func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) erro // Writes a state diff from the current block, parent state root, and provided params func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, params Params) error { - // log.Info("Writing state diff", "block height", block.Number().Uint64()) var totalDifficulty *big.Int var receipts types.Receipts var err error var tx interfaces.Batch + start, logger := countStateDiffBegin(block) + defer countStateDiffEnd(start, logger, err) if params.IncludeTD { totalDifficulty = sds.BlockChain.GetTd(block.Hash(), block.NumberU64()) } @@ -859,28 +859,25 @@ func (sds *Service) writeStateDiff(block *types.Block, parentRoot common.Hash, p if err != nil { return err } - // defer handling of commit/rollback for any return case - defer func() { - if err := tx.Submit(err); err != nil { - log.Error("batch transaction submission failed", "err", err) - } - }() + output := func(node types2.StateNode) error { return sds.indexer.PushStateNode(tx, node, block.Hash().String()) } codeOutput := func(c types2.CodeAndCodeHash) error { return sds.indexer.PushCodeAndCodeHash(tx, c) } + err = sds.Builder.WriteStateDiffObject(types2.StateRoots{ NewStateRoot: block.Root(), OldStateRoot: parentRoot, }, params, output, codeOutput) + // TODO this anti-pattern needs to be sorted out eventually + if err := tx.Submit(err); err != nil { + return fmt.Errorf("batch transaction submission failed: %s", err.Error()) + } // allow dereferencing of parent, keep current locked as it should be the next parent sds.BlockChain.UnlockTrie(parentRoot) - if err != nil { - return err - } return nil } @@ -891,8 +888,8 @@ func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot commo err = sds.writeStateDiff(block, parentRoot, params) if err != nil && strings.Contains(err.Error(), deadlockDetected) { // Retry only when the deadlock is detected. - if i != sds.maxRetry { - log.Info("dead lock detected while writing statediff", "err", err, "retry number", i) + if i+1 < sds.maxRetry { + log.Warn("dead lock detected while writing statediff", "err", err, "retry number", i) } continue } @@ -901,7 +898,34 @@ func (sds *Service) writeStateDiffWithRetry(block *types.Block, parentRoot commo return err } -// Performs one of following operations on the watched addresses in writeLoopParams and the db: +// SubscribeWriteStatus is used by the API to subscribe to the job status updates +func (sds *Service) SubscribeWriteStatus(id rpc.ID, sub chan<- JobStatus, quitChan chan<- bool) { + log.Info("Subscribing to job status updates", "subscription id", id) + sds.Lock() + if sds.jobStatusSubs == nil { + sds.jobStatusSubs = map[rpc.ID]statusSubscription{} + } + sds.jobStatusSubs[id] = statusSubscription{ + statusChan: sub, + quitChan: quitChan, + } + sds.Unlock() +} + +// UnsubscribeWriteStatus is used to unsubscribe from job status updates +func (sds *Service) UnsubscribeWriteStatus(id rpc.ID) error { + log.Info("Unsubscribing from job status updates", "subscription id", id) + sds.Lock() + close(sds.jobStatusSubs[id].quitChan) + delete(sds.jobStatusSubs, id) + if len(sds.jobStatusSubs) == 0 { + sds.jobStatusSubs = nil + } + sds.Unlock() + return nil +} + +// WatchAddress performs one of following operations on the watched addresses in writeLoopParams and the db: // add | remove | set | clear func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.WatchAddressArg) error { // lock writeLoopParams for a write diff --git a/statediff/test_helpers/mocks/service.go b/statediff/test_helpers/mocks/service.go index ab183fddd..1ecd80ec8 100644 --- a/statediff/test_helpers/mocks/service.go +++ b/statediff/test_helpers/mocks/service.go @@ -189,9 +189,9 @@ func (sds *MockStateDiffService) newPayload(stateObject []byte, block *types.Blo } // WriteStateDiffAt mock method -func (sds *MockStateDiffService) WriteStateDiffAt(blockNumber uint64, params statediff.Params) error { +func (sds *MockStateDiffService) WriteStateDiffAt(blockNumber uint64, params statediff.Params) statediff.JobID { // TODO: something useful here - return nil + return 0 } // WriteStateDiffFor mock method @@ -436,3 +436,14 @@ func (sds *MockStateDiffService) WatchAddress(operation sdtypes.OperationType, a return nil } + +// SubscribeWriteStatus is used by the API to subscribe to the job status updates +func (sds *MockStateDiffService) SubscribeWriteStatus(id rpc.ID, sub chan<- statediff.JobStatus, quitChan chan<- bool) { + // TODO when WriteStateDiff methods are implemented +} + +// UnsubscribeWriteStatus is used to unsubscribe from job status updates +func (sds *MockStateDiffService) UnsubscribeWriteStatus(id rpc.ID) error { + // TODO when WriteStateDiff methods are implemented + return nil +} diff --git a/trie/sync.go b/trie/sync.go index f4fdf1432..2f79242ca 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -19,6 +19,7 @@ package trie import ( "errors" "fmt" + "sync" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/prque" @@ -381,11 +382,11 @@ func (s *Sync) scheduleCodeRequest(req *codeRequest) { // retrieval scheduling. func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) { // Gather all the children of the node, irrelevant whether known or not - type child struct { + type childNode struct { path []byte node node } - var children []child + var children []childNode switch node := (object).(type) { case *shortNode: @@ -393,14 +394,14 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) { if hasTerm(key) { key = key[:len(key)-1] } - children = []child{{ + children = []childNode{{ node: node.Val, path: append(append([]byte(nil), req.path...), key...), }} case *fullNode: for i := 0; i < 17; i++ { if node.Children[i] != nil { - children = append(children, child{ + children = append(children, childNode{ node: node.Children[i], path: append(append([]byte(nil), req.path...), byte(i)), }) @@ -410,7 +411,10 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) { panic(fmt.Sprintf("unknown node: %+v", node)) } // Iterate over the children, and request all unknown ones - requests := make([]*nodeRequest, 0, len(children)) + var ( + missing = make(chan *nodeRequest, len(children)) + pending sync.WaitGroup + ) for _, child := range children { // Notify any external watcher of a new key/value node if req.callback != nil { @@ -433,19 +437,36 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) { if s.membatch.hasNode(child.path) { continue } - // If database says duplicate, then at least the trie node is present - // and we hold the assumption that it's NOT legacy contract code. - chash := common.BytesToHash(node) - if rawdb.HasTrieNode(s.database, chash) { - continue - } - // Locally unknown node, schedule for retrieval - requests = append(requests, &nodeRequest{ - path: child.path, - hash: chash, - parent: req, - callback: req.callback, - }) + // Check the presence of children concurrently + pending.Add(1) + go func(child childNode) { + defer pending.Done() + + // If database says duplicate, then at least the trie node is present + // and we hold the assumption that it's NOT legacy contract code. + chash := common.BytesToHash(node) + if rawdb.HasTrieNode(s.database, chash) { + return + } + // Locally unknown node, schedule for retrieval + missing <- &nodeRequest{ + path: child.path, + hash: chash, + parent: req, + callback: req.callback, + } + }(child) + } + } + pending.Wait() + + requests := make([]*nodeRequest, 0, len(children)) + for done := false; !done; { + select { + case miss := <-missing: + requests = append(requests, miss) + default: + done = true } } return requests, nil