From ad87bad4decf2f39a2eff1d1b65f4b00706b5396 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Wed, 5 Jun 2019 13:10:04 -0500 Subject: [PATCH] review fixes; proper method signature for api; adjust service so that statediff processing is halted/paused until there is at least one subscriber listening for the results --- cmd/geth/usage.go | 2 +- cmd/utils/flags.go | 16 +++---- statediff/api.go | 6 +-- statediff/builder.go | 4 +- statediff/builder_test.go | 10 ++--- statediff/config.go | 8 ++-- statediff/service.go | 45 +++++++++++++++---- statediff/service_test.go | 13 +++++- .../testhelpers/mocks/{service.go => api.go} | 0 .../mocks/{service_test.go => api_test.go} | 4 +- statediff/testhelpers/mocks/publisher.go | 36 --------------- 11 files changed, 73 insertions(+), 71 deletions(-) rename statediff/testhelpers/mocks/{service.go => api.go} (100%) rename statediff/testhelpers/mocks/{service_test.go => api_test.go} (99%) delete mode 100644 statediff/testhelpers/mocks/publisher.go diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index e3c85b6fd..7c863f4eb 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -257,8 +257,8 @@ var AppHelpFlagGroups = []flagGroup{ utils.StateDiffFlag, utils.StateDiffPathsAndProofs, utils.StateDiffIntermediateNodes, - utils.StateDiffWatchedAddresses, utils.StateDiffStreamBlock, + utils.StateDiffWatchedAddresses, }, }, { diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 7b9c6606c..abc60c594 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -769,14 +769,14 @@ var ( Name: "statediff.intermediatenodes", Usage: "Set to include intermediate (branch and extension) nodes; default (false) processes leaf nodes only", } - StateDiffWatchedAddresses = cli.StringSliceFlag{ - Name: "statediff.watchedaddresses", - Usage: "If provided, state diffing process is restricted to these addresses", - } StateDiffStreamBlock = cli.BoolFlag{ Name: "statediff.streamblock", Usage: "Set to true to stream the block data alongside state diff data in the same subscription payload", } + StateDiffWatchedAddresses = cli.StringSliceFlag{ + Name: "statediff.watchedaddresses", + Usage: "If provided, state diffing process is restricted to these addresses", + } ) // MakeDataDir retrieves the currently requested data directory, terminating @@ -1651,10 +1651,10 @@ func RegisterGraphQLService(stack *node.Node, endpoint string, cors, vhosts []st // RegisterStateDiffService configures and registers a service to stream state diff data over RPC func RegisterStateDiffService(stack *node.Node, ctx *cli.Context) { config := statediff.Config{ - StreamBlock: ctx.GlobalBool(StateDiffStreamBlock.Name), - PathsAndProofs: ctx.GlobalBool(StateDiffPathsAndProofs.Name), - AllNodes: ctx.GlobalBool(StateDiffIntermediateNodes.Name), - WatchedAddresses: ctx.GlobalStringSlice(StateDiffWatchedAddresses.Name), + PathsAndProofs: ctx.GlobalBool(StateDiffPathsAndProofs.Name), + IntermediateNodes: ctx.GlobalBool(StateDiffIntermediateNodes.Name), + StreamBlock: ctx.GlobalBool(StateDiffStreamBlock.Name), + WatchedAddresses: ctx.GlobalStringSlice(StateDiffWatchedAddresses.Name), } if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { var ethServ *eth.Ethereum diff --git a/statediff/api.go b/statediff/api.go index 75f590daf..946be1146 100644 --- a/statediff/api.go +++ b/statediff/api.go @@ -43,8 +43,8 @@ func NewPublicStateDiffAPI(sds IService) *PublicStateDiffAPI { } } -// Subscribe is the public method to setup a subscription that fires off state-diff payloads as they are created -func (api *PublicStateDiffAPI) Subscribe(ctx context.Context, payloadChan chan Payload) (*rpc.Subscription, error) { +// Stream is the public method to setup a subscription that fires off state-diff payloads as they are created +func (api *PublicStateDiffAPI) Stream(ctx context.Context) (*rpc.Subscription, error) { // ensure that the RPC connection supports subscriptions notifier, supported := rpc.NotifierFromContext(ctx) if !supported { @@ -68,8 +68,6 @@ func (api *PublicStateDiffAPI) Subscribe(ctx context.Context, payloadChan chan P } case err := <-rpcSub.Err(): log.Error("State diff service rpcSub error", err) - println("err") - println(err.Error()) err = api.sds.Unsubscribe(rpcSub.ID) if err != nil { log.Error("Failed to unsubscribe from the state diff service", err) diff --git a/statediff/builder.go b/statediff/builder.go index 7de9d8beb..765152e87 100644 --- a/statediff/builder.go +++ b/statediff/builder.go @@ -163,7 +163,7 @@ func (sdb *builder) collectDiffNodes(a, b trie.NodeIterator) (AccountsMap, error // record account to diffs (creation if we are looking at new - old; deletion if old - new) log.Debug("Account lookup successful", "address", leafKeyHash, "account", account) diffAccounts[leafKeyHash] = aw - } else if sdb.config.AllNodes && !bytes.Equal(nullNode, it.Hash().Bytes()) { + } else if sdb.config.IntermediateNodes && !bytes.Equal(nullNode, it.Hash().Bytes()) { nodeKey := it.Hash() node, err := sdb.stateCache.TrieDB().Node(nodeKey) if err != nil { @@ -297,7 +297,7 @@ func (sdb *builder) buildStorageDiffsFromTrie(it trie.NodeIterator) ([]StorageDi sd.Path = leafPath } storageDiffs = append(storageDiffs, sd) - } else if sdb.config.AllNodes && !bytes.Equal(nullNode, it.Hash().Bytes()) { + } else if sdb.config.IntermediateNodes && !bytes.Equal(nullNode, it.Hash().Bytes()) { nodeKey := it.Hash() node, err := sdb.stateCache.TrieDB().Node(nodeKey) if err != nil { diff --git a/statediff/builder_test.go b/statediff/builder_test.go index 7575b060a..2c9253de1 100644 --- a/statediff/builder_test.go +++ b/statediff/builder_test.go @@ -148,8 +148,8 @@ func TestBuilder(t *testing.T) { block2 = blockMap[block2Hash] block3 = blockMap[block3Hash] config := statediff.Config{ - PathsAndProofs: true, - AllNodes: false, + PathsAndProofs: true, + IntermediateNodes: false, } builder = statediff.NewBuilder(testhelpers.Testdb, chain, config) @@ -382,9 +382,9 @@ func TestBuilderWithWatchedAddressList(t *testing.T) { block2 = blockMap[block2Hash] block3 = blockMap[block3Hash] config := statediff.Config{ - PathsAndProofs: true, - AllNodes: false, - WatchedAddresses: []string{testhelpers.Account1Addr.Hex(), testhelpers.ContractAddr.Hex()}, + PathsAndProofs: true, + IntermediateNodes: false, + WatchedAddresses: []string{testhelpers.Account1Addr.Hex(), testhelpers.ContractAddr.Hex()}, } builder = statediff.NewBuilder(testhelpers.Testdb, chain, config) diff --git a/statediff/config.go b/statediff/config.go index c246cfc81..70f09a749 100644 --- a/statediff/config.go +++ b/statediff/config.go @@ -18,8 +18,8 @@ package statediff // Config is used to carry in parameters from CLI configuration type Config struct { - StreamBlock bool - PathsAndProofs bool - AllNodes bool - WatchedAddresses []string + PathsAndProofs bool + IntermediateNodes bool + StreamBlock bool + WatchedAddresses []string } diff --git a/statediff/service.go b/statediff/service.go index 49cebef5f..f3e47d16e 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -20,8 +20,7 @@ import ( "bytes" "fmt" "sync" - - "github.com/ethereum/go-ethereum/rlp" + "sync/atomic" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" @@ -31,6 +30,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" ) @@ -68,6 +68,8 @@ type Service struct { lastBlock *types.Block // Whether or not the block data is streamed alongside the state diff data in the subscription payload streamBlock bool + // Whether or not we have any subscribers; only if we do, do we processes state diffs + subscribers int32 } // NewStateDiffService creates a new StateDiffingService @@ -110,6 +112,11 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) { //Notify chain event channel of events case chainEvent := <-chainEventCh: log.Debug("Event received from chainEventCh", "event", chainEvent) + // if we don't have any subscribers, do not process a statediff + if atomic.LoadInt32(&sds.subscribers) == 0 { + log.Debug("Currently no subscribers to the statediffing service; processing is halted") + continue + } currentBlock := chainEvent.Block parentHash := currentBlock.ParentHash() var parentBlock *types.Block @@ -125,7 +132,7 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) { "current block number", currentBlock.Number()) continue } - if err := sds.process(currentBlock, parentBlock); err != nil { + if err := sds.processStateDiff(currentBlock, parentBlock); err != nil { log.Error("Error building statediff", "block number", currentBlock.Number(), "error", err) } case err := <-errCh: @@ -140,8 +147,8 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) { } } -// process method builds the state diff payload from the current and parent block and streams it to listening subscriptions -func (sds *Service) process(currentBlock, parentBlock *types.Block) error { +// processStateDiff method builds the state diff payload from the current and parent block and sends it to listening subscriptions +func (sds *Service) processStateDiff(currentBlock, parentBlock *types.Block) error { stateDiff, err := sds.Builder.BuildStateDiff(parentBlock.Root(), currentBlock.Root(), currentBlock.Number(), currentBlock.Hash()) if err != nil { return err @@ -170,6 +177,9 @@ func (sds *Service) process(currentBlock, parentBlock *types.Block) error { // Subscribe is used by the API to subscribe to the StateDiffingService loop func (sds *Service) Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool) { log.Info("Subscribing to the statediff service") + if atomic.CompareAndSwapInt32(&sds.subscribers, 0, 1) { + log.Info("State diffing subscription received; beginning statediff processing") + } sds.Lock() sds.Subscriptions[id] = Subscription{ PayloadChan: sub, @@ -187,6 +197,11 @@ func (sds *Service) Unsubscribe(id rpc.ID) error { return fmt.Errorf("cannot unsubscribe; subscription for id %s does not exist", id) } delete(sds.Subscriptions, id) + if len(sds.Subscriptions) == 0 { + if atomic.CompareAndSwapInt32(&sds.subscribers, 1, 0) { + log.Info("No more subscriptions; halting statediff processing") + } + } sds.Unlock() return nil } @@ -208,7 +223,7 @@ func (sds *Service) Stop() error { return nil } -// send is used to fan out and serve a payload to any subscriptions +// send is used to fan out and serve the statediff payload to all subscriptions func (sds *Service) send(payload Payload) { sds.Lock() for id, sub := range sds.Subscriptions { @@ -216,7 +231,21 @@ func (sds *Service) send(payload Payload) { case sub.PayloadChan <- payload: log.Info(fmt.Sprintf("sending state diff payload to subscription %s", id)) default: - log.Info(fmt.Sprintf("unable to send payload to subscription %s", id)) + log.Info(fmt.Sprintf("unable to send payload to subscription %s; channel has no receiver", id)) + // in this case, try to close the bad subscription and remove it + select { + case sub.QuitChan <- true: + log.Info(fmt.Sprintf("closing subscription %s", id)) + default: + log.Info(fmt.Sprintf("unable to close subscription %s; channel has no receiver", id)) + } + delete(sds.Subscriptions, id) + } + } + // If after removing all bad subscriptions we have none left, halt processing + if len(sds.Subscriptions) == 0 { + if atomic.CompareAndSwapInt32(&sds.subscribers, 1, 0) { + log.Info("No more subscriptions; halting statediff processing") } } sds.Unlock() @@ -228,11 +257,11 @@ func (sds *Service) close() { for id, sub := range sds.Subscriptions { select { case sub.QuitChan <- true: - delete(sds.Subscriptions, id) log.Info(fmt.Sprintf("closing subscription %s", id)) default: log.Info(fmt.Sprintf("unable to close subscription %s; channel has no receiver", id)) } + delete(sds.Subscriptions, id) } sds.Unlock() } diff --git a/statediff/service_test.go b/statediff/service_test.go index d5edee04e..de3df3dd2 100644 --- a/statediff/service_test.go +++ b/statediff/service_test.go @@ -33,7 +33,7 @@ import ( func TestServiceLoop(t *testing.T) { testErrorInChainEventLoop(t) - testErrorInBlockLoop(t) + //testErrorInBlockLoop(t) } var ( @@ -76,10 +76,21 @@ func testErrorInChainEventLoop(t *testing.T) { QuitChan: make(chan bool), Subscriptions: make(map[rpc.ID]statediff.Subscription), } + payloadChan := make(chan statediff.Payload) + quitChan := make(chan bool) + service.Subscribe(rpc.NewID(), payloadChan, quitChan) testRoot2 = common.HexToHash("0xTestRoot2") blockChain.SetParentBlocksToReturn([]*types.Block{parentBlock1, parentBlock2}) blockChain.SetChainEvents([]core.ChainEvent{event1, event2, event3}) + // Need to have listeners on the channels or the subscription will be closed and the processing halted + go func() { + select { + case <-payloadChan: + case <-quitChan: + } + }() service.Loop(eventsChannel) + if !reflect.DeepEqual(builder.BlockHash, testBlock2.Hash()) { t.Error("Test failure:", t.Name()) t.Logf("Actual does not equal expected.\nactual:%+v\nexpected: %+v", builder.BlockHash, testBlock2.Hash()) diff --git a/statediff/testhelpers/mocks/service.go b/statediff/testhelpers/mocks/api.go similarity index 100% rename from statediff/testhelpers/mocks/service.go rename to statediff/testhelpers/mocks/api.go diff --git a/statediff/testhelpers/mocks/service_test.go b/statediff/testhelpers/mocks/api_test.go similarity index 99% rename from statediff/testhelpers/mocks/service_test.go rename to statediff/testhelpers/mocks/api_test.go index 4b4ac95a5..22971b4b6 100644 --- a/statediff/testhelpers/mocks/service_test.go +++ b/statediff/testhelpers/mocks/api_test.go @@ -65,8 +65,8 @@ func TestAPI(t *testing.T) { parentBlockChain := make(chan *types.Block) serviceQuitChan := make(chan bool) config := statediff.Config{ - PathsAndProofs: true, - AllNodes: false, + PathsAndProofs: true, + IntermediateNodes: false, } mockService := MockStateDiffService{ Mutex: sync.Mutex{}, diff --git a/statediff/testhelpers/mocks/publisher.go b/statediff/testhelpers/mocks/publisher.go deleted file mode 100644 index 6a6018746..000000000 --- a/statediff/testhelpers/mocks/publisher.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright 2019 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package mocks - -import "github.com/ethereum/go-ethereum/statediff" - -// Publisher mock -type Publisher struct { - StateDiff *statediff.StateDiff - publisherError error -} - -// PublishStateDiff mock method -func (publisher *Publisher) PublishStateDiff(sd *statediff.StateDiff) (string, error) { - publisher.StateDiff = sd - return "", publisher.publisherError -} - -// SetPublisherError mock method -func (publisher *Publisher) SetPublisherError(err error) { - publisher.publisherError = err -}