adjust buffering to improve stability; doc.go; fix notifier

err handling
This commit is contained in:
Ian Norden 2019-06-06 21:24:17 -05:00 committed by Rob Mulholand
parent 3edcd7690f
commit 00cc1f89ff
5 changed files with 72 additions and 16 deletions

View File

@ -761,7 +761,7 @@ var (
StateDiffFlag = cli.BoolFlag{ StateDiffFlag = cli.BoolFlag{
Name: "statediff", Name: "statediff",
Usage: "Enables the calculation of state diffs between each block, persists these state diffs the configured persistence mode.", Usage: "Enables the processing of state diffs between each block",
} }
StateDiffPathsAndProofs = cli.BoolFlag{ StateDiffPathsAndProofs = cli.BoolFlag{
Name: "statediff.pathsandproofs", Name: "statediff.pathsandproofs",

View File

@ -2227,8 +2227,8 @@ func BenchmarkBlockChain_1x1000ValueTransferToExisting(b *testing.B) {
func BenchmarkBlockChain_1x1000Executions(b *testing.B) { func BenchmarkBlockChain_1x1000Executions(b *testing.B) {
var ( var (
numTxs= 1000 numTxs = 1000
numBlocks= 1 numBlocks = 1
) )
b.StopTimer() b.StopTimer()
b.ResetTimer() b.ResetTimer()

View File

@ -56,23 +56,30 @@ func (api *PublicStateDiffAPI) Stream(ctx context.Context) (*rpc.Subscription, e
go func() { go func() {
// subscribe to events from the state diff service // subscribe to events from the state diff service
payloadChannel := make(chan Payload, 10) payloadChannel := make(chan Payload, chainEventChanSize)
quitChan := make(chan bool) quitChan := make(chan bool, 1)
api.sds.Subscribe(rpcSub.ID, payloadChannel, quitChan) api.sds.Subscribe(rpcSub.ID, payloadChannel, quitChan)
// loop and await state diff payloads and relay them to the subscriber with the notifier // loop and await state diff payloads and relay them to the subscriber with the notifier
for { for {
select { select {
case packet := <-payloadChannel: case packet := <-payloadChannel:
if err := notifier.Notify(rpcSub.ID, packet); err != nil { if notifyErr := notifier.Notify(rpcSub.ID, packet); notifyErr != nil {
log.Error("Failed to send state diff packet", "err", err) log.Error("Failed to send state diff packet; error: " + notifyErr.Error())
unSubErr := api.sds.Unsubscribe(rpcSub.ID)
if unSubErr != nil {
log.Error("Failed to unsubscribe from the state diff service; error: " + unSubErr.Error())
}
return
} }
case err := <-rpcSub.Err(): case err := <-rpcSub.Err():
log.Error("State diff service rpcSub error", err)
err = api.sds.Unsubscribe(rpcSub.ID)
if err != nil { if err != nil {
log.Error("Failed to unsubscribe from the state diff service", err) log.Error("State diff service rpcSub error: " + err.Error())
err = api.sds.Unsubscribe(rpcSub.ID)
if err != nil {
log.Error("Failed to unsubscribe from the state diff service; error: " + err.Error())
}
return
} }
return
case <-quitChan: case <-quitChan:
// don't need to unsubscribe, statediff service does so before sending the quit signal // don't need to unsubscribe, statediff service does so before sending the quit signal
return return

49
statediff/doc.go Normal file
View File

@ -0,0 +1,49 @@
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
/*
This work is adapted from work by Charles Crain at https://github.com/jpmorganchase/quorum/blob/9b7fd9af8082795eeeb6863d9746f12b82dd5078/statediff/statediff.go
Package statediff provides an auxiliary service that processes state diff objects from incoming chain events,
relaying the objects to any rpc subscriptions.
The service is spun up using the below CLI flags
--statediff: boolean flag, turns on the service
--statediff.streamblock: boolean flag, configures the service to associate and stream out the rest of the block data with the state diffs.
--statediff.intermediatenodes: boolean flag, tells service to include intermediate (branch and extension) nodes; default (false) processes leaf nodes only.
--statediff.pathsandproofs: boolean flag, tells service to generate paths and proofs for the diffed storage and state trie leaf nodes.
--statediff.watchedaddresses: string slice flag, used to limit the state diffing process to the given addresses. Usage: --statediff.watchedaddresses=addr1 --statediff.watchedaddresses=addr2 --statediff.watchedaddresses=addr3
If you wish to use the websocket endpoint to subscribe to the statediff service, be sure to open up the Websocket RPC server with the `--ws` flag.
Rpc subscriptions to the service can be created using the rpc.Client.Subscribe() method,
with the "statediff" namespace, a statediff.Payload channel, and the name of the statediff api's rpc method- "stream".
e.g.
cli, _ := rpc.Dial("ipcPathOrWsURL")
stateDiffPayloadChan := make(chan statediff.Payload, 20000)
rpcSub, err := cli.Subscribe(context.Background(), "statediff", stateDiffPayloadChan, "stream"})
for {
select {
case stateDiffPayload := <- stateDiffPayloadChan:
processPayload(stateDiffPayload)
case err := <= rpcSub.Err():
log.Error(err)
}
}
*/
package statediff

View File

@ -34,6 +34,8 @@ import (
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
) )
const chainEventChanSize = 20000
type blockChain interface { type blockChain interface {
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
GetBlockByHash(hash common.Hash) *types.Block GetBlockByHash(hash common.Hash) *types.Block
@ -127,13 +129,11 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) {
} }
sds.lastBlock = currentBlock sds.lastBlock = currentBlock
if parentBlock == nil { if parentBlock == nil {
log.Error("Parent block is nil, skipping this block", log.Error(fmt.Sprintf("Parent block is nil, skipping this block (%d)", currentBlock.Number()))
"parent block hash", parentHash.String(),
"current block number", currentBlock.Number())
continue continue
} }
if err := sds.processStateDiff(currentBlock, parentBlock); err != nil { if err := sds.processStateDiff(currentBlock, parentBlock); err != nil {
log.Error("Error building statediff", "block number", currentBlock.Number(), "error", err) log.Error(fmt.Sprintf("Error building statediff for block %d; error: ", currentBlock.Number()) + err.Error())
} }
case err := <-errCh: case err := <-errCh:
log.Warn("Error from chain event subscription, breaking loop", "error", err) log.Warn("Error from chain event subscription, breaking loop", "error", err)
@ -210,7 +210,7 @@ func (sds *Service) Unsubscribe(id rpc.ID) error {
func (sds *Service) Start(*p2p.Server) error { func (sds *Service) Start(*p2p.Server) error {
log.Info("Starting statediff service") log.Info("Starting statediff service")
chainEventCh := make(chan core.ChainEvent, 10) chainEventCh := make(chan core.ChainEvent, chainEventChanSize)
go sds.Loop(chainEventCh) go sds.Loop(chainEventCh)
return nil return nil