From f3e4e85cd779d5838f147d65bdeb6a34c1638c6e Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Thu, 6 Jun 2019 21:24:17 -0500 Subject: [PATCH] adjust buffering to improve stability; doc.go; fix notifier err handling --- cmd/utils/flags.go | 2 +- core/blockchain_test.go | 4 ++-- statediff/api.go | 23 ++++++++++++------- statediff/doc.go | 49 +++++++++++++++++++++++++++++++++++++++++ statediff/service.go | 10 ++++----- 5 files changed, 72 insertions(+), 16 deletions(-) create mode 100644 statediff/doc.go diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index abc60c594..35107feb2 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -759,7 +759,7 @@ var ( StateDiffFlag = cli.BoolFlag{ Name: "statediff", - Usage: "Enables the calculation of state diffs between each block, persists these state diffs the configured persistence mode.", + Usage: "Enables the processing of state diffs between each block", } StateDiffPathsAndProofs = cli.BoolFlag{ Name: "statediff.pathsandproofs", diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 040096fd0..54b7d5b24 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -2227,8 +2227,8 @@ func BenchmarkBlockChain_1x1000ValueTransferToExisting(b *testing.B) { func BenchmarkBlockChain_1x1000Executions(b *testing.B) { var ( - numTxs= 1000 - numBlocks= 1 + numTxs = 1000 + numBlocks = 1 ) b.StopTimer() b.ResetTimer() diff --git a/statediff/api.go b/statediff/api.go index 946be1146..a05ef5510 100644 --- a/statediff/api.go +++ b/statediff/api.go @@ -56,23 +56,30 @@ func (api *PublicStateDiffAPI) Stream(ctx context.Context) (*rpc.Subscription, e go func() { // subscribe to events from the state diff service - payloadChannel := make(chan Payload, 10) - quitChan := make(chan bool) + payloadChannel := make(chan Payload, chainEventChanSize) + quitChan := make(chan bool, 1) api.sds.Subscribe(rpcSub.ID, payloadChannel, quitChan) // loop and await state diff payloads and relay them to the subscriber with the notifier for { select { case packet := <-payloadChannel: - if err := notifier.Notify(rpcSub.ID, packet); err != nil { - log.Error("Failed to send state diff packet", "err", err) + if notifyErr := notifier.Notify(rpcSub.ID, packet); notifyErr != nil { + log.Error("Failed to send state diff packet; error: " + notifyErr.Error()) + unSubErr := api.sds.Unsubscribe(rpcSub.ID) + if unSubErr != nil { + log.Error("Failed to unsubscribe from the state diff service; error: " + unSubErr.Error()) + } + return } case err := <-rpcSub.Err(): - log.Error("State diff service rpcSub error", err) - err = api.sds.Unsubscribe(rpcSub.ID) if err != nil { - log.Error("Failed to unsubscribe from the state diff service", err) + log.Error("State diff service rpcSub error: " + err.Error()) + err = api.sds.Unsubscribe(rpcSub.ID) + if err != nil { + log.Error("Failed to unsubscribe from the state diff service; error: " + err.Error()) + } + return } - return case <-quitChan: // don't need to unsubscribe, statediff service does so before sending the quit signal return diff --git a/statediff/doc.go b/statediff/doc.go new file mode 100644 index 000000000..0e6d5f3e1 --- /dev/null +++ b/statediff/doc.go @@ -0,0 +1,49 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +/* +This work is adapted from work by Charles Crain at https://github.com/jpmorganchase/quorum/blob/9b7fd9af8082795eeeb6863d9746f12b82dd5078/statediff/statediff.go + +Package statediff provides an auxiliary service that processes state diff objects from incoming chain events, +relaying the objects to any rpc subscriptions. + +The service is spun up using the below CLI flags +--statediff: boolean flag, turns on the service +--statediff.streamblock: boolean flag, configures the service to associate and stream out the rest of the block data with the state diffs. +--statediff.intermediatenodes: boolean flag, tells service to include intermediate (branch and extension) nodes; default (false) processes leaf nodes only. +--statediff.pathsandproofs: boolean flag, tells service to generate paths and proofs for the diffed storage and state trie leaf nodes. +--statediff.watchedaddresses: string slice flag, used to limit the state diffing process to the given addresses. Usage: --statediff.watchedaddresses=addr1 --statediff.watchedaddresses=addr2 --statediff.watchedaddresses=addr3 + +If you wish to use the websocket endpoint to subscribe to the statediff service, be sure to open up the Websocket RPC server with the `--ws` flag. + +Rpc subscriptions to the service can be created using the rpc.Client.Subscribe() method, +with the "statediff" namespace, a statediff.Payload channel, and the name of the statediff api's rpc method- "stream". + +e.g. + +cli, _ := rpc.Dial("ipcPathOrWsURL") +stateDiffPayloadChan := make(chan statediff.Payload, 20000) +rpcSub, err := cli.Subscribe(context.Background(), "statediff", stateDiffPayloadChan, "stream"}) +for { + select { + case stateDiffPayload := <- stateDiffPayloadChan: + processPayload(stateDiffPayload) + case err := <= rpcSub.Err(): + log.Error(err) + } +} +*/ +package statediff diff --git a/statediff/service.go b/statediff/service.go index f3e47d16e..e978fcb8f 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -34,6 +34,8 @@ import ( "github.com/ethereum/go-ethereum/rpc" ) +const chainEventChanSize = 20000 + type blockChain interface { SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription GetBlockByHash(hash common.Hash) *types.Block @@ -127,13 +129,11 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) { } sds.lastBlock = currentBlock if parentBlock == nil { - log.Error("Parent block is nil, skipping this block", - "parent block hash", parentHash.String(), - "current block number", currentBlock.Number()) + log.Error(fmt.Sprintf("Parent block is nil, skipping this block (%d)", currentBlock.Number())) continue } if err := sds.processStateDiff(currentBlock, parentBlock); err != nil { - log.Error("Error building statediff", "block number", currentBlock.Number(), "error", err) + log.Error(fmt.Sprintf("Error building statediff for block %d; error: ", currentBlock.Number()) + err.Error()) } case err := <-errCh: log.Warn("Error from chain event subscription, breaking loop", "error", err) @@ -210,7 +210,7 @@ func (sds *Service) Unsubscribe(id rpc.ID) error { func (sds *Service) Start(*p2p.Server) error { log.Info("Starting statediff service") - chainEventCh := make(chan core.ChainEvent, 10) + chainEventCh := make(chan core.ChainEvent, chainEventChanSize) go sds.Loop(chainEventCh) return nil