relay receipts with the rest of the data + review fixes/changes
This commit is contained in:
parent
f3e4e85cd7
commit
b3f10c9e69
@ -1391,7 +1391,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
|
||||
break
|
||||
}
|
||||
if bc.cacheConfig.ProcessingStateDiffs {
|
||||
if !bc.allowedRootToBeDereferenced(root.(common.Hash)) {
|
||||
if !bc.rootAllowedToBeDereferenced(root.(common.Hash)) {
|
||||
bc.triegc.Push(root, number)
|
||||
break
|
||||
} else {
|
||||
@ -1464,7 +1464,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
|
||||
// since we need the state tries of the current block and its parent in-memory
|
||||
// in order to process statediffs, we should avoid dereferencing roots until
|
||||
// its statediff and its child have been processed
|
||||
func (bc *BlockChain) allowedRootToBeDereferenced(root common.Hash) bool {
|
||||
func (bc *BlockChain) rootAllowedToBeDereferenced(root common.Hash) bool {
|
||||
diffProcessedForSelfAndChildCount := 2
|
||||
count := bc.stateDiffsProcessed[root]
|
||||
return count >= diffProcessedForSelfAndChildCount
|
||||
|
@ -29,21 +29,21 @@ const APIName = "statediff"
|
||||
// APIVersion is the version of the state diffing service API
|
||||
const APIVersion = "0.0.1"
|
||||
|
||||
// PublicStateDiffAPI provides the a websocket service
|
||||
// PublicStateDiffAPI provides an RPC subscription interface
|
||||
// that can be used to stream out state diffs as they
|
||||
// are produced by a full node
|
||||
type PublicStateDiffAPI struct {
|
||||
sds IService
|
||||
}
|
||||
|
||||
// NewPublicStateDiffAPI create a new state diff websocket streaming service.
|
||||
// NewPublicStateDiffAPI creates an rpc subscription interface for the underlying statediff service
|
||||
func NewPublicStateDiffAPI(sds IService) *PublicStateDiffAPI {
|
||||
return &PublicStateDiffAPI{
|
||||
sds: sds,
|
||||
}
|
||||
}
|
||||
|
||||
// Stream is the public method to setup a subscription that fires off state-diff payloads as they are created
|
||||
// Stream is the public method to setup a subscription that fires off statediff service payloads as they are created
|
||||
func (api *PublicStateDiffAPI) Stream(ctx context.Context) (*rpc.Subscription, error) {
|
||||
// ensure that the RPC connection supports subscriptions
|
||||
notifier, supported := rpc.NotifierFromContext(ctx)
|
||||
@ -51,19 +51,19 @@ func (api *PublicStateDiffAPI) Stream(ctx context.Context) (*rpc.Subscription, e
|
||||
return nil, rpc.ErrNotificationsUnsupported
|
||||
}
|
||||
|
||||
// create subscription and start waiting for statediff events
|
||||
// create subscription and start waiting for events
|
||||
rpcSub := notifier.CreateSubscription()
|
||||
|
||||
go func() {
|
||||
// subscribe to events from the state diff service
|
||||
// subscribe to events from the statediff service
|
||||
payloadChannel := make(chan Payload, chainEventChanSize)
|
||||
quitChan := make(chan bool, 1)
|
||||
api.sds.Subscribe(rpcSub.ID, payloadChannel, quitChan)
|
||||
// loop and await state diff payloads and relay them to the subscriber with the notifier
|
||||
// loop and await payloads and relay them to the subscriber with the notifier
|
||||
for {
|
||||
select {
|
||||
case packet := <-payloadChannel:
|
||||
if notifyErr := notifier.Notify(rpcSub.ID, packet); notifyErr != nil {
|
||||
case payload := <-payloadChannel:
|
||||
if notifyErr := notifier.Notify(rpcSub.ID, payload); notifyErr != nil {
|
||||
log.Error("Failed to send state diff packet; error: " + notifyErr.Error())
|
||||
unSubErr := api.sds.Unsubscribe(rpcSub.ID)
|
||||
if unSubErr != nil {
|
||||
@ -81,7 +81,7 @@ func (api *PublicStateDiffAPI) Stream(ctx context.Context) (*rpc.Subscription, e
|
||||
return
|
||||
}
|
||||
case <-quitChan:
|
||||
// don't need to unsubscribe, statediff service does so before sending the quit signal
|
||||
// don't need to unsubscribe, service does so before sending the quit signal
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -25,7 +25,6 @@ import (
|
||||
"math/big"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
"github.com/ethereum/go-ethereum/core"
|
||||
"github.com/ethereum/go-ethereum/core/state"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
@ -49,7 +48,7 @@ type builder struct {
|
||||
stateCache state.Database
|
||||
}
|
||||
|
||||
// NewBuilder is used to create a state diff builder
|
||||
// NewBuilder is used to create a statediff builder
|
||||
func NewBuilder(db ethdb.Database, blockChain *core.BlockChain, config Config) Builder {
|
||||
return &builder{
|
||||
chainDB: db,
|
||||
@ -58,7 +57,7 @@ func NewBuilder(db ethdb.Database, blockChain *core.BlockChain, config Config) B
|
||||
}
|
||||
}
|
||||
|
||||
// BuildStateDiff builds a StateDiff object from two blocks
|
||||
// BuildStateDiff builds a statediff object from two blocks
|
||||
func (sdb *builder) BuildStateDiff(oldStateRoot, newStateRoot common.Hash, blockNumber *big.Int, blockHash common.Hash) (StateDiff, error) {
|
||||
// Generate tries for old and new states
|
||||
sdb.stateCache = sdb.blockChain.StateCache()
|
||||
@ -115,8 +114,9 @@ func (sdb *builder) BuildStateDiff(oldStateRoot, newStateRoot common.Hash, block
|
||||
}, nil
|
||||
}
|
||||
|
||||
// isWatchedAddress is used to check if a state account corresponds to one of the addresses the builder is configured to watch
|
||||
func (sdb *builder) isWatchedAddress(hashKey []byte) bool {
|
||||
// If we aren't watching any addresses, we are watching everything
|
||||
// If we aren't watching any specific addresses, we are watching everything
|
||||
if len(sdb.config.WatchedAddresses) == 0 {
|
||||
return true
|
||||
}
|
||||
@ -318,15 +318,3 @@ func (sdb *builder) buildStorageDiffsFromTrie(it trie.NodeIterator) ([]StorageDi
|
||||
|
||||
return storageDiffs, nil
|
||||
}
|
||||
|
||||
func (sdb *builder) addressByPath(path []byte) (*common.Address, error) {
|
||||
log.Debug("Looking up address from path", "path", hexutil.Encode(append([]byte("secure-key-"), path...)))
|
||||
addrBytes, err := sdb.chainDB.Get(append([]byte("secure-key-"), hexToKeyBytes(path)...))
|
||||
if err != nil {
|
||||
log.Error("Error looking up address via path", "path", hexutil.Encode(append([]byte("secure-key-"), path...)), "error", err)
|
||||
return nil, err
|
||||
}
|
||||
addr := common.BytesToAddress(addrBytes)
|
||||
log.Debug("Address found", "Address", addr)
|
||||
return &addr, nil
|
||||
}
|
||||
|
@ -15,11 +15,11 @@
|
||||
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
/*
|
||||
This work is adapted from work by Charles Crain at https://github.com/jpmorganchase/quorum/blob/9b7fd9af8082795eeeb6863d9746f12b82dd5078/statediff/statediff.go
|
||||
|
||||
Package statediff provides an auxiliary service that processes state diff objects from incoming chain events,
|
||||
relaying the objects to any rpc subscriptions.
|
||||
|
||||
This work is adapted from work by Charles Crain at https://github.com/jpmorganchase/quorum/blob/9b7fd9af8082795eeeb6863d9746f12b82dd5078/statediff/statediff.go
|
||||
|
||||
The service is spun up using the below CLI flags
|
||||
--statediff: boolean flag, turns on the service
|
||||
--statediff.streamblock: boolean flag, configures the service to associate and stream out the rest of the block data with the state diffs.
|
||||
@ -27,7 +27,16 @@ The service is spun up using the below CLI flags
|
||||
--statediff.pathsandproofs: boolean flag, tells service to generate paths and proofs for the diffed storage and state trie leaf nodes.
|
||||
--statediff.watchedaddresses: string slice flag, used to limit the state diffing process to the given addresses. Usage: --statediff.watchedaddresses=addr1 --statediff.watchedaddresses=addr2 --statediff.watchedaddresses=addr3
|
||||
|
||||
If you wish to use the websocket endpoint to subscribe to the statediff service, be sure to open up the Websocket RPC server with the `--ws` flag.
|
||||
If you wish to use the websocket endpoint to subscribe to the statediff service, be sure to open up the Websocket RPC server with the `--ws` flag. The IPC-RPC server is turned on by default.
|
||||
|
||||
The statediffing services works only with `--syncmode="full", but -importantly- does not require garbage collection to be turned off (does not require an archival node).
|
||||
|
||||
e.g.
|
||||
|
||||
$ ./geth --statediff --statediff.streamblock --ws --syncmode "full"
|
||||
|
||||
This starts up the geth node in full sync mode, starts up the statediffing service, and opens up the websocket endpoint to subscribe to the service.
|
||||
Because the "streamblock" flag has been turned on, the service will strean out block data (headers, transactions, and receipts) along with the diffed state and storage leafs.
|
||||
|
||||
Rpc subscriptions to the service can be created using the rpc.Client.Subscribe() method,
|
||||
with the "statediff" namespace, a statediff.Payload channel, and the name of the statediff api's rpc method- "stream".
|
||||
@ -41,7 +50,7 @@ for {
|
||||
select {
|
||||
case stateDiffPayload := <- stateDiffPayloadChan:
|
||||
processPayload(stateDiffPayload)
|
||||
case err := <= rpcSub.Err():
|
||||
case err := <- rpcSub.Err():
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
|
@ -37,7 +37,7 @@ func sortKeys(data AccountsMap) []string {
|
||||
return keys
|
||||
}
|
||||
|
||||
// BytesToNiblePath
|
||||
// bytesToNiblePath converts the byte representation of a path to its string representation
|
||||
func bytesToNiblePath(path []byte) string {
|
||||
if hasTerm(path) {
|
||||
path = path[:len(path)-1]
|
||||
@ -53,6 +53,8 @@ func bytesToNiblePath(path []byte) string {
|
||||
return nibblePath
|
||||
}
|
||||
|
||||
// findIntersection finds the set of strings from both arrays that are equivalent (same key as same index)
|
||||
// this is used to find which keys have been both "deleted" and "created" i.e. they were updated
|
||||
func findIntersection(a, b []string) []string {
|
||||
lenA := len(a)
|
||||
lenB := len(b)
|
||||
@ -63,13 +65,13 @@ func findIntersection(a, b []string) []string {
|
||||
}
|
||||
for {
|
||||
switch strings.Compare(a[iOfA], b[iOfB]) {
|
||||
// a[iOfA] < b[iOfB]
|
||||
// -1 when a[iOfA] < b[iOfB]
|
||||
case -1:
|
||||
iOfA++
|
||||
if iOfA >= lenA {
|
||||
return updates
|
||||
}
|
||||
// a[iOfA] == b[iOfB]
|
||||
// 0 when a[iOfA] == b[iOfB]
|
||||
case 0:
|
||||
updates = append(updates, a[iOfA])
|
||||
iOfA++
|
||||
@ -77,7 +79,7 @@ func findIntersection(a, b []string) []string {
|
||||
if iOfA >= lenA || iOfB >= lenB {
|
||||
return updates
|
||||
}
|
||||
// a[iOfA] > b[iOfB]
|
||||
// 1 when a[iOfA] > b[iOfB]
|
||||
case 1:
|
||||
iOfB++
|
||||
if iOfB >= lenB {
|
||||
@ -88,30 +90,11 @@ func findIntersection(a, b []string) []string {
|
||||
|
||||
}
|
||||
|
||||
// pathToStr converts the NodeIterator path to a string representation
|
||||
func pathToStr(it trie.NodeIterator) string {
|
||||
return bytesToNiblePath(it.Path())
|
||||
}
|
||||
|
||||
// Duplicated from trie/encoding.go
|
||||
func hexToKeyBytes(hex []byte) []byte {
|
||||
if hasTerm(hex) {
|
||||
hex = hex[:len(hex)-1]
|
||||
}
|
||||
if len(hex)&1 != 0 {
|
||||
panic("can't convert hex key of odd length")
|
||||
}
|
||||
key := make([]byte, (len(hex)+1)/2)
|
||||
decodeNibbles(hex, key)
|
||||
|
||||
return key
|
||||
}
|
||||
|
||||
func decodeNibbles(nibbles []byte, bytes []byte) {
|
||||
for bi, ni := 0, 0; ni < len(nibbles); bi, ni = bi+1, ni+2 {
|
||||
bytes[bi] = nibbles[ni]<<4 | nibbles[ni+1]
|
||||
}
|
||||
}
|
||||
|
||||
// hasTerm returns whether a hex key has the terminator flag.
|
||||
func hasTerm(s []byte) bool {
|
||||
return len(s) > 0 && s[len(s)-1] == 16
|
||||
|
@ -40,6 +40,7 @@ type blockChain interface {
|
||||
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
|
||||
GetBlockByHash(hash common.Hash) *types.Block
|
||||
AddToStateDiffProcessedCollection(hash common.Hash)
|
||||
GetReceiptsByHash(hash common.Hash) types.Receipts
|
||||
}
|
||||
|
||||
// IService is the state-diffing service interface
|
||||
@ -69,12 +70,12 @@ type Service struct {
|
||||
// Cache the last block so that we can avoid having to lookup the next block's parent
|
||||
lastBlock *types.Block
|
||||
// Whether or not the block data is streamed alongside the state diff data in the subscription payload
|
||||
streamBlock bool
|
||||
StreamBlock bool
|
||||
// Whether or not we have any subscribers; only if we do, do we processes state diffs
|
||||
subscribers int32
|
||||
}
|
||||
|
||||
// NewStateDiffService creates a new StateDiffingService
|
||||
// NewStateDiffService creates a new statediff.Service
|
||||
func NewStateDiffService(db ethdb.Database, blockChain *core.BlockChain, config Config) (*Service, error) {
|
||||
return &Service{
|
||||
Mutex: sync.Mutex{},
|
||||
@ -82,7 +83,7 @@ func NewStateDiffService(db ethdb.Database, blockChain *core.BlockChain, config
|
||||
Builder: NewBuilder(db, blockChain, config),
|
||||
QuitChan: make(chan bool),
|
||||
Subscriptions: make(map[rpc.ID]Subscription),
|
||||
streamBlock: config.StreamBlock,
|
||||
StreamBlock: config.StreamBlock,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -91,7 +92,7 @@ func (sds *Service) Protocols() []p2p.Protocol {
|
||||
return []p2p.Protocol{}
|
||||
}
|
||||
|
||||
// APIs returns the RPC descriptors the StateDiffingService offers
|
||||
// APIs returns the RPC descriptors the statediff.Service offers
|
||||
func (sds *Service) APIs() []rpc.API {
|
||||
return []rpc.API{
|
||||
{
|
||||
@ -108,7 +109,6 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) {
|
||||
chainEventSub := sds.BlockChain.SubscribeChainEvent(chainEventCh)
|
||||
defer chainEventSub.Unsubscribe()
|
||||
errCh := chainEventSub.Err()
|
||||
|
||||
for {
|
||||
select {
|
||||
//Notify chain event channel of events
|
||||
@ -147,7 +147,7 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) {
|
||||
}
|
||||
}
|
||||
|
||||
// processStateDiff method builds the state diff payload from the current and parent block and sends it to listening subscriptions
|
||||
// processStateDiff method builds the state diff payload from the current and parent block before sending it to listening subscriptions
|
||||
func (sds *Service) processStateDiff(currentBlock, parentBlock *types.Block) error {
|
||||
stateDiff, err := sds.Builder.BuildStateDiff(parentBlock.Root(), currentBlock.Root(), currentBlock.Number(), currentBlock.Hash())
|
||||
if err != nil {
|
||||
@ -159,22 +159,26 @@ func (sds *Service) processStateDiff(currentBlock, parentBlock *types.Block) err
|
||||
}
|
||||
payload := Payload{
|
||||
StateDiffRlp: stateDiffRlp,
|
||||
Err: err,
|
||||
}
|
||||
if sds.streamBlock {
|
||||
rlpBuff := new(bytes.Buffer)
|
||||
if err = currentBlock.EncodeRLP(rlpBuff); err != nil {
|
||||
if sds.StreamBlock {
|
||||
blockBuff := new(bytes.Buffer)
|
||||
if err = currentBlock.EncodeRLP(blockBuff); err != nil {
|
||||
return err
|
||||
}
|
||||
payload.BlockRlp = rlpBuff.Bytes()
|
||||
payload.BlockRlp = blockBuff.Bytes()
|
||||
receiptBuff := new(bytes.Buffer)
|
||||
receipts := sds.BlockChain.GetReceiptsByHash(currentBlock.Hash())
|
||||
if err = rlp.Encode(receiptBuff, receipts); err != nil {
|
||||
return err
|
||||
}
|
||||
payload.ReceiptsRlp = receiptBuff.Bytes()
|
||||
}
|
||||
|
||||
// If we have any websocket subscriptions listening in, send the data to them
|
||||
sds.send(payload)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Subscribe is used by the API to subscribe to the StateDiffingService loop
|
||||
// Subscribe is used by the API to subscribe to the service loop
|
||||
func (sds *Service) Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- bool) {
|
||||
log.Info("Subscribing to the statediff service")
|
||||
if atomic.CompareAndSwapInt32(&sds.subscribers, 0, 1) {
|
||||
@ -188,7 +192,7 @@ func (sds *Service) Subscribe(id rpc.ID, sub chan<- Payload, quitChan chan<- boo
|
||||
sds.Unlock()
|
||||
}
|
||||
|
||||
// Unsubscribe is used to unsubscribe to the StateDiffingService loop
|
||||
// Unsubscribe is used to unsubscribe from the service loop
|
||||
func (sds *Service) Unsubscribe(id rpc.ID) error {
|
||||
log.Info("Unsubscribing from the statediff service")
|
||||
sds.Lock()
|
||||
@ -206,7 +210,7 @@ func (sds *Service) Unsubscribe(id rpc.ID) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Start is used to begin the StateDiffingService
|
||||
// Start is used to begin the service
|
||||
func (sds *Service) Start(*p2p.Server) error {
|
||||
log.Info("Starting statediff service")
|
||||
|
||||
@ -216,14 +220,14 @@ func (sds *Service) Start(*p2p.Server) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop is used to close down the StateDiffingService
|
||||
// Stop is used to close down the service
|
||||
func (sds *Service) Stop() error {
|
||||
log.Info("Stopping statediff service")
|
||||
close(sds.QuitChan)
|
||||
return nil
|
||||
}
|
||||
|
||||
// send is used to fan out and serve the statediff payload to all subscriptions
|
||||
// send is used to fan out and serve the payloads to all subscriptions
|
||||
func (sds *Service) send(payload Payload) {
|
||||
sds.Lock()
|
||||
for id, sub := range sds.Subscriptions {
|
||||
|
@ -21,11 +21,13 @@ import (
|
||||
"math/big"
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
"github.com/ethereum/go-ethereum/statediff"
|
||||
"github.com/ethereum/go-ethereum/statediff/testhelpers/mocks"
|
||||
@ -33,7 +35,7 @@ import (
|
||||
|
||||
func TestServiceLoop(t *testing.T) {
|
||||
testErrorInChainEventLoop(t)
|
||||
//testErrorInBlockLoop(t)
|
||||
testErrorInBlockLoop(t)
|
||||
}
|
||||
|
||||
var (
|
||||
@ -61,6 +63,12 @@ var (
|
||||
testBlock2 = types.NewBlock(&header2, nil, nil, nil)
|
||||
testBlock3 = types.NewBlock(&header3, nil, nil, nil)
|
||||
|
||||
receiptRoot1 = common.HexToHash("0x05")
|
||||
receiptRoot2 = common.HexToHash("0x06")
|
||||
receiptRoot3 = common.HexToHash("0x07")
|
||||
testReceipts1 = []*types.Receipt{types.NewReceipt(receiptRoot1.Bytes(), false, 1000), types.NewReceipt(receiptRoot2.Bytes(), false, 2000)}
|
||||
testReceipts2 = []*types.Receipt{types.NewReceipt(receiptRoot3.Bytes(), false, 3000)}
|
||||
|
||||
event1 = core.ChainEvent{Block: testBlock1}
|
||||
event2 = core.ChainEvent{Block: testBlock2}
|
||||
event3 = core.ChainEvent{Block: testBlock3}
|
||||
@ -71,43 +79,79 @@ func testErrorInChainEventLoop(t *testing.T) {
|
||||
builder := mocks.Builder{}
|
||||
blockChain := mocks.BlockChain{}
|
||||
service := statediff.Service{
|
||||
Mutex: sync.Mutex{},
|
||||
Builder: &builder,
|
||||
BlockChain: &blockChain,
|
||||
QuitChan: make(chan bool),
|
||||
Subscriptions: make(map[rpc.ID]statediff.Subscription),
|
||||
StreamBlock: true,
|
||||
}
|
||||
payloadChan := make(chan statediff.Payload)
|
||||
payloadChan := make(chan statediff.Payload, 2)
|
||||
quitChan := make(chan bool)
|
||||
service.Subscribe(rpc.NewID(), payloadChan, quitChan)
|
||||
testRoot2 = common.HexToHash("0xTestRoot2")
|
||||
blockChain.SetParentBlocksToReturn([]*types.Block{parentBlock1, parentBlock2})
|
||||
blockMapping := make(map[common.Hash]*types.Block)
|
||||
blockMapping[parentBlock1.Hash()] = parentBlock1
|
||||
blockMapping[parentBlock2.Hash()] = parentBlock2
|
||||
blockChain.SetParentBlocksToReturn(blockMapping)
|
||||
blockChain.SetChainEvents([]core.ChainEvent{event1, event2, event3})
|
||||
// Need to have listeners on the channels or the subscription will be closed and the processing halted
|
||||
blockChain.SetReceiptsForHash(testBlock1.Hash(), testReceipts1)
|
||||
blockChain.SetReceiptsForHash(testBlock2.Hash(), testReceipts2)
|
||||
|
||||
payloads := make([]statediff.Payload, 0, 2)
|
||||
wg := sync.WaitGroup{}
|
||||
go func() {
|
||||
select {
|
||||
case <-payloadChan:
|
||||
case <-quitChan:
|
||||
wg.Add(1)
|
||||
for i := 0; i < 2; i++ {
|
||||
select {
|
||||
case payload := <-payloadChan:
|
||||
payloads = append(payloads, payload)
|
||||
case <-quitChan:
|
||||
}
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
service.Loop(eventsChannel)
|
||||
wg.Wait()
|
||||
if len(payloads) != 2 {
|
||||
t.Error("Test failure:", t.Name())
|
||||
t.Logf("Actual number of payloads does not equal expected.\nactual: %+v\nexpected: 3", len(payloads))
|
||||
}
|
||||
|
||||
testReceipts1Rlp, err := rlp.EncodeToBytes(testReceipts1)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
testReceipts2Rlp, err := rlp.EncodeToBytes(testReceipts2)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
expectedReceiptsRlp := [][]byte{testReceipts1Rlp, testReceipts2Rlp, nil}
|
||||
for i, payload := range payloads {
|
||||
if !bytes.Equal(payload.ReceiptsRlp, expectedReceiptsRlp[i]) {
|
||||
t.Error("Test failure:", t.Name())
|
||||
t.Logf("Actual receipt rlp for payload %d does not equal expected.\nactual: %+v\nexpected: %+v", i, payload.ReceiptsRlp, expectedReceiptsRlp[i])
|
||||
}
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(builder.BlockHash, testBlock2.Hash()) {
|
||||
t.Error("Test failure:", t.Name())
|
||||
t.Logf("Actual does not equal expected.\nactual:%+v\nexpected: %+v", builder.BlockHash, testBlock2.Hash())
|
||||
t.Logf("Actual blockhash does not equal expected.\nactual:%+v\nexpected: %+v", builder.BlockHash, testBlock2.Hash())
|
||||
}
|
||||
if !bytes.Equal(builder.OldStateRoot.Bytes(), parentBlock2.Root().Bytes()) {
|
||||
t.Error("Test failure:", t.Name())
|
||||
t.Logf("Actual does not equal expected.\nactual:%+v\nexpected: %+v", builder.OldStateRoot, parentBlock2.Root())
|
||||
t.Logf("Actual root does not equal expected.\nactual:%+v\nexpected: %+v", builder.OldStateRoot, parentBlock2.Root())
|
||||
}
|
||||
if !bytes.Equal(builder.NewStateRoot.Bytes(), testBlock2.Root().Bytes()) {
|
||||
t.Error("Test failure:", t.Name())
|
||||
t.Logf("Actual does not equal expected.\nactual:%+v\nexpected: %+v", builder.NewStateRoot, testBlock2.Root())
|
||||
t.Logf("Actual root does not equal expected.\nactual:%+v\nexpected: %+v", builder.NewStateRoot, testBlock2.Root())
|
||||
}
|
||||
//look up the parent block from its hash
|
||||
expectedHashes := []common.Hash{testBlock1.ParentHash(), testBlock2.ParentHash()}
|
||||
if !reflect.DeepEqual(blockChain.ParentHashesLookedUp, expectedHashes) {
|
||||
t.Error("Test failure:", t.Name())
|
||||
t.Logf("Actual does not equal expected.\nactual:%+v\nexpected: %+v", blockChain.ParentHashesLookedUp, expectedHashes)
|
||||
t.Logf("Actual parent hash does not equal expected.\nactual:%+v\nexpected: %+v", blockChain.ParentHashesLookedUp, expectedHashes)
|
||||
}
|
||||
}
|
||||
|
||||
@ -121,9 +165,20 @@ func testErrorInBlockLoop(t *testing.T) {
|
||||
QuitChan: make(chan bool),
|
||||
Subscriptions: make(map[rpc.ID]statediff.Subscription),
|
||||
}
|
||||
|
||||
blockChain.SetParentBlocksToReturn([]*types.Block{parentBlock1, nil})
|
||||
payloadChan := make(chan statediff.Payload)
|
||||
quitChan := make(chan bool)
|
||||
service.Subscribe(rpc.NewID(), payloadChan, quitChan)
|
||||
blockMapping := make(map[common.Hash]*types.Block)
|
||||
blockMapping[parentBlock1.Hash()] = parentBlock1
|
||||
blockChain.SetParentBlocksToReturn(blockMapping)
|
||||
blockChain.SetChainEvents([]core.ChainEvent{event1, event2})
|
||||
// Need to have listeners on the channels or the subscription will be closed and the processing halted
|
||||
go func() {
|
||||
select {
|
||||
case <-payloadChan:
|
||||
case <-quitChan:
|
||||
}
|
||||
}()
|
||||
service.Loop(eventsChannel)
|
||||
|
||||
if !bytes.Equal(builder.BlockHash.Bytes(), testBlock1.Hash().Bytes()) {
|
||||
|
@ -102,7 +102,6 @@ func (sds *MockStateDiffService) process(currentBlock, parentBlock *types.Block)
|
||||
}
|
||||
payload := statediff.Payload{
|
||||
StateDiffRlp: stateDiffRlp,
|
||||
Err: err,
|
||||
}
|
||||
if sds.streamBlock {
|
||||
rlpBuff := new(bytes.Buffer)
|
||||
@ -119,7 +118,7 @@ func (sds *MockStateDiffService) process(currentBlock, parentBlock *types.Block)
|
||||
|
||||
// Subscribe mock method
|
||||
func (sds *MockStateDiffService) Subscribe(id rpc.ID, sub chan<- statediff.Payload, quitChan chan<- bool) {
|
||||
log.Info("Subscribing to the statediff service")
|
||||
log.Info("Subscribing to the mock statediff service")
|
||||
sds.Lock()
|
||||
sds.Subscriptions[id] = statediff.Subscription{
|
||||
PayloadChan: sub,
|
||||
@ -130,7 +129,7 @@ func (sds *MockStateDiffService) Subscribe(id rpc.ID, sub chan<- statediff.Paylo
|
||||
|
||||
// Unsubscribe mock method
|
||||
func (sds *MockStateDiffService) Unsubscribe(id rpc.ID) error {
|
||||
log.Info("Unsubscribing from the statediff service")
|
||||
log.Info("Unsubscribing from the mock statediff service")
|
||||
sds.Lock()
|
||||
_, ok := sds.Subscriptions[id]
|
||||
if !ok {
|
||||
@ -170,9 +169,9 @@ func (sds *MockStateDiffService) close() {
|
||||
|
||||
// Start mock method
|
||||
func (sds *MockStateDiffService) Start(server *p2p.Server) error {
|
||||
log.Info("Starting statediff service")
|
||||
log.Info("Starting mock statediff service")
|
||||
if sds.ParentBlockChan == nil || sds.BlockChan == nil {
|
||||
return errors.New("mock StateDiffingService requires preconfiguration with a MockParentBlockChan and MockBlockChan")
|
||||
return errors.New("MockStateDiffingService needs to be configured with a MockParentBlockChan and MockBlockChan")
|
||||
}
|
||||
chainEventCh := make(chan core.ChainEvent, 10)
|
||||
go sds.Loop(chainEventCh)
|
||||
@ -182,7 +181,7 @@ func (sds *MockStateDiffService) Start(server *p2p.Server) error {
|
||||
|
||||
// Stop mock method
|
||||
func (sds *MockStateDiffService) Stop() error {
|
||||
log.Info("Stopping statediff service")
|
||||
log.Info("Stopping mock statediff service")
|
||||
close(sds.QuitChan)
|
||||
return nil
|
||||
}
|
||||
|
@ -136,9 +136,6 @@ func TestAPI(t *testing.T) {
|
||||
if !bytes.Equal(payload.StateDiffRlp, expectedStateDiffBytes) {
|
||||
t.Errorf("payload does not have expected state diff\r\actual state diff rlp: %v\r\nexpected state diff rlp: %v", payload.StateDiffRlp, expectedStateDiffBytes)
|
||||
}
|
||||
if payload.Err != nil {
|
||||
t.Errorf("payload should not contain an error, but does: %v", payload.Err)
|
||||
}
|
||||
case <-quitChan:
|
||||
t.Errorf("channel quit before delivering payload")
|
||||
}
|
||||
|
@ -30,16 +30,20 @@ import (
|
||||
// BlockChain is a mock blockchain for testing
|
||||
type BlockChain struct {
|
||||
ParentHashesLookedUp []common.Hash
|
||||
parentBlocksToReturn []*types.Block
|
||||
parentBlocksToReturn map[common.Hash]*types.Block
|
||||
callCount int
|
||||
ChainEvents []core.ChainEvent
|
||||
Receipts map[common.Hash]types.Receipts
|
||||
}
|
||||
|
||||
// AddToStateDiffProcessedCollection mock method
|
||||
func (blockChain *BlockChain) AddToStateDiffProcessedCollection(hash common.Hash) {}
|
||||
|
||||
// SetParentBlocksToReturn mock method
|
||||
func (blockChain *BlockChain) SetParentBlocksToReturn(blocks []*types.Block) {
|
||||
func (blockChain *BlockChain) SetParentBlocksToReturn(blocks map[common.Hash]*types.Block) {
|
||||
if blockChain.parentBlocksToReturn == nil {
|
||||
blockChain.parentBlocksToReturn = make(map[common.Hash]*types.Block)
|
||||
}
|
||||
blockChain.parentBlocksToReturn = blocks
|
||||
}
|
||||
|
||||
@ -49,10 +53,9 @@ func (blockChain *BlockChain) GetBlockByHash(hash common.Hash) *types.Block {
|
||||
|
||||
var parentBlock *types.Block
|
||||
if len(blockChain.parentBlocksToReturn) > 0 {
|
||||
parentBlock = blockChain.parentBlocksToReturn[blockChain.callCount]
|
||||
parentBlock = blockChain.parentBlocksToReturn[hash]
|
||||
}
|
||||
|
||||
blockChain.callCount++
|
||||
return parentBlock
|
||||
}
|
||||
|
||||
@ -84,3 +87,16 @@ func (blockChain *BlockChain) SubscribeChainEvent(ch chan<- core.ChainEvent) eve
|
||||
|
||||
return subscription
|
||||
}
|
||||
|
||||
// SetReceiptsForHash mock method
|
||||
func (blockChain *BlockChain) SetReceiptsForHash(hash common.Hash, receipts types.Receipts) {
|
||||
if blockChain.Receipts == nil {
|
||||
blockChain.Receipts = make(map[common.Hash]types.Receipts)
|
||||
}
|
||||
blockChain.Receipts[hash] = receipts
|
||||
}
|
||||
|
||||
// GetReceiptsByHash mock method
|
||||
func (blockChain *BlockChain) GetReceiptsByHash(hash common.Hash) types.Receipts {
|
||||
return blockChain.Receipts[hash]
|
||||
}
|
||||
|
@ -34,17 +34,38 @@ type Subscription struct {
|
||||
QuitChan chan<- bool
|
||||
}
|
||||
|
||||
// Payload packages the data to send to StateDiffingService subscriptions
|
||||
// Payload packages the data to send to statediff subscriptions
|
||||
type Payload struct {
|
||||
BlockRlp []byte `json:"blockRlp" gencodec:"required"`
|
||||
BlockRlp []byte `json:"blockRlp"`
|
||||
ReceiptsRlp []byte `json:"receiptsRlp"`
|
||||
StateDiffRlp []byte `json:"stateDiff" gencodec:"required"`
|
||||
Err error `json:"error"`
|
||||
|
||||
encoded []byte
|
||||
err error
|
||||
}
|
||||
|
||||
func (sd *Payload) ensureEncoded() {
|
||||
if sd.encoded == nil && sd.err == nil {
|
||||
sd.encoded, sd.err = json.Marshal(sd)
|
||||
}
|
||||
}
|
||||
|
||||
// Length to implement Encoder interface for Payload
|
||||
func (sd *Payload) Length() int {
|
||||
sd.ensureEncoded()
|
||||
return len(sd.encoded)
|
||||
}
|
||||
|
||||
// Encode to implement Encoder interface for Payload
|
||||
func (sd *Payload) Encode() ([]byte, error) {
|
||||
sd.ensureEncoded()
|
||||
return sd.encoded, sd.err
|
||||
}
|
||||
|
||||
// StateDiff is the final output structure from the builder
|
||||
type StateDiff struct {
|
||||
BlockNumber *big.Int `json:"blockNumber" gencodec:"required"`
|
||||
BlockHash common.Hash `json:"blockHash" gencodec:"required"`
|
||||
BlockNumber *big.Int `json:"blockNumber" gencodec:"required"`
|
||||
BlockHash common.Hash `json:"blockHash" gencodec:"required"`
|
||||
CreatedAccounts []AccountDiff `json:"createdAccounts" gencodec:"required"`
|
||||
DeletedAccounts []AccountDiff `json:"deletedAccounts" gencodec:"required"`
|
||||
UpdatedAccounts []AccountDiff `json:"updatedAccounts" gencodec:"required"`
|
||||
@ -53,24 +74,6 @@ type StateDiff struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (sd *StateDiff) ensureEncoded() {
|
||||
if sd.encoded == nil && sd.err == nil {
|
||||
sd.encoded, sd.err = json.Marshal(sd)
|
||||
}
|
||||
}
|
||||
|
||||
// Length to implement Encoder interface for StateDiff
|
||||
func (sd *StateDiff) Length() int {
|
||||
sd.ensureEncoded()
|
||||
return len(sd.encoded)
|
||||
}
|
||||
|
||||
// Encode to implement Encoder interface for StateDiff
|
||||
func (sd *StateDiff) Encode() ([]byte, error) {
|
||||
sd.ensureEncoded()
|
||||
return sd.encoded, sd.err
|
||||
}
|
||||
|
||||
// AccountDiff holds the data for a single state diff node
|
||||
type AccountDiff struct {
|
||||
Leaf bool `json:"leaf" gencodec:"required"`
|
||||
|
Loading…
Reference in New Issue
Block a user