Cleanup statedb test #23
71
.travis.yml
71
.travis.yml
@ -166,77 +166,6 @@ jobs:
|
|||||||
- go run build/ci.go xgo --alltools -- --targets=linux/mips64le --ldflags '-extldflags "-static"' -v
|
- go run build/ci.go xgo --alltools -- --targets=linux/mips64le --ldflags '-extldflags "-static"' -v
|
||||||
- for bin in build/bin/*-linux-mips64le; do mv -f "${bin}" "${bin/-linux-mips64le/}"; done
|
- for bin in build/bin/*-linux-mips64le; do mv -f "${bin}" "${bin/-linux-mips64le/}"; done
|
||||||
- go run build/ci.go archive -arch mips64le -type tar -signer LINUX_SIGNING_KEY -upload gethstore/builds
|
- go run build/ci.go archive -arch mips64le -type tar -signer LINUX_SIGNING_KEY -upload gethstore/builds
|
||||||
|
|
||||||
# This builder does the Android Maven and Azure uploads
|
|
||||||
- stage: build
|
|
||||||
if: type = push
|
|
||||||
os: linux
|
|
||||||
dist: xenial
|
|
||||||
addons:
|
|
||||||
apt:
|
|
||||||
packages:
|
|
||||||
- oracle-java8-installer
|
|
||||||
- oracle-java8-set-default
|
|
||||||
language: android
|
|
||||||
android:
|
|
||||||
components:
|
|
||||||
- platform-tools
|
|
||||||
- tools
|
|
||||||
- android-15
|
|
||||||
- android-19
|
|
||||||
- android-24
|
|
||||||
env:
|
|
||||||
- azure-android
|
|
||||||
- maven-android
|
|
||||||
- GO111MODULE=on
|
|
||||||
git:
|
|
||||||
submodules: false # avoid cloning ethereum/tests
|
|
||||||
before_install:
|
|
||||||
- curl https://dl.google.com/go/go1.14.2.linux-amd64.tar.gz | tar -xz
|
|
||||||
- export PATH=`pwd`/go/bin:$PATH
|
|
||||||
- export GOROOT=`pwd`/go
|
|
||||||
- export GOPATH=$HOME/go
|
|
||||||
script:
|
|
||||||
# Build the Android archive and upload it to Maven Central and Azure
|
|
||||||
- curl https://dl.google.com/android/repository/android-ndk-r19b-linux-x86_64.zip -o android-ndk-r19b.zip
|
|
||||||
- unzip -q android-ndk-r19b.zip && rm android-ndk-r19b.zip
|
|
||||||
- mv android-ndk-r19b $ANDROID_HOME/ndk-bundle
|
|
||||||
|
|
||||||
- mkdir -p $GOPATH/src/github.com/ethereum
|
|
||||||
- ln -s `pwd` $GOPATH/src/github.com/ethereum/go-ethereum
|
|
||||||
- go run build/ci.go aar -signer ANDROID_SIGNING_KEY -deploy https://oss.sonatype.org -upload gethstore/builds
|
|
||||||
|
|
||||||
# This builder does the OSX Azure, iOS CocoaPods and iOS Azure uploads
|
|
||||||
- stage: build
|
|
||||||
if: type = push
|
|
||||||
os: osx
|
|
||||||
go: 1.14.x
|
|
||||||
env:
|
|
||||||
- azure-osx
|
|
||||||
- azure-ios
|
|
||||||
- cocoapods-ios
|
|
||||||
- GO111MODULE=on
|
|
||||||
git:
|
|
||||||
submodules: false # avoid cloning ethereum/tests
|
|
||||||
script:
|
|
||||||
- go run build/ci.go install
|
|
||||||
- go run build/ci.go archive -type tar -signer OSX_SIGNING_KEY -upload gethstore/builds
|
|
||||||
|
|
||||||
# Build the iOS framework and upload it to CocoaPods and Azure
|
|
||||||
- gem uninstall cocoapods -a -x
|
|
||||||
- gem install cocoapods
|
|
||||||
|
|
||||||
- mv ~/.cocoapods/repos/master ~/.cocoapods/repos/master.bak
|
|
||||||
- sed -i '.bak' 's/repo.join/!repo.join/g' $(dirname `gem which cocoapods`)/cocoapods/sources_manager.rb
|
|
||||||
- if [ "$TRAVIS_PULL_REQUEST" = "false" ]; then git clone --depth=1 https://github.com/CocoaPods/Specs.git ~/.cocoapods/repos/master && pod setup --verbose; fi
|
|
||||||
|
|
||||||
- xctool -version
|
|
||||||
- xcrun simctl list
|
|
||||||
|
|
||||||
# Workaround for https://github.com/golang/go/issues/23749
|
|
||||||
- export CGO_CFLAGS_ALLOW='-fmodules|-fblocks|-fobjc-arc'
|
|
||||||
- go run build/ci.go xcode -signer IOS_SIGNING_KEY -deploy trunk -upload gethstore/builds
|
|
||||||
|
|
||||||
# This builder does the Azure archive purges to avoid accumulating junk
|
# This builder does the Azure archive purges to avoid accumulating junk
|
||||||
- stage: build
|
- stage: build
|
||||||
if: type = cron
|
if: type = cron
|
||||||
|
@ -777,6 +777,10 @@ func (fb *filterBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event
|
|||||||
return nullSubscription()
|
return nullSubscription()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (fb *filterBackend) SubscribeStateChangeEvent(ch chan<- core.StateChangeEvent) event.Subscription {
|
||||||
|
return nullSubscription()
|
||||||
|
}
|
||||||
|
|
||||||
func (fb *filterBackend) BloomStatus() (uint64, uint64) { return 4096, 0 }
|
func (fb *filterBackend) BloomStatus() (uint64, uint64) { return 4096, 0 }
|
||||||
|
|
||||||
func (fb *filterBackend) ServiceFilter(ctx context.Context, ms *bloombits.MatcherSession) {
|
func (fb *filterBackend) ServiceFilter(ctx context.Context, ms *bloombits.MatcherSession) {
|
||||||
|
@ -686,7 +686,7 @@ func (api *RetestethAPI) AccountRange(ctx context.Context,
|
|||||||
root = statedb.IntermediateRoot(vmenv.ChainConfig().IsEIP158(block.Number()))
|
root = statedb.IntermediateRoot(vmenv.ChainConfig().IsEIP158(block.Number()))
|
||||||
if idx == int(txIndex) {
|
if idx == int(txIndex) {
|
||||||
// This is to make sure root can be opened by OpenTrie
|
// This is to make sure root can be opened by OpenTrie
|
||||||
root, err = statedb.Commit(api.chainConfig.IsEIP158(block.Number()))
|
root, _, err = statedb.Commit(api.chainConfig.IsEIP158(block.Number()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return AccountRangeResult{}, err
|
return AccountRangeResult{}, err
|
||||||
}
|
}
|
||||||
@ -796,7 +796,7 @@ func (api *RetestethAPI) StorageRangeAt(ctx context.Context,
|
|||||||
_ = statedb.IntermediateRoot(vmenv.ChainConfig().IsEIP158(block.Number()))
|
_ = statedb.IntermediateRoot(vmenv.ChainConfig().IsEIP158(block.Number()))
|
||||||
if idx == int(txIndex) {
|
if idx == int(txIndex) {
|
||||||
// This is to make sure root can be opened by OpenTrie
|
// This is to make sure root can be opened by OpenTrie
|
||||||
_, err = statedb.Commit(vmenv.ChainConfig().IsEIP158(block.Number()))
|
_, _, err = statedb.Commit(vmenv.ChainConfig().IsEIP158(block.Number()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return StorageRangeResult{}, err
|
return StorageRangeResult{}, err
|
||||||
}
|
}
|
||||||
|
@ -160,6 +160,7 @@ type BlockChain struct {
|
|||||||
chainFeed event.Feed
|
chainFeed event.Feed
|
||||||
chainSideFeed event.Feed
|
chainSideFeed event.Feed
|
||||||
chainHeadFeed event.Feed
|
chainHeadFeed event.Feed
|
||||||
|
stateChangeEventFeed event.Feed
|
||||||
logsFeed event.Feed
|
logsFeed event.Feed
|
||||||
blockProcFeed event.Feed
|
blockProcFeed event.Feed
|
||||||
scope event.SubscriptionScope
|
scope event.SubscriptionScope
|
||||||
@ -1434,10 +1435,14 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
|
|||||||
log.Crit("Failed to write block into disk", "err", err)
|
log.Crit("Failed to write block into disk", "err", err)
|
||||||
}
|
}
|
||||||
// Commit all cached state changes into underlying memory database.
|
// Commit all cached state changes into underlying memory database.
|
||||||
root, err := state.Commit(bc.chainConfig.IsEIP158(block.Number()))
|
root, stateChanges, commitErr := state.Commit(bc.chainConfig.IsEIP158(block.Number()))
|
||||||
if err != nil {
|
log.Debug("Sending StateChangeEvent to the feed", "block number", block.Number(), "count", len(stateChanges))
|
||||||
return NonStatTy, err
|
bc.stateChangeEventFeed.Send(StateChangeEvent{block, stateChanges})
|
||||||
|
|
||||||
|
if commitErr != nil {
|
||||||
|
return NonStatTy, commitErr
|
||||||
}
|
}
|
||||||
|
|
||||||
triedb := bc.stateCache.TrieDB()
|
triedb := bc.stateCache.TrieDB()
|
||||||
|
|
||||||
// If we're running an archive node, always flush
|
// If we're running an archive node, always flush
|
||||||
@ -2457,3 +2462,8 @@ func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript
|
|||||||
func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription {
|
func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription {
|
||||||
return bc.scope.Track(bc.blockProcFeed.Subscribe(ch))
|
return bc.scope.Track(bc.blockProcFeed.Subscribe(ch))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SubscribeStateChangeEvent registers a subscription StateChangeEvent.
|
||||||
|
func (bc *BlockChain) SubscribeStateChangeEvent(ch chan<- StateChangeEvent) event.Subscription {
|
||||||
|
return bc.scope.Track(bc.stateChangeEventFeed.Subscribe(ch))
|
||||||
|
}
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"math/big"
|
"math/big"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -3028,3 +3029,79 @@ func TestInitThenFailCreateContract(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSendingStateChangeEvents(t *testing.T) { testSendingStateChangeEvents(t, 3) }
|
||||||
|
func testSendingStateChangeEvents(t *testing.T, numberOfEventsToSend int) {
|
||||||
|
var (
|
||||||
|
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
|
||||||
|
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
|
||||||
|
key2, _ = crypto.HexToECDSA("8a1f9a8f95be41cd7ccb6168179afb4504aefe388d1e14474d32c45c72ce7b7a")
|
||||||
|
addr2 = crypto.PubkeyToAddress(key2.PublicKey)
|
||||||
|
db = rawdb.NewMemoryDatabase()
|
||||||
|
gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{addr1: {Balance: big.NewInt(10000000000000)}}}
|
||||||
|
genesis = gspec.MustCommit(db)
|
||||||
|
signer = types.NewEIP155Signer(gspec.Config.ChainID)
|
||||||
|
newEventCh = make(chan bool)
|
||||||
|
)
|
||||||
|
|
||||||
|
blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil)
|
||||||
|
defer blockchain.Stop()
|
||||||
|
|
||||||
|
stateChangeCh := make(chan StateChangeEvent)
|
||||||
|
blockchain.SubscribeStateChangeEvent(stateChangeCh)
|
||||||
|
|
||||||
|
// create numberOfEventsToSend blocks that include State Changes
|
||||||
|
chain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, numberOfEventsToSend, func(i int, block *BlockGen) {
|
||||||
|
tx, _ := types.SignTx(types.NewTransaction(block.TxNonce(addr1), addr2, big.NewInt(10000), params.TxGas, nil, nil), signer, key1)
|
||||||
|
block.AddTx(tx)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Spawn a goroutine to receive state change events
|
||||||
|
go validateEventsReceived(t, stateChangeCh, newEventCh, numberOfEventsToSend)
|
||||||
|
if _, err := blockchain.InsertChain(chain); err != nil {
|
||||||
|
t.Fatalf("failed to insert chain: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !<-newEventCh {
|
||||||
|
t.Fatal("failed to receive new event")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func validateEventsReceived(t *testing.T, sink interface{}, result chan bool, expect int) {
|
||||||
|
chanval := reflect.ValueOf(sink)
|
||||||
|
chantyp := chanval.Type()
|
||||||
|
if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.RecvDir == 0 {
|
||||||
|
t.Fatalf("invalid channel, given type %v", chantyp)
|
||||||
|
}
|
||||||
|
cnt := 0
|
||||||
|
var recv []reflect.Value
|
||||||
|
timeout := time.After(4 * time.Second)
|
||||||
|
cases := []reflect.SelectCase{{Chan: chanval, Dir: reflect.SelectRecv}, {Chan: reflect.ValueOf(timeout), Dir: reflect.SelectRecv}}
|
||||||
|
for {
|
||||||
|
chose, v, _ := reflect.Select(cases)
|
||||||
|
if chose == 1 {
|
||||||
|
t.Log("got a timeout")
|
||||||
|
// Not enough event received
|
||||||
|
result <- false
|
||||||
|
return
|
||||||
|
}
|
||||||
|
cnt += 1
|
||||||
|
recv = append(recv, v)
|
||||||
|
if cnt == expect {
|
||||||
|
t.Log("got an event")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
done := time.After(50 * time.Millisecond)
|
||||||
|
cases = cases[:1]
|
||||||
|
cases = append(cases, reflect.SelectCase{Chan: reflect.ValueOf(done), Dir: reflect.SelectRecv})
|
||||||
|
chose, _, _ := reflect.Select(cases)
|
||||||
|
// If chose equal 0, it means receiving redundant events.
|
||||||
|
if chose == 1 {
|
||||||
|
t.Log("is done")
|
||||||
|
result <- true
|
||||||
|
} else {
|
||||||
|
t.Log("got too many events")
|
||||||
|
result <- false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -216,7 +216,7 @@ func GenerateChain(config *params.ChainConfig, parent *types.Block, engine conse
|
|||||||
block, _ := b.engine.FinalizeAndAssemble(chainreader, b.header, statedb, b.txs, b.uncles, b.receipts)
|
block, _ := b.engine.FinalizeAndAssemble(chainreader, b.header, statedb, b.txs, b.uncles, b.receipts)
|
||||||
|
|
||||||
// Write state changes to db
|
// Write state changes to db
|
||||||
root, err := statedb.Commit(config.IsEIP158(b.header.Number))
|
root, _, err := statedb.Commit(config.IsEIP158(b.header.Number))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Sprintf("state write error: %v", err))
|
panic(fmt.Sprintf("state write error: %v", err))
|
||||||
}
|
}
|
||||||
|
@ -18,6 +18,7 @@ package core
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
|
"github.com/ethereum/go-ethereum/core/state"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -36,6 +37,11 @@ type ChainEvent struct {
|
|||||||
Logs []*types.Log
|
Logs []*types.Log
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type StateChangeEvent struct {
|
||||||
|
*types.Block
|
||||||
|
state.StateChanges
|
||||||
|
}
|
||||||
|
|
||||||
type ChainSideEvent struct {
|
type ChainSideEvent struct {
|
||||||
Block *types.Block
|
Block *types.Block
|
||||||
}
|
}
|
||||||
|
@ -81,6 +81,7 @@ type stateObject struct {
|
|||||||
|
|
||||||
originStorage Storage // Storage cache of original entries to dedup rewrites, reset for every transaction
|
originStorage Storage // Storage cache of original entries to dedup rewrites, reset for every transaction
|
||||||
pendingStorage Storage // Storage entries that need to be flushed to disk, at the end of an entire block
|
pendingStorage Storage // Storage entries that need to be flushed to disk, at the end of an entire block
|
||||||
|
diffStorage Storage // Storage entries that need to be emitted with stateDiffs
|
||||||
dirtyStorage Storage // Storage entries that have been modified in the current transaction execution
|
dirtyStorage Storage // Storage entries that have been modified in the current transaction execution
|
||||||
fakeStorage Storage // Fake storage which constructed by caller for debugging purpose.
|
fakeStorage Storage // Fake storage which constructed by caller for debugging purpose.
|
||||||
|
|
||||||
@ -125,6 +126,7 @@ func newObject(db *StateDB, address common.Address, data Account) *stateObject {
|
|||||||
originStorage: make(Storage),
|
originStorage: make(Storage),
|
||||||
pendingStorage: make(Storage),
|
pendingStorage: make(Storage),
|
||||||
dirtyStorage: make(Storage),
|
dirtyStorage: make(Storage),
|
||||||
|
diffStorage: make(Storage),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -321,6 +323,7 @@ func (s *stateObject) updateTrie(db Database) Trie {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
s.originStorage[key] = value
|
s.originStorage[key] = value
|
||||||
|
s.diffStorage[key] = value
|
||||||
|
|
||||||
var v []byte
|
var v []byte
|
||||||
if (value == common.Hash{}) {
|
if (value == common.Hash{}) {
|
||||||
@ -423,6 +426,7 @@ func (s *stateObject) deepCopy(db *StateDB) *stateObject {
|
|||||||
stateObject.dirtyStorage = s.dirtyStorage.Copy()
|
stateObject.dirtyStorage = s.dirtyStorage.Copy()
|
||||||
stateObject.originStorage = s.originStorage.Copy()
|
stateObject.originStorage = s.originStorage.Copy()
|
||||||
stateObject.pendingStorage = s.pendingStorage.Copy()
|
stateObject.pendingStorage = s.pendingStorage.Copy()
|
||||||
|
stateObject.diffStorage = s.diffStorage.Copy()
|
||||||
stateObject.suicided = s.suicided
|
stateObject.suicided = s.suicided
|
||||||
stateObject.dirtyCode = s.dirtyCode
|
stateObject.dirtyCode = s.dirtyCode
|
||||||
stateObject.deleted = s.deleted
|
stateObject.deleted = s.deleted
|
||||||
|
@ -167,7 +167,7 @@ func TestSnapshot2(t *testing.T) {
|
|||||||
so0.deleted = false
|
so0.deleted = false
|
||||||
state.setStateObject(so0)
|
state.setStateObject(so0)
|
||||||
|
|
||||||
root, _ := state.Commit(false)
|
root, _, _ := state.Commit(false)
|
||||||
state.Reset(root)
|
state.Reset(root)
|
||||||
|
|
||||||
// and one with deleted == true
|
// and one with deleted == true
|
||||||
|
@ -802,28 +802,52 @@ func (s *StateDB) clearJournalAndRefund() {
|
|||||||
s.validRevisions = s.validRevisions[:0] // Snapshots can be created without journal entires
|
s.validRevisions = s.validRevisions[:0] // Snapshots can be created without journal entires
|
||||||
}
|
}
|
||||||
|
|
||||||
// Commit writes the state to the underlying in-memory trie database.
|
// ModifiedAccount is an Account and it's Storage trie that has changed in the current block.
|
||||||
func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) {
|
type ModifiedAccount struct {
|
||||||
if s.dbErr != nil {
|
Account
|
||||||
return common.Hash{}, fmt.Errorf("commit aborted due to earlier error: %v", s.dbErr)
|
Storage
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StateChanges are a map between an Account's address to it's ModifiedAccount.
|
||||||
|
type StateChanges map[common.Address]ModifiedAccount
|
||||||
|
|
||||||
|
// Commit writes the state to the underlying in-memory trie database.
|
||||||
|
func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, StateChanges, error) {
|
||||||
|
if s.dbErr != nil {
|
||||||
|
return common.Hash{}, nil, fmt.Errorf("commit aborted due to earlier error: %v", s.dbErr)
|
||||||
|
}
|
||||||
|
|
||||||
// Finalize any pending changes and merge everything into the tries
|
// Finalize any pending changes and merge everything into the tries
|
||||||
s.IntermediateRoot(deleteEmptyObjects)
|
s.IntermediateRoot(deleteEmptyObjects)
|
||||||
|
|
||||||
|
stateChanges := make(StateChanges)
|
||||||
|
|
||||||
// Commit objects to the trie, measuring the elapsed time
|
// Commit objects to the trie, measuring the elapsed time
|
||||||
for addr := range s.stateObjectsDirty {
|
for addr := range s.stateObjectsDirty {
|
||||||
|
modifiedAccount := ModifiedAccount{}
|
||||||
|
|
||||||
if obj := s.stateObjects[addr]; !obj.deleted {
|
if obj := s.stateObjects[addr]; !obj.deleted {
|
||||||
// Write any contract code associated with the state object
|
// Write any contract code associated with the state object
|
||||||
if obj.code != nil && obj.dirtyCode {
|
if obj.code != nil && obj.dirtyCode {
|
||||||
s.db.TrieDB().InsertBlob(common.BytesToHash(obj.CodeHash()), obj.code)
|
s.db.TrieDB().InsertBlob(common.BytesToHash(obj.CodeHash()), obj.code)
|
||||||
obj.dirtyCode = false
|
obj.dirtyCode = false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add the account to the modifiedAccounts map
|
||||||
|
modifiedAccount = ModifiedAccount{Account: obj.data}
|
||||||
|
// Add the diff storage to the modifiedAccounts map
|
||||||
|
modifiedAccount.Storage = obj.diffStorage
|
||||||
|
obj.diffStorage = make(Storage)
|
||||||
|
|
||||||
// Write any storage changes in the state object to its storage trie
|
// Write any storage changes in the state object to its storage trie
|
||||||
if err := obj.CommitTrie(s.db); err != nil {
|
if err := obj.CommitTrie(s.db); err != nil {
|
||||||
return common.Hash{}, err
|
return common.Hash{}, StateChanges{}, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stateChanges[addr] = modifiedAccount
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(s.stateObjectsDirty) > 0 {
|
if len(s.stateObjectsDirty) > 0 {
|
||||||
s.stateObjectsDirty = make(map[common.Address]struct{})
|
s.stateObjectsDirty = make(map[common.Address]struct{})
|
||||||
}
|
}
|
||||||
@ -867,5 +891,6 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) {
|
|||||||
}
|
}
|
||||||
s.snap, s.snapDestructs, s.snapAccounts, s.snapStorage = nil, nil, nil, nil
|
s.snap, s.snapDestructs, s.snapAccounts, s.snapStorage = nil, nil, nil, nil
|
||||||
}
|
}
|
||||||
return root, err
|
|
||||||
|
return root, stateChanges, err
|
||||||
}
|
}
|
||||||
|
@ -67,6 +67,109 @@ func TestUpdateLeaks(t *testing.T) {
|
|||||||
it.Release()
|
it.Release()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStateChangesEmittedFromCommit(t *testing.T) {
|
||||||
|
// Create an empty state database
|
||||||
|
db := rawdb.NewMemoryDatabase()
|
||||||
|
state, _ := New(common.Hash{}, NewDatabase(db), nil)
|
||||||
|
addr1 := common.BytesToAddress([]byte{1, 2, 3, 4})
|
||||||
|
addr2 := common.BytesToAddress([]byte{5, 6, 7, 8})
|
||||||
|
expectedModifiedAccount1 := getNewModifiedAccount()
|
||||||
|
expectedModifiedAccount2 := getNewModifiedAccount()
|
||||||
|
|
||||||
|
// Commit 1
|
||||||
|
// set balance and storage for addr1
|
||||||
|
addr1Balance := big.NewInt(rand.Int63())
|
||||||
|
state.SetBalance(addr1, addr1Balance)
|
||||||
|
expectedModifiedAccount1.Account.Balance = addr1Balance
|
||||||
|
|
||||||
|
storageKey := common.BytesToHash([]byte{1})
|
||||||
|
storageValue := common.BytesToHash([]byte{1, 2, 3})
|
||||||
|
state.SetState(addr1, storageKey, storageValue)
|
||||||
|
expectedModifiedAccount1.Storage[storageKey] = storageValue
|
||||||
|
|
||||||
|
// set balance for addr2
|
||||||
|
addr2Balance := big.NewInt(rand.Int63())
|
||||||
|
state.SetBalance(addr2, addr2Balance)
|
||||||
|
expectedModifiedAccount2.Account.Balance = addr2Balance
|
||||||
|
|
||||||
|
expectedStateChangesOne := make(StateChanges)
|
||||||
|
expectedStateChangesOne[addr1] = expectedModifiedAccount1
|
||||||
|
expectedStateChangesOne[addr2] = expectedModifiedAccount2
|
||||||
|
|
||||||
|
_, actualStateChangesOne, commitErr := state.Commit(false)
|
||||||
|
if commitErr != nil {
|
||||||
|
t.Errorf("error committing to statedb")
|
||||||
|
}
|
||||||
|
|
||||||
|
assertStateChanges(actualStateChangesOne, expectedStateChangesOne, t)
|
||||||
|
|
||||||
|
// Commit 2: Update existing accounts
|
||||||
|
expectedStateChangesTwo := make(StateChanges)
|
||||||
|
|
||||||
|
// update balance and storage for addr1
|
||||||
|
newBalanceToAdd := big.NewInt(100)
|
||||||
|
state.AddBalance(addr1, newBalanceToAdd)
|
||||||
|
newAddr1Balance := newBalanceToAdd.Int64() + addr1Balance.Int64()
|
||||||
|
expectedModifiedAccount1.Account.Balance = big.NewInt(newAddr1Balance)
|
||||||
|
|
||||||
|
updatedStorageValue := common.BytesToHash([]byte{0}) // setting storage value back to zero to make sure that we're capturing zero value diffs
|
||||||
|
state.SetState(addr1, storageKey, updatedStorageValue)
|
||||||
|
expectedModifiedAccount1.Storage[storageKey] = updatedStorageValue
|
||||||
|
expectedStateChangesTwo[addr1] = expectedModifiedAccount1
|
||||||
|
|
||||||
|
_, stateChangesTwo, commitTwoErr := state.Commit(false)
|
||||||
|
if commitTwoErr != nil {
|
||||||
|
t.Errorf("error committing to statedb")
|
||||||
|
}
|
||||||
|
|
||||||
|
assertStateChanges(stateChangesTwo, expectedStateChangesTwo, t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getNewModifiedAccount() ModifiedAccount {
|
||||||
|
return ModifiedAccount{
|
||||||
|
Storage: make(map[common.Hash]common.Hash),
|
||||||
|
Account: Account{
|
||||||
|
Nonce: 0,
|
||||||
|
Balance: big.NewInt(0),
|
||||||
|
Root: emptyRoot,
|
||||||
|
CodeHash: emptyCodeHash,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertStateChanges(actualChanges, expectedChanges StateChanges, t *testing.T) {
|
||||||
|
if len(actualChanges) != len(expectedChanges) {
|
||||||
|
t.Error("Test failure:", t.Name())
|
||||||
|
t.Logf("Mismatch in number of StateChanges.ModifiedAccounts. actual: %v, expected: %v", len(actualChanges), len(expectedChanges))
|
||||||
|
}
|
||||||
|
|
||||||
|
for addr, account := range actualChanges {
|
||||||
|
expected := expectedChanges[addr]
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(account.Nonce, expected.Nonce) {
|
||||||
|
t.Error("Test failure:", t.Name())
|
||||||
|
t.Errorf("Account Nonce does not match expected. actual: %v, expected: %v", account.Nonce, expected.Nonce)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(account.Balance, expected.Balance) {
|
||||||
|
t.Error("Test failure:", t.Name())
|
||||||
|
t.Errorf("Account Balance does not match expected. actual: %v, expected: %v", account.Balance, expected.Balance)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(account.CodeHash, expected.CodeHash) {
|
||||||
|
t.Error("Test failure:", t.Name())
|
||||||
|
t.Errorf("Account CodeHash does not match expected. actual: %v, expected: %v", account.CodeHash, expected.CodeHash)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Not asserting on the Account's Root because it's not easy to get the root between `SetState` calls
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(account.Storage, expected.Storage) {
|
||||||
|
t.Error("Test failure:", t.Name())
|
||||||
|
t.Errorf("Account Storage does not match expected. actual: %v, expected: %v", account.Storage, expected.Storage)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Tests that no intermediate state of an object is stored into the database,
|
// Tests that no intermediate state of an object is stored into the database,
|
||||||
// only the one right before the commit.
|
// only the one right before the commit.
|
||||||
func TestIntermediateLeaks(t *testing.T) {
|
func TestIntermediateLeaks(t *testing.T) {
|
||||||
@ -102,7 +205,7 @@ func TestIntermediateLeaks(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Commit and cross check the databases.
|
// Commit and cross check the databases.
|
||||||
transRoot, err := transState.Commit(false)
|
transRoot, _, err := transState.Commit(false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to commit transition state: %v", err)
|
t.Fatalf("failed to commit transition state: %v", err)
|
||||||
}
|
}
|
||||||
@ -110,7 +213,7 @@ func TestIntermediateLeaks(t *testing.T) {
|
|||||||
t.Errorf("can not commit trie %v to persistent database", transRoot.Hex())
|
t.Errorf("can not commit trie %v to persistent database", transRoot.Hex())
|
||||||
}
|
}
|
||||||
|
|
||||||
finalRoot, err := finalState.Commit(false)
|
finalRoot, _, err := finalState.Commit(false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to commit final state: %v", err)
|
t.Fatalf("failed to commit final state: %v", err)
|
||||||
}
|
}
|
||||||
@ -459,7 +562,7 @@ func (test *snapshotTest) checkEqual(state, checkstate *StateDB) error {
|
|||||||
func TestTouchDelete(t *testing.T) {
|
func TestTouchDelete(t *testing.T) {
|
||||||
s := newStateTest()
|
s := newStateTest()
|
||||||
s.state.GetOrNewStateObject(common.Address{})
|
s.state.GetOrNewStateObject(common.Address{})
|
||||||
root, _ := s.state.Commit(false)
|
root, _, _ := s.state.Commit(false)
|
||||||
s.state.Reset(root)
|
s.state.Reset(root)
|
||||||
|
|
||||||
snapshot := s.state.Snapshot()
|
snapshot := s.state.Snapshot()
|
||||||
@ -661,7 +764,7 @@ func TestDeleteCreateRevert(t *testing.T) {
|
|||||||
addr := toAddr([]byte("so"))
|
addr := toAddr([]byte("so"))
|
||||||
state.SetBalance(addr, big.NewInt(1))
|
state.SetBalance(addr, big.NewInt(1))
|
||||||
|
|
||||||
root, _ := state.Commit(false)
|
root, _, _ := state.Commit(false)
|
||||||
state.Reset(root)
|
state.Reset(root)
|
||||||
|
|
||||||
// Simulate self-destructing in one transaction, then create-reverting in another
|
// Simulate self-destructing in one transaction, then create-reverting in another
|
||||||
@ -673,7 +776,7 @@ func TestDeleteCreateRevert(t *testing.T) {
|
|||||||
state.RevertToSnapshot(id)
|
state.RevertToSnapshot(id)
|
||||||
|
|
||||||
// Commit the entire state and make sure we don't crash and have the correct state
|
// Commit the entire state and make sure we don't crash and have the correct state
|
||||||
root, _ = state.Commit(true)
|
root, _, _ = state.Commit(true)
|
||||||
state.Reset(root)
|
state.Reset(root)
|
||||||
|
|
||||||
if state.getStateObject(addr) != nil {
|
if state.getStateObject(addr) != nil {
|
||||||
@ -698,7 +801,7 @@ func TestMissingTrieNodes(t *testing.T) {
|
|||||||
a2 := toAddr([]byte("another"))
|
a2 := toAddr([]byte("another"))
|
||||||
state.SetBalance(a2, big.NewInt(100))
|
state.SetBalance(a2, big.NewInt(100))
|
||||||
state.SetCode(a2, []byte{1, 2, 4})
|
state.SetCode(a2, []byte{1, 2, 4})
|
||||||
root, _ = state.Commit(false)
|
root, _, _ = state.Commit(false)
|
||||||
t.Logf("root: %x", root)
|
t.Logf("root: %x", root)
|
||||||
// force-flush
|
// force-flush
|
||||||
state.Database().TrieDB().Cap(0)
|
state.Database().TrieDB().Cap(0)
|
||||||
@ -722,7 +825,7 @@ func TestMissingTrieNodes(t *testing.T) {
|
|||||||
}
|
}
|
||||||
// Modify the state
|
// Modify the state
|
||||||
state.SetBalance(addr, big.NewInt(2))
|
state.SetBalance(addr, big.NewInt(2))
|
||||||
root, err := state.Commit(false)
|
root, _, err := state.Commit(false)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fatalf("expected error, got root :%x", root)
|
t.Fatalf("expected error, got root :%x", root)
|
||||||
}
|
}
|
||||||
|
@ -62,7 +62,7 @@ func makeTestState() (Database, common.Hash, []*testAccount) {
|
|||||||
state.updateStateObject(obj)
|
state.updateStateObject(obj)
|
||||||
accounts = append(accounts, acc)
|
accounts = append(accounts, acc)
|
||||||
}
|
}
|
||||||
root, _ := state.Commit(false)
|
root, _, _ := state.Commit(false)
|
||||||
|
|
||||||
// Return the generated state
|
// Return the generated state
|
||||||
return db, root, accounts
|
return db, root, accounts
|
||||||
|
@ -220,6 +220,10 @@ func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscri
|
|||||||
return b.eth.BlockChain().SubscribeLogsEvent(ch)
|
return b.eth.BlockChain().SubscribeLogsEvent(ch)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *EthAPIBackend) SubscribeStateChangeEvent(ch chan<- core.StateChangeEvent) event.Subscription {
|
||||||
|
return b.eth.BlockChain().SubscribeStateChangeEvent(ch)
|
||||||
|
}
|
||||||
|
|
||||||
func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
|
func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
|
||||||
return b.eth.txPool.AddLocal(signedTx)
|
return b.eth.txPool.AddLocal(signedTx)
|
||||||
}
|
}
|
||||||
|
@ -295,7 +295,7 @@ func (api *PrivateDebugAPI) traceChain(ctx context.Context, start, end *types.Bl
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
// Finalize the state so any modifications are written to the trie
|
// Finalize the state so any modifications are written to the trie
|
||||||
root, err := statedb.Commit(api.eth.blockchain.Config().IsEIP158(block.Number()))
|
root, _, err := statedb.Commit(api.eth.blockchain.Config().IsEIP158(block.Number()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
failed = err
|
failed = err
|
||||||
break
|
break
|
||||||
@ -681,7 +681,7 @@ func (api *PrivateDebugAPI) computeStateDB(block *types.Block, reexec uint64) (*
|
|||||||
return nil, fmt.Errorf("processing block %d failed: %v", block.NumberU64(), err)
|
return nil, fmt.Errorf("processing block %d failed: %v", block.NumberU64(), err)
|
||||||
}
|
}
|
||||||
// Finalize the state so any modifications are written to the trie
|
// Finalize the state so any modifications are written to the trie
|
||||||
root, err := statedb.Commit(api.eth.blockchain.Config().IsEIP158(block.Number()))
|
root, _, err := statedb.Commit(api.eth.blockchain.Config().IsEIP158(block.Number()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -25,7 +25,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
ethereum "github.com/ethereum/go-ethereum"
|
"github.com/ethereum/go-ethereum"
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
@ -233,6 +233,35 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er
|
|||||||
return rpcSub, nil
|
return rpcSub, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (api *PublicFilterAPI) NewStateChanges(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
|
||||||
|
notifier, supported := rpc.NotifierFromContext(ctx)
|
||||||
|
if !supported {
|
||||||
|
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
|
||||||
|
}
|
||||||
|
|
||||||
|
rpcSub := notifier.CreateSubscription()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
stateChanges := make(chan Payload)
|
||||||
|
stateChangeSub := api.events.SubscribeStateChanges(ethereum.FilterQuery(crit), stateChanges)
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case s := <-stateChanges:
|
||||||
|
notifier.Notify(rpcSub.ID, s)
|
||||||
|
case <-rpcSub.Err():
|
||||||
|
stateChangeSub.Unsubscribe()
|
||||||
|
return
|
||||||
|
case <-notifier.Closed():
|
||||||
|
stateChangeSub.Unsubscribe()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return rpcSub, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Logs creates a subscription that fires for all new log that match the given filter criteria.
|
// Logs creates a subscription that fires for all new log that match the given filter criteria.
|
||||||
func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
|
func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
|
||||||
notifier, supported := rpc.NotifierFromContext(ctx)
|
notifier, supported := rpc.NotifierFromContext(ctx)
|
||||||
|
@ -42,6 +42,7 @@ type Backend interface {
|
|||||||
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
|
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
|
||||||
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
|
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
|
||||||
SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription
|
SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription
|
||||||
|
SubscribeStateChangeEvent(ch chan<- core.StateChangeEvent) event.Subscription
|
||||||
|
|
||||||
BloomStatus() (uint64, uint64)
|
BloomStatus() (uint64, uint64)
|
||||||
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
|
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
|
||||||
|
@ -24,7 +24,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
ethereum "github.com/ethereum/go-ethereum"
|
"github.com/ethereum/go-ethereum"
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/core"
|
"github.com/ethereum/go-ethereum/core"
|
||||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||||
@ -52,6 +52,8 @@ const (
|
|||||||
PendingTransactionsSubscription
|
PendingTransactionsSubscription
|
||||||
// BlocksSubscription queries hashes for blocks that are imported
|
// BlocksSubscription queries hashes for blocks that are imported
|
||||||
BlocksSubscription
|
BlocksSubscription
|
||||||
|
// StateChangeSubscription queries for state changes between blocks being imported
|
||||||
|
StateChangeSubscription
|
||||||
// LastSubscription keeps track of the last index
|
// LastSubscription keeps track of the last index
|
||||||
LastIndexSubscription
|
LastIndexSubscription
|
||||||
)
|
)
|
||||||
@ -66,16 +68,19 @@ const (
|
|||||||
logsChanSize = 10
|
logsChanSize = 10
|
||||||
// chainEvChanSize is the size of channel listening to ChainEvent.
|
// chainEvChanSize is the size of channel listening to ChainEvent.
|
||||||
chainEvChanSize = 10
|
chainEvChanSize = 10
|
||||||
|
// stateChangeChanSize is the size of the channel listening to StateChangeEvent.
|
||||||
|
stateChangeChanSize = 10
|
||||||
)
|
)
|
||||||
|
|
||||||
type subscription struct {
|
type subscription struct {
|
||||||
id rpc.ID
|
id rpc.ID
|
||||||
typ Type
|
typ Type
|
||||||
created time.Time
|
created time.Time
|
||||||
logsCrit ethereum.FilterQuery
|
filterCrit ethereum.FilterQuery
|
||||||
logs chan []*types.Log
|
logs chan []*types.Log
|
||||||
hashes chan []common.Hash
|
hashes chan []common.Hash
|
||||||
headers chan *types.Header
|
headers chan *types.Header
|
||||||
|
stateChangePayloads chan Payload
|
||||||
installed chan struct{} // closed when the filter is installed
|
installed chan struct{} // closed when the filter is installed
|
||||||
err chan error // closed when the filter is uninstalled
|
err chan error // closed when the filter is uninstalled
|
||||||
}
|
}
|
||||||
@ -93,6 +98,7 @@ type EventSystem struct {
|
|||||||
rmLogsSub event.Subscription // Subscription for removed log event
|
rmLogsSub event.Subscription // Subscription for removed log event
|
||||||
pendingLogsSub event.Subscription // Subscription for pending log event
|
pendingLogsSub event.Subscription // Subscription for pending log event
|
||||||
chainSub event.Subscription // Subscription for new chain event
|
chainSub event.Subscription // Subscription for new chain event
|
||||||
|
stateChangeEventSub event.Subscription // Subscription for new state change event
|
||||||
|
|
||||||
// Channels
|
// Channels
|
||||||
install chan *subscription // install filter for event notification
|
install chan *subscription // install filter for event notification
|
||||||
@ -102,6 +108,7 @@ type EventSystem struct {
|
|||||||
pendingLogsCh chan []*types.Log // Channel to receive new log event
|
pendingLogsCh chan []*types.Log // Channel to receive new log event
|
||||||
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
|
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
|
||||||
chainCh chan core.ChainEvent // Channel to receive new chain event
|
chainCh chan core.ChainEvent // Channel to receive new chain event
|
||||||
|
stateChangeEventChan chan core.StateChangeEvent // Channel to receive new state change event
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewEventSystem creates a new manager that listens for event on the given mux,
|
// NewEventSystem creates a new manager that listens for event on the given mux,
|
||||||
@ -121,6 +128,7 @@ func NewEventSystem(backend Backend, lightMode bool) *EventSystem {
|
|||||||
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
|
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
|
||||||
pendingLogsCh: make(chan []*types.Log, logsChanSize),
|
pendingLogsCh: make(chan []*types.Log, logsChanSize),
|
||||||
chainCh: make(chan core.ChainEvent, chainEvChanSize),
|
chainCh: make(chan core.ChainEvent, chainEvChanSize),
|
||||||
|
stateChangeEventChan: make(chan core.StateChangeEvent, stateChangeChanSize),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Subscribe events
|
// Subscribe events
|
||||||
@ -129,9 +137,10 @@ func NewEventSystem(backend Backend, lightMode bool) *EventSystem {
|
|||||||
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
|
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
|
||||||
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
|
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
|
||||||
m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh)
|
m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh)
|
||||||
|
m.stateChangeEventSub = m.backend.SubscribeStateChangeEvent(m.stateChangeEventChan)
|
||||||
|
|
||||||
// Make sure none of the subscriptions are empty
|
// Make sure none of the subscriptions are empty
|
||||||
if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil {
|
if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil || m.stateChangeEventSub == nil {
|
||||||
log.Crit("Subscribe for event system failed")
|
log.Crit("Subscribe for event system failed")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -167,6 +176,7 @@ func (sub *Subscription) Unsubscribe() {
|
|||||||
case <-sub.f.logs:
|
case <-sub.f.logs:
|
||||||
case <-sub.f.hashes:
|
case <-sub.f.hashes:
|
||||||
case <-sub.f.headers:
|
case <-sub.f.headers:
|
||||||
|
case <-sub.f.stateChangePayloads:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -229,11 +239,12 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs
|
|||||||
sub := &subscription{
|
sub := &subscription{
|
||||||
id: rpc.NewID(),
|
id: rpc.NewID(),
|
||||||
typ: MinedAndPendingLogsSubscription,
|
typ: MinedAndPendingLogsSubscription,
|
||||||
logsCrit: crit,
|
filterCrit: crit,
|
||||||
created: time.Now(),
|
created: time.Now(),
|
||||||
logs: logs,
|
logs: logs,
|
||||||
hashes: make(chan []common.Hash),
|
hashes: make(chan []common.Hash),
|
||||||
headers: make(chan *types.Header),
|
headers: make(chan *types.Header),
|
||||||
|
stateChangePayloads: make(chan Payload),
|
||||||
installed: make(chan struct{}),
|
installed: make(chan struct{}),
|
||||||
err: make(chan error),
|
err: make(chan error),
|
||||||
}
|
}
|
||||||
@ -246,11 +257,12 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
|
|||||||
sub := &subscription{
|
sub := &subscription{
|
||||||
id: rpc.NewID(),
|
id: rpc.NewID(),
|
||||||
typ: LogsSubscription,
|
typ: LogsSubscription,
|
||||||
logsCrit: crit,
|
filterCrit: crit,
|
||||||
created: time.Now(),
|
created: time.Now(),
|
||||||
logs: logs,
|
logs: logs,
|
||||||
hashes: make(chan []common.Hash),
|
hashes: make(chan []common.Hash),
|
||||||
headers: make(chan *types.Header),
|
headers: make(chan *types.Header),
|
||||||
|
stateChangePayloads: make(chan Payload),
|
||||||
installed: make(chan struct{}),
|
installed: make(chan struct{}),
|
||||||
err: make(chan error),
|
err: make(chan error),
|
||||||
}
|
}
|
||||||
@ -263,11 +275,12 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan
|
|||||||
sub := &subscription{
|
sub := &subscription{
|
||||||
id: rpc.NewID(),
|
id: rpc.NewID(),
|
||||||
typ: PendingLogsSubscription,
|
typ: PendingLogsSubscription,
|
||||||
logsCrit: crit,
|
filterCrit: crit,
|
||||||
created: time.Now(),
|
created: time.Now(),
|
||||||
logs: logs,
|
logs: logs,
|
||||||
hashes: make(chan []common.Hash),
|
hashes: make(chan []common.Hash),
|
||||||
headers: make(chan *types.Header),
|
headers: make(chan *types.Header),
|
||||||
|
stateChangePayloads: make(chan Payload),
|
||||||
installed: make(chan struct{}),
|
installed: make(chan struct{}),
|
||||||
err: make(chan error),
|
err: make(chan error),
|
||||||
}
|
}
|
||||||
@ -284,6 +297,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
|
|||||||
logs: make(chan []*types.Log),
|
logs: make(chan []*types.Log),
|
||||||
hashes: make(chan []common.Hash),
|
hashes: make(chan []common.Hash),
|
||||||
headers: headers,
|
headers: headers,
|
||||||
|
stateChangePayloads: make(chan Payload),
|
||||||
installed: make(chan struct{}),
|
installed: make(chan struct{}),
|
||||||
err: make(chan error),
|
err: make(chan error),
|
||||||
}
|
}
|
||||||
@ -300,6 +314,25 @@ func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscript
|
|||||||
logs: make(chan []*types.Log),
|
logs: make(chan []*types.Log),
|
||||||
hashes: hashes,
|
hashes: hashes,
|
||||||
headers: make(chan *types.Header),
|
headers: make(chan *types.Header),
|
||||||
|
stateChangePayloads: make(chan Payload),
|
||||||
|
installed: make(chan struct{}),
|
||||||
|
err: make(chan error),
|
||||||
|
}
|
||||||
|
return es.subscribe(sub)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SubscribeStateChanges creates a subscription that writes new state changes that are identified
|
||||||
|
// for each new block.
|
||||||
|
func (es *EventSystem) SubscribeStateChanges(crit ethereum.FilterQuery, stateChanges chan Payload) *Subscription {
|
||||||
|
sub := &subscription{
|
||||||
|
id: rpc.NewID(),
|
||||||
|
typ: StateChangeSubscription,
|
||||||
|
filterCrit: crit,
|
||||||
|
created: time.Now(),
|
||||||
|
logs: make(chan []*types.Log),
|
||||||
|
hashes: make(chan []common.Hash),
|
||||||
|
headers: make(chan *types.Header),
|
||||||
|
stateChangePayloads: stateChanges,
|
||||||
installed: make(chan struct{}),
|
installed: make(chan struct{}),
|
||||||
err: make(chan error),
|
err: make(chan error),
|
||||||
}
|
}
|
||||||
@ -313,7 +346,7 @@ func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
for _, f := range filters[LogsSubscription] {
|
for _, f := range filters[LogsSubscription] {
|
||||||
matchedLogs := filterLogs(ev, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
|
matchedLogs := filterLogs(ev, f.filterCrit.FromBlock, f.filterCrit.ToBlock, f.filterCrit.Addresses, f.filterCrit.Topics)
|
||||||
if len(matchedLogs) > 0 {
|
if len(matchedLogs) > 0 {
|
||||||
f.logs <- matchedLogs
|
f.logs <- matchedLogs
|
||||||
}
|
}
|
||||||
@ -325,7 +358,7 @@ func (es *EventSystem) handlePendingLogs(filters filterIndex, ev []*types.Log) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
for _, f := range filters[PendingLogsSubscription] {
|
for _, f := range filters[PendingLogsSubscription] {
|
||||||
matchedLogs := filterLogs(ev, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
|
matchedLogs := filterLogs(ev, nil, f.filterCrit.ToBlock, f.filterCrit.Addresses, f.filterCrit.Topics)
|
||||||
if len(matchedLogs) > 0 {
|
if len(matchedLogs) > 0 {
|
||||||
f.logs <- matchedLogs
|
f.logs <- matchedLogs
|
||||||
}
|
}
|
||||||
@ -334,7 +367,7 @@ func (es *EventSystem) handlePendingLogs(filters filterIndex, ev []*types.Log) {
|
|||||||
|
|
||||||
func (es *EventSystem) handleRemovedLogs(filters filterIndex, ev core.RemovedLogsEvent) {
|
func (es *EventSystem) handleRemovedLogs(filters filterIndex, ev core.RemovedLogsEvent) {
|
||||||
for _, f := range filters[LogsSubscription] {
|
for _, f := range filters[LogsSubscription] {
|
||||||
matchedLogs := filterLogs(ev.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
|
matchedLogs := filterLogs(ev.Logs, f.filterCrit.FromBlock, f.filterCrit.ToBlock, f.filterCrit.Addresses, f.filterCrit.Topics)
|
||||||
if len(matchedLogs) > 0 {
|
if len(matchedLogs) > 0 {
|
||||||
f.logs <- matchedLogs
|
f.logs <- matchedLogs
|
||||||
}
|
}
|
||||||
@ -358,7 +391,7 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent)
|
|||||||
if es.lightMode && len(filters[LogsSubscription]) > 0 {
|
if es.lightMode && len(filters[LogsSubscription]) > 0 {
|
||||||
es.lightFilterNewHead(ev.Block.Header(), func(header *types.Header, remove bool) {
|
es.lightFilterNewHead(ev.Block.Header(), func(header *types.Header, remove bool) {
|
||||||
for _, f := range filters[LogsSubscription] {
|
for _, f := range filters[LogsSubscription] {
|
||||||
if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
|
if matchedLogs := es.lightFilterLogs(header, f.filterCrit.Addresses, f.filterCrit.Topics, remove); len(matchedLogs) > 0 {
|
||||||
f.logs <- matchedLogs
|
f.logs <- matchedLogs
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -366,6 +399,20 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (es *EventSystem) handleStateChangeEvent(filters filterIndex, ev core.StateChangeEvent) {
|
||||||
|
for _, f := range filters[StateChangeSubscription] {
|
||||||
|
payload, processingErr := processStateChanges(ev, f.filterCrit)
|
||||||
|
if processingErr != nil {
|
||||||
|
f.err <- processingErr
|
||||||
|
}
|
||||||
|
|
||||||
|
empty := isPayloadEmpty(payload)
|
||||||
|
if !empty {
|
||||||
|
f.stateChangePayloads <- payload
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func(*types.Header, bool)) {
|
func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func(*types.Header, bool)) {
|
||||||
oldh := es.lastHead
|
oldh := es.lastHead
|
||||||
es.lastHead = newHeader
|
es.lastHead = newHeader
|
||||||
@ -448,6 +495,7 @@ func (es *EventSystem) eventLoop() {
|
|||||||
es.rmLogsSub.Unsubscribe()
|
es.rmLogsSub.Unsubscribe()
|
||||||
es.pendingLogsSub.Unsubscribe()
|
es.pendingLogsSub.Unsubscribe()
|
||||||
es.chainSub.Unsubscribe()
|
es.chainSub.Unsubscribe()
|
||||||
|
es.stateChangeEventSub.Unsubscribe()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
index := make(filterIndex)
|
index := make(filterIndex)
|
||||||
@ -467,6 +515,8 @@ func (es *EventSystem) eventLoop() {
|
|||||||
es.handlePendingLogs(index, ev)
|
es.handlePendingLogs(index, ev)
|
||||||
case ev := <-es.chainCh:
|
case ev := <-es.chainCh:
|
||||||
es.handleChainEvent(index, ev)
|
es.handleChainEvent(index, ev)
|
||||||
|
case ev := <-es.stateChangeEventChan:
|
||||||
|
es.handleStateChangeEvent(index, ev)
|
||||||
|
|
||||||
case f := <-es.install:
|
case f := <-es.install:
|
||||||
if f.typ == MinedAndPendingLogsSubscription {
|
if f.typ == MinedAndPendingLogsSubscription {
|
||||||
@ -497,6 +547,8 @@ func (es *EventSystem) eventLoop() {
|
|||||||
return
|
return
|
||||||
case <-es.chainSub.Err():
|
case <-es.chainSub.Err():
|
||||||
return
|
return
|
||||||
|
case <-es.stateChangeEventSub.Err():
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
package filters
|
package filters
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/big"
|
"math/big"
|
||||||
@ -25,16 +26,18 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
ethereum "github.com/ethereum/go-ethereum"
|
"github.com/ethereum/go-ethereum"
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/consensus/ethash"
|
"github.com/ethereum/go-ethereum/consensus/ethash"
|
||||||
"github.com/ethereum/go-ethereum/core"
|
"github.com/ethereum/go-ethereum/core"
|
||||||
"github.com/ethereum/go-ethereum/core/bloombits"
|
"github.com/ethereum/go-ethereum/core/bloombits"
|
||||||
"github.com/ethereum/go-ethereum/core/rawdb"
|
"github.com/ethereum/go-ethereum/core/rawdb"
|
||||||
|
"github.com/ethereum/go-ethereum/core/state"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/ethdb"
|
"github.com/ethereum/go-ethereum/ethdb"
|
||||||
"github.com/ethereum/go-ethereum/event"
|
"github.com/ethereum/go-ethereum/event"
|
||||||
"github.com/ethereum/go-ethereum/params"
|
"github.com/ethereum/go-ethereum/params"
|
||||||
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
"github.com/ethereum/go-ethereum/rpc"
|
"github.com/ethereum/go-ethereum/rpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -47,6 +50,7 @@ type testBackend struct {
|
|||||||
rmLogsFeed event.Feed
|
rmLogsFeed event.Feed
|
||||||
pendingLogsFeed event.Feed
|
pendingLogsFeed event.Feed
|
||||||
chainFeed event.Feed
|
chainFeed event.Feed
|
||||||
|
stateChangeFeed event.Feed
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *testBackend) ChainDb() ethdb.Database {
|
func (b *testBackend) ChainDb() ethdb.Database {
|
||||||
@ -121,6 +125,10 @@ func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subsc
|
|||||||
return b.chainFeed.Subscribe(ch)
|
return b.chainFeed.Subscribe(ch)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *testBackend) SubscribeStateChangeEvent(ch chan<- core.StateChangeEvent) event.Subscription {
|
||||||
|
return b.stateChangeFeed.Subscribe(ch)
|
||||||
|
}
|
||||||
|
|
||||||
func (b *testBackend) BloomStatus() (uint64, uint64) {
|
func (b *testBackend) BloomStatus() (uint64, uint64) {
|
||||||
return params.BloomBitsBlocks, b.sections
|
return params.BloomBitsBlocks, b.sections
|
||||||
}
|
}
|
||||||
@ -608,3 +616,225 @@ func flattenLogs(pl [][]*types.Log) []*types.Log {
|
|||||||
}
|
}
|
||||||
return logs
|
return logs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStateChangeSubscription(t *testing.T) {
|
||||||
|
var (
|
||||||
|
db = rawdb.NewMemoryDatabase()
|
||||||
|
backend = &testBackend{db: db}
|
||||||
|
api = NewPublicFilterAPI(backend, false)
|
||||||
|
genesis = new(core.Genesis).MustCommit(db)
|
||||||
|
numberOfBlocks = 3
|
||||||
|
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, numberOfBlocks, func(i int, gen *core.BlockGen) {})
|
||||||
|
|
||||||
|
address1 = common.HexToAddress("0x1")
|
||||||
|
address2 = common.HexToAddress("0x2")
|
||||||
|
|
||||||
|
modifiedAccount1 = state.ModifiedAccount{
|
||||||
|
Account: state.Account{
|
||||||
|
Nonce: 0,
|
||||||
|
Balance: big.NewInt(100),
|
||||||
|
Root: common.HexToHash("0x01"),
|
||||||
|
},
|
||||||
|
Storage: nil,
|
||||||
|
}
|
||||||
|
|
||||||
|
modifiedAccount2 = state.ModifiedAccount{
|
||||||
|
Account: state.Account{
|
||||||
|
Nonce: 0,
|
||||||
|
Balance: big.NewInt(200),
|
||||||
|
Root: common.HexToHash("0x02"),
|
||||||
|
},
|
||||||
|
Storage: state.Storage{common.HexToHash("0x0002"): common.HexToHash("0x00002")},
|
||||||
|
}
|
||||||
|
|
||||||
|
accountDiff1 = getAccountDiff(address1, modifiedAccount1, t)
|
||||||
|
accountDiff2 = getAccountDiff(address2, modifiedAccount2, t)
|
||||||
|
|
||||||
|
block0 = chain[0]
|
||||||
|
block1 = chain[1]
|
||||||
|
block2 = chain[2]
|
||||||
|
|
||||||
|
emptyStateChangeEvent = core.StateChangeEvent{Block: block0}
|
||||||
|
blockOneStateChangeEvent = core.StateChangeEvent{
|
||||||
|
Block: block1,
|
||||||
|
StateChanges: state.StateChanges{
|
||||||
|
address1: modifiedAccount1,
|
||||||
|
address2: modifiedAccount2,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
blockTwoStateChangeEvent = core.StateChangeEvent{
|
||||||
|
Block: block2,
|
||||||
|
StateChanges: state.StateChanges{
|
||||||
|
address1: modifiedAccount1,
|
||||||
|
address2: modifiedAccount2,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
testCases = []struct {
|
||||||
|
description string
|
||||||
|
stateChangeEvents []core.StateChangeEvent
|
||||||
|
expectedPayloads []Payload
|
||||||
|
crit ethereum.FilterQuery
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
"when block 0 has no state changes no payload for that block is returned",
|
||||||
|
[]core.StateChangeEvent{emptyStateChangeEvent, blockOneStateChangeEvent, blockTwoStateChangeEvent},
|
||||||
|
getExpectedPayloads([]*types.Block{block1, block2}, []AccountDiff{accountDiff1, accountDiff2}, t),
|
||||||
|
ethereum.FilterQuery{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"when watching state changes from specific contracts",
|
||||||
|
[]core.StateChangeEvent{blockOneStateChangeEvent, blockTwoStateChangeEvent, emptyStateChangeEvent},
|
||||||
|
getExpectedPayloads([]*types.Block{block1, block2}, []AccountDiff{accountDiff1}, t),
|
||||||
|
ethereum.FilterQuery{Addresses: []common.Address{address1}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"when no specific addresses are configured all state changes are returned",
|
||||||
|
[]core.StateChangeEvent{blockOneStateChangeEvent, blockTwoStateChangeEvent, emptyStateChangeEvent},
|
||||||
|
getExpectedPayloads([]*types.Block{block1, block2}, []AccountDiff{accountDiff1, accountDiff2}, t),
|
||||||
|
ethereum.FilterQuery{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
for _, test := range testCases {
|
||||||
|
chan0 := make(chan Payload)
|
||||||
|
sub0 := api.events.SubscribeStateChanges(test.crit, chan0)
|
||||||
|
|
||||||
|
payloads := make([]Payload, 0, len(test.expectedPayloads))
|
||||||
|
go func() {
|
||||||
|
// simulate client
|
||||||
|
// keep this loop open for numberOfBlocks - 1 because the first block has no StateChangeEvents and should
|
||||||
|
// not send an event to the client via the subscription
|
||||||
|
for index := 0; index < numberOfBlocks-1; index++ {
|
||||||
|
payload := <-chan0
|
||||||
|
payloads = append(payloads, payload)
|
||||||
|
}
|
||||||
|
|
||||||
|
sub0.Unsubscribe()
|
||||||
|
}()
|
||||||
|
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
// send the state change events to the subscriber
|
||||||
|
for _, e := range test.stateChangeEvents {
|
||||||
|
backend.stateChangeFeed.Send(e)
|
||||||
|
}
|
||||||
|
|
||||||
|
<-sub0.Err()
|
||||||
|
|
||||||
|
expectedNumberOfPayloads := len(test.expectedPayloads)
|
||||||
|
if len(payloads) != expectedNumberOfPayloads {
|
||||||
|
t.Errorf("Test failure: %s: %s", t.Name(), test.description)
|
||||||
|
t.Logf("Actual number of payloads does not equal expected.\nactual: %+v\nexpected: %+v", len(payloads), expectedNumberOfPayloads)
|
||||||
|
}
|
||||||
|
|
||||||
|
for index, payload := range payloads {
|
||||||
|
var actualStateDiff, expectedStateDiff StateDiff
|
||||||
|
actualRLP := payload.StateDiffRlp
|
||||||
|
err := rlp.DecodeBytes(actualRLP, &actualStateDiff)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("error decoding state diff RLP into state diff: %s: %s", err, test.description)
|
||||||
|
}
|
||||||
|
expectedRLP := test.expectedPayloads[index].StateDiffRlp
|
||||||
|
err = rlp.DecodeBytes(expectedRLP, &expectedStateDiff)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("error decoding state diff RLP into state diff: %s: %s", err, test.description)
|
||||||
|
}
|
||||||
|
if expectedStateDiff.BlockNumber.Cmp(actualStateDiff.BlockNumber) != 0 {
|
||||||
|
t.Errorf("Test failure: %s: %s", t.Name(), test.description)
|
||||||
|
t.Logf("Actual payload block number equal expected.\nactual:%+v\nexpected: %+v", actualStateDiff.BlockNumber, expectedStateDiff.BlockNumber)
|
||||||
|
}
|
||||||
|
if expectedStateDiff.BlockHash != actualStateDiff.BlockHash {
|
||||||
|
t.Errorf("Test failure: %s: %s", t.Name(), test.description)
|
||||||
|
t.Logf("Actual payload block hash equal expected.\nactual:%+v\nexpected: %+v", actualStateDiff.BlockHash, expectedStateDiff.BlockHash)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The updated accounts are not necessarily in the same order for the state diffs.
|
||||||
|
// Put the actual in a map for easier lookup.
|
||||||
|
actualUpdatedAccounts := make(map[string]AccountDiff)
|
||||||
|
for _, account := range actualStateDiff.UpdatedAccounts {
|
||||||
|
actualUpdatedAccounts[string(account.Key)] = account
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, e := range expectedStateDiff.UpdatedAccounts {
|
||||||
|
actualUpdatedAccount := actualUpdatedAccounts[string(e.Key)]
|
||||||
|
|
||||||
|
if !bytes.Equal(actualUpdatedAccount.Key, e.Key) {
|
||||||
|
t.Errorf("Test failure: %s: %s", t.Name(), test.description)
|
||||||
|
t.Logf("Actual payload updated account key equal expected.\nactual:%+v\nexpected: %+v", actualUpdatedAccount.Key, e.Key)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !bytes.Equal(actualUpdatedAccount.Value, e.Value) {
|
||||||
|
t.Errorf("Test failure: %s: %s", t.Name(), test.description)
|
||||||
|
t.Logf("Actual payload updated account value equal expected.\nactual:%+v\nexpected: %+v", actualUpdatedAccount.Value, e.Value)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store storage keys for this updatedAccount in a map to avoid errors from order
|
||||||
|
actualUpdatedStorageDiffs := make(map[string]StorageDiff)
|
||||||
|
for _, storageDiff := range actualUpdatedAccount.Storage {
|
||||||
|
actualUpdatedStorageDiffs[string(storageDiff.Key)] = storageDiff
|
||||||
|
}
|
||||||
|
for _, se := range e.Storage {
|
||||||
|
actualStorageDiff := actualUpdatedStorageDiffs[string(se.Key)]
|
||||||
|
|
||||||
|
if !bytes.Equal(actualStorageDiff.Key, se.Key) {
|
||||||
|
t.Errorf("Test failure: %s: %s", t.Name(), test.description)
|
||||||
|
t.Logf("Actual payload updated account storage key equal expected.\nactual:%+v\nexpected: %+v", actualStorageDiff.Key, se.Key)
|
||||||
|
}
|
||||||
|
if !bytes.Equal(actualStorageDiff.Value, se.Value) {
|
||||||
|
t.Errorf("Test failure: %s: %s", t.Name(), test.description)
|
||||||
|
t.Logf("Actual payload updated account storage value equal expected.\nactual:%+v\nexpected: %+v", actualStorageDiff.Value, se.Value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getAccountDiff(accountAddress common.Address, modifedAccount state.ModifiedAccount, t *testing.T) AccountDiff {
|
||||||
|
accountRlp, accountRlpErr := rlp.EncodeToBytes(modifedAccount.Account)
|
||||||
|
if accountRlpErr != nil {
|
||||||
|
t.Error("Test failure:", t.Name())
|
||||||
|
t.Logf("Failed to encode account diff")
|
||||||
|
}
|
||||||
|
|
||||||
|
var storageDiffs []StorageDiff
|
||||||
|
for key, value := range modifedAccount.Storage {
|
||||||
|
storageValueRlp, storageRlpErr := rlp.EncodeToBytes(value)
|
||||||
|
if storageRlpErr != nil {
|
||||||
|
t.Error("Test failure:", t.Name())
|
||||||
|
t.Logf("Failed to encode storgae diff")
|
||||||
|
}
|
||||||
|
storageDiff := StorageDiff{
|
||||||
|
Key: key[:],
|
||||||
|
Value: storageValueRlp,
|
||||||
|
}
|
||||||
|
|
||||||
|
storageDiffs = append(storageDiffs, storageDiff)
|
||||||
|
}
|
||||||
|
|
||||||
|
return AccountDiff{
|
||||||
|
Key: accountAddress[:],
|
||||||
|
Value: accountRlp,
|
||||||
|
Storage: storageDiffs,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getExpectedPayloads(blocks []*types.Block, updatedAccounts []AccountDiff, t *testing.T) []Payload {
|
||||||
|
var payloads []Payload
|
||||||
|
for _, block := range blocks {
|
||||||
|
expectedStateDiff := StateDiff{
|
||||||
|
BlockNumber: block.Number(),
|
||||||
|
BlockHash: block.Hash(),
|
||||||
|
UpdatedAccounts: updatedAccounts,
|
||||||
|
}
|
||||||
|
expectedStateDiffRLP, err := rlp.EncodeToBytes(expectedStateDiff)
|
||||||
|
if err != nil {
|
||||||
|
t.Error("Test failure:", t.Name())
|
||||||
|
t.Logf("Failed to encode state diff to bytes")
|
||||||
|
return payloads
|
||||||
|
}
|
||||||
|
payloads = append(payloads, Payload{StateDiffRlp: expectedStateDiffRLP})
|
||||||
|
}
|
||||||
|
return payloads
|
||||||
|
}
|
||||||
|
113
eth/filters/statediff.go
Normal file
113
eth/filters/statediff.go
Normal file
@ -0,0 +1,113 @@
|
|||||||
|
package filters
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/big"
|
||||||
|
"reflect"
|
||||||
|
|
||||||
|
"github.com/ethereum/go-ethereum"
|
||||||
|
"github.com/ethereum/go-ethereum/common"
|
||||||
|
"github.com/ethereum/go-ethereum/core"
|
||||||
|
"github.com/ethereum/go-ethereum/core/state"
|
||||||
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
|
)
|
||||||
|
|
||||||
|
var emptyPayload Payload
|
||||||
|
|
||||||
|
// processStateChanges builds the state diff Payload from the modified accounts in the StateChangeEvent
|
||||||
|
func processStateChanges(event core.StateChangeEvent, crit ethereum.FilterQuery) (Payload, error) {
|
||||||
|
var accountDiffs []AccountDiff
|
||||||
|
block := event.Block
|
||||||
|
// Iterate over state changes to build AccountDiffs
|
||||||
|
for addr, modifiedAccount := range event.StateChanges {
|
||||||
|
if len(crit.Addresses) > 0 && !includes(crit.Addresses, addr) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
a, err := buildAccountDiff(addr, modifiedAccount)
|
||||||
|
if err != nil {
|
||||||
|
return emptyPayload, err
|
||||||
|
}
|
||||||
|
|
||||||
|
accountDiffs = append(accountDiffs, a)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(accountDiffs) == 0 {
|
||||||
|
return emptyPayload, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
stateDiff := StateDiff{
|
||||||
|
BlockNumber: block.Number(),
|
||||||
|
BlockHash: block.Hash(),
|
||||||
|
UpdatedAccounts: accountDiffs,
|
||||||
|
}
|
||||||
|
|
||||||
|
stateDiffRlp, err := rlp.EncodeToBytes(stateDiff)
|
||||||
|
if err != nil {
|
||||||
|
return emptyPayload, err
|
||||||
|
}
|
||||||
|
payload := Payload{
|
||||||
|
StateDiffRlp: stateDiffRlp,
|
||||||
|
}
|
||||||
|
|
||||||
|
return payload, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// buildAccountDiff
|
||||||
|
func buildAccountDiff(addr common.Address, modifiedAccount state.ModifiedAccount) (AccountDiff, error) {
|
||||||
|
emptyAccountDiff := AccountDiff{}
|
||||||
|
accountBytes, err := rlp.EncodeToBytes(modifiedAccount.Account)
|
||||||
|
if err != nil {
|
||||||
|
return emptyAccountDiff, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var storageDiffs []StorageDiff
|
||||||
|
for k, v := range modifiedAccount.Storage {
|
||||||
|
// Storage diff value should be an RLP object too
|
||||||
|
encodedValueRlp, err := rlp.EncodeToBytes(v[:])
|
||||||
|
if err != nil {
|
||||||
|
return emptyAccountDiff, err
|
||||||
|
}
|
||||||
|
storageKey := k
|
||||||
|
diff := StorageDiff{
|
||||||
|
Key: storageKey[:],
|
||||||
|
Value: encodedValueRlp,
|
||||||
|
}
|
||||||
|
storageDiffs = append(storageDiffs, diff)
|
||||||
|
}
|
||||||
|
|
||||||
|
address := addr
|
||||||
|
return AccountDiff{
|
||||||
|
Key: address[:],
|
||||||
|
Value: accountBytes,
|
||||||
|
Storage: storageDiffs,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func isPayloadEmpty(payload Payload) bool {
|
||||||
|
return reflect.DeepEqual(payload, emptyPayload)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Payload packages the data to send to statediff subscriptions
|
||||||
|
type Payload struct {
|
||||||
|
StateDiffRlp []byte `json:"stateDiff" gencodec:"required"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// StateDiff is the final output structure from the builder
|
||||||
|
type StateDiff struct {
|
||||||
|
BlockNumber *big.Int `json:"blockNumber" gencodec:"required"`
|
||||||
|
BlockHash common.Hash `json:"blockHash" gencodec:"required"`
|
||||||
|
UpdatedAccounts []AccountDiff `json:"updatedAccounts" gencodec:"required"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// AccountDiff holds the data for a single state diff node
|
||||||
|
type AccountDiff struct {
|
||||||
|
Key []byte `json:"key" gencodec:"required"`
|
||||||
|
Value []byte `json:"value" gencodec:"required"`
|
||||||
|
Storage []StorageDiff `json:"storage" gencodec:"required"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// StorageDiff holds the data for a single storage diff node
|
||||||
|
type StorageDiff struct {
|
||||||
|
Key []byte `json:"key" gencodec:"required"`
|
||||||
|
Value []byte `json:"value" gencodec:"required"`
|
||||||
|
}
|
@ -28,6 +28,7 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||||
"github.com/ethereum/go-ethereum/core/types"
|
"github.com/ethereum/go-ethereum/core/types"
|
||||||
|
"github.com/ethereum/go-ethereum/eth/filters"
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
"github.com/ethereum/go-ethereum/rpc"
|
"github.com/ethereum/go-ethereum/rpc"
|
||||||
)
|
)
|
||||||
@ -371,6 +372,15 @@ func (ec *Client) NonceAt(ctx context.Context, account common.Address, blockNumb
|
|||||||
return uint64(result), err
|
return uint64(result), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SubscribeNewStateChanges subscribes to notifications about the state change events on the given channel.
|
||||||
|
func (ec *Client) SubscribeNewStateChanges(ctx context.Context, q ethereum.FilterQuery, ch chan<- filters.Payload) (ethereum.Subscription, error) {
|
||||||
|
arg, err := toFilterArg(q)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return ec.c.EthSubscribe(ctx, ch, "newStateChanges", arg)
|
||||||
|
}
|
||||||
|
|
||||||
// Filters
|
// Filters
|
||||||
|
|
||||||
// FilterLogs executes a filter query.
|
// FilterLogs executes a filter query.
|
||||||
|
@ -63,6 +63,7 @@ type Backend interface {
|
|||||||
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
|
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
|
||||||
SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
|
SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
|
||||||
SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription
|
SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription
|
||||||
|
SubscribeStateChangeEvent(ch chan<- core.StateChangeEvent) event.Subscription
|
||||||
|
|
||||||
// Transaction pool API
|
// Transaction pool API
|
||||||
SendTx(ctx context.Context, signedTx *types.Transaction) error
|
SendTx(ctx context.Context, signedTx *types.Transaction) error
|
||||||
|
@ -234,6 +234,10 @@ func (b *LesApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEven
|
|||||||
return b.eth.blockchain.SubscribeRemovedLogsEvent(ch)
|
return b.eth.blockchain.SubscribeRemovedLogsEvent(ch)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *LesApiBackend) SubscribeStateChangeEvent(ch chan<- core.StateChangeEvent) event.Subscription {
|
||||||
|
return b.eth.blockchain.SubscribeStateChangeEvent(ch)
|
||||||
|
}
|
||||||
|
|
||||||
func (b *LesApiBackend) Downloader() *downloader.Downloader {
|
func (b *LesApiBackend) Downloader() *downloader.Downloader {
|
||||||
return b.eth.Downloader()
|
return b.eth.Downloader()
|
||||||
}
|
}
|
||||||
|
@ -557,6 +557,12 @@ func (lc *LightChain) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent)
|
|||||||
return lc.scope.Track(new(event.Feed).Subscribe(ch))
|
return lc.scope.Track(new(event.Feed).Subscribe(ch))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SubscribeStateChangeEvent implements the interface of filters.Backend
|
||||||
|
// LightChain does not send core.StateChangeEvent, so return an empty subscription.
|
||||||
|
func (lc *LightChain) SubscribeStateChangeEvent(ch chan<- core.StateChangeEvent) event.Subscription {
|
||||||
|
return lc.scope.Track(new(event.Feed).Subscribe(ch))
|
||||||
|
}
|
||||||
|
|
||||||
// DisableCheckFreq disables header validation. This is used for ultralight mode.
|
// DisableCheckFreq disables header validation. This is used for ultralight mode.
|
||||||
func (lc *LightChain) DisableCheckFreq() {
|
func (lc *LightChain) DisableCheckFreq() {
|
||||||
atomic.StoreInt32(&lc.disableCheckFreq, 1)
|
atomic.StoreInt32(&lc.disableCheckFreq, 1)
|
||||||
|
@ -218,7 +218,7 @@ func MakePreState(db ethdb.Database, accounts core.GenesisAlloc, snapshotter boo
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Commit and re-open to start with a clean state.
|
// Commit and re-open to start with a clean state.
|
||||||
root, _ := statedb.Commit(false)
|
root, _, _ := statedb.Commit(false)
|
||||||
|
|
||||||
var snaps *snapshot.Tree
|
var snaps *snapshot.Tree
|
||||||
if snapshotter {
|
if snapshotter {
|
||||||
|
Loading…
Reference in New Issue
Block a user