relay receipts with the rest of the data + review fixes/changes

This commit is contained in:
Ian Norden 2019-07-18 13:55:37 -05:00 committed by Rob Mulholand
parent 00cc1f89ff
commit ca79f6ef98
11 changed files with 175 additions and 121 deletions

View File

@ -1345,7 +1345,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
break break
} }
if bc.cacheConfig.ProcessingStateDiffs { if bc.cacheConfig.ProcessingStateDiffs {
if !bc.allowedRootToBeDereferenced(root.(common.Hash)) { if !bc.rootAllowedToBeDereferenced(root.(common.Hash)) {
bc.triegc.Push(root, number) bc.triegc.Push(root, number)
break break
} else { } else {
@ -1414,7 +1414,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
// since we need the state tries of the current block and its parent in-memory // since we need the state tries of the current block and its parent in-memory
// in order to process statediffs, we should avoid dereferencing roots until // in order to process statediffs, we should avoid dereferencing roots until
// its statediff and its child have been processed // its statediff and its child have been processed
func (bc *BlockChain) allowedRootToBeDereferenced(root common.Hash) bool { func (bc *BlockChain) rootAllowedToBeDereferenced(root common.Hash) bool {
diffProcessedForSelfAndChildCount := 2 diffProcessedForSelfAndChildCount := 2
count := bc.stateDiffsProcessed[root] count := bc.stateDiffsProcessed[root]
return count >= diffProcessedForSelfAndChildCount return count >= diffProcessedForSelfAndChildCount

View File

@ -29,21 +29,21 @@ const APIName = "statediff"
// APIVersion is the version of the state diffing service API // APIVersion is the version of the state diffing service API
const APIVersion = "0.0.1" const APIVersion = "0.0.1"
// PublicStateDiffAPI provides the a websocket service // PublicStateDiffAPI provides an RPC subscription interface
// that can be used to stream out state diffs as they // that can be used to stream out state diffs as they
// are produced by a full node // are produced by a full node
type PublicStateDiffAPI struct { type PublicStateDiffAPI struct {
sds IService sds IService
} }
// NewPublicStateDiffAPI create a new state diff websocket streaming service. // NewPublicStateDiffAPI creates an rpc subscription interface for the underlying statediff service
func NewPublicStateDiffAPI(sds IService) *PublicStateDiffAPI { func NewPublicStateDiffAPI(sds IService) *PublicStateDiffAPI {
return &PublicStateDiffAPI{ return &PublicStateDiffAPI{
sds: sds, sds: sds,
} }
} }
// Stream is the public method to setup a subscription that fires off state-diff payloads as they are created // Stream is the public method to setup a subscription that fires off statediff service payloads as they are created
func (api *PublicStateDiffAPI) Stream(ctx context.Context) (*rpc.Subscription, error) { func (api *PublicStateDiffAPI) Stream(ctx context.Context) (*rpc.Subscription, error) {
// ensure that the RPC connection supports subscriptions // ensure that the RPC connection supports subscriptions
notifier, supported := rpc.NotifierFromContext(ctx) notifier, supported := rpc.NotifierFromContext(ctx)
@ -51,7 +51,7 @@ func (api *PublicStateDiffAPI) Stream(ctx context.Context) (*rpc.Subscription, e
return nil, rpc.ErrNotificationsUnsupported return nil, rpc.ErrNotificationsUnsupported
} }
// create subscription and start waiting for statediff events // create subscription and start waiting for events
rpcSub := notifier.CreateSubscription() rpcSub := notifier.CreateSubscription()
go func() { go func() {
@ -59,11 +59,11 @@ func (api *PublicStateDiffAPI) Stream(ctx context.Context) (*rpc.Subscription, e
payloadChannel := make(chan Payload, chainEventChanSize) payloadChannel := make(chan Payload, chainEventChanSize)
quitChan := make(chan bool, 1) quitChan := make(chan bool, 1)
api.sds.Subscribe(rpcSub.ID, payloadChannel, quitChan) api.sds.Subscribe(rpcSub.ID, payloadChannel, quitChan)
// loop and await state diff payloads and relay them to the subscriber with the notifier // loop and await payloads and relay them to the subscriber with the notifier
for { for {
select { select {
case packet := <-payloadChannel: case payload := <-payloadChannel:
if notifyErr := notifier.Notify(rpcSub.ID, packet); notifyErr != nil { if notifyErr := notifier.Notify(rpcSub.ID, payload); notifyErr != nil {
log.Error("Failed to send state diff packet; error: " + notifyErr.Error()) log.Error("Failed to send state diff packet; error: " + notifyErr.Error())
unSubErr := api.sds.Unsubscribe(rpcSub.ID) unSubErr := api.sds.Unsubscribe(rpcSub.ID)
if unSubErr != nil { if unSubErr != nil {
@ -81,7 +81,7 @@ func (api *PublicStateDiffAPI) Stream(ctx context.Context) (*rpc.Subscription, e
return return
} }
case <-quitChan: case <-quitChan:
// don't need to unsubscribe, statediff service does so before sending the quit signal // don't need to unsubscribe, service does so before sending the quit signal
return return
} }
} }

View File

@ -25,7 +25,6 @@ import (
"math/big" "math/big"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
@ -58,7 +57,7 @@ func NewBuilder(db ethdb.Database, blockChain *core.BlockChain, config Config) B
} }
} }
// BuildStateDiff builds a StateDiff object from two blocks // BuildStateDiff builds a statediff object from two blocks
func (sdb *builder) BuildStateDiff(oldStateRoot, newStateRoot common.Hash, blockNumber *big.Int, blockHash common.Hash) (StateDiff, error) { func (sdb *builder) BuildStateDiff(oldStateRoot, newStateRoot common.Hash, blockNumber *big.Int, blockHash common.Hash) (StateDiff, error) {
// Generate tries for old and new states // Generate tries for old and new states
sdb.stateCache = sdb.blockChain.StateCache() sdb.stateCache = sdb.blockChain.StateCache()
@ -115,8 +114,9 @@ func (sdb *builder) BuildStateDiff(oldStateRoot, newStateRoot common.Hash, block
}, nil }, nil
} }
// isWatchedAddress is used to check if a state account corresponds to one of the addresses the builder is configured to watch
func (sdb *builder) isWatchedAddress(hashKey []byte) bool { func (sdb *builder) isWatchedAddress(hashKey []byte) bool {
// If we aren't watching any addresses, we are watching everything // If we aren't watching any specific addresses, we are watching everything
if len(sdb.config.WatchedAddresses) == 0 { if len(sdb.config.WatchedAddresses) == 0 {
return true return true
} }
@ -318,15 +318,3 @@ func (sdb *builder) buildStorageDiffsFromTrie(it trie.NodeIterator) ([]StorageDi
return storageDiffs, nil return storageDiffs, nil
} }
func (sdb *builder) addressByPath(path []byte) (*common.Address, error) {
log.Debug("Looking up address from path", "path", hexutil.Encode(append([]byte("secure-key-"), path...)))
addrBytes, err := sdb.chainDB.Get(append([]byte("secure-key-"), hexToKeyBytes(path)...))
if err != nil {
log.Error("Error looking up address via path", "path", hexutil.Encode(append([]byte("secure-key-"), path...)), "error", err)
return nil, err
}
addr := common.BytesToAddress(addrBytes)
log.Debug("Address found", "Address", addr)
return &addr, nil
}

View File

@ -15,11 +15,11 @@
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
/* /*
This work is adapted from work by Charles Crain at https://github.com/jpmorganchase/quorum/blob/9b7fd9af8082795eeeb6863d9746f12b82dd5078/statediff/statediff.go
Package statediff provides an auxiliary service that processes state diff objects from incoming chain events, Package statediff provides an auxiliary service that processes state diff objects from incoming chain events,
relaying the objects to any rpc subscriptions. relaying the objects to any rpc subscriptions.
This work is adapted from work by Charles Crain at https://github.com/jpmorganchase/quorum/blob/9b7fd9af8082795eeeb6863d9746f12b82dd5078/statediff/statediff.go
The service is spun up using the below CLI flags The service is spun up using the below CLI flags
--statediff: boolean flag, turns on the service --statediff: boolean flag, turns on the service
--statediff.streamblock: boolean flag, configures the service to associate and stream out the rest of the block data with the state diffs. --statediff.streamblock: boolean flag, configures the service to associate and stream out the rest of the block data with the state diffs.
@ -27,7 +27,16 @@ The service is spun up using the below CLI flags
--statediff.pathsandproofs: boolean flag, tells service to generate paths and proofs for the diffed storage and state trie leaf nodes. --statediff.pathsandproofs: boolean flag, tells service to generate paths and proofs for the diffed storage and state trie leaf nodes.
--statediff.watchedaddresses: string slice flag, used to limit the state diffing process to the given addresses. Usage: --statediff.watchedaddresses=addr1 --statediff.watchedaddresses=addr2 --statediff.watchedaddresses=addr3 --statediff.watchedaddresses: string slice flag, used to limit the state diffing process to the given addresses. Usage: --statediff.watchedaddresses=addr1 --statediff.watchedaddresses=addr2 --statediff.watchedaddresses=addr3
If you wish to use the websocket endpoint to subscribe to the statediff service, be sure to open up the Websocket RPC server with the `--ws` flag. If you wish to use the websocket endpoint to subscribe to the statediff service, be sure to open up the Websocket RPC server with the `--ws` flag. The IPC-RPC server is turned on by default.
The statediffing services works only with `--syncmode="full", but -importantly- does not require garbage collection to be turned off (does not require an archival node).
e.g.
$ ./geth --statediff --statediff.streamblock --ws --syncmode "full"
This starts up the geth node in full sync mode, starts up the statediffing service, and opens up the websocket endpoint to subscribe to the service.
Because the "streamblock" flag has been turned on, the service will strean out block data (headers, transactions, and receipts) along with the diffed state and storage leafs.
Rpc subscriptions to the service can be created using the rpc.Client.Subscribe() method, Rpc subscriptions to the service can be created using the rpc.Client.Subscribe() method,
with the "statediff" namespace, a statediff.Payload channel, and the name of the statediff api's rpc method- "stream". with the "statediff" namespace, a statediff.Payload channel, and the name of the statediff api's rpc method- "stream".
@ -41,7 +50,7 @@ for {
select { select {
case stateDiffPayload := <- stateDiffPayloadChan: case stateDiffPayload := <- stateDiffPayloadChan:
processPayload(stateDiffPayload) processPayload(stateDiffPayload)
case err := <= rpcSub.Err(): case err := <- rpcSub.Err():
log.Error(err) log.Error(err)
} }
} }

View File

@ -37,7 +37,7 @@ func sortKeys(data AccountsMap) []string {
return keys return keys
} }
// BytesToNiblePath // bytesToNiblePath converts the byte representation of a path to its string representation
func bytesToNiblePath(path []byte) string { func bytesToNiblePath(path []byte) string {
if hasTerm(path) { if hasTerm(path) {
path = path[:len(path)-1] path = path[:len(path)-1]
@ -53,6 +53,8 @@ func bytesToNiblePath(path []byte) string {
return nibblePath return nibblePath
} }
// findIntersection finds the set of strings from both arrays that are equivalent (same key as same index)
// this is used to find which keys have been both "deleted" and "created" i.e. they were updated
func findIntersection(a, b []string) []string { func findIntersection(a, b []string) []string {
lenA := len(a) lenA := len(a)
lenB := len(b) lenB := len(b)
@ -63,13 +65,13 @@ func findIntersection(a, b []string) []string {
} }
for { for {
switch strings.Compare(a[iOfA], b[iOfB]) { switch strings.Compare(a[iOfA], b[iOfB]) {
// a[iOfA] < b[iOfB] // -1 when a[iOfA] < b[iOfB]
case -1: case -1:
iOfA++ iOfA++
if iOfA >= lenA { if iOfA >= lenA {
return updates return updates
} }
// a[iOfA] == b[iOfB] // 0 when a[iOfA] == b[iOfB]
case 0: case 0:
updates = append(updates, a[iOfA]) updates = append(updates, a[iOfA])
iOfA++ iOfA++
@ -77,7 +79,7 @@ func findIntersection(a, b []string) []string {
if iOfA >= lenA || iOfB >= lenB { if iOfA >= lenA || iOfB >= lenB {
return updates return updates
} }
// a[iOfA] > b[iOfB] // 1 when a[iOfA] > b[iOfB]
case 1: case 1:
iOfB++ iOfB++
if iOfB >= lenB { if iOfB >= lenB {
@ -88,30 +90,11 @@ func findIntersection(a, b []string) []string {
} }
// pathToStr converts the NodeIterator path to a string representation
func pathToStr(it trie.NodeIterator) string { func pathToStr(it trie.NodeIterator) string {
return bytesToNiblePath(it.Path()) return bytesToNiblePath(it.Path())
} }
// Duplicated from trie/encoding.go
func hexToKeyBytes(hex []byte) []byte {
if hasTerm(hex) {
hex = hex[:len(hex)-1]
}
if len(hex)&1 != 0 {
panic("can't convert hex key of odd length")
}
key := make([]byte, (len(hex)+1)/2)
decodeNibbles(hex, key)
return key
}
func decodeNibbles(nibbles []byte, bytes []byte) {
for bi, ni := 0, 0; ni < len(nibbles); bi, ni = bi+1, ni+2 {
bytes[bi] = nibbles[ni]<<4 | nibbles[ni+1]
}
}
// hasTerm returns whether a hex key has the terminator flag. // hasTerm returns whether a hex key has the terminator flag.
func hasTerm(s []byte) bool { func hasTerm(s []byte) bool {
return len(s) > 0 && s[len(s)-1] == 16 return len(s) > 0 && s[len(s)-1] == 16

View File

@ -40,6 +40,7 @@ type blockChain interface {
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
GetBlockByHash(hash common.Hash) *types.Block GetBlockByHash(hash common.Hash) *types.Block
AddToStateDiffProcessedCollection(hash common.Hash) AddToStateDiffProcessedCollection(hash common.Hash)
GetReceiptsByHash(hash common.Hash) types.Receipts
} }
// IService is the state-diffing service interface // IService is the state-diffing service interface
@ -69,12 +70,12 @@ type Service struct {
// Cache the last block so that we can avoid having to lookup the next block's parent // Cache the last block so that we can avoid having to lookup the next block's parent
lastBlock *types.Block lastBlock *types.Block
// Whether or not the block data is streamed alongside the state diff data in the subscription payload // Whether or not the block data is streamed alongside the state diff data in the subscription payload
streamBlock bool StreamBlock bool
// Whether or not we have any subscribers; only if we do, do we processes state diffs // Whether or not we have any subscribers; only if we do, do we processes state diffs
subscribers int32 subscribers int32
} }
// NewStateDiffService creates a new StateDiffingService // NewStateDiffService creates a new statediff.Service
func NewStateDiffService(db ethdb.Database, blockChain *core.BlockChain, config Config) (*Service, error) { func NewStateDiffService(db ethdb.Database, blockChain *core.BlockChain, config Config) (*Service, error) {
return &Service{ return &Service{
Mutex: sync.Mutex{}, Mutex: sync.Mutex{},
@ -82,7 +83,7 @@ func NewStateDiffService(db ethdb.Database, blockChain *core.BlockChain, config
Builder: NewBuilder(db, blockChain, config), Builder: NewBuilder(db, blockChain, config),
QuitChan: make(chan bool), QuitChan: make(chan bool),
Subscriptions: make(map[rpc.ID]Subscription), Subscriptions: make(map[rpc.ID]Subscription),
streamBlock: config.StreamBlock, StreamBlock: config.StreamBlock,
}, nil }, nil
} }
@ -91,7 +92,7 @@ func (sds *Service) Protocols() []p2p.Protocol {
return []p2p.Protocol{} return []p2p.Protocol{}
} }
// APIs returns the RPC descriptors the StateDiffingService offers // APIs returns the RPC descriptors the statediff.Service offers
func (sds *Service) APIs() []rpc.API { func (sds *Service) APIs() []rpc.API {
return []rpc.API{ return []rpc.API{
{ {
@ -108,7 +109,6 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) {
chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh) chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh)
defer chainEventSub.Unsubscribe() defer chainEventSub.Unsubscribe()
errCh := chainEventSub.Err() errCh := chainEventSub.Err()
for { for {
select { select {
//Notify chain event channel of events //Notify chain event channel of events
@ -147,7 +147,7 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) {
} }
} }
// processStateDiff method builds the state diff payload from the current and parent block and sends it to listening subscriptions // processStateDiff method builds the state diff payload from the current and parent block before sending it to listening subscriptions
func (sds *Service) processStateDiff(currentBlock, parentBlock *types.Block) error { func (sds *Service) processStateDiff(currentBlock, parentBlock *types.Block) error {
stateDiff, err := sds.Builder.BuildStateDiff(parentBlock.Root(), currentBlock.Root(), currentBlock.Number(), currentBlock.Hash()) stateDiff, err := sds.Builder.BuildStateDiff(parentBlock.Root(), currentBlock.Root(), currentBlock.Number(), currentBlock.Hash())
if err != nil { if err != nil {
@ -159,22 +159,26 @@ func (sds *Service) processStateDiff(currentBlock, parentBlock *types.Block) err
} }
payload := Payload{ payload := Payload{
StateDiffRlp: stateDiffRlp, StateDiffRlp: stateDiffRlp,
Err: err,
} }
if sds.streamBlock { if sds.StreamBlock {
rlpBuff := new(bytes.Buffer) blockBuff := new(bytes.Buffer)
if err = currentBlock.EncodeRLP(rlpBuff); err != nil { if err = currentBlock.EncodeRLP(blockBuff); err != nil {
return err return err
} }
payload.BlockRlp = rlpBuff.Bytes() payload.BlockRlp = blockBuff.Bytes()
receiptBuff := new(bytes.Buffer)
receipts := sds.BlockChain.GetReceiptsByHash(currentBlock.Hash())
if err = rlp.Encode(receiptBuff, receipts); err != nil {
return err
}
payload.ReceiptsRlp = receiptBuff.Bytes()
} }
// If we have any websocket subscriptions listening in, send the data to them
sds.send(payload) sds.send(payload)
return nil return nil
} }
// Subscribe is used by the API to subscribe to the StateDiffingService loop // Subscribe is used by the API to subscribe to the service loop
func (sds *Service) Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool) { func (sds *Service) Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool) {
log.Info("Subscribing to the statediff service") log.Info("Subscribing to the statediff service")
if atomic.CompareAndSwapInt32(&sds.subscribers, 0, 1) { if atomic.CompareAndSwapInt32(&sds.subscribers, 0, 1) {
@ -188,7 +192,7 @@ func (sds *Service) Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- boo
sds.Unlock() sds.Unlock()
} }
// Unsubscribe is used to unsubscribe to the StateDiffingService loop // Unsubscribe is used to unsubscribe from the service loop
func (sds *Service) Unsubscribe(id rpc.ID) error { func (sds *Service) Unsubscribe(id rpc.ID) error {
log.Info("Unsubscribing from the statediff service") log.Info("Unsubscribing from the statediff service")
sds.Lock() sds.Lock()
@ -206,7 +210,7 @@ func (sds *Service) Unsubscribe(id rpc.ID) error {
return nil return nil
} }
// Start is used to begin the StateDiffingService // Start is used to begin the service
func (sds *Service) Start(*p2p.Server) error { func (sds *Service) Start(*p2p.Server) error {
log.Info("Starting statediff service") log.Info("Starting statediff service")
@ -216,14 +220,14 @@ func (sds *Service) Start(*p2p.Server) error {
return nil return nil
} }
// Stop is used to close down the StateDiffingService // Stop is used to close down the service
func (sds *Service) Stop() error { func (sds *Service) Stop() error {
log.Info("Stopping statediff service") log.Info("Stopping statediff service")
close(sds.QuitChan) close(sds.QuitChan)
return nil return nil
} }
// send is used to fan out and serve the statediff payload to all subscriptions // send is used to fan out and serve the payloads to all subscriptions
func (sds *Service) send(payload Payload) { func (sds *Service) send(payload Payload) {
sds.Lock() sds.Lock()
for id, sub := range sds.Subscriptions { for id, sub := range sds.Subscriptions {

View File

@ -21,11 +21,13 @@ import (
"math/big" "math/big"
"math/rand" "math/rand"
"reflect" "reflect"
"sync"
"testing" "testing"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff"
"github.com/ethereum/go-ethereum/statediff/testhelpers/mocks" "github.com/ethereum/go-ethereum/statediff/testhelpers/mocks"
@ -33,7 +35,7 @@ import (
func TestServiceLoop(t *testing.T) { func TestServiceLoop(t *testing.T) {
testErrorInChainEventLoop(t) testErrorInChainEventLoop(t)
//testErrorInBlockLoop(t) testErrorInBlockLoop(t)
} }
var ( var (
@ -61,6 +63,12 @@ var (
testBlock2 = types.NewBlock(&header2, nil, nil, nil) testBlock2 = types.NewBlock(&header2, nil, nil, nil)
testBlock3 = types.NewBlock(&header3, nil, nil, nil) testBlock3 = types.NewBlock(&header3, nil, nil, nil)
receiptRoot1 = common.HexToHash("0x05")
receiptRoot2 = common.HexToHash("0x06")
receiptRoot3 = common.HexToHash("0x07")
testReceipts1 = []*types.Receipt{types.NewReceipt(receiptRoot1.Bytes(), false, 1000), types.NewReceipt(receiptRoot2.Bytes(), false, 2000)}
testReceipts2 = []*types.Receipt{types.NewReceipt(receiptRoot3.Bytes(), false, 3000)}
event1 = core.ChainEvent{Block: testBlock1} event1 = core.ChainEvent{Block: testBlock1}
event2 = core.ChainEvent{Block: testBlock2} event2 = core.ChainEvent{Block: testBlock2}
event3 = core.ChainEvent{Block: testBlock3} event3 = core.ChainEvent{Block: testBlock3}
@ -71,43 +79,79 @@ func testErrorInChainEventLoop(t *testing.T) {
builder := mocks.Builder{} builder := mocks.Builder{}
blockChain := mocks.BlockChain{} blockChain := mocks.BlockChain{}
service := statediff.Service{ service := statediff.Service{
Mutex: sync.Mutex{},
Builder: &builder, Builder: &builder,
BlockChain: &blockChain, BlockChain: &blockChain,
QuitChan: make(chan bool), QuitChan: make(chan bool),
Subscriptions: make(map[rpc.ID]statediff.Subscription), Subscriptions: make(map[rpc.ID]statediff.Subscription),
StreamBlock: true,
} }
payloadChan := make(chan statediff.Payload) payloadChan := make(chan statediff.Payload, 2)
quitChan := make(chan bool) quitChan := make(chan bool)
service.Subscribe(rpc.NewID(), payloadChan, quitChan) service.Subscribe(rpc.NewID(), payloadChan, quitChan)
testRoot2 = common.HexToHash("0xTestRoot2") testRoot2 = common.HexToHash("0xTestRoot2")
blockChain.SetParentBlocksToReturn([]*types.Block{parentBlock1, parentBlock2}) blockMapping := make(map[common.Hash]*types.Block)
blockMapping[parentBlock1.Hash()] = parentBlock1
blockMapping[parentBlock2.Hash()] = parentBlock2
blockChain.SetParentBlocksToReturn(blockMapping)
blockChain.SetChainEvents([]core.ChainEvent{event1, event2, event3}) blockChain.SetChainEvents([]core.ChainEvent{event1, event2, event3})
// Need to have listeners on the channels or the subscription will be closed and the processing halted blockChain.SetReceiptsForHash(testBlock1.Hash(), testReceipts1)
blockChain.SetReceiptsForHash(testBlock2.Hash(), testReceipts2)
payloads := make([]statediff.Payload, 0, 2)
wg := sync.WaitGroup{}
go func() { go func() {
wg.Add(1)
for i := 0; i < 2; i++ {
select { select {
case <-payloadChan: case payload := <-payloadChan:
payloads = append(payloads, payload)
case <-quitChan: case <-quitChan:
} }
}
wg.Done()
}() }()
service.Loop(eventsChannel) service.Loop(eventsChannel)
wg.Wait()
if len(payloads) != 2 {
t.Error("Test failure:", t.Name())
t.Logf("Actual number of payloads does not equal expected.\nactual: %+v\nexpected: 3", len(payloads))
}
testReceipts1Rlp, err := rlp.EncodeToBytes(testReceipts1)
if err != nil {
t.Error(err)
}
testReceipts2Rlp, err := rlp.EncodeToBytes(testReceipts2)
if err != nil {
t.Error(err)
}
expectedReceiptsRlp := [][]byte{testReceipts1Rlp, testReceipts2Rlp, nil}
for i, payload := range payloads {
if !bytes.Equal(payload.ReceiptsRlp, expectedReceiptsRlp[i]) {
t.Error("Test failure:", t.Name())
t.Logf("Actual receipt rlp for payload %d does not equal expected.\nactual: %+v\nexpected: %+v", i, payload.ReceiptsRlp, expectedReceiptsRlp[i])
}
}
if !reflect.DeepEqual(builder.BlockHash, testBlock2.Hash()) { if !reflect.DeepEqual(builder.BlockHash, testBlock2.Hash()) {
t.Error("Test failure:", t.Name()) t.Error("Test failure:", t.Name())
t.Logf("Actual does not equal expected.\nactual:%+v\nexpected: %+v", builder.BlockHash, testBlock2.Hash()) t.Logf("Actual blockhash does not equal expected.\nactual:%+v\nexpected: %+v", builder.BlockHash, testBlock2.Hash())
} }
if !bytes.Equal(builder.OldStateRoot.Bytes(), parentBlock2.Root().Bytes()) { if !bytes.Equal(builder.OldStateRoot.Bytes(), parentBlock2.Root().Bytes()) {
t.Error("Test failure:", t.Name()) t.Error("Test failure:", t.Name())
t.Logf("Actual does not equal expected.\nactual:%+v\nexpected: %+v", builder.OldStateRoot, parentBlock2.Root()) t.Logf("Actual root does not equal expected.\nactual:%+v\nexpected: %+v", builder.OldStateRoot, parentBlock2.Root())
} }
if !bytes.Equal(builder.NewStateRoot.Bytes(), testBlock2.Root().Bytes()) { if !bytes.Equal(builder.NewStateRoot.Bytes(), testBlock2.Root().Bytes()) {
t.Error("Test failure:", t.Name()) t.Error("Test failure:", t.Name())
t.Logf("Actual does not equal expected.\nactual:%+v\nexpected: %+v", builder.NewStateRoot, testBlock2.Root()) t.Logf("Actual root does not equal expected.\nactual:%+v\nexpected: %+v", builder.NewStateRoot, testBlock2.Root())
} }
//look up the parent block from its hash //look up the parent block from its hash
expectedHashes := []common.Hash{testBlock1.ParentHash(), testBlock2.ParentHash()} expectedHashes := []common.Hash{testBlock1.ParentHash(), testBlock2.ParentHash()}
if !reflect.DeepEqual(blockChain.ParentHashesLookedUp, expectedHashes) { if !reflect.DeepEqual(blockChain.ParentHashesLookedUp, expectedHashes) {
t.Error("Test failure:", t.Name()) t.Error("Test failure:", t.Name())
t.Logf("Actual does not equal expected.\nactual:%+v\nexpected: %+v", blockChain.ParentHashesLookedUp, expectedHashes) t.Logf("Actual parent hash does not equal expected.\nactual:%+v\nexpected: %+v", blockChain.ParentHashesLookedUp, expectedHashes)
} }
} }
@ -121,9 +165,20 @@ func testErrorInBlockLoop(t *testing.T) {
QuitChan: make(chan bool), QuitChan: make(chan bool),
Subscriptions: make(map[rpc.ID]statediff.Subscription), Subscriptions: make(map[rpc.ID]statediff.Subscription),
} }
payloadChan := make(chan statediff.Payload)
blockChain.SetParentBlocksToReturn([]*types.Block{parentBlock1, nil}) quitChan := make(chan bool)
service.Subscribe(rpc.NewID(), payloadChan, quitChan)
blockMapping := make(map[common.Hash]*types.Block)
blockMapping[parentBlock1.Hash()] = parentBlock1
blockChain.SetParentBlocksToReturn(blockMapping)
blockChain.SetChainEvents([]core.ChainEvent{event1, event2}) blockChain.SetChainEvents([]core.ChainEvent{event1, event2})
// Need to have listeners on the channels or the subscription will be closed and the processing halted
go func() {
select {
case <-payloadChan:
case <-quitChan:
}
}()
service.Loop(eventsChannel) service.Loop(eventsChannel)
if !bytes.Equal(builder.BlockHash.Bytes(), testBlock1.Hash().Bytes()) { if !bytes.Equal(builder.BlockHash.Bytes(), testBlock1.Hash().Bytes()) {

View File

@ -102,7 +102,6 @@ func (sds *MockStateDiffService) process(currentBlock, parentBlock *types.Block)
} }
payload := statediff.Payload{ payload := statediff.Payload{
StateDiffRlp: stateDiffRlp, StateDiffRlp: stateDiffRlp,
Err: err,
} }
if sds.streamBlock { if sds.streamBlock {
rlpBuff := new(bytes.Buffer) rlpBuff := new(bytes.Buffer)
@ -119,7 +118,7 @@ func (sds *MockStateDiffService) process(currentBlock, parentBlock *types.Block)
// Subscribe mock method // Subscribe mock method
func (sds *MockStateDiffService) Subscribe(id rpc.ID, sub chan<- statediff.Payload, quitChan chan<- bool) { func (sds *MockStateDiffService) Subscribe(id rpc.ID, sub chan<- statediff.Payload, quitChan chan<- bool) {
log.Info("Subscribing to the statediff service") log.Info("Subscribing to the mock statediff service")
sds.Lock() sds.Lock()
sds.Subscriptions[id] = statediff.Subscription{ sds.Subscriptions[id] = statediff.Subscription{
PayloadChan: sub, PayloadChan: sub,
@ -130,7 +129,7 @@ func (sds *MockStateDiffService) Subscribe(id rpc.ID, sub chan<- statediff.Paylo
// Unsubscribe mock method // Unsubscribe mock method
func (sds *MockStateDiffService) Unsubscribe(id rpc.ID) error { func (sds *MockStateDiffService) Unsubscribe(id rpc.ID) error {
log.Info("Unsubscribing from the statediff service") log.Info("Unsubscribing from the mock statediff service")
sds.Lock() sds.Lock()
_, ok := sds.Subscriptions[id] _, ok := sds.Subscriptions[id]
if !ok { if !ok {
@ -170,9 +169,9 @@ func (sds *MockStateDiffService) close() {
// Start mock method // Start mock method
func (sds *MockStateDiffService) Start(server *p2p.Server) error { func (sds *MockStateDiffService) Start(server *p2p.Server) error {
log.Info("Starting statediff service") log.Info("Starting mock statediff service")
if sds.ParentBlockChan == nil || sds.BlockChan == nil { if sds.ParentBlockChan == nil || sds.BlockChan == nil {
return errors.New("mock StateDiffingService requires preconfiguration with a MockParentBlockChan and MockBlockChan") return errors.New("MockStateDiffingService needs to be configured with a MockParentBlockChan and MockBlockChan")
} }
chainEventCh := make(chan core.ChainEvent, 10) chainEventCh := make(chan core.ChainEvent, 10)
go sds.Loop(chainEventCh) go sds.Loop(chainEventCh)
@ -182,7 +181,7 @@ func (sds *MockStateDiffService) Start(server *p2p.Server) error {
// Stop mock method // Stop mock method
func (sds *MockStateDiffService) Stop() error { func (sds *MockStateDiffService) Stop() error {
log.Info("Stopping statediff service") log.Info("Stopping mock statediff service")
close(sds.QuitChan) close(sds.QuitChan)
return nil return nil
} }

View File

@ -136,9 +136,6 @@ func TestAPI(t *testing.T) {
if !bytes.Equal(payload.StateDiffRlp, expectedStateDiffBytes) { if !bytes.Equal(payload.StateDiffRlp, expectedStateDiffBytes) {
t.Errorf("payload does not have expected state diff\r\actual state diff rlp: %v\r\nexpected state diff rlp: %v", payload.StateDiffRlp, expectedStateDiffBytes) t.Errorf("payload does not have expected state diff\r\actual state diff rlp: %v\r\nexpected state diff rlp: %v", payload.StateDiffRlp, expectedStateDiffBytes)
} }
if payload.Err != nil {
t.Errorf("payload should not contain an error, but does: %v", payload.Err)
}
case <-quitChan: case <-quitChan:
t.Errorf("channel quit before delivering payload") t.Errorf("channel quit before delivering payload")
} }

View File

@ -30,16 +30,20 @@ import (
// BlockChain is a mock blockchain for testing // BlockChain is a mock blockchain for testing
type BlockChain struct { type BlockChain struct {
ParentHashesLookedUp []common.Hash ParentHashesLookedUp []common.Hash
parentBlocksToReturn []*types.Block parentBlocksToReturn map[common.Hash]*types.Block
callCount int callCount int
ChainEvents []core.ChainEvent ChainEvents []core.ChainEvent
Receipts map[common.Hash]types.Receipts
} }
// AddToStateDiffProcessedCollection mock method // AddToStateDiffProcessedCollection mock method
func (blockChain *BlockChain) AddToStateDiffProcessedCollection(hash common.Hash) {} func (blockChain *BlockChain) AddToStateDiffProcessedCollection(hash common.Hash) {}
// SetParentBlocksToReturn mock method // SetParentBlocksToReturn mock method
func (blockChain *BlockChain) SetParentBlocksToReturn(blocks []*types.Block) { func (blockChain *BlockChain) SetParentBlocksToReturn(blocks map[common.Hash]*types.Block) {
if blockChain.parentBlocksToReturn == nil {
blockChain.parentBlocksToReturn = make(map[common.Hash]*types.Block)
}
blockChain.parentBlocksToReturn = blocks blockChain.parentBlocksToReturn = blocks
} }
@ -49,10 +53,9 @@ func (blockChain *BlockChain) GetBlockByHash(hash common.Hash) *types.Block {
var parentBlock *types.Block var parentBlock *types.Block
if len(blockChain.parentBlocksToReturn) > 0 { if len(blockChain.parentBlocksToReturn) > 0 {
parentBlock = blockChain.parentBlocksToReturn[blockChain.callCount] parentBlock = blockChain.parentBlocksToReturn[hash]
} }
blockChain.callCount++
return parentBlock return parentBlock
} }
@ -84,3 +87,16 @@ func (blockChain *BlockChain) SubscribeChainEvent(ch chan<- core.ChainEvent) eve
return subscription return subscription
} }
// SetReceiptsForHash mock method
func (blockChain *BlockChain) SetReceiptsForHash(hash common.Hash, receipts types.Receipts) {
if blockChain.Receipts == nil {
blockChain.Receipts = make(map[common.Hash]types.Receipts)
}
blockChain.Receipts[hash] = receipts
}
// GetReceiptsByHash mock method
func (blockChain *BlockChain) GetReceiptsByHash(hash common.Hash) types.Receipts {
return blockChain.Receipts[hash]
}

View File

@ -34,11 +34,32 @@ type Subscription struct {
QuitChan chan<- bool QuitChan chan<- bool
} }
// Payload packages the data to send to StateDiffingService subscriptions // Payload packages the data to send to statediff subscriptions
type Payload struct { type Payload struct {
BlockRlp []byte `json:"blockRlp" gencodec:"required"` BlockRlp []byte `json:"blockRlp"`
ReceiptsRlp []byte `json:"receiptsRlp"`
StateDiffRlp []byte `json:"stateDiff" gencodec:"required"` StateDiffRlp []byte `json:"stateDiff" gencodec:"required"`
Err error `json:"error"`
encoded []byte
err error
}
func (sd *Payload) ensureEncoded() {
if sd.encoded == nil && sd.err == nil {
sd.encoded, sd.err = json.Marshal(sd)
}
}
// Length to implement Encoder interface for Payload
func (sd *Payload) Length() int {
sd.ensureEncoded()
return len(sd.encoded)
}
// Encode to implement Encoder interface for Payload
func (sd *Payload) Encode() ([]byte, error) {
sd.ensureEncoded()
return sd.encoded, sd.err
} }
// StateDiff is the final output structure from the builder // StateDiff is the final output structure from the builder
@ -53,24 +74,6 @@ type StateDiff struct {
err error err error
} }
func (sd *StateDiff) ensureEncoded() {
if sd.encoded == nil && sd.err == nil {
sd.encoded, sd.err = json.Marshal(sd)
}
}
// Length to implement Encoder interface for StateDiff
func (sd *StateDiff) Length() int {
sd.ensureEncoded()
return len(sd.encoded)
}
// Encode to implement Encoder interface for StateDiff
func (sd *StateDiff) Encode() ([]byte, error) {
sd.ensureEncoded()
return sd.encoded, sd.err
}
// AccountDiff holds the data for a single state diff node // AccountDiff holds the data for a single state diff node
type AccountDiff struct { type AccountDiff struct {
Leaf bool `json:"leaf" gencodec:"required"` Leaf bool `json:"leaf" gencodec:"required"`