review fixes; proper method signature for api; adjust service so that statediff processing is halted/paused until there is at least one subscriber listening for the results

This commit is contained in:
Ian Norden 2019-06-05 13:10:04 -05:00
parent 8543581b5d
commit ad87bad4de
11 changed files with 73 additions and 71 deletions

View File

@ -257,8 +257,8 @@ var AppHelpFlagGroups = []flagGroup{
utils.StateDiffFlag, utils.StateDiffFlag,
utils.StateDiffPathsAndProofs, utils.StateDiffPathsAndProofs,
utils.StateDiffIntermediateNodes, utils.StateDiffIntermediateNodes,
utils.StateDiffWatchedAddresses,
utils.StateDiffStreamBlock, utils.StateDiffStreamBlock,
utils.StateDiffWatchedAddresses,
}, },
}, },
{ {

View File

@ -769,14 +769,14 @@ var (
Name: "statediff.intermediatenodes", Name: "statediff.intermediatenodes",
Usage: "Set to include intermediate (branch and extension) nodes; default (false) processes leaf nodes only", Usage: "Set to include intermediate (branch and extension) nodes; default (false) processes leaf nodes only",
} }
StateDiffWatchedAddresses = cli.StringSliceFlag{
Name: "statediff.watchedaddresses",
Usage: "If provided, state diffing process is restricted to these addresses",
}
StateDiffStreamBlock = cli.BoolFlag{ StateDiffStreamBlock = cli.BoolFlag{
Name: "statediff.streamblock", Name: "statediff.streamblock",
Usage: "Set to true to stream the block data alongside state diff data in the same subscription payload", Usage: "Set to true to stream the block data alongside state diff data in the same subscription payload",
} }
StateDiffWatchedAddresses = cli.StringSliceFlag{
Name: "statediff.watchedaddresses",
Usage: "If provided, state diffing process is restricted to these addresses",
}
) )
// MakeDataDir retrieves the currently requested data directory, terminating // MakeDataDir retrieves the currently requested data directory, terminating
@ -1651,9 +1651,9 @@ func RegisterGraphQLService(stack *node.Node, endpoint string, cors, vhosts []st
// RegisterStateDiffService configures and registers a service to stream state diff data over RPC // RegisterStateDiffService configures and registers a service to stream state diff data over RPC
func RegisterStateDiffService(stack *node.Node, ctx *cli.Context) { func RegisterStateDiffService(stack *node.Node, ctx *cli.Context) {
config := statediff.Config{ config := statediff.Config{
StreamBlock: ctx.GlobalBool(StateDiffStreamBlock.Name),
PathsAndProofs: ctx.GlobalBool(StateDiffPathsAndProofs.Name), PathsAndProofs: ctx.GlobalBool(StateDiffPathsAndProofs.Name),
AllNodes: ctx.GlobalBool(StateDiffIntermediateNodes.Name), IntermediateNodes: ctx.GlobalBool(StateDiffIntermediateNodes.Name),
StreamBlock: ctx.GlobalBool(StateDiffStreamBlock.Name),
WatchedAddresses: ctx.GlobalStringSlice(StateDiffWatchedAddresses.Name), WatchedAddresses: ctx.GlobalStringSlice(StateDiffWatchedAddresses.Name),
} }
if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {

View File

@ -43,8 +43,8 @@ func NewPublicStateDiffAPI(sds IService) *PublicStateDiffAPI {
} }
} }
// Subscribe is the public method to setup a subscription that fires off state-diff payloads as they are created // Stream is the public method to setup a subscription that fires off state-diff payloads as they are created
func (api *PublicStateDiffAPI) Subscribe(ctx context.Context, payloadChan chan Payload) (*rpc.Subscription, error) { func (api *PublicStateDiffAPI) Stream(ctx context.Context) (*rpc.Subscription, error) {
// ensure that the RPC connection supports subscriptions // ensure that the RPC connection supports subscriptions
notifier, supported := rpc.NotifierFromContext(ctx) notifier, supported := rpc.NotifierFromContext(ctx)
if !supported { if !supported {
@ -68,8 +68,6 @@ func (api *PublicStateDiffAPI) Subscribe(ctx context.Context, payloadChan chan P
} }
case err := <-rpcSub.Err(): case err := <-rpcSub.Err():
log.Error("State diff service rpcSub error", err) log.Error("State diff service rpcSub error", err)
println("err")
println(err.Error())
err = api.sds.Unsubscribe(rpcSub.ID) err = api.sds.Unsubscribe(rpcSub.ID)
if err != nil { if err != nil {
log.Error("Failed to unsubscribe from the state diff service", err) log.Error("Failed to unsubscribe from the state diff service", err)

View File

@ -163,7 +163,7 @@ func (sdb *builder) collectDiffNodes(a, b trie.NodeIterator) (AccountsMap, error
// record account to diffs (creation if we are looking at new - old; deletion if old - new) // record account to diffs (creation if we are looking at new - old; deletion if old - new)
log.Debug("Account lookup successful", "address", leafKeyHash, "account", account) log.Debug("Account lookup successful", "address", leafKeyHash, "account", account)
diffAccounts[leafKeyHash] = aw diffAccounts[leafKeyHash] = aw
} else if sdb.config.AllNodes && !bytes.Equal(nullNode, it.Hash().Bytes()) { } else if sdb.config.IntermediateNodes && !bytes.Equal(nullNode, it.Hash().Bytes()) {
nodeKey := it.Hash() nodeKey := it.Hash()
node, err := sdb.stateCache.TrieDB().Node(nodeKey) node, err := sdb.stateCache.TrieDB().Node(nodeKey)
if err != nil { if err != nil {
@ -297,7 +297,7 @@ func (sdb *builder) buildStorageDiffsFromTrie(it trie.NodeIterator) ([]StorageDi
sd.Path = leafPath sd.Path = leafPath
} }
storageDiffs = append(storageDiffs, sd) storageDiffs = append(storageDiffs, sd)
} else if sdb.config.AllNodes && !bytes.Equal(nullNode, it.Hash().Bytes()) { } else if sdb.config.IntermediateNodes && !bytes.Equal(nullNode, it.Hash().Bytes()) {
nodeKey := it.Hash() nodeKey := it.Hash()
node, err := sdb.stateCache.TrieDB().Node(nodeKey) node, err := sdb.stateCache.TrieDB().Node(nodeKey)
if err != nil { if err != nil {

View File

@ -149,7 +149,7 @@ func TestBuilder(t *testing.T) {
block3 = blockMap[block3Hash] block3 = blockMap[block3Hash]
config := statediff.Config{ config := statediff.Config{
PathsAndProofs: true, PathsAndProofs: true,
AllNodes: false, IntermediateNodes: false,
} }
builder = statediff.NewBuilder(testhelpers.Testdb, chain, config) builder = statediff.NewBuilder(testhelpers.Testdb, chain, config)
@ -383,7 +383,7 @@ func TestBuilderWithWatchedAddressList(t *testing.T) {
block3 = blockMap[block3Hash] block3 = blockMap[block3Hash]
config := statediff.Config{ config := statediff.Config{
PathsAndProofs: true, PathsAndProofs: true,
AllNodes: false, IntermediateNodes: false,
WatchedAddresses: []string{testhelpers.Account1Addr.Hex(), testhelpers.ContractAddr.Hex()}, WatchedAddresses: []string{testhelpers.Account1Addr.Hex(), testhelpers.ContractAddr.Hex()},
} }
builder = statediff.NewBuilder(testhelpers.Testdb, chain, config) builder = statediff.NewBuilder(testhelpers.Testdb, chain, config)

View File

@ -18,8 +18,8 @@ package statediff
// Config is used to carry in parameters from CLI configuration // Config is used to carry in parameters from CLI configuration
type Config struct { type Config struct {
StreamBlock bool
PathsAndProofs bool PathsAndProofs bool
AllNodes bool IntermediateNodes bool
StreamBlock bool
WatchedAddresses []string WatchedAddresses []string
} }

View File

@ -20,8 +20,7 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"sync" "sync"
"sync/atomic"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
@ -31,6 +30,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
) )
@ -68,6 +68,8 @@ type Service struct {
lastBlock *types.Block lastBlock *types.Block
// Whether or not the block data is streamed alongside the state diff data in the subscription payload // Whether or not the block data is streamed alongside the state diff data in the subscription payload
streamBlock bool streamBlock bool
// Whether or not we have any subscribers; only if we do, do we processes state diffs
subscribers int32
} }
// NewStateDiffService creates a new StateDiffingService // NewStateDiffService creates a new StateDiffingService
@ -110,6 +112,11 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) {
//Notify chain event channel of events //Notify chain event channel of events
case chainEvent := <-chainEventCh: case chainEvent := <-chainEventCh:
log.Debug("Event received from chainEventCh", "event", chainEvent) log.Debug("Event received from chainEventCh", "event", chainEvent)
// if we don't have any subscribers, do not process a statediff
if atomic.LoadInt32(&sds.subscribers) == 0 {
log.Debug("Currently no subscribers to the statediffing service; processing is halted")
continue
}
currentBlock := chainEvent.Block currentBlock := chainEvent.Block
parentHash := currentBlock.ParentHash() parentHash := currentBlock.ParentHash()
var parentBlock *types.Block var parentBlock *types.Block
@ -125,7 +132,7 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) {
"current block number", currentBlock.Number()) "current block number", currentBlock.Number())
continue continue
} }
if err := sds.process(currentBlock, parentBlock); err != nil { if err := sds.processStateDiff(currentBlock, parentBlock); err != nil {
log.Error("Error building statediff", "block number", currentBlock.Number(), "error", err) log.Error("Error building statediff", "block number", currentBlock.Number(), "error", err)
} }
case err := <-errCh: case err := <-errCh:
@ -140,8 +147,8 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) {
} }
} }
// process method builds the state diff payload from the current and parent block and streams it to listening subscriptions // processStateDiff method builds the state diff payload from the current and parent block and sends it to listening subscriptions
func (sds *Service) process(currentBlock, parentBlock *types.Block) error { func (sds *Service) processStateDiff(currentBlock, parentBlock *types.Block) error {
stateDiff, err := sds.Builder.BuildStateDiff(parentBlock.Root(), currentBlock.Root(), currentBlock.Number(), currentBlock.Hash()) stateDiff, err := sds.Builder.BuildStateDiff(parentBlock.Root(), currentBlock.Root(), currentBlock.Number(), currentBlock.Hash())
if err != nil { if err != nil {
return err return err
@ -170,6 +177,9 @@ func (sds *Service) process(currentBlock, parentBlock *types.Block) error {
// Subscribe is used by the API to subscribe to the StateDiffingService loop // Subscribe is used by the API to subscribe to the StateDiffingService loop
func (sds *Service) Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool) { func (sds *Service) Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool) {
log.Info("Subscribing to the statediff service") log.Info("Subscribing to the statediff service")
if atomic.CompareAndSwapInt32(&sds.subscribers, 0, 1) {
log.Info("State diffing subscription received; beginning statediff processing")
}
sds.Lock() sds.Lock()
sds.Subscriptions[id] = Subscription{ sds.Subscriptions[id] = Subscription{
PayloadChan: sub, PayloadChan: sub,
@ -187,6 +197,11 @@ func (sds *Service) Unsubscribe(id rpc.ID) error {
return fmt.Errorf("cannot unsubscribe; subscription for id %s does not exist", id) return fmt.Errorf("cannot unsubscribe; subscription for id %s does not exist", id)
} }
delete(sds.Subscriptions, id) delete(sds.Subscriptions, id)
if len(sds.Subscriptions) == 0 {
if atomic.CompareAndSwapInt32(&sds.subscribers, 1, 0) {
log.Info("No more subscriptions; halting statediff processing")
}
}
sds.Unlock() sds.Unlock()
return nil return nil
} }
@ -208,7 +223,7 @@ func (sds *Service) Stop() error {
return nil return nil
} }
// send is used to fan out and serve a payload to any subscriptions // send is used to fan out and serve the statediff payload to all subscriptions
func (sds *Service) send(payload Payload) { func (sds *Service) send(payload Payload) {
sds.Lock() sds.Lock()
for id, sub := range sds.Subscriptions { for id, sub := range sds.Subscriptions {
@ -216,7 +231,21 @@ func (sds *Service) send(payload Payload) {
case sub.PayloadChan <- payload: case sub.PayloadChan <- payload:
log.Info(fmt.Sprintf("sending state diff payload to subscription %s", id)) log.Info(fmt.Sprintf("sending state diff payload to subscription %s", id))
default: default:
log.Info(fmt.Sprintf("unable to send payload to subscription %s", id)) log.Info(fmt.Sprintf("unable to send payload to subscription %s; channel has no receiver", id))
// in this case, try to close the bad subscription and remove it
select {
case sub.QuitChan <- true:
log.Info(fmt.Sprintf("closing subscription %s", id))
default:
log.Info(fmt.Sprintf("unable to close subscription %s; channel has no receiver", id))
}
delete(sds.Subscriptions, id)
}
}
// If after removing all bad subscriptions we have none left, halt processing
if len(sds.Subscriptions) == 0 {
if atomic.CompareAndSwapInt32(&sds.subscribers, 1, 0) {
log.Info("No more subscriptions; halting statediff processing")
} }
} }
sds.Unlock() sds.Unlock()
@ -228,11 +257,11 @@ func (sds *Service) close() {
for id, sub := range sds.Subscriptions { for id, sub := range sds.Subscriptions {
select { select {
case sub.QuitChan <- true: case sub.QuitChan <- true:
delete(sds.Subscriptions, id)
log.Info(fmt.Sprintf("closing subscription %s", id)) log.Info(fmt.Sprintf("closing subscription %s", id))
default: default:
log.Info(fmt.Sprintf("unable to close subscription %s; channel has no receiver", id)) log.Info(fmt.Sprintf("unable to close subscription %s; channel has no receiver", id))
} }
delete(sds.Subscriptions, id)
} }
sds.Unlock() sds.Unlock()
} }

View File

@ -33,7 +33,7 @@ import (
func TestServiceLoop(t *testing.T) { func TestServiceLoop(t *testing.T) {
testErrorInChainEventLoop(t) testErrorInChainEventLoop(t)
testErrorInBlockLoop(t) //testErrorInBlockLoop(t)
} }
var ( var (
@ -76,10 +76,21 @@ func testErrorInChainEventLoop(t *testing.T) {
QuitChan: make(chan bool), QuitChan: make(chan bool),
Subscriptions: make(map[rpc.ID]statediff.Subscription), Subscriptions: make(map[rpc.ID]statediff.Subscription),
} }
payloadChan := make(chan statediff.Payload)
quitChan := make(chan bool)
service.Subscribe(rpc.NewID(), payloadChan, quitChan)
testRoot2 = common.HexToHash("0xTestRoot2") testRoot2 = common.HexToHash("0xTestRoot2")
blockChain.SetParentBlocksToReturn([]*types.Block{parentBlock1, parentBlock2}) blockChain.SetParentBlocksToReturn([]*types.Block{parentBlock1, parentBlock2})
blockChain.SetChainEvents([]core.ChainEvent{event1, event2, event3}) blockChain.SetChainEvents([]core.ChainEvent{event1, event2, event3})
// Need to have listeners on the channels or the subscription will be closed and the processing halted
go func() {
select {
case <-payloadChan:
case <-quitChan:
}
}()
service.Loop(eventsChannel) service.Loop(eventsChannel)
if !reflect.DeepEqual(builder.BlockHash, testBlock2.Hash()) { if !reflect.DeepEqual(builder.BlockHash, testBlock2.Hash()) {
t.Error("Test failure:", t.Name()) t.Error("Test failure:", t.Name())
t.Logf("Actual does not equal expected.\nactual:%+v\nexpected: %+v", builder.BlockHash, testBlock2.Hash()) t.Logf("Actual does not equal expected.\nactual:%+v\nexpected: %+v", builder.BlockHash, testBlock2.Hash())

View File

@ -66,7 +66,7 @@ func TestAPI(t *testing.T) {
serviceQuitChan := make(chan bool) serviceQuitChan := make(chan bool)
config := statediff.Config{ config := statediff.Config{
PathsAndProofs: true, PathsAndProofs: true,
AllNodes: false, IntermediateNodes: false,
} }
mockService := MockStateDiffService{ mockService := MockStateDiffService{
Mutex: sync.Mutex{}, Mutex: sync.Mutex{},

View File

@ -1,36 +0,0 @@
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package mocks
import "github.com/ethereum/go-ethereum/statediff"
// Publisher mock
type Publisher struct {
StateDiff *statediff.StateDiff
publisherError error
}
// PublishStateDiff mock method
func (publisher *Publisher) PublishStateDiff(sd *statediff.StateDiff) (string, error) {
publisher.StateDiff = sd
return "", publisher.publisherError
}
// SetPublisherError mock method
func (publisher *Publisher) SetPublisherError(err error) {
publisher.publisherError = err
}