diff --git a/statediff/service.go b/statediff/service.go index e978fcb8f..be3dc6cf9 100644 --- a/statediff/service.go +++ b/statediff/service.go @@ -40,6 +40,7 @@ type blockChain interface { SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription GetBlockByHash(hash common.Hash) *types.Block AddToStateDiffProcessedCollection(hash common.Hash) + GetReceiptsByHash(hash common.Hash) types.Receipts } // IService is the state-diffing service interface @@ -69,7 +70,7 @@ type Service struct { // Cache the last block so that we can avoid having to lookup the next block's parent lastBlock *types.Block // Whether or not the block data is streamed alongside the state diff data in the subscription payload - streamBlock bool + StreamBlock bool // Whether or not we have any subscribers; only if we do, do we processes state diffs subscribers int32 } @@ -82,7 +83,7 @@ func NewStateDiffService(db ethdb.Database, blockChain *core.BlockChain, config Builder: NewBuilder(db, blockChain, config), QuitChan: make(chan bool), Subscriptions: make(map[rpc.ID]Subscription), - streamBlock: config.StreamBlock, + StreamBlock: config.StreamBlock, }, nil } @@ -108,7 +109,6 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) { chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh) defer chainEventSub.Unsubscribe() errCh := chainEventSub.Err() - for { select { //Notify chain event channel of events @@ -161,12 +161,19 @@ func (sds *Service) processStateDiff(currentBlock, parentBlock *types.Block) err StateDiffRlp: stateDiffRlp, Err: err, } - if sds.streamBlock { - rlpBuff := new(bytes.Buffer) - if err = currentBlock.EncodeRLP(rlpBuff); err != nil { + if sds.StreamBlock { + blockBuff := new(bytes.Buffer) + if err = currentBlock.EncodeRLP(blockBuff); err != nil { return err } - payload.BlockRlp = rlpBuff.Bytes() + payload.BlockRlp = blockBuff.Bytes() + receiptBuff := new(bytes.Buffer) + receipts := sds.BlockChain.GetReceiptsByHash(currentBlock.Hash()) + if err = rlp.Encode(receiptBuff, receipts); err != nil { + println(err.Error()) + return err + } + payload.ReceiptsRlp = receiptBuff.Bytes() } // If we have any websocket subscriptions listening in, send the data to them diff --git a/statediff/service_test.go b/statediff/service_test.go index de3df3dd2..1508542b7 100644 --- a/statediff/service_test.go +++ b/statediff/service_test.go @@ -21,11 +21,13 @@ import ( "math/big" "math/rand" "reflect" + "sync" "testing" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/statediff" "github.com/ethereum/go-ethereum/statediff/testhelpers/mocks" @@ -33,7 +35,7 @@ import ( func TestServiceLoop(t *testing.T) { testErrorInChainEventLoop(t) - //testErrorInBlockLoop(t) + testErrorInBlockLoop(t) } var ( @@ -61,6 +63,12 @@ var ( testBlock2 = types.NewBlock(&header2, nil, nil, nil) testBlock3 = types.NewBlock(&header3, nil, nil, nil) + receiptRoot1 = common.HexToHash("0x05") + receiptRoot2 = common.HexToHash("0x06") + receiptRoot3 = common.HexToHash("0x07") + testReceipts1 = []*types.Receipt{types.NewReceipt(receiptRoot1.Bytes(), false, 1000), types.NewReceipt(receiptRoot2.Bytes(), false, 2000)} + testReceipts2 = []*types.Receipt{types.NewReceipt(receiptRoot3.Bytes(), false, 3000)} + event1 = core.ChainEvent{Block: testBlock1} event2 = core.ChainEvent{Block: testBlock2} event3 = core.ChainEvent{Block: testBlock3} @@ -71,25 +79,61 @@ func testErrorInChainEventLoop(t *testing.T) { builder := mocks.Builder{} blockChain := mocks.BlockChain{} service := statediff.Service{ + Mutex: sync.Mutex{}, Builder: &builder, BlockChain: &blockChain, QuitChan: make(chan bool), Subscriptions: make(map[rpc.ID]statediff.Subscription), + StreamBlock: true, } - payloadChan := make(chan statediff.Payload) + payloadChan := make(chan statediff.Payload, 2) quitChan := make(chan bool) service.Subscribe(rpc.NewID(), payloadChan, quitChan) testRoot2 = common.HexToHash("0xTestRoot2") - blockChain.SetParentBlocksToReturn([]*types.Block{parentBlock1, parentBlock2}) + blockMapping := make(map[common.Hash]*types.Block) + blockMapping[parentBlock1.Hash()] = parentBlock1 + blockMapping[parentBlock2.Hash()] = parentBlock2 + blockChain.SetParentBlocksToReturn(blockMapping) blockChain.SetChainEvents([]core.ChainEvent{event1, event2, event3}) - // Need to have listeners on the channels or the subscription will be closed and the processing halted + blockChain.SetReceiptsForHash(testBlock1.Hash(), testReceipts1) + blockChain.SetReceiptsForHash(testBlock2.Hash(), testReceipts2) + + payloads := make([]statediff.Payload, 0, 2) + wg := sync.WaitGroup{} go func() { - select { - case <-payloadChan: - case <-quitChan: + wg.Add(1) + for i := 0; i < 2; i++ { + select { + case payload := <-payloadChan: + payloads = append(payloads, payload) + case <-quitChan: + } } + wg.Done() }() + service.Loop(eventsChannel) + wg.Wait() + if len(payloads) != 2 { + t.Error("Test failure:", t.Name()) + t.Logf("Actual number of payloads does not equal expected.\nactual: %+v\nexpected: 3", len(payloads)) + } + + testReceipts1Rlp, err := rlp.EncodeToBytes(testReceipts1) + if err != nil { + t.Error(err) + } + testReceipts2Rlp, err := rlp.EncodeToBytes(testReceipts2) + if err != nil { + t.Error(err) + } + expectedReceiptsRlp := [][]byte{testReceipts1Rlp, testReceipts2Rlp, nil} + for i, payload := range payloads { + if !bytes.Equal(payload.ReceiptsRlp, expectedReceiptsRlp[i]) { + t.Error("Test failure:", t.Name()) + t.Logf("Actual reeipt rlp for payload %d does not equal expected.\nactual: %+v\nexpected: %+v", i, payload.ReceiptsRlp, expectedReceiptsRlp[i]) + } + } if !reflect.DeepEqual(builder.BlockHash, testBlock2.Hash()) { t.Error("Test failure:", t.Name()) @@ -121,9 +165,20 @@ func testErrorInBlockLoop(t *testing.T) { QuitChan: make(chan bool), Subscriptions: make(map[rpc.ID]statediff.Subscription), } - - blockChain.SetParentBlocksToReturn([]*types.Block{parentBlock1, nil}) + payloadChan := make(chan statediff.Payload) + quitChan := make(chan bool) + service.Subscribe(rpc.NewID(), payloadChan, quitChan) + blockMapping := make(map[common.Hash]*types.Block) + blockMapping[parentBlock1.Hash()] = parentBlock1 + blockChain.SetParentBlocksToReturn(blockMapping) blockChain.SetChainEvents([]core.ChainEvent{event1, event2}) + // Need to have listeners on the channels or the subscription will be closed and the processing halted + go func() { + select { + case <-payloadChan: + case <-quitChan: + } + }() service.Loop(eventsChannel) if !bytes.Equal(builder.BlockHash.Bytes(), testBlock1.Hash().Bytes()) { diff --git a/statediff/testhelpers/mocks/blockchain.go b/statediff/testhelpers/mocks/blockchain.go index f2c097d38..508435236 100644 --- a/statediff/testhelpers/mocks/blockchain.go +++ b/statediff/testhelpers/mocks/blockchain.go @@ -30,16 +30,20 @@ import ( // BlockChain is a mock blockchain for testing type BlockChain struct { ParentHashesLookedUp []common.Hash - parentBlocksToReturn []*types.Block + parentBlocksToReturn map[common.Hash]*types.Block callCount int ChainEvents []core.ChainEvent + Receipts map[common.Hash]types.Receipts } // AddToStateDiffProcessedCollection mock method func (blockChain *BlockChain) AddToStateDiffProcessedCollection(hash common.Hash) {} // SetParentBlocksToReturn mock method -func (blockChain *BlockChain) SetParentBlocksToReturn(blocks []*types.Block) { +func (blockChain *BlockChain) SetParentBlocksToReturn(blocks map[common.Hash]*types.Block) { + if blockChain.parentBlocksToReturn == nil { + blockChain.parentBlocksToReturn = make(map[common.Hash]*types.Block) + } blockChain.parentBlocksToReturn = blocks } @@ -49,10 +53,9 @@ func (blockChain *BlockChain) GetBlockByHash(hash common.Hash) *types.Block { var parentBlock *types.Block if len(blockChain.parentBlocksToReturn) > 0 { - parentBlock = blockChain.parentBlocksToReturn[blockChain.callCount] + parentBlock = blockChain.parentBlocksToReturn[hash] } - blockChain.callCount++ return parentBlock } @@ -84,3 +87,16 @@ func (blockChain *BlockChain) SubscribeChainEvent(ch chan<- core.ChainEvent) eve return subscription } + +// SetReceiptsForHash mock method +func (blockChain *BlockChain) SetReceiptsForHash(hash common.Hash, receipts types.Receipts) { + if blockChain.Receipts == nil { + blockChain.Receipts = make(map[common.Hash]types.Receipts) + } + blockChain.Receipts[hash] = receipts +} + +// GetReceiptsByHash mock method +func (blockChain *BlockChain) GetReceiptsByHash(hash common.Hash) types.Receipts { + return blockChain.Receipts[hash] +} diff --git a/statediff/types.go b/statediff/types.go index 6df398a10..2db58d52b 100644 --- a/statediff/types.go +++ b/statediff/types.go @@ -36,15 +36,16 @@ type Subscription struct { // Payload packages the data to send to StateDiffingService subscriptions type Payload struct { - BlockRlp []byte `json:"blockRlp" gencodec:"required"` + BlockRlp []byte `json:"blockRlp"` + ReceiptsRlp []byte `json:"receiptsRlp"` StateDiffRlp []byte `json:"stateDiff" gencodec:"required"` Err error `json:"error"` } // StateDiff is the final output structure from the builder type StateDiff struct { - BlockNumber *big.Int `json:"blockNumber" gencodec:"required"` - BlockHash common.Hash `json:"blockHash" gencodec:"required"` + BlockNumber *big.Int `json:"blockNumber" gencodec:"required"` + BlockHash common.Hash `json:"blockHash" gencodec:"required"` CreatedAccounts []AccountDiff `json:"createdAccounts" gencodec:"required"` DeletedAccounts []AccountDiff `json:"deletedAccounts" gencodec:"required"` UpdatedAccounts []AccountDiff `json:"updatedAccounts" gencodec:"required"`