From e1ed4951aa8828dd070d225b9e864392b71e1720 Mon Sep 17 00:00:00 2001 From: Ian Norden Date: Fri, 26 Apr 2019 11:18:23 -0500 Subject: [PATCH] refactoring state diff service and adding api which allows for streaming state diff payloads over an rpc websocket subscription --- statediff/api.go | 91 ++++++++ statediff/builder.go | 2 +- statediff/builder_test.go | 2 +- statediff/helpers.go | 2 +- statediff/service.go | 244 ++++++++++++++++++++ statediff/service/service.go | 120 ---------- statediff/service/service_test.go | 107 --------- statediff/service_test.go | 130 +++++++++++ statediff/testhelpers/helpers.go | 2 +- statediff/testhelpers/mocks/blockchain.go | 2 +- statediff/testhelpers/mocks/builder.go | 2 +- statediff/testhelpers/mocks/publisher.go | 2 +- statediff/testhelpers/mocks/service.go | 171 ++++++++++++++ statediff/testhelpers/mocks/service_test.go | 127 ++++++++++ statediff/testhelpers/test_data.go | 2 +- statediff/types.go | 2 +- 16 files changed, 772 insertions(+), 236 deletions(-) create mode 100644 statediff/api.go create mode 100644 statediff/service.go delete mode 100644 statediff/service/service.go delete mode 100644 statediff/service/service_test.go create mode 100644 statediff/service_test.go create mode 100644 statediff/testhelpers/mocks/service.go create mode 100644 statediff/testhelpers/mocks/service_test.go diff --git a/statediff/api.go b/statediff/api.go new file mode 100644 index 000000000..26f03c9c3 --- /dev/null +++ b/statediff/api.go @@ -0,0 +1,91 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package statediff + +import ( + "context" + "sync" + "time" + + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rpc" +) + +// APIName is the namespace used for the state diffing service API +const APIName = "statediff" + +// APIVersion is the version of the state diffing service API +const APIVersion = "0.0.1" + +// PublicStateDiffAPI provides the a websocket service +// that can be used to stream out state diffs as they +// are produced by a full node +type PublicStateDiffAPI struct { + sds IService + + mu sync.Mutex + lastUsed map[string]time.Time // keeps track when a filter was polled for the last time. +} + +// NewPublicStateDiffAPI create a new state diff websocket streaming service. +func NewPublicStateDiffAPI(sds IService) *PublicStateDiffAPI { + return &PublicStateDiffAPI{ + sds: sds, + lastUsed: make(map[string]time.Time), + mu: sync.Mutex{}, + } +} + +// Subscribe is the public method to setup a subscription that fires off state-diff payloads as they are created +func (api *PublicStateDiffAPI) Subscribe(ctx context.Context) (*rpc.Subscription, error) { + // ensure that the RPC connection supports subscriptions + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return nil, rpc.ErrNotificationsUnsupported + } + + // create subscription and start waiting for statediff events + rpcSub := notifier.CreateSubscription() + + go func() { + // subscribe to events from the state diff service + payloadChannel := make(chan Payload) + quitChan := make(chan bool) + api.sds.Subscribe(rpcSub.ID, payloadChannel, quitChan) + + // loop and await state diff payloads and relay them to the subscriber with then notifier + for { + select { + case packet := <-payloadChannel: + if err := notifier.Notify(rpcSub.ID, packet); err != nil { + log.Error("Failed to send state diff packet", "err", err) + } + case <-rpcSub.Err(): + err := api.sds.Unsubscribe(rpcSub.ID) + if err != nil { + log.Error("Failed to unsubscribe from the state diff service", err) + } + return + case <-quitChan: + // don't need to unsubscribe, statediff service does so before sending the quit signal + return + } + } + }() + + return rpcSub, nil +} diff --git a/statediff/builder.go b/statediff/builder.go index 4e8ab0e21..05e7d8572 100644 --- a/statediff/builder.go +++ b/statediff/builder.go @@ -1,4 +1,4 @@ -// Copyright 2015 The go-ethereum Authors +// Copyright 2019 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify diff --git a/statediff/builder_test.go b/statediff/builder_test.go index b4df80f81..5ecf647e3 100644 --- a/statediff/builder_test.go +++ b/statediff/builder_test.go @@ -1,4 +1,4 @@ -// Copyright 2015 The go-ethereum Authors +// Copyright 2019 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify diff --git a/statediff/helpers.go b/statediff/helpers.go index 96f9ddf30..89852b55c 100644 --- a/statediff/helpers.go +++ b/statediff/helpers.go @@ -1,4 +1,4 @@ -// Copyright 2015 The go-ethereum Authors +// Copyright 2019 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify diff --git a/statediff/service.go b/statediff/service.go new file mode 100644 index 000000000..d03a7b513 --- /dev/null +++ b/statediff/service.go @@ -0,0 +1,244 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package statediff + +import ( + "bytes" + "fmt" + "sync" + + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/rlp" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rpc" +) + +type blockChain interface { + SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription + GetBlockByHash(hash common.Hash) *types.Block + AddToStateDiffProcessedCollection(hash common.Hash) +} + +// IService is the state-diffing service interface +type IService interface { + // APIs(), Protocols(), Start() and Stop() + node.Service + // Main event loop for processing state diffs + Loop(chainEventCh chan core.ChainEvent) + // Method to subscribe to receive state diff processing output + Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool) + // Method to unsubscribe from state diff processing + Unsubscribe(id rpc.ID) error +} + +// Service is the underlying struct for the state diffing service +type Service struct { + // Used to sync access to the Subscriptions + sync.Mutex + // Used to build the state diff objects + Builder Builder + // Used to subscribe to chain events (blocks) + BlockChain blockChain + // Used to signal shutdown of the service + QuitChan chan bool + // A mapping of rpc.IDs to their subscription channels + Subscriptions map[rpc.ID]Subscription +} + +// Subscription struct holds our subscription channels +type Subscription struct { + PayloadChan chan<- Payload + QuitChan chan<- bool +} + +// Payload packages the data to send to StateDiffingService subscriptions +type Payload struct { + BlockRlp []byte `json:"blockRlp" gencodec:"required"` + StateDiffRlp []byte `json:"stateDiffRlp" gencodec:"required"` + Err error `json:"error"` +} + +// NewStateDiffService creates a new StateDiffingService +func NewStateDiffService(db ethdb.Database, blockChain *core.BlockChain) (*Service, error) { + return &Service{ + Mutex: sync.Mutex{}, + BlockChain: blockChain, + Builder: NewBuilder(db, blockChain), + QuitChan: make(chan bool), + Subscriptions: make(map[rpc.ID]Subscription), + }, nil +} + +// Protocols exports the services p2p protocols, this service has none +func (sds *Service) Protocols() []p2p.Protocol { + return []p2p.Protocol{} +} + +// APIs returns the RPC descriptors the StateDiffingService offers +func (sds *Service) APIs() []rpc.API { + return []rpc.API{ + { + Namespace: APIName, + Version: APIVersion, + Service: NewPublicStateDiffAPI(sds), + Public: true, + }, + } +} + +// Loop is the main processing method +func (sds *Service) Loop(chainEventCh chan core.ChainEvent) { + + chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh) + defer chainEventSub.Unsubscribe() + + blocksCh := make(chan *types.Block, 10) + errCh := chainEventSub.Err() + + go func() { + HandleChainEventChLoop: + for { + select { + //Notify chain event channel of events + case chainEvent := <-chainEventCh: + log.Debug("Event received from chainEventCh", "event", chainEvent) + blocksCh <- chainEvent.Block + //if node stopped + case err := <-errCh: + log.Warn("Error from chain event subscription, breaking loop.", "error", err) + close(sds.QuitChan) + break HandleChainEventChLoop + case <-sds.QuitChan: + break HandleChainEventChLoop + } + } + }() + + //loop through chain events until no more +HandleBlockChLoop: + for { + select { + case block := <-blocksCh: + currentBlock := block + parentHash := currentBlock.ParentHash() + parentBlock := sds.BlockChain.GetBlockByHash(parentHash) + if parentBlock == nil { + log.Error("Parent block is nil, skipping this block", + "parent block hash", parentHash.String(), + "current block number", currentBlock.Number()) + break HandleBlockChLoop + } + + stateDiff, err := sds.Builder.BuildStateDiff(parentBlock.Root(), currentBlock.Root(), currentBlock.Number().Int64(), currentBlock.Hash()) + if err != nil { + log.Error("Error building statediff", "block number", currentBlock.Number(), "error", err) + } + rlpBuff := new(bytes.Buffer) + currentBlock.EncodeRLP(rlpBuff) + blockRlp := rlpBuff.Bytes() + stateDiffRlp, _ := rlp.EncodeToBytes(stateDiff) + payload := Payload{ + BlockRlp: blockRlp, + StateDiffRlp: stateDiffRlp, + Err: err, + } + // If we have any websocket subscription listening in, send the data to them + sds.send(payload) + case <-sds.QuitChan: + log.Debug("Quitting the statediff block channel") + sds.close() + return + } + } +} + +// Subscribe is used by the API to subscribe to the StateDiffingService loop +func (sds *Service) Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool) { + log.Info("Subscribing to the statediff service") + sds.Lock() + sds.Subscriptions[id] = Subscription{ + PayloadChan: sub, + QuitChan: quitChan, + } + sds.Unlock() +} + +// Unsubscribe is used to unsubscribe to the StateDiffingService loop +func (sds *Service) Unsubscribe(id rpc.ID) error { + log.Info("Unsubscribing from the statediff service") + sds.Lock() + _, ok := sds.Subscriptions[id] + if !ok { + return fmt.Errorf("cannot unsubscribe; subscription for id %s does not exist", id) + } + delete(sds.Subscriptions, id) + sds.Unlock() + return nil +} + +// Start is used to begin the StateDiffingService +func (sds *Service) Start(*p2p.Server) error { + log.Info("Starting statediff service") + + chainEventCh := make(chan core.ChainEvent, 10) + go sds.Loop(chainEventCh) + + return nil +} + +// Stop is used to close down the StateDiffingService +func (sds *Service) Stop() error { + log.Info("Stopping statediff service") + close(sds.QuitChan) + return nil +} + +// send is used to fan out and serve a payload to any subscriptions +func (sds *Service) send(payload Payload) { + sds.Lock() + for id, sub := range sds.Subscriptions { + select { + case sub.PayloadChan <- payload: + log.Info("sending state diff payload to subscription %s", id) + default: + log.Info("unable to send payload to subscription %s; channel has no receiver", id) + } + } + sds.Unlock() +} + +// close is used to close all listening subscriptions +func (sds *Service) close() { + sds.Lock() + for id, sub := range sds.Subscriptions { + select { + case sub.QuitChan <- true: + delete(sds.Subscriptions, id) + log.Info("closing subscription %s", id) + default: + log.Info("unable to close subscription %s; channel has no receiver", id) + } + } + sds.Unlock() +} diff --git a/statediff/service/service.go b/statediff/service/service.go deleted file mode 100644 index 19ea0c644..000000000 --- a/statediff/service/service.go +++ /dev/null @@ -1,120 +0,0 @@ -package service - -import ( - "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/p2p" - "github.com/ethereum/go-ethereum/rpc" - "github.com/ethereum/go-ethereum/statediff" - b "github.com/ethereum/go-ethereum/statediff/builder" - e "github.com/ethereum/go-ethereum/statediff/extractor" - p "github.com/ethereum/go-ethereum/statediff/publisher" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/event" - "github.com/ethereum/go-ethereum/log" -) - -type BlockChain interface { - SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription - GetBlockByHash(hash common.Hash) *types.Block - AddToStateDiffProcessedCollection(hash common.Hash) -} - -type StateDiffService struct { - Builder *b.Builder - Extractor e.Extractor - BlockChain BlockChain -} - -func NewStateDiffService(db ethdb.Database, blockChain *core.BlockChain, config statediff.Config) (*StateDiffService, error) { - builder := b.NewBuilder(db, blockChain) - publisher, err := p.NewPublisher(config) - if err != nil { - return nil, err - } - - extractor := e.NewExtractor(builder, publisher) - return &StateDiffService{ - BlockChain: blockChain, - Extractor: extractor, - }, nil -} - -func (StateDiffService) Protocols() []p2p.Protocol { - return []p2p.Protocol{} -} - -func (StateDiffService) APIs() []rpc.API { - return []rpc.API{} -} - -func (sds *StateDiffService) Loop(chainEventCh chan core.ChainEvent) { - chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh) - defer chainEventSub.Unsubscribe() - - blocksCh := make(chan *types.Block, 10) - errCh := chainEventSub.Err() - quitCh := make(chan struct{}) - - go func() { - HandleChainEventChLoop: - for { - select { - //Notify chain event channel of events - case chainEvent := <-chainEventCh: - log.Debug("Event received from chainEventCh", "event", chainEvent) - blocksCh <- chainEvent.Block - //if node stopped - case err := <-errCh: - log.Warn("Error from chain event subscription, breaking loop.", "error", err) - break HandleChainEventChLoop - } - } - close(quitCh) - }() - - //loop through chain events until no more -HandleBlockChLoop: - for { - select { - case block := <-blocksCh: - currentBlock := block - parentHash := currentBlock.ParentHash() - parentBlock := sds.BlockChain.GetBlockByHash(parentHash) - if parentBlock == nil { - log.Error("Parent block is nil, skipping this block", - "parent block hash", parentHash.String(), - "current block number", currentBlock.Number()) - break HandleBlockChLoop - } - - stateDiffLocation, err := sds.Extractor.ExtractStateDiff(*parentBlock, *currentBlock) - if err != nil { - log.Error("Error extracting statediff", "block number", currentBlock.Number(), "error", err) - } else { - log.Info("Statediff extracted", "block number", currentBlock.Number(), "location", stateDiffLocation) - sds.BlockChain.AddToStateDiffProcessedCollection(parentBlock.Root()) - sds.BlockChain.AddToStateDiffProcessedCollection(currentBlock.Root()) - } - case <-quitCh: - log.Debug("Quitting the statediff block channel") - return - } - } -} - -func (sds *StateDiffService) Start(server *p2p.Server) error { - log.Info("Starting statediff service") - - chainEventCh := make(chan core.ChainEvent, 10) - go sds.Loop(chainEventCh) - - return nil -} - -func (StateDiffService) Stop() error { - log.Info("Stopping statediff service") - return nil -} diff --git a/statediff/service/service_test.go b/statediff/service/service_test.go deleted file mode 100644 index daf3445c3..000000000 --- a/statediff/service/service_test.go +++ /dev/null @@ -1,107 +0,0 @@ -package service_test - -import ( - "math/big" - "math/rand" - "reflect" - "testing" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/types" - s "github.com/ethereum/go-ethereum/statediff/service" - "github.com/ethereum/go-ethereum/statediff/testhelpers/mocks" -) - -func TestServiceLoop(t *testing.T) { - testErrorInChainEventLoop(t) - testErrorInBlockLoop(t) -} - -var ( - eventsChannel = make(chan core.ChainEvent, 1) - - parentHeader1 = types.Header{Number: big.NewInt(rand.Int63())} - parentHeader2 = types.Header{Number: big.NewInt(rand.Int63())} - - parentBlock1 = types.NewBlock(&parentHeader1, nil, nil, nil) - parentBlock2 = types.NewBlock(&parentHeader2, nil, nil, nil) - - parentHash1 = parentBlock1.Hash() - parentHash2 = parentBlock2.Hash() - - header1 = types.Header{ParentHash: parentHash1} - header2 = types.Header{ParentHash: parentHash2} - header3 = types.Header{ParentHash: common.HexToHash("parent hash")} - - block1 = types.NewBlock(&header1, nil, nil, nil) - block2 = types.NewBlock(&header2, nil, nil, nil) - block3 = types.NewBlock(&header3, nil, nil, nil) - - event1 = core.ChainEvent{Block: block1} - event2 = core.ChainEvent{Block: block2} - event3 = core.ChainEvent{Block: block3} -) - -func testErrorInChainEventLoop(t *testing.T) { - //the first chain event causes and error (in blockchain mock) - extractor := mocks.Extractor{} - - blockChain := mocks.BlockChain{} - service := s.StateDiffService{ - Builder: nil, - Extractor: &extractor, - BlockChain: &blockChain, - } - - blockChain.SetParentBlocksToReturn([]*types.Block{parentBlock1, parentBlock2}) - blockChain.SetChainEvents([]core.ChainEvent{event1, event2, event3}) - service.Loop(eventsChannel) - - //parent and current blocks are passed to the extractor - expectedCurrentBlocks := []types.Block{*block1, *block2} - if !reflect.DeepEqual(extractor.CurrentBlocks, expectedCurrentBlocks) { - t.Error("Test failure:", t.Name()) - t.Logf("Actual does not equal expected.\nactual:%+v\nexpected: %+v", extractor.CurrentBlocks, expectedCurrentBlocks) - } - expectedParentBlocks := []types.Block{*parentBlock1, *parentBlock2} - if !reflect.DeepEqual(extractor.ParentBlocks, expectedParentBlocks) { - t.Error("Test failure:", t.Name()) - t.Logf("Actual does not equal expected.\nactual:%+v\nexpected: %+v", extractor.CurrentBlocks, expectedParentBlocks) - } - - //look up the parent block from its hash - expectedHashes := []common.Hash{block1.ParentHash(), block2.ParentHash()} - if !reflect.DeepEqual(blockChain.ParentHashesLookedUp, expectedHashes) { - t.Error("Test failure:", t.Name()) - t.Logf("Actual does not equal expected.\nactual:%+v\nexpected: %+v", blockChain.ParentHashesLookedUp, expectedHashes) - } -} - -func testErrorInBlockLoop(t *testing.T) { - //second block's parent block can't be found - extractor := mocks.Extractor{} - - blockChain := mocks.BlockChain{} - service := s.StateDiffService{ - Builder: nil, - Extractor: &extractor, - BlockChain: &blockChain, - } - - blockChain.SetParentBlocksToReturn([]*types.Block{parentBlock1, nil}) - blockChain.SetChainEvents([]core.ChainEvent{event1, event2}) - service.Loop(eventsChannel) - - //only the first current block (and it's parent) are passed to the extractor - expectedCurrentBlocks := []types.Block{*block1} - if !reflect.DeepEqual(extractor.CurrentBlocks, expectedCurrentBlocks) { - t.Error("Test failure:", t.Name()) - t.Logf("Actual does not equal expected.\nactual:%+v\nexpected: %+v", extractor.CurrentBlocks, expectedCurrentBlocks) - } - expectedParentBlocks := []types.Block{*parentBlock1} - if !reflect.DeepEqual(extractor.ParentBlocks, expectedParentBlocks) { - t.Error("Test failure:", t.Name()) - t.Logf("Actual does not equal expected.\nactual:%+v\nexpected: %+v", extractor.CurrentBlocks, expectedParentBlocks) - } -} diff --git a/statediff/service_test.go b/statediff/service_test.go new file mode 100644 index 000000000..d5edee04e --- /dev/null +++ b/statediff/service_test.go @@ -0,0 +1,130 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package statediff_test + +import ( + "bytes" + "math/big" + "math/rand" + "reflect" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/statediff" + "github.com/ethereum/go-ethereum/statediff/testhelpers/mocks" +) + +func TestServiceLoop(t *testing.T) { + testErrorInChainEventLoop(t) + testErrorInBlockLoop(t) +} + +var ( + eventsChannel = make(chan core.ChainEvent, 1) + + parentRoot1 = common.HexToHash("0x01") + parentRoot2 = common.HexToHash("0x02") + parentHeader1 = types.Header{Number: big.NewInt(rand.Int63()), Root: parentRoot1} + parentHeader2 = types.Header{Number: big.NewInt(rand.Int63()), Root: parentRoot2} + + parentBlock1 = types.NewBlock(&parentHeader1, nil, nil, nil) + parentBlock2 = types.NewBlock(&parentHeader2, nil, nil, nil) + + parentHash1 = parentBlock1.Hash() + parentHash2 = parentBlock2.Hash() + + testRoot1 = common.HexToHash("0x03") + testRoot2 = common.HexToHash("0x04") + testRoot3 = common.HexToHash("0x04") + header1 = types.Header{ParentHash: parentHash1, Root: testRoot1} + header2 = types.Header{ParentHash: parentHash2, Root: testRoot2} + header3 = types.Header{ParentHash: common.HexToHash("parent hash"), Root: testRoot3} + + testBlock1 = types.NewBlock(&header1, nil, nil, nil) + testBlock2 = types.NewBlock(&header2, nil, nil, nil) + testBlock3 = types.NewBlock(&header3, nil, nil, nil) + + event1 = core.ChainEvent{Block: testBlock1} + event2 = core.ChainEvent{Block: testBlock2} + event3 = core.ChainEvent{Block: testBlock3} +) + +func testErrorInChainEventLoop(t *testing.T) { + //the first chain event causes and error (in blockchain mock) + builder := mocks.Builder{} + blockChain := mocks.BlockChain{} + service := statediff.Service{ + Builder: &builder, + BlockChain: &blockChain, + QuitChan: make(chan bool), + Subscriptions: make(map[rpc.ID]statediff.Subscription), + } + testRoot2 = common.HexToHash("0xTestRoot2") + blockChain.SetParentBlocksToReturn([]*types.Block{parentBlock1, parentBlock2}) + blockChain.SetChainEvents([]core.ChainEvent{event1, event2, event3}) + service.Loop(eventsChannel) + if !reflect.DeepEqual(builder.BlockHash, testBlock2.Hash()) { + t.Error("Test failure:", t.Name()) + t.Logf("Actual does not equal expected.\nactual:%+v\nexpected: %+v", builder.BlockHash, testBlock2.Hash()) + } + if !bytes.Equal(builder.OldStateRoot.Bytes(), parentBlock2.Root().Bytes()) { + t.Error("Test failure:", t.Name()) + t.Logf("Actual does not equal expected.\nactual:%+v\nexpected: %+v", builder.OldStateRoot, parentBlock2.Root()) + } + if !bytes.Equal(builder.NewStateRoot.Bytes(), testBlock2.Root().Bytes()) { + t.Error("Test failure:", t.Name()) + t.Logf("Actual does not equal expected.\nactual:%+v\nexpected: %+v", builder.NewStateRoot, testBlock2.Root()) + } + //look up the parent block from its hash + expectedHashes := []common.Hash{testBlock1.ParentHash(), testBlock2.ParentHash()} + if !reflect.DeepEqual(blockChain.ParentHashesLookedUp, expectedHashes) { + t.Error("Test failure:", t.Name()) + t.Logf("Actual does not equal expected.\nactual:%+v\nexpected: %+v", blockChain.ParentHashesLookedUp, expectedHashes) + } +} + +func testErrorInBlockLoop(t *testing.T) { + //second block's parent block can't be found + builder := mocks.Builder{} + blockChain := mocks.BlockChain{} + service := statediff.Service{ + Builder: &builder, + BlockChain: &blockChain, + QuitChan: make(chan bool), + Subscriptions: make(map[rpc.ID]statediff.Subscription), + } + + blockChain.SetParentBlocksToReturn([]*types.Block{parentBlock1, nil}) + blockChain.SetChainEvents([]core.ChainEvent{event1, event2}) + service.Loop(eventsChannel) + + if !bytes.Equal(builder.BlockHash.Bytes(), testBlock1.Hash().Bytes()) { + t.Error("Test failure:", t.Name()) + t.Logf("Actual does not equal expected.\nactual:%+v\nexpected: %+v", builder.BlockHash, testBlock1.Hash()) + } + if !bytes.Equal(builder.OldStateRoot.Bytes(), parentBlock1.Root().Bytes()) { + t.Error("Test failure:", t.Name()) + t.Logf("Actual does not equal expected.\nactual:%+v\nexpected: %+v", builder.OldStateRoot, parentBlock1.Root()) + } + if !bytes.Equal(builder.NewStateRoot.Bytes(), testBlock1.Root().Bytes()) { + t.Error("Test failure:", t.Name()) + t.Logf("Actual does not equal expected.\nactual:%+v\nexpected: %+v", builder.NewStateRoot, testBlock1.Root()) + } +} diff --git a/statediff/testhelpers/helpers.go b/statediff/testhelpers/helpers.go index 5126c6556..8f52bc8cc 100644 --- a/statediff/testhelpers/helpers.go +++ b/statediff/testhelpers/helpers.go @@ -1,4 +1,4 @@ -// Copyright 2015 The go-ethereum Authors +// Copyright 2019 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify diff --git a/statediff/testhelpers/mocks/blockchain.go b/statediff/testhelpers/mocks/blockchain.go index cececde6f..f2c097d38 100644 --- a/statediff/testhelpers/mocks/blockchain.go +++ b/statediff/testhelpers/mocks/blockchain.go @@ -1,4 +1,4 @@ -// Copyright 2015 The go-ethereum Authors +// Copyright 2019 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify diff --git a/statediff/testhelpers/mocks/builder.go b/statediff/testhelpers/mocks/builder.go index e9668629e..bb2e38f76 100644 --- a/statediff/testhelpers/mocks/builder.go +++ b/statediff/testhelpers/mocks/builder.go @@ -1,4 +1,4 @@ -// Copyright 2015 The go-ethereum Authors +// Copyright 2019 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify diff --git a/statediff/testhelpers/mocks/publisher.go b/statediff/testhelpers/mocks/publisher.go index 3ea18abf0..6a6018746 100644 --- a/statediff/testhelpers/mocks/publisher.go +++ b/statediff/testhelpers/mocks/publisher.go @@ -1,4 +1,4 @@ -// Copyright 2015 The go-ethereum Authors +// Copyright 2019 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify diff --git a/statediff/testhelpers/mocks/service.go b/statediff/testhelpers/mocks/service.go new file mode 100644 index 000000000..edb2d453d --- /dev/null +++ b/statediff/testhelpers/mocks/service.go @@ -0,0 +1,171 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package mocks + +import ( + "bytes" + "errors" + "fmt" + "sync" + + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/statediff" +) + +// MockStateDiffService is a mock state diff service +type MockStateDiffService struct { + sync.Mutex + Builder statediff.Builder + ReturnProtocol []p2p.Protocol + ReturnAPIs []rpc.API + BlockChan chan *types.Block + ParentBlockChan chan *types.Block + QuitChan chan bool + Subscriptions map[rpc.ID]statediff.Subscription +} + +// Protocols mock method +func (sds *MockStateDiffService) Protocols() []p2p.Protocol { + return []p2p.Protocol{} +} + +// APIs mock method +func (sds *MockStateDiffService) APIs() []rpc.API { + return []rpc.API{ + { + Namespace: statediff.APIName, + Version: statediff.APIVersion, + Service: statediff.NewPublicStateDiffAPI(sds), + Public: true, + }, + } +} + +// Loop mock method +func (sds *MockStateDiffService) Loop(chan core.ChainEvent) { + //loop through chain events until no more +HandleBlockChLoop: + for { + select { + case block := <-sds.BlockChan: + currentBlock := block + parentBlock := <-sds.ParentBlockChan + parentHash := parentBlock.Hash() + if parentBlock == nil { + log.Error("Parent block is nil, skipping this block", + "parent block hash", parentHash.String(), + "current block number", currentBlock.Number()) + break HandleBlockChLoop + } + + stateDiff, err := sds.Builder.BuildStateDiff(parentBlock.Root(), currentBlock.Root(), currentBlock.Number().Int64(), currentBlock.Hash()) + if err != nil { + log.Error("Error building statediff", "block number", currentBlock.Number(), "error", err) + } + rlpBuff := new(bytes.Buffer) + currentBlock.EncodeRLP(rlpBuff) + blockRlp := rlpBuff.Bytes() + stateDiffRlp, _ := rlp.EncodeToBytes(stateDiff) + payload := statediff.Payload{ + BlockRlp: blockRlp, + StateDiffRlp: stateDiffRlp, + Err: err, + } + // If we have any websocket subscription listening in, send the data to them + sds.send(payload) + case <-sds.QuitChan: + log.Debug("Quitting the statediff block channel") + sds.close() + return + } + } +} + +// Subscribe mock method +func (sds *MockStateDiffService) Subscribe(id rpc.ID, sub chan<- statediff.Payload, quitChan chan<- bool) { + log.Info("Subscribing to the statediff service") + sds.Lock() + sds.Subscriptions[id] = statediff.Subscription{ + PayloadChan: sub, + QuitChan: quitChan, + } + sds.Unlock() +} + +// Unsubscribe mock method +func (sds *MockStateDiffService) Unsubscribe(id rpc.ID) error { + log.Info("Unsubscribing from the statediff service") + sds.Lock() + _, ok := sds.Subscriptions[id] + if !ok { + return fmt.Errorf("cannot unsubscribe; subscription for id %s does not exist", id) + } + delete(sds.Subscriptions, id) + sds.Unlock() + return nil +} + +func (sds *MockStateDiffService) send(payload statediff.Payload) { + sds.Lock() + for id, sub := range sds.Subscriptions { + select { + case sub.PayloadChan <- payload: + log.Info("sending state diff payload to subscription %s", id) + default: + log.Info("unable to send payload to subscription %s; channel has no receiver", id) + } + } + sds.Unlock() +} + +func (sds *MockStateDiffService) close() { + sds.Lock() + for id, sub := range sds.Subscriptions { + select { + case sub.QuitChan <- true: + delete(sds.Subscriptions, id) + log.Info("closing subscription %s", id) + default: + log.Info("unable to close subscription %s; channel has no receiver", id) + } + } + sds.Unlock() +} + +// Start mock method +func (sds *MockStateDiffService) Start(server *p2p.Server) error { + log.Info("Starting statediff service") + if sds.ParentBlockChan == nil || sds.BlockChan == nil { + return errors.New("mock StateDiffingService requires preconfiguration with a MockParentBlockChan and MockBlockChan") + } + chainEventCh := make(chan core.ChainEvent, 10) + go sds.Loop(chainEventCh) + + return nil +} + +// Stop mock method +func (sds *MockStateDiffService) Stop() error { + log.Info("Stopping statediff service") + close(sds.QuitChan) + return nil +} diff --git a/statediff/testhelpers/mocks/service_test.go b/statediff/testhelpers/mocks/service_test.go new file mode 100644 index 000000000..1a4f83cf5 --- /dev/null +++ b/statediff/testhelpers/mocks/service_test.go @@ -0,0 +1,127 @@ +// Copyright 2019 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package mocks + +import ( + "bytes" + "math/big" + "sync" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/rpc" + "github.com/ethereum/go-ethereum/statediff" + "github.com/ethereum/go-ethereum/statediff/testhelpers" +) + +var block0, block1 *types.Block +var burnLeafKey = testhelpers.AddressToLeafKey(common.HexToAddress("0x0")) +var emptyAccountDiffEventualMap = make(statediff.AccountDiffsMap) +var account1, _ = rlp.EncodeToBytes(state.Account{ + Nonce: uint64(0), + Balance: big.NewInt(10000), + CodeHash: common.HexToHash("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470").Bytes(), + Root: common.HexToHash("0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"), +}) +var burnAccount1, _ = rlp.EncodeToBytes(state.Account{ + Nonce: uint64(0), + Balance: big.NewInt(2000000000000000000), + CodeHash: common.HexToHash("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470").Bytes(), + Root: common.HexToHash("0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"), +}) +var bankAccount1, _ = rlp.EncodeToBytes(state.Account{ + Nonce: uint64(1), + Balance: big.NewInt(testhelpers.TestBankFunds.Int64() - 10000), + CodeHash: common.HexToHash("0xc5d2460186f7233c927e7db2dcc703c0e500b653ca82273b7bfad8045d85a470").Bytes(), + Root: common.HexToHash("0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421"), +}) + +func TestAPI(t *testing.T) { + _, blockMap, chain := testhelpers.MakeChain(3, testhelpers.Genesis) + defer chain.Stop() + block0Hash := common.HexToHash("0xd1721cfd0b29c36fd7a68f25c128e86413fb666a6e1d68e89b875bd299262661") + block1Hash := common.HexToHash("0xbbe88de60ba33a3f18c0caa37d827bfb70252e19e40a07cd34041696c35ecb1a") + block0 = blockMap[block0Hash] + block1 = blockMap[block1Hash] + blockChan := make(chan *types.Block) + parentBlockChain := make(chan *types.Block) + serviceQuitChan := make(chan bool) + mockService := MockStateDiffService{ + Mutex: sync.Mutex{}, + Builder: statediff.NewBuilder(testhelpers.Testdb, chain), + BlockChan: blockChan, + ParentBlockChan: parentBlockChain, + QuitChan: serviceQuitChan, + Subscriptions: make(map[rpc.ID]statediff.Subscription), + } + mockService.Start(nil) + id := rpc.NewID() + payloadChan := make(chan statediff.Payload) + quitChan := make(chan bool) + mockService.Subscribe(id, payloadChan, quitChan) + blockChan <- block1 + parentBlockChain <- block0 + expectedBlockRlp, _ := rlp.EncodeToBytes(block1) + expectedStateDiff := &statediff.StateDiff{ + BlockNumber: block1.Number().Int64(), + BlockHash: block1.Hash(), + CreatedAccounts: statediff.AccountDiffsMap{ + testhelpers.Account1LeafKey: { + Key: testhelpers.Account1LeafKey.Bytes(), + Value: account1, + Proof: [][]byte{{248, 113, 160, 87, 118, 82, 182, 37, 183, 123, 219, 91, 247, 123, 196, 63, 49, 37, 202, 215, 70, 77, 103, 157, 21, 117, 86, 82, 119, 211, 97, 27, 128, 83, 231, 128, 128, 128, 128, 160, 254, 136, 159, 16, 229, 219, 143, 44, 43, 243, 85, 146, 129, 82, 161, 127, 110, 59, 185, 154, 146, 65, 172, 109, 132, 199, 126, 98, 100, 80, 156, 121, 128, 128, 128, 128, 128, 128, 128, 128, 160, 17, 219, 12, 218, 52, 168, 150, 218, 190, 182, 131, 155, 176, 106, 56, 244, 149, 20, 207, 164, 134, 67, 89, 132, 235, 1, 59, 125, 249, 238, 133, 197, 128, 128}, + {248, 107, 160, 57, 38, 219, 105, 170, 206, 213, 24, 233, 185, 240, 244, 52, 164, 115, 231, 23, 65, 9, 201, 67, 84, 139, 184, 242, 59, 228, 28, 167, 109, 154, 210, 184, 72, 248, 70, 128, 130, 39, 16, 160, 86, 232, 31, 23, 27, 204, 85, 166, 255, 131, 69, 230, 146, 192, 248, 110, 91, 72, 224, 27, 153, 108, 173, 192, 1, 98, 47, 181, 227, 99, 180, 33, 160, 197, 210, 70, 1, 134, 247, 35, 60, 146, 126, 125, 178, 220, 199, 3, 192, 229, 0, 182, 83, 202, 130, 39, 59, 123, 250, 216, 4, 93, 133, 164, 112}}, + Path: []byte{14, 9, 2, 6, 13, 11, 6, 9, 10, 10, 12, 14, 13, 5, 1, 8, 14, 9, 11, 9, 15, 0, 15, 4, 3, 4, 10, 4, 7, 3, 14, 7, 1, 7, 4, 1, 0, 9, 12, 9, 4, 3, 5, 4, 8, 11, 11, 8, 15, 2, 3, 11, 14, 4, 1, 12, 10, 7, 6, 13, 9, 10, 13, 2, 16}, + Storage: []statediff.StorageDiff{}, + }, + burnLeafKey: { + Key: burnLeafKey.Bytes(), + Value: burnAccount1, + Proof: [][]byte{{248, 113, 160, 87, 118, 82, 182, 37, 183, 123, 219, 91, 247, 123, 196, 63, 49, 37, 202, 215, 70, 77, 103, 157, 21, 117, 86, 82, 119, 211, 97, 27, 128, 83, 231, 128, 128, 128, 128, 160, 254, 136, 159, 16, 229, 219, 143, 44, 43, 243, 85, 146, 129, 82, 161, 127, 110, 59, 185, 154, 146, 65, 172, 109, 132, 199, 126, 98, 100, 80, 156, 121, 128, 128, 128, 128, 128, 128, 128, 128, 160, 17, 219, 12, 218, 52, 168, 150, 218, 190, 182, 131, 155, 176, 106, 56, 244, 149, 20, 207, 164, 134, 67, 89, 132, 235, 1, 59, 125, 249, 238, 133, 197, 128, 128}, + {248, 113, 160, 51, 128, 199, 183, 174, 129, 165, 142, 185, 141, 156, 120, 222, 74, 31, 215, 253, 149, 53, 252, 149, 62, 210, 190, 96, 45, 170, 164, 23, 103, 49, 42, 184, 78, 248, 76, 128, 136, 27, 193, 109, 103, 78, 200, 0, 0, 160, 86, 232, 31, 23, 27, 204, 85, 166, 255, 131, 69, 230, 146, 192, 248, 110, 91, 72, 224, 27, 153, 108, 173, 192, 1, 98, 47, 181, 227, 99, 180, 33, 160, 197, 210, 70, 1, 134, 247, 35, 60, 146, 126, 125, 178, 220, 199, 3, 192, 229, 0, 182, 83, 202, 130, 39, 59, 123, 250, 216, 4, 93, 133, 164, 112}}, + Path: []byte{5, 3, 8, 0, 12, 7, 11, 7, 10, 14, 8, 1, 10, 5, 8, 14, 11, 9, 8, 13, 9, 12, 7, 8, 13, 14, 4, 10, 1, 15, 13, 7, 15, 13, 9, 5, 3, 5, 15, 12, 9, 5, 3, 14, 13, 2, 11, 14, 6, 0, 2, 13, 10, 10, 10, 4, 1, 7, 6, 7, 3, 1, 2, 10, 16}, + Storage: []statediff.StorageDiff{}, + }, + }, + DeletedAccounts: emptyAccountDiffEventualMap, + UpdatedAccounts: statediff.AccountDiffsMap{ + testhelpers.BankLeafKey: { + Key: testhelpers.BankLeafKey.Bytes(), + Value: bankAccount1, + Proof: [][]byte{{248, 113, 160, 87, 118, 82, 182, 37, 183, 123, 219, 91, 247, 123, 196, 63, 49, 37, 202, 215, 70, 77, 103, 157, 21, 117, 86, 82, 119, 211, 97, 27, 128, 83, 231, 128, 128, 128, 128, 160, 254, 136, 159, 16, 229, 219, 143, 44, 43, 243, 85, 146, 129, 82, 161, 127, 110, 59, 185, 154, 146, 65, 172, 109, 132, 199, 126, 98, 100, 80, 156, 121, 128, 128, 128, 128, 128, 128, 128, 128, 160, 17, 219, 12, 218, 52, 168, 150, 218, 190, 182, 131, 155, 176, 106, 56, 244, 149, 20, 207, 164, 134, 67, 89, 132, 235, 1, 59, 125, 249, 238, 133, 197, 128, 128}, + {248, 109, 160, 48, 191, 73, 244, 64, 161, 205, 5, 39, 228, 208, 110, 39, 101, 101, 76, 15, 86, 69, 34, 87, 81, 109, 121, 58, 155, 141, 96, 77, 207, 223, 42, 184, 74, 248, 72, 1, 132, 5, 245, 185, 240, 160, 86, 232, 31, 23, 27, 204, 85, 166, 255, 131, 69, 230, 146, 192, 248, 110, 91, 72, 224, 27, 153, 108, 173, 192, 1, 98, 47, 181, 227, 99, 180, 33, 160, 197, 210, 70, 1, 134, 247, 35, 60, 146, 126, 125, 178, 220, 199, 3, 192, 229, 0, 182, 83, 202, 130, 39, 59, 123, 250, 216, 4, 93, 133, 164, 112}}, + Path: []byte{0, 0, 11, 15, 4, 9, 15, 4, 4, 0, 10, 1, 12, 13, 0, 5, 2, 7, 14, 4, 13, 0, 6, 14, 2, 7, 6, 5, 6, 5, 4, 12, 0, 15, 5, 6, 4, 5, 2, 2, 5, 7, 5, 1, 6, 13, 7, 9, 3, 10, 9, 11, 8, 13, 6, 0, 4, 13, 12, 15, 13, 15, 2, 10, 16}, + Storage: []statediff.StorageDiff{}, + }, + }, + } + expectedStateDiffRlp, _ := rlp.EncodeToBytes(expectedStateDiff) + select { + case payload := <-payloadChan: + if !bytes.Equal(payload.BlockRlp, expectedBlockRlp) { + t.Errorf("payload does not have expected block\r\actual: %v\r\nexpected: %v", payload.BlockRlp, expectedBlockRlp) + } + if !bytes.Equal(payload.StateDiffRlp, expectedStateDiffRlp) { + t.Errorf("payload does not have expected state diff\r\actual: %v\r\nexpected: %v", payload.StateDiffRlp, expectedStateDiffRlp) + } + case <-quitChan: + t.Errorf("channel quit before delivering payload") + } +} diff --git a/statediff/testhelpers/test_data.go b/statediff/testhelpers/test_data.go index 17dac8473..1203be611 100644 --- a/statediff/testhelpers/test_data.go +++ b/statediff/testhelpers/test_data.go @@ -1,4 +1,4 @@ -// Copyright 2015 The go-ethereum Authors +// Copyright 2019 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify diff --git a/statediff/types.go b/statediff/types.go index 41e83dddb..816aae6d6 100644 --- a/statediff/types.go +++ b/statediff/types.go @@ -1,4 +1,4 @@ -// Copyright 2015 The go-ethereum Authors +// Copyright 2019 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify