diff --git a/core/blockchain.go b/core/blockchain.go index 880fa6bed..87d5a6d43 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1391,7 +1391,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. break } if bc.cacheConfig.ProcessingStateDiffs { - if !bc.allowedRootToBeDereferenced(root.(common.Hash)) { + if !bc.rootAllowedToBeDereferenced(root.(common.Hash)) { bc.triegc.Push(root, number) break } else { @@ -1464,7 +1464,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. // since we need the state tries of the current block and its parent in-memory // in order to process statediffs, we should avoid dereferencing roots until // its statediff and its child have been processed -func (bc *BlockChain) allowedRootToBeDereferenced(root common.Hash) bool { +func (bc *BlockChain) rootAllowedToBeDereferenced(root common.Hash) bool { diffProcessedForSelfAndChildCount := 2 count := bc.stateDiffsProcessed[root] return count >= diffProcessedForSelfAndChildCount diff --git a/statediff/api.go b/statediff/api.go index a05ef5510..52c604f97 100644 --- a/statediff/api.go +++ b/statediff/api.go @@ -29,21 +29,21 @@ const APIName = "statediff" // APIVersion is the version of the state diffing service API const APIVersion = "0.0.1" -// PublicStateDiffAPI provides the a websocket service +// PublicStateDiffAPI provides an RPC subscription interface // that can be used to stream out state diffs as they // are produced by a full node type PublicStateDiffAPI struct { sds IService } -// NewPublicStateDiffAPI create a new state diff websocket streaming service. +// NewPublicStateDiffAPI creates an rpc subscription interface for the underlying statediff service func NewPublicStateDiffAPI(sds IService) *PublicStateDiffAPI { return &PublicStateDiffAPI{ sds: sds, } } -// Stream is the public method to setup a subscription that fires off state-diff payloads as they are created +// Stream is the public method to setup a subscription that fires off statediff service payloads as they are created func (api *PublicStateDiffAPI) Stream(ctx context.Context) (*rpc.Subscription, error) { // ensure that the RPC connection supports subscriptions notifier, supported := rpc.NotifierFromContext(ctx) @@ -51,19 +51,19 @@ func (api *PublicStateDiffAPI) Stream(ctx context.Context) (*rpc.Subscription, e return nil, rpc.ErrNotificationsUnsupported } - // create subscription and start waiting for statediff events + // create subscription and start waiting for events rpcSub := notifier.CreateSubscription() go func() { - // subscribe to events from the state diff service + // subscribe to events from the statediff service payloadChannel := make(chan Payload, chainEventChanSize) quitChan := make(chan bool, 1) api.sds.Subscribe(rpcSub.ID, payloadChannel, quitChan) - // loop and await state diff payloads and relay them to the subscriber with the notifier + // loop and await payloads and relay them to the subscriber with the notifier for { select { - case packet := <-payloadChannel: - if notifyErr := notifier.Notify(rpcSub.ID, packet); notifyErr != nil { + case payload := <-payloadChannel: + if notifyErr := notifier.Notify(rpcSub.ID, payload); notifyErr != nil { log.Error("Failed to send state diff packet; error: " + notifyErr.Error()) unSubErr := api.sds.Unsubscribe(rpcSub.ID) if unSubErr != nil { @@ -81,7 +81,7 @@ func (api *PublicStateDiffAPI) Stream(ctx context.Context) (*rpc.Subscription, e return } case <-quitChan: - // don't need to unsubscribe, statediff service does so before sending the quit signal + // don't need to unsubscribe, service does so before sending the quit signal return } } diff --git a/statediff/builder.go b/statediff/builder.go index 765152e87..bbd523a5f 100644 --- a/statediff/builder.go +++ b/statediff/builder.go @@ -25,7 +25,6 @@ import ( "math/big" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/crypto" @@ -49,7 +48,7 @@ type builder struct { stateCache state.Database } -// NewBuilder is used to create a state diff builder +// NewBuilder is used to create a statediff builder func NewBuilder(db ethdb.Database, blockChain *core.BlockChain, config Config) Builder { return &builder{ chainDB: db, @@ -58,7 +57,7 @@ func NewBuilder(db ethdb.Database, blockChain *core.BlockChain, config Config) B } } -// BuildStateDiff builds a StateDiff object from two blocks +// BuildStateDiff builds a statediff object from two blocks func (sdb *builder) BuildStateDiff(oldStateRoot, newStateRoot common.Hash, blockNumber *big.Int, blockHash common.Hash) (StateDiff, error) { // Generate tries for old and new states sdb.stateCache = sdb.blockChain.StateCache() @@ -115,8 +114,9 @@ func (sdb *builder) BuildStateDiff(oldStateRoot, newStateRoot common.Hash, block }, nil } +// isWatchedAddress is used to check if a state account corresponds to one of the addresses the builder is configured to watch func (sdb *builder) isWatchedAddress(hashKey []byte) bool { - // If we aren't watching any addresses, we are watching everything + // If we aren't watching any specific addresses, we are watching everything if len(sdb.config.WatchedAddresses) == 0 { return true } @@ -318,15 +318,3 @@ func (sdb *builder) buildStorageDiffsFromTrie(it trie.NodeIterator) ([]StorageDi return storageDiffs, nil } - -func (sdb *builder) addressByPath(path []byte) (*common.Address, error) { - log.Debug("Looking up address from path", "path", hexutil.Encode(append([]byte("secure-key-"), path...))) - addrBytes, err := sdb.chainDB.Get(append([]byte("secure-key-"), hexToKeyBytes(path)...)) - if err != nil { - log.Error("Error looking up address via path", "path", hexutil.Encode(append([]byte("secure-key-"), path...)), "error", err) - return nil, err - } - addr := common.BytesToAddress(addrBytes) - log.Debug("Address found", "Address", addr) - return &addr, nil -} diff --git a/statediff/doc.go b/statediff/doc.go index 0e6d5f3e1..35c48c02d 100644 --- a/statediff/doc.go +++ b/statediff/doc.go @@ -15,11 +15,11 @@ // along with the go-ethereum library. If not, see . /* -This work is adapted from work by Charles Crain at https://github.com/jpmorganchase/quorum/blob/9b7fd9af8082795eeeb6863d9746f12b82dd5078/statediff/statediff.go - Package statediff provides an auxiliary service that processes state diff objects from incoming chain events, relaying the objects to any rpc subscriptions. +This work is adapted from work by Charles Crain at https://github.com/jpmorganchase/quorum/blob/9b7fd9af8082795eeeb6863d9746f12b82dd5078/statediff/statediff.go + The service is spun up using the below CLI flags --statediff: boolean flag, turns on the service --statediff.streamblock: boolean flag, configures the service to associate and stream out the rest of the block data with the state diffs. @@ -27,7 +27,16 @@ The service is spun up using the below CLI flags --statediff.pathsandproofs: boolean flag, tells service to generate paths and proofs for the diffed storage and state trie leaf nodes. --statediff.watchedaddresses: string slice flag, used to limit the state diffing process to the given addresses. Usage: --statediff.watchedaddresses=addr1 --statediff.watchedaddresses=addr2 --statediff.watchedaddresses=addr3 -If you wish to use the websocket endpoint to subscribe to the statediff service, be sure to open up the Websocket RPC server with the `--ws` flag. +If you wish to use the websocket endpoint to subscribe to the statediff service, be sure to open up the Websocket RPC server with the `--ws` flag. The IPC-RPC server is turned on by default. + +The statediffing services works only with `--syncmode="full", but -importantly- does not require garbage collection to be turned off (does not require an archival node). + +e.g. + +$ ./geth --statediff --statediff.streamblock --ws --syncmode "full" + +This starts up the geth node in full sync mode, starts up the statediffing service, and opens up the websocket endpoint to subscribe to the service. +Because the "streamblock" flag has been turned on, the service will strean out block data (headers, transactions, and receipts) along with the diffed state and storage leafs. Rpc subscriptions to the service can be created using the rpc.Client.Subscribe() method, with the "statediff" namespace, a statediff.Payload channel, and the name of the statediff api's rpc method- "stream". @@ -41,7 +50,7 @@ for { select { case stateDiffPayload := <- stateDiffPayloadChan: processPayload(stateDiffPayload) - case err := <= rpcSub.Err(): + case err := <- rpcSub.Err(): log.Error(err) } } diff --git a/statediff/helpers.go b/statediff/helpers.go index 89852b55c..2c5ddeb45 100644 --- a/statediff/helpers.go +++ b/statediff/helpers.go @@ -37,7 +37,7 @@ func sortKeys(data AccountsMap) []string { return keys } -// BytesToNiblePath +// bytesToNiblePath converts the byte representation of a path to its string representation func bytesToNiblePath(path []byte) string { if hasTerm(path) { path = path[:len(path)-1] @@ -53,6 +53,8 @@ func bytesToNiblePath(path []byte) string { return nibblePath } +// findIntersection finds the set of strings from both arrays that are equivalent (same key as same index) +// this is used to find which keys have been both "deleted" and "created" i.e. they were updated func findIntersection(a, b []string) []string { lenA := len(a) lenB := len(b) @@ -63,13 +65,13 @@ func findIntersection(a, b []string) []string { } for { switch strings.Compare(a[iOfA], b[iOfB]) { - // a[iOfA] < b[iOfB] + // -1 when a[iOfA] < b[iOfB] case -1: iOfA++ if iOfA >= lenA { return updates } - // a[iOfA] == b[iOfB] + // 0 when a[iOfA] == b[iOfB] case 0: updates = append(updates, a[iOfA]) iOfA++ @@ -77,7 +79,7 @@ func findIntersection(a, b []string) []string { if iOfA >= lenA || iOfB >= lenB { return updates } - // a[iOfA] > b[iOfB] + // 1 when a[iOfA] > b[iOfB] case 1: iOfB++ if iOfB >= lenB { @@ -88,30 +90,11 @@ func findIntersection(a, b []string) []string { } +// pathToStr converts the NodeIterator path to a string representation func pathToStr(it trie.NodeIterator) string { return bytesToNiblePath(it.Path()) } -// Duplicated from trie/encoding.go -func hexToKeyBytes(hex []byte) []byte { - if hasTerm(hex) { - hex = hex[:len(hex)-1] - } - if len(hex)&1 != 0 { - panic("can't convert hex key of odd length") - } - key := make([]byte, (len(hex)+1)/2) - decodeNibbles(hex, key) - - return key -} - -func decodeNibbles(nibbles []byte, bytes []byte) { - for bi, ni := 0, 0; ni < len(nibbles); bi, ni = bi+1, ni+2 { - bytes[bi] = nibbles[ni]<<4 | nibbles[ni+1] - } -} - // hasTerm returns whether a hex key has the terminator flag. func hasTerm(s []byte) bool { return len(s) > 0 && s[len(s)-1] == 16 diff --git a/statediff/service.go b/statediff/service.go index e978fcb8f..d3eab1065 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -40,6 +40,7 @@ type blockChain interface { SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription GetBlockByHash(hash common.Hash) *types.Block AddToStateDiffProcessedCollection(hash common.Hash) + GetReceiptsByHash(hash common.Hash) types.Receipts } // IService is the state-diffing service interface @@ -69,12 +70,12 @@ type Service struct { // Cache the last block so that we can avoid having to lookup the next block's parent lastBlock *types.Block // Whether or not the block data is streamed alongside the state diff data in the subscription payload - streamBlock bool + StreamBlock bool // Whether or not we have any subscribers; only if we do, do we processes state diffs subscribers int32 } -// NewStateDiffService creates a new StateDiffingService +// NewStateDiffService creates a new statediff.Service func NewStateDiffService(db ethdb.Database, blockChain *core.BlockChain, config Config) (*Service, error) { return &Service{ Mutex: sync.Mutex{}, @@ -82,7 +83,7 @@ func NewStateDiffService(db ethdb.Database, blockChain *core.BlockChain, config Builder: NewBuilder(db, blockChain, config), QuitChan: make(chan bool), Subscriptions: make(map[rpc.ID]Subscription), - streamBlock: config.StreamBlock, + StreamBlock: config.StreamBlock, }, nil } @@ -91,7 +92,7 @@ func (sds *Service) Protocols() []p2p.Protocol { return []p2p.Protocol{} } -// APIs returns the RPC descriptors the StateDiffingService offers +// APIs returns the RPC descriptors the statediff.Service offers func (sds *Service) APIs() []rpc.API { return []rpc.API{ { @@ -108,7 +109,6 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) { chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh) defer chainEventSub.Unsubscribe() errCh := chainEventSub.Err() - for { select { //Notify chain event channel of events @@ -147,7 +147,7 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) { } } -// processStateDiff method builds the state diff payload from the current and parent block and sends it to listening subscriptions +// processStateDiff method builds the state diff payload from the current and parent block before sending it to listening subscriptions func (sds *Service) processStateDiff(currentBlock, parentBlock *types.Block) error { stateDiff, err := sds.Builder.BuildStateDiff(parentBlock.Root(), currentBlock.Root(), currentBlock.Number(), currentBlock.Hash()) if err != nil { @@ -159,22 +159,26 @@ func (sds *Service) processStateDiff(currentBlock, parentBlock *types.Block) err } payload := Payload{ StateDiffRlp: stateDiffRlp, - Err: err, } - if sds.streamBlock { - rlpBuff := new(bytes.Buffer) - if err = currentBlock.EncodeRLP(rlpBuff); err != nil { + if sds.StreamBlock { + blockBuff := new(bytes.Buffer) + if err = currentBlock.EncodeRLP(blockBuff); err != nil { return err } - payload.BlockRlp = rlpBuff.Bytes() + payload.BlockRlp = blockBuff.Bytes() + receiptBuff := new(bytes.Buffer) + receipts := sds.BlockChain.GetReceiptsByHash(currentBlock.Hash()) + if err = rlp.Encode(receiptBuff, receipts); err != nil { + return err + } + payload.ReceiptsRlp = receiptBuff.Bytes() } - // If we have any websocket subscriptions listening in, send the data to them sds.send(payload) return nil } -// Subscribe is used by the API to subscribe to the StateDiffingService loop +// Subscribe is used by the API to subscribe to the service loop func (sds *Service) Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool) { log.Info("Subscribing to the statediff service") if atomic.CompareAndSwapInt32(&sds.subscribers, 0, 1) { @@ -188,7 +192,7 @@ func (sds *Service) Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- boo sds.Unlock() } -// Unsubscribe is used to unsubscribe to the StateDiffingService loop +// Unsubscribe is used to unsubscribe from the service loop func (sds *Service) Unsubscribe(id rpc.ID) error { log.Info("Unsubscribing from the statediff service") sds.Lock() @@ -206,7 +210,7 @@ func (sds *Service) Unsubscribe(id rpc.ID) error { return nil } -// Start is used to begin the StateDiffingService +// Start is used to begin the service func (sds *Service) Start(*p2p.Server) error { log.Info("Starting statediff service") @@ -216,14 +220,14 @@ func (sds *Service) Start(*p2p.Server) error { return nil } -// Stop is used to close down the StateDiffingService +// Stop is used to close down the service func (sds *Service) Stop() error { log.Info("Stopping statediff service") close(sds.QuitChan) return nil } -// send is used to fan out and serve the statediff payload to all subscriptions +// send is used to fan out and serve the payloads to all subscriptions func (sds *Service) send(payload Payload) { sds.Lock() for id, sub := range sds.Subscriptions { diff --git a/statediff/service_test.go b/statediff/service_test.go index de3df3dd2..6119f6ecb 100644 --- a/statediff/service_test.go +++ b/statediff/service_test.go @@ -21,11 +21,13 @@ import ( "math/big" "math/rand" "reflect" + "sync" "testing" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff/testhelpers/mocks" @@ -33,7 +35,7 @@ import ( func TestServiceLoop(t *testing.T) { testErrorInChainEventLoop(t) - //testErrorInBlockLoop(t) + testErrorInBlockLoop(t) } var ( @@ -61,6 +63,12 @@ var ( testBlock2 = types.NewBlock(&header2, nil, nil, nil) testBlock3 = types.NewBlock(&header3, nil, nil, nil) + receiptRoot1 = common.HexToHash("0x05") + receiptRoot2 = common.HexToHash("0x06") + receiptRoot3 = common.HexToHash("0x07") + testReceipts1 = []*types.Receipt{types.NewReceipt(receiptRoot1.Bytes(), false, 1000), types.NewReceipt(receiptRoot2.Bytes(), false, 2000)} + testReceipts2 = []*types.Receipt{types.NewReceipt(receiptRoot3.Bytes(), false, 3000)} + event1 = core.ChainEvent{Block: testBlock1} event2 = core.ChainEvent{Block: testBlock2} event3 = core.ChainEvent{Block: testBlock3} @@ -71,43 +79,79 @@ func testErrorInChainEventLoop(t *testing.T) { builder := mocks.Builder{} blockChain := mocks.BlockChain{} service := statediff.Service{ + Mutex: sync.Mutex{}, Builder: &builder, BlockChain: &blockChain, QuitChan: make(chan bool), Subscriptions: make(map[rpc.ID]statediff.Subscription), + StreamBlock: true, } - payloadChan := make(chan statediff.Payload) + payloadChan := make(chan statediff.Payload, 2) quitChan := make(chan bool) service.Subscribe(rpc.NewID(), payloadChan, quitChan) testRoot2 = common.HexToHash("0xTestRoot2") - blockChain.SetParentBlocksToReturn([]*types.Block{parentBlock1, parentBlock2}) + blockMapping := make(map[common.Hash]*types.Block) + blockMapping[parentBlock1.Hash()] = parentBlock1 + blockMapping[parentBlock2.Hash()] = parentBlock2 + blockChain.SetParentBlocksToReturn(blockMapping) blockChain.SetChainEvents([]core.ChainEvent{event1, event2, event3}) - // Need to have listeners on the channels or the subscription will be closed and the processing halted + blockChain.SetReceiptsForHash(testBlock1.Hash(), testReceipts1) + blockChain.SetReceiptsForHash(testBlock2.Hash(), testReceipts2) + + payloads := make([]statediff.Payload, 0, 2) + wg := sync.WaitGroup{} go func() { - select { - case <-payloadChan: - case <-quitChan: + wg.Add(1) + for i := 0; i < 2; i++ { + select { + case payload := <-payloadChan: + payloads = append(payloads, payload) + case <-quitChan: + } } + wg.Done() }() + service.Loop(eventsChannel) + wg.Wait() + if len(payloads) != 2 { + t.Error("Test failure:", t.Name()) + t.Logf("Actual number of payloads does not equal expected.\nactual: %+v\nexpected: 3", len(payloads)) + } + + testReceipts1Rlp, err := rlp.EncodeToBytes(testReceipts1) + if err != nil { + t.Error(err) + } + testReceipts2Rlp, err := rlp.EncodeToBytes(testReceipts2) + if err != nil { + t.Error(err) + } + expectedReceiptsRlp := [][]byte{testReceipts1Rlp, testReceipts2Rlp, nil} + for i, payload := range payloads { + if !bytes.Equal(payload.ReceiptsRlp, expectedReceiptsRlp[i]) { + t.Error("Test failure:", t.Name()) + t.Logf("Actual receipt rlp for payload %d does not equal expected.\nactual: %+v\nexpected: %+v", i, payload.ReceiptsRlp, expectedReceiptsRlp[i]) + } + } if !reflect.DeepEqual(builder.BlockHash, testBlock2.Hash()) { t.Error("Test failure:", t.Name()) - t.Logf("Actual does not equal expected.\nactual:%+v\nexpected: %+v", builder.BlockHash, testBlock2.Hash()) + t.Logf("Actual blockhash does not equal expected.\nactual:%+v\nexpected: %+v", builder.BlockHash, testBlock2.Hash()) } if !bytes.Equal(builder.OldStateRoot.Bytes(), parentBlock2.Root().Bytes()) { t.Error("Test failure:", t.Name()) - t.Logf("Actual does not equal expected.\nactual:%+v\nexpected: %+v", builder.OldStateRoot, parentBlock2.Root()) + t.Logf("Actual root does not equal expected.\nactual:%+v\nexpected: %+v", builder.OldStateRoot, parentBlock2.Root()) } if !bytes.Equal(builder.NewStateRoot.Bytes(), testBlock2.Root().Bytes()) { t.Error("Test failure:", t.Name()) - t.Logf("Actual does not equal expected.\nactual:%+v\nexpected: %+v", builder.NewStateRoot, testBlock2.Root()) + t.Logf("Actual root does not equal expected.\nactual:%+v\nexpected: %+v", builder.NewStateRoot, testBlock2.Root()) } //look up the parent block from its hash expectedHashes := []common.Hash{testBlock1.ParentHash(), testBlock2.ParentHash()} if !reflect.DeepEqual(blockChain.ParentHashesLookedUp, expectedHashes) { t.Error("Test failure:", t.Name()) - t.Logf("Actual does not equal expected.\nactual:%+v\nexpected: %+v", blockChain.ParentHashesLookedUp, expectedHashes) + t.Logf("Actual parent hash does not equal expected.\nactual:%+v\nexpected: %+v", blockChain.ParentHashesLookedUp, expectedHashes) } } @@ -121,9 +165,20 @@ func testErrorInBlockLoop(t *testing.T) { QuitChan: make(chan bool), Subscriptions: make(map[rpc.ID]statediff.Subscription), } - - blockChain.SetParentBlocksToReturn([]*types.Block{parentBlock1, nil}) + payloadChan := make(chan statediff.Payload) + quitChan := make(chan bool) + service.Subscribe(rpc.NewID(), payloadChan, quitChan) + blockMapping := make(map[common.Hash]*types.Block) + blockMapping[parentBlock1.Hash()] = parentBlock1 + blockChain.SetParentBlocksToReturn(blockMapping) blockChain.SetChainEvents([]core.ChainEvent{event1, event2}) + // Need to have listeners on the channels or the subscription will be closed and the processing halted + go func() { + select { + case <-payloadChan: + case <-quitChan: + } + }() service.Loop(eventsChannel) if !bytes.Equal(builder.BlockHash.Bytes(), testBlock1.Hash().Bytes()) { diff --git a/statediff/testhelpers/mocks/api.go b/statediff/testhelpers/mocks/api.go index 687a7c77d..3b43ab7dd 100644 --- a/statediff/testhelpers/mocks/api.go +++ b/statediff/testhelpers/mocks/api.go @@ -102,7 +102,6 @@ func (sds *MockStateDiffService) process(currentBlock, parentBlock *types.Block) } payload := statediff.Payload{ StateDiffRlp: stateDiffRlp, - Err: err, } if sds.streamBlock { rlpBuff := new(bytes.Buffer) @@ -119,7 +118,7 @@ func (sds *MockStateDiffService) process(currentBlock, parentBlock *types.Block) // Subscribe mock method func (sds *MockStateDiffService) Subscribe(id rpc.ID, sub chan<- statediff.Payload, quitChan chan<- bool) { - log.Info("Subscribing to the statediff service") + log.Info("Subscribing to the mock statediff service") sds.Lock() sds.Subscriptions[id] = statediff.Subscription{ PayloadChan: sub, @@ -130,7 +129,7 @@ func (sds *MockStateDiffService) Subscribe(id rpc.ID, sub chan<- statediff.Paylo // Unsubscribe mock method func (sds *MockStateDiffService) Unsubscribe(id rpc.ID) error { - log.Info("Unsubscribing from the statediff service") + log.Info("Unsubscribing from the mock statediff service") sds.Lock() _, ok := sds.Subscriptions[id] if !ok { @@ -170,9 +169,9 @@ func (sds *MockStateDiffService) close() { // Start mock method func (sds *MockStateDiffService) Start(server *p2p.Server) error { - log.Info("Starting statediff service") + log.Info("Starting mock statediff service") if sds.ParentBlockChan == nil || sds.BlockChan == nil { - return errors.New("mock StateDiffingService requires preconfiguration with a MockParentBlockChan and MockBlockChan") + return errors.New("MockStateDiffingService needs to be configured with a MockParentBlockChan and MockBlockChan") } chainEventCh := make(chan core.ChainEvent, 10) go sds.Loop(chainEventCh) @@ -182,7 +181,7 @@ func (sds *MockStateDiffService) Start(server *p2p.Server) error { // Stop mock method func (sds *MockStateDiffService) Stop() error { - log.Info("Stopping statediff service") + log.Info("Stopping mock statediff service") close(sds.QuitChan) return nil } diff --git a/statediff/testhelpers/mocks/api_test.go b/statediff/testhelpers/mocks/api_test.go index 22971b4b6..b76ba4328 100644 --- a/statediff/testhelpers/mocks/api_test.go +++ b/statediff/testhelpers/mocks/api_test.go @@ -136,9 +136,6 @@ func TestAPI(t *testing.T) { if !bytes.Equal(payload.StateDiffRlp, expectedStateDiffBytes) { t.Errorf("payload does not have expected state diff\r\actual state diff rlp: %v\r\nexpected state diff rlp: %v", payload.StateDiffRlp, expectedStateDiffBytes) } - if payload.Err != nil { - t.Errorf("payload should not contain an error, but does: %v", payload.Err) - } case <-quitChan: t.Errorf("channel quit before delivering payload") } diff --git a/statediff/testhelpers/mocks/blockchain.go b/statediff/testhelpers/mocks/blockchain.go index f2c097d38..508435236 100644 --- a/statediff/testhelpers/mocks/blockchain.go +++ b/statediff/testhelpers/mocks/blockchain.go @@ -30,16 +30,20 @@ import ( // BlockChain is a mock blockchain for testing type BlockChain struct { ParentHashesLookedUp []common.Hash - parentBlocksToReturn []*types.Block + parentBlocksToReturn map[common.Hash]*types.Block callCount int ChainEvents []core.ChainEvent + Receipts map[common.Hash]types.Receipts } // AddToStateDiffProcessedCollection mock method func (blockChain *BlockChain) AddToStateDiffProcessedCollection(hash common.Hash) {} // SetParentBlocksToReturn mock method -func (blockChain *BlockChain) SetParentBlocksToReturn(blocks []*types.Block) { +func (blockChain *BlockChain) SetParentBlocksToReturn(blocks map[common.Hash]*types.Block) { + if blockChain.parentBlocksToReturn == nil { + blockChain.parentBlocksToReturn = make(map[common.Hash]*types.Block) + } blockChain.parentBlocksToReturn = blocks } @@ -49,10 +53,9 @@ func (blockChain *BlockChain) GetBlockByHash(hash common.Hash) *types.Block { var parentBlock *types.Block if len(blockChain.parentBlocksToReturn) > 0 { - parentBlock = blockChain.parentBlocksToReturn[blockChain.callCount] + parentBlock = blockChain.parentBlocksToReturn[hash] } - blockChain.callCount++ return parentBlock } @@ -84,3 +87,16 @@ func (blockChain *BlockChain) SubscribeChainEvent(ch chan<- core.ChainEvent) eve return subscription } + +// SetReceiptsForHash mock method +func (blockChain *BlockChain) SetReceiptsForHash(hash common.Hash, receipts types.Receipts) { + if blockChain.Receipts == nil { + blockChain.Receipts = make(map[common.Hash]types.Receipts) + } + blockChain.Receipts[hash] = receipts +} + +// GetReceiptsByHash mock method +func (blockChain *BlockChain) GetReceiptsByHash(hash common.Hash) types.Receipts { + return blockChain.Receipts[hash] +} diff --git a/statediff/types.go b/statediff/types.go index 6df398a10..3bfb598a5 100644 --- a/statediff/types.go +++ b/statediff/types.go @@ -34,17 +34,38 @@ type Subscription struct { QuitChan chan<- bool } -// Payload packages the data to send to StateDiffingService subscriptions +// Payload packages the data to send to statediff subscriptions type Payload struct { - BlockRlp []byte `json:"blockRlp" gencodec:"required"` + BlockRlp []byte `json:"blockRlp"` + ReceiptsRlp []byte `json:"receiptsRlp"` StateDiffRlp []byte `json:"stateDiff" gencodec:"required"` - Err error `json:"error"` + + encoded []byte + err error +} + +func (sd *Payload) ensureEncoded() { + if sd.encoded == nil && sd.err == nil { + sd.encoded, sd.err = json.Marshal(sd) + } +} + +// Length to implement Encoder interface for Payload +func (sd *Payload) Length() int { + sd.ensureEncoded() + return len(sd.encoded) +} + +// Encode to implement Encoder interface for Payload +func (sd *Payload) Encode() ([]byte, error) { + sd.ensureEncoded() + return sd.encoded, sd.err } // StateDiff is the final output structure from the builder type StateDiff struct { - BlockNumber *big.Int `json:"blockNumber" gencodec:"required"` - BlockHash common.Hash `json:"blockHash" gencodec:"required"` + BlockNumber *big.Int `json:"blockNumber" gencodec:"required"` + BlockHash common.Hash `json:"blockHash" gencodec:"required"` CreatedAccounts []AccountDiff `json:"createdAccounts" gencodec:"required"` DeletedAccounts []AccountDiff `json:"deletedAccounts" gencodec:"required"` UpdatedAccounts []AccountDiff `json:"updatedAccounts" gencodec:"required"` @@ -53,24 +74,6 @@ type StateDiff struct { err error } -func (sd *StateDiff) ensureEncoded() { - if sd.encoded == nil && sd.err == nil { - sd.encoded, sd.err = json.Marshal(sd) - } -} - -// Length to implement Encoder interface for StateDiff -func (sd *StateDiff) Length() int { - sd.ensureEncoded() - return len(sd.encoded) -} - -// Encode to implement Encoder interface for StateDiff -func (sd *StateDiff) Encode() ([]byte, error) { - sd.ensureEncoded() - return sd.encoded, sd.err -} - // AccountDiff holds the data for a single state diff node type AccountDiff struct { Leaf bool `json:"leaf" gencodec:"required"`