Cleanup statedb test #23

Closed
paytonrules wants to merge 14 commits from cleanup-statedb-test into master
24 changed files with 791 additions and 183 deletions

View File

@ -166,77 +166,6 @@ jobs:
- go run build/ci.go xgo --alltools -- --targets=linux/mips64le --ldflags '-extldflags "-static"' -v
- for bin in build/bin/*-linux-mips64le; do mv -f "${bin}" "${bin/-linux-mips64le/}"; done
- go run build/ci.go archive -arch mips64le -type tar -signer LINUX_SIGNING_KEY -upload gethstore/builds
# This builder does the Android Maven and Azure uploads
- stage: build
if: type = push
os: linux
dist: xenial
addons:
apt:
packages:
- oracle-java8-installer
- oracle-java8-set-default
language: android
android:
components:
- platform-tools
- tools
- android-15
- android-19
- android-24
env:
- azure-android
- maven-android
- GO111MODULE=on
git:
submodules: false # avoid cloning ethereum/tests
before_install:
- curl https://dl.google.com/go/go1.14.2.linux-amd64.tar.gz | tar -xz
- export PATH=`pwd`/go/bin:$PATH
- export GOROOT=`pwd`/go
- export GOPATH=$HOME/go
script:
# Build the Android archive and upload it to Maven Central and Azure
- curl https://dl.google.com/android/repository/android-ndk-r19b-linux-x86_64.zip -o android-ndk-r19b.zip
- unzip -q android-ndk-r19b.zip && rm android-ndk-r19b.zip
- mv android-ndk-r19b $ANDROID_HOME/ndk-bundle
- mkdir -p $GOPATH/src/github.com/ethereum
- ln -s `pwd` $GOPATH/src/github.com/ethereum/go-ethereum
- go run build/ci.go aar -signer ANDROID_SIGNING_KEY -deploy https://oss.sonatype.org -upload gethstore/builds
# This builder does the OSX Azure, iOS CocoaPods and iOS Azure uploads
- stage: build
if: type = push
os: osx
go: 1.14.x
env:
- azure-osx
- azure-ios
- cocoapods-ios
- GO111MODULE=on
git:
submodules: false # avoid cloning ethereum/tests
script:
- go run build/ci.go install
- go run build/ci.go archive -type tar -signer OSX_SIGNING_KEY -upload gethstore/builds
# Build the iOS framework and upload it to CocoaPods and Azure
- gem uninstall cocoapods -a -x
- gem install cocoapods
- mv ~/.cocoapods/repos/master ~/.cocoapods/repos/master.bak
- sed -i '.bak' 's/repo.join/!repo.join/g' $(dirname `gem which cocoapods`)/cocoapods/sources_manager.rb
- if [ "$TRAVIS_PULL_REQUEST" = "false" ]; then git clone --depth=1 https://github.com/CocoaPods/Specs.git ~/.cocoapods/repos/master && pod setup --verbose; fi
- xctool -version
- xcrun simctl list
# Workaround for https://github.com/golang/go/issues/23749
- export CGO_CFLAGS_ALLOW='-fmodules|-fblocks|-fobjc-arc'
- go run build/ci.go xcode -signer IOS_SIGNING_KEY -deploy trunk -upload gethstore/builds
# This builder does the Azure archive purges to avoid accumulating junk
- stage: build
if: type = cron

View File

@ -777,6 +777,10 @@ func (fb *filterBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event
return nullSubscription()
}
func (fb *filterBackend) SubscribeStateChangeEvent(ch chan<- core.StateChangeEvent) event.Subscription {
return nullSubscription()
}
func (fb *filterBackend) BloomStatus() (uint64, uint64) { return 4096, 0 }
func (fb *filterBackend) ServiceFilter(ctx context.Context, ms *bloombits.MatcherSession) {

View File

@ -686,7 +686,7 @@ func (api *RetestethAPI) AccountRange(ctx context.Context,
root = statedb.IntermediateRoot(vmenv.ChainConfig().IsEIP158(block.Number()))
if idx == int(txIndex) {
// This is to make sure root can be opened by OpenTrie
root, err = statedb.Commit(api.chainConfig.IsEIP158(block.Number()))
root, _, err = statedb.Commit(api.chainConfig.IsEIP158(block.Number()))
if err != nil {
return AccountRangeResult{}, err
}
@ -796,7 +796,7 @@ func (api *RetestethAPI) StorageRangeAt(ctx context.Context,
_ = statedb.IntermediateRoot(vmenv.ChainConfig().IsEIP158(block.Number()))
if idx == int(txIndex) {
// This is to make sure root can be opened by OpenTrie
_, err = statedb.Commit(vmenv.ChainConfig().IsEIP158(block.Number()))
_, _, err = statedb.Commit(vmenv.ChainConfig().IsEIP158(block.Number()))
if err != nil {
return StorageRangeResult{}, err
}

View File

@ -155,15 +155,16 @@ type BlockChain struct {
// * nil: disable tx reindexer/deleter, but still index new blocks
txLookupLimit uint64
hc *HeaderChain
rmLogsFeed event.Feed
chainFeed event.Feed
chainSideFeed event.Feed
chainHeadFeed event.Feed
logsFeed event.Feed
blockProcFeed event.Feed
scope event.SubscriptionScope
genesisBlock *types.Block
hc *HeaderChain
rmLogsFeed event.Feed
chainFeed event.Feed
chainSideFeed event.Feed
chainHeadFeed event.Feed
stateChangeEventFeed event.Feed
logsFeed event.Feed
blockProcFeed event.Feed
scope event.SubscriptionScope
genesisBlock *types.Block
chainmu sync.RWMutex // blockchain insertion lock
@ -1434,10 +1435,14 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
log.Crit("Failed to write block into disk", "err", err)
}
// Commit all cached state changes into underlying memory database.
root, err := state.Commit(bc.chainConfig.IsEIP158(block.Number()))
if err != nil {
return NonStatTy, err
root, stateChanges, commitErr := state.Commit(bc.chainConfig.IsEIP158(block.Number()))
log.Debug("Sending StateChangeEvent to the feed", "block number", block.Number(), "count", len(stateChanges))
bc.stateChangeEventFeed.Send(StateChangeEvent{block, stateChanges})
if commitErr != nil {
return NonStatTy, commitErr
}
triedb := bc.stateCache.TrieDB()
// If we're running an archive node, always flush
@ -2457,3 +2462,8 @@ func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript
func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription {
return bc.scope.Track(bc.blockProcFeed.Subscribe(ch))
}
// SubscribeStateChangeEvent registers a subscription StateChangeEvent.
func (bc *BlockChain) SubscribeStateChangeEvent(ch chan<- StateChangeEvent) event.Subscription {
return bc.scope.Track(bc.stateChangeEventFeed.Subscribe(ch))
}

View File

@ -22,6 +22,7 @@ import (
"math/big"
"math/rand"
"os"
"reflect"
"sync"
"testing"
"time"
@ -3028,3 +3029,79 @@ func TestInitThenFailCreateContract(t *testing.T) {
}
}
}
func TestSendingStateChangeEvents(t *testing.T) { testSendingStateChangeEvents(t, 3) }
func testSendingStateChangeEvents(t *testing.T, numberOfEventsToSend int) {
var (
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
key2, _ = crypto.HexToECDSA("8a1f9a8f95be41cd7ccb6168179afb4504aefe388d1e14474d32c45c72ce7b7a")
addr2 = crypto.PubkeyToAddress(key2.PublicKey)
db = rawdb.NewMemoryDatabase()
gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{addr1: {Balance: big.NewInt(10000000000000)}}}
genesis = gspec.MustCommit(db)
signer = types.NewEIP155Signer(gspec.Config.ChainID)
newEventCh = make(chan bool)
)
blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil, nil)
defer blockchain.Stop()
stateChangeCh := make(chan StateChangeEvent)
blockchain.SubscribeStateChangeEvent(stateChangeCh)
// create numberOfEventsToSend blocks that include State Changes
chain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, numberOfEventsToSend, func(i int, block *BlockGen) {
tx, _ := types.SignTx(types.NewTransaction(block.TxNonce(addr1), addr2, big.NewInt(10000), params.TxGas, nil, nil), signer, key1)
block.AddTx(tx)
})
// Spawn a goroutine to receive state change events
go validateEventsReceived(t, stateChangeCh, newEventCh, numberOfEventsToSend)
if _, err := blockchain.InsertChain(chain); err != nil {
t.Fatalf("failed to insert chain: %v", err)
}
if !<-newEventCh {
t.Fatal("failed to receive new event")
}
}
func validateEventsReceived(t *testing.T, sink interface{}, result chan bool, expect int) {
chanval := reflect.ValueOf(sink)
chantyp := chanval.Type()
if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.RecvDir == 0 {
t.Fatalf("invalid channel, given type %v", chantyp)
}
cnt := 0
var recv []reflect.Value
timeout := time.After(4 * time.Second)
cases := []reflect.SelectCase{{Chan: chanval, Dir: reflect.SelectRecv}, {Chan: reflect.ValueOf(timeout), Dir: reflect.SelectRecv}}
for {
chose, v, _ := reflect.Select(cases)
if chose == 1 {
t.Log("got a timeout")
// Not enough event received
result <- false
return
}
cnt += 1
recv = append(recv, v)
if cnt == expect {
t.Log("got an event")
break
}
}
done := time.After(50 * time.Millisecond)
cases = cases[:1]
cases = append(cases, reflect.SelectCase{Chan: reflect.ValueOf(done), Dir: reflect.SelectRecv})
chose, _, _ := reflect.Select(cases)
// If chose equal 0, it means receiving redundant events.
if chose == 1 {
t.Log("is done")
result <- true
} else {
t.Log("got too many events")
result <- false
}
}

View File

@ -216,7 +216,7 @@ func GenerateChain(config *params.ChainConfig, parent *types.Block, engine conse
block, _ := b.engine.FinalizeAndAssemble(chainreader, b.header, statedb, b.txs, b.uncles, b.receipts)
// Write state changes to db
root, err := statedb.Commit(config.IsEIP158(b.header.Number))
root, _, err := statedb.Commit(config.IsEIP158(b.header.Number))
if err != nil {
panic(fmt.Sprintf("state write error: %v", err))
}

View File

@ -18,6 +18,7 @@ package core
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
)
@ -36,6 +37,11 @@ type ChainEvent struct {
Logs []*types.Log
}
type StateChangeEvent struct {
*types.Block
state.StateChanges
}
type ChainSideEvent struct {
Block *types.Block
}

View File

@ -81,6 +81,7 @@ type stateObject struct {
originStorage Storage // Storage cache of original entries to dedup rewrites, reset for every transaction
pendingStorage Storage // Storage entries that need to be flushed to disk, at the end of an entire block
diffStorage Storage // Storage entries that need to be emitted with stateDiffs
dirtyStorage Storage // Storage entries that have been modified in the current transaction execution
fakeStorage Storage // Fake storage which constructed by caller for debugging purpose.
@ -125,6 +126,7 @@ func newObject(db *StateDB, address common.Address, data Account) *stateObject {
originStorage: make(Storage),
pendingStorage: make(Storage),
dirtyStorage: make(Storage),
diffStorage: make(Storage),
}
}
@ -321,6 +323,7 @@ func (s *stateObject) updateTrie(db Database) Trie {
continue
}
s.originStorage[key] = value
s.diffStorage[key] = value
var v []byte
if (value == common.Hash{}) {
@ -423,6 +426,7 @@ func (s *stateObject) deepCopy(db *StateDB) *stateObject {
stateObject.dirtyStorage = s.dirtyStorage.Copy()
stateObject.originStorage = s.originStorage.Copy()
stateObject.pendingStorage = s.pendingStorage.Copy()
stateObject.diffStorage = s.diffStorage.Copy()
stateObject.suicided = s.suicided
stateObject.dirtyCode = s.dirtyCode
stateObject.deleted = s.deleted

View File

@ -167,7 +167,7 @@ func TestSnapshot2(t *testing.T) {
so0.deleted = false
state.setStateObject(so0)
root, _ := state.Commit(false)
root, _, _ := state.Commit(false)
state.Reset(root)
// and one with deleted == true

View File

@ -802,28 +802,52 @@ func (s *StateDB) clearJournalAndRefund() {
s.validRevisions = s.validRevisions[:0] // Snapshots can be created without journal entires
}
// ModifiedAccount is an Account and it's Storage trie that has changed in the current block.
type ModifiedAccount struct {
Account
Storage
}
// StateChanges are a map between an Account's address to it's ModifiedAccount.
type StateChanges map[common.Address]ModifiedAccount
// Commit writes the state to the underlying in-memory trie database.
func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) {
func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, StateChanges, error) {
if s.dbErr != nil {
return common.Hash{}, fmt.Errorf("commit aborted due to earlier error: %v", s.dbErr)
return common.Hash{}, nil, fmt.Errorf("commit aborted due to earlier error: %v", s.dbErr)
}
// Finalize any pending changes and merge everything into the tries
s.IntermediateRoot(deleteEmptyObjects)
stateChanges := make(StateChanges)
// Commit objects to the trie, measuring the elapsed time
for addr := range s.stateObjectsDirty {
modifiedAccount := ModifiedAccount{}
if obj := s.stateObjects[addr]; !obj.deleted {
// Write any contract code associated with the state object
if obj.code != nil && obj.dirtyCode {
s.db.TrieDB().InsertBlob(common.BytesToHash(obj.CodeHash()), obj.code)
obj.dirtyCode = false
}
// Add the account to the modifiedAccounts map
modifiedAccount = ModifiedAccount{Account: obj.data}
// Add the diff storage to the modifiedAccounts map
modifiedAccount.Storage = obj.diffStorage
obj.diffStorage = make(Storage)
// Write any storage changes in the state object to its storage trie
if err := obj.CommitTrie(s.db); err != nil {
return common.Hash{}, err
return common.Hash{}, StateChanges{}, err
}
}
stateChanges[addr] = modifiedAccount
}
if len(s.stateObjectsDirty) > 0 {
s.stateObjectsDirty = make(map[common.Address]struct{})
}
@ -867,5 +891,6 @@ func (s *StateDB) Commit(deleteEmptyObjects bool) (common.Hash, error) {
}
s.snap, s.snapDestructs, s.snapAccounts, s.snapStorage = nil, nil, nil, nil
}
return root, err
return root, stateChanges, err
}

View File

@ -67,6 +67,109 @@ func TestUpdateLeaks(t *testing.T) {
it.Release()
}
func TestStateChangesEmittedFromCommit(t *testing.T) {
// Create an empty state database
db := rawdb.NewMemoryDatabase()
state, _ := New(common.Hash{}, NewDatabase(db), nil)
addr1 := common.BytesToAddress([]byte{1, 2, 3, 4})
addr2 := common.BytesToAddress([]byte{5, 6, 7, 8})
expectedModifiedAccount1 := getNewModifiedAccount()
expectedModifiedAccount2 := getNewModifiedAccount()
// Commit 1
// set balance and storage for addr1
addr1Balance := big.NewInt(rand.Int63())
state.SetBalance(addr1, addr1Balance)
expectedModifiedAccount1.Account.Balance = addr1Balance
storageKey := common.BytesToHash([]byte{1})
storageValue := common.BytesToHash([]byte{1, 2, 3})
state.SetState(addr1, storageKey, storageValue)
expectedModifiedAccount1.Storage[storageKey] = storageValue
// set balance for addr2
addr2Balance := big.NewInt(rand.Int63())
state.SetBalance(addr2, addr2Balance)
expectedModifiedAccount2.Account.Balance = addr2Balance
expectedStateChangesOne := make(StateChanges)
expectedStateChangesOne[addr1] = expectedModifiedAccount1
expectedStateChangesOne[addr2] = expectedModifiedAccount2
_, actualStateChangesOne, commitErr := state.Commit(false)
if commitErr != nil {
t.Errorf("error committing to statedb")
}
assertStateChanges(actualStateChangesOne, expectedStateChangesOne, t)
// Commit 2: Update existing accounts
expectedStateChangesTwo := make(StateChanges)
// update balance and storage for addr1
newBalanceToAdd := big.NewInt(100)
state.AddBalance(addr1, newBalanceToAdd)
newAddr1Balance := newBalanceToAdd.Int64() + addr1Balance.Int64()
expectedModifiedAccount1.Account.Balance = big.NewInt(newAddr1Balance)
updatedStorageValue := common.BytesToHash([]byte{0}) // setting storage value back to zero to make sure that we're capturing zero value diffs
state.SetState(addr1, storageKey, updatedStorageValue)
expectedModifiedAccount1.Storage[storageKey] = updatedStorageValue
expectedStateChangesTwo[addr1] = expectedModifiedAccount1
_, stateChangesTwo, commitTwoErr := state.Commit(false)
if commitTwoErr != nil {
t.Errorf("error committing to statedb")
}
assertStateChanges(stateChangesTwo, expectedStateChangesTwo, t)
}
func getNewModifiedAccount() ModifiedAccount {
return ModifiedAccount{
Storage: make(map[common.Hash]common.Hash),
Account: Account{
Nonce: 0,
Balance: big.NewInt(0),
Root: emptyRoot,
CodeHash: emptyCodeHash,
},
}
}
func assertStateChanges(actualChanges, expectedChanges StateChanges, t *testing.T) {
if len(actualChanges) != len(expectedChanges) {
t.Error("Test failure:", t.Name())
t.Logf("Mismatch in number of StateChanges.ModifiedAccounts. actual: %v, expected: %v", len(actualChanges), len(expectedChanges))
}
for addr, account := range actualChanges {
expected := expectedChanges[addr]
if !reflect.DeepEqual(account.Nonce, expected.Nonce) {
t.Error("Test failure:", t.Name())
t.Errorf("Account Nonce does not match expected. actual: %v, expected: %v", account.Nonce, expected.Nonce)
}
if !reflect.DeepEqual(account.Balance, expected.Balance) {
t.Error("Test failure:", t.Name())
t.Errorf("Account Balance does not match expected. actual: %v, expected: %v", account.Balance, expected.Balance)
}
if !reflect.DeepEqual(account.CodeHash, expected.CodeHash) {
t.Error("Test failure:", t.Name())
t.Errorf("Account CodeHash does not match expected. actual: %v, expected: %v", account.CodeHash, expected.CodeHash)
}
// Not asserting on the Account's Root because it's not easy to get the root between `SetState` calls
if !reflect.DeepEqual(account.Storage, expected.Storage) {
t.Error("Test failure:", t.Name())
t.Errorf("Account Storage does not match expected. actual: %v, expected: %v", account.Storage, expected.Storage)
}
}
}
// Tests that no intermediate state of an object is stored into the database,
// only the one right before the commit.
func TestIntermediateLeaks(t *testing.T) {
@ -102,7 +205,7 @@ func TestIntermediateLeaks(t *testing.T) {
}
// Commit and cross check the databases.
transRoot, err := transState.Commit(false)
transRoot, _, err := transState.Commit(false)
if err != nil {
t.Fatalf("failed to commit transition state: %v", err)
}
@ -110,7 +213,7 @@ func TestIntermediateLeaks(t *testing.T) {
t.Errorf("can not commit trie %v to persistent database", transRoot.Hex())
}
finalRoot, err := finalState.Commit(false)
finalRoot, _, err := finalState.Commit(false)
if err != nil {
t.Fatalf("failed to commit final state: %v", err)
}
@ -459,7 +562,7 @@ func (test *snapshotTest) checkEqual(state, checkstate *StateDB) error {
func TestTouchDelete(t *testing.T) {
s := newStateTest()
s.state.GetOrNewStateObject(common.Address{})
root, _ := s.state.Commit(false)
root, _, _ := s.state.Commit(false)
s.state.Reset(root)
snapshot := s.state.Snapshot()
@ -661,7 +764,7 @@ func TestDeleteCreateRevert(t *testing.T) {
addr := toAddr([]byte("so"))
state.SetBalance(addr, big.NewInt(1))
root, _ := state.Commit(false)
root, _, _ := state.Commit(false)
state.Reset(root)
// Simulate self-destructing in one transaction, then create-reverting in another
@ -673,7 +776,7 @@ func TestDeleteCreateRevert(t *testing.T) {
state.RevertToSnapshot(id)
// Commit the entire state and make sure we don't crash and have the correct state
root, _ = state.Commit(true)
root, _, _ = state.Commit(true)
state.Reset(root)
if state.getStateObject(addr) != nil {
@ -698,7 +801,7 @@ func TestMissingTrieNodes(t *testing.T) {
a2 := toAddr([]byte("another"))
state.SetBalance(a2, big.NewInt(100))
state.SetCode(a2, []byte{1, 2, 4})
root, _ = state.Commit(false)
root, _, _ = state.Commit(false)
t.Logf("root: %x", root)
// force-flush
state.Database().TrieDB().Cap(0)
@ -722,7 +825,7 @@ func TestMissingTrieNodes(t *testing.T) {
}
// Modify the state
state.SetBalance(addr, big.NewInt(2))
root, err := state.Commit(false)
root, _, err := state.Commit(false)
if err == nil {
t.Fatalf("expected error, got root :%x", root)
}

View File

@ -62,7 +62,7 @@ func makeTestState() (Database, common.Hash, []*testAccount) {
state.updateStateObject(obj)
accounts = append(accounts, acc)
}
root, _ := state.Commit(false)
root, _, _ := state.Commit(false)
// Return the generated state
return db, root, accounts

View File

@ -220,6 +220,10 @@ func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscri
return b.eth.BlockChain().SubscribeLogsEvent(ch)
}
func (b *EthAPIBackend) SubscribeStateChangeEvent(ch chan<- core.StateChangeEvent) event.Subscription {
return b.eth.BlockChain().SubscribeStateChangeEvent(ch)
}
func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
return b.eth.txPool.AddLocal(signedTx)
}

View File

@ -295,7 +295,7 @@ func (api *PrivateDebugAPI) traceChain(ctx context.Context, start, end *types.Bl
break
}
// Finalize the state so any modifications are written to the trie
root, err := statedb.Commit(api.eth.blockchain.Config().IsEIP158(block.Number()))
root, _, err := statedb.Commit(api.eth.blockchain.Config().IsEIP158(block.Number()))
if err != nil {
failed = err
break
@ -681,7 +681,7 @@ func (api *PrivateDebugAPI) computeStateDB(block *types.Block, reexec uint64) (*
return nil, fmt.Errorf("processing block %d failed: %v", block.NumberU64(), err)
}
// Finalize the state so any modifications are written to the trie
root, err := statedb.Commit(api.eth.blockchain.Config().IsEIP158(block.Number()))
root, _, err := statedb.Commit(api.eth.blockchain.Config().IsEIP158(block.Number()))
if err != nil {
return nil, err
}

View File

@ -25,7 +25,7 @@ import (
"sync"
"time"
ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
@ -233,6 +233,35 @@ func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, er
return rpcSub, nil
}
func (api *PublicFilterAPI) NewStateChanges(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
rpcSub := notifier.CreateSubscription()
go func() {
stateChanges := make(chan Payload)
stateChangeSub := api.events.SubscribeStateChanges(ethereum.FilterQuery(crit), stateChanges)
for {
select {
case s := <-stateChanges:
notifier.Notify(rpcSub.ID, s)
case <-rpcSub.Err():
stateChangeSub.Unsubscribe()
return
case <-notifier.Closed():
stateChangeSub.Unsubscribe()
return
}
}
}()
return rpcSub, nil
}
// Logs creates a subscription that fires for all new log that match the given filter criteria.
func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)

View File

@ -42,6 +42,7 @@ type Backend interface {
SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription
SubscribeStateChangeEvent(ch chan<- core.StateChangeEvent) event.Subscription
BloomStatus() (uint64, uint64)
ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)

View File

@ -24,7 +24,7 @@ import (
"sync"
"time"
ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
@ -52,6 +52,8 @@ const (
PendingTransactionsSubscription
// BlocksSubscription queries hashes for blocks that are imported
BlocksSubscription
// StateChangeSubscription queries for state changes between blocks being imported
StateChangeSubscription
// LastSubscription keeps track of the last index
LastIndexSubscription
)
@ -66,18 +68,21 @@ const (
logsChanSize = 10
// chainEvChanSize is the size of channel listening to ChainEvent.
chainEvChanSize = 10
// stateChangeChanSize is the size of the channel listening to StateChangeEvent.
stateChangeChanSize = 10
)
type subscription struct {
id rpc.ID
typ Type
created time.Time
logsCrit ethereum.FilterQuery
logs chan []*types.Log
hashes chan []common.Hash
headers chan *types.Header
installed chan struct{} // closed when the filter is installed
err chan error // closed when the filter is uninstalled
id rpc.ID
typ Type
created time.Time
filterCrit ethereum.FilterQuery
logs chan []*types.Log
hashes chan []common.Hash
headers chan *types.Header
stateChangePayloads chan Payload
installed chan struct{} // closed when the filter is installed
err chan error // closed when the filter is uninstalled
}
// EventSystem creates subscriptions, processes events and broadcasts them to the
@ -88,20 +93,22 @@ type EventSystem struct {
lastHead *types.Header
// Subscriptions
txsSub event.Subscription // Subscription for new transaction event
logsSub event.Subscription // Subscription for new log event
rmLogsSub event.Subscription // Subscription for removed log event
pendingLogsSub event.Subscription // Subscription for pending log event
chainSub event.Subscription // Subscription for new chain event
txsSub event.Subscription // Subscription for new transaction event
logsSub event.Subscription // Subscription for new log event
rmLogsSub event.Subscription // Subscription for removed log event
pendingLogsSub event.Subscription // Subscription for pending log event
chainSub event.Subscription // Subscription for new chain event
stateChangeEventSub event.Subscription // Subscription for new state change event
// Channels
install chan *subscription // install filter for event notification
uninstall chan *subscription // remove filter for event notification
txsCh chan core.NewTxsEvent // Channel to receive new transactions event
logsCh chan []*types.Log // Channel to receive new log event
pendingLogsCh chan []*types.Log // Channel to receive new log event
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
chainCh chan core.ChainEvent // Channel to receive new chain event
install chan *subscription // install filter for event notification
uninstall chan *subscription // remove filter for event notification
txsCh chan core.NewTxsEvent // Channel to receive new transactions event
logsCh chan []*types.Log // Channel to receive new log event
pendingLogsCh chan []*types.Log // Channel to receive new log event
rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
chainCh chan core.ChainEvent // Channel to receive new chain event
stateChangeEventChan chan core.StateChangeEvent // Channel to receive new state change event
}
// NewEventSystem creates a new manager that listens for event on the given mux,
@ -112,15 +119,16 @@ type EventSystem struct {
// or by stopping the given mux.
func NewEventSystem(backend Backend, lightMode bool) *EventSystem {
m := &EventSystem{
backend: backend,
lightMode: lightMode,
install: make(chan *subscription),
uninstall: make(chan *subscription),
txsCh: make(chan core.NewTxsEvent, txChanSize),
logsCh: make(chan []*types.Log, logsChanSize),
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
pendingLogsCh: make(chan []*types.Log, logsChanSize),
chainCh: make(chan core.ChainEvent, chainEvChanSize),
backend: backend,
lightMode: lightMode,
install: make(chan *subscription),
uninstall: make(chan *subscription),
txsCh: make(chan core.NewTxsEvent, txChanSize),
logsCh: make(chan []*types.Log, logsChanSize),
rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
pendingLogsCh: make(chan []*types.Log, logsChanSize),
chainCh: make(chan core.ChainEvent, chainEvChanSize),
stateChangeEventChan: make(chan core.StateChangeEvent, stateChangeChanSize),
}
// Subscribe events
@ -129,9 +137,10 @@ func NewEventSystem(backend Backend, lightMode bool) *EventSystem {
m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh)
m.stateChangeEventSub = m.backend.SubscribeStateChangeEvent(m.stateChangeEventChan)
// Make sure none of the subscriptions are empty
if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil {
if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil || m.stateChangeEventSub == nil {
log.Crit("Subscribe for event system failed")
}
@ -167,6 +176,7 @@ func (sub *Subscription) Unsubscribe() {
case <-sub.f.logs:
case <-sub.f.hashes:
case <-sub.f.headers:
case <-sub.f.stateChangePayloads:
}
}
@ -227,15 +237,16 @@ func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
// pending logs that match the given criteria.
func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: MinedAndPendingLogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
id: rpc.NewID(),
typ: MinedAndPendingLogsSubscription,
filterCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
stateChangePayloads: make(chan Payload),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
@ -244,15 +255,16 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs
// given criteria to the given logs channel.
func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: LogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
id: rpc.NewID(),
typ: LogsSubscription,
filterCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
stateChangePayloads: make(chan Payload),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
@ -261,15 +273,16 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
// transactions that enter the transaction pool.
func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: PendingLogsSubscription,
logsCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
id: rpc.NewID(),
typ: PendingLogsSubscription,
filterCrit: crit,
created: time.Now(),
logs: logs,
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
stateChangePayloads: make(chan Payload),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
@ -278,14 +291,15 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan
// imported in the chain.
func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: BlocksSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: make(chan []common.Hash),
headers: headers,
installed: make(chan struct{}),
err: make(chan error),
id: rpc.NewID(),
typ: BlocksSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: make(chan []common.Hash),
headers: headers,
stateChangePayloads: make(chan Payload),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
@ -294,14 +308,33 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
// transactions that enter the transaction pool.
func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: PendingTransactionsSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: hashes,
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
id: rpc.NewID(),
typ: PendingTransactionsSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: hashes,
headers: make(chan *types.Header),
stateChangePayloads: make(chan Payload),
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
// SubscribeStateChanges creates a subscription that writes new state changes that are identified
// for each new block.
func (es *EventSystem) SubscribeStateChanges(crit ethereum.FilterQuery, stateChanges chan Payload) *Subscription {
sub := &subscription{
id: rpc.NewID(),
typ: StateChangeSubscription,
filterCrit: crit,
created: time.Now(),
logs: make(chan []*types.Log),
hashes: make(chan []common.Hash),
headers: make(chan *types.Header),
stateChangePayloads: stateChanges,
installed: make(chan struct{}),
err: make(chan error),
}
return es.subscribe(sub)
}
@ -313,7 +346,7 @@ func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) {
return
}
for _, f := range filters[LogsSubscription] {
matchedLogs := filterLogs(ev, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
matchedLogs := filterLogs(ev, f.filterCrit.FromBlock, f.filterCrit.ToBlock, f.filterCrit.Addresses, f.filterCrit.Topics)
if len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
@ -325,7 +358,7 @@ func (es *EventSystem) handlePendingLogs(filters filterIndex, ev []*types.Log) {
return
}
for _, f := range filters[PendingLogsSubscription] {
matchedLogs := filterLogs(ev, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
matchedLogs := filterLogs(ev, nil, f.filterCrit.ToBlock, f.filterCrit.Addresses, f.filterCrit.Topics)
if len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
@ -334,7 +367,7 @@ func (es *EventSystem) handlePendingLogs(filters filterIndex, ev []*types.Log) {
func (es *EventSystem) handleRemovedLogs(filters filterIndex, ev core.RemovedLogsEvent) {
for _, f := range filters[LogsSubscription] {
matchedLogs := filterLogs(ev.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics)
matchedLogs := filterLogs(ev.Logs, f.filterCrit.FromBlock, f.filterCrit.ToBlock, f.filterCrit.Addresses, f.filterCrit.Topics)
if len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
@ -358,7 +391,7 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent)
if es.lightMode && len(filters[LogsSubscription]) > 0 {
es.lightFilterNewHead(ev.Block.Header(), func(header *types.Header, remove bool) {
for _, f := range filters[LogsSubscription] {
if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
if matchedLogs := es.lightFilterLogs(header, f.filterCrit.Addresses, f.filterCrit.Topics, remove); len(matchedLogs) > 0 {
f.logs <- matchedLogs
}
}
@ -366,6 +399,20 @@ func (es *EventSystem) handleChainEvent(filters filterIndex, ev core.ChainEvent)
}
}
func (es *EventSystem) handleStateChangeEvent(filters filterIndex, ev core.StateChangeEvent) {
for _, f := range filters[StateChangeSubscription] {
payload, processingErr := processStateChanges(ev, f.filterCrit)
if processingErr != nil {
f.err <- processingErr
}
empty := isPayloadEmpty(payload)
if !empty {
f.stateChangePayloads <- payload
}
}
}
func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func(*types.Header, bool)) {
oldh := es.lastHead
es.lastHead = newHeader
@ -448,6 +495,7 @@ func (es *EventSystem) eventLoop() {
es.rmLogsSub.Unsubscribe()
es.pendingLogsSub.Unsubscribe()
es.chainSub.Unsubscribe()
es.stateChangeEventSub.Unsubscribe()
}()
index := make(filterIndex)
@ -467,6 +515,8 @@ func (es *EventSystem) eventLoop() {
es.handlePendingLogs(index, ev)
case ev := <-es.chainCh:
es.handleChainEvent(index, ev)
case ev := <-es.stateChangeEventChan:
es.handleStateChangeEvent(index, ev)
case f := <-es.install:
if f.typ == MinedAndPendingLogsSubscription {
@ -497,6 +547,8 @@ func (es *EventSystem) eventLoop() {
return
case <-es.chainSub.Err():
return
case <-es.stateChangeEventSub.Err():
return
}
}
}

View File

@ -17,6 +17,7 @@
package filters
import (
"bytes"
"context"
"fmt"
"math/big"
@ -25,16 +26,18 @@ import (
"testing"
"time"
ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
)
@ -47,6 +50,7 @@ type testBackend struct {
rmLogsFeed event.Feed
pendingLogsFeed event.Feed
chainFeed event.Feed
stateChangeFeed event.Feed
}
func (b *testBackend) ChainDb() ethdb.Database {
@ -121,6 +125,10 @@ func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subsc
return b.chainFeed.Subscribe(ch)
}
func (b *testBackend) SubscribeStateChangeEvent(ch chan<- core.StateChangeEvent) event.Subscription {
return b.stateChangeFeed.Subscribe(ch)
}
func (b *testBackend) BloomStatus() (uint64, uint64) {
return params.BloomBitsBlocks, b.sections
}
@ -608,3 +616,225 @@ func flattenLogs(pl [][]*types.Log) []*types.Log {
}
return logs
}
func TestStateChangeSubscription(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
backend = &testBackend{db: db}
api = NewPublicFilterAPI(backend, false)
genesis = new(core.Genesis).MustCommit(db)
numberOfBlocks = 3
chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, numberOfBlocks, func(i int, gen *core.BlockGen) {})
address1 = common.HexToAddress("0x1")
address2 = common.HexToAddress("0x2")
modifiedAccount1 = state.ModifiedAccount{
Account: state.Account{
Nonce: 0,
Balance: big.NewInt(100),
Root: common.HexToHash("0x01"),
},
Storage: nil,
}
modifiedAccount2 = state.ModifiedAccount{
Account: state.Account{
Nonce: 0,
Balance: big.NewInt(200),
Root: common.HexToHash("0x02"),
},
Storage: state.Storage{common.HexToHash("0x0002"): common.HexToHash("0x00002")},
}
accountDiff1 = getAccountDiff(address1, modifiedAccount1, t)
accountDiff2 = getAccountDiff(address2, modifiedAccount2, t)
block0 = chain[0]
block1 = chain[1]
block2 = chain[2]
emptyStateChangeEvent = core.StateChangeEvent{Block: block0}
blockOneStateChangeEvent = core.StateChangeEvent{
Block: block1,
StateChanges: state.StateChanges{
address1: modifiedAccount1,
address2: modifiedAccount2,
},
}
blockTwoStateChangeEvent = core.StateChangeEvent{
Block: block2,
StateChanges: state.StateChanges{
address1: modifiedAccount1,
address2: modifiedAccount2,
},
}
testCases = []struct {
description string
stateChangeEvents []core.StateChangeEvent
expectedPayloads []Payload
crit ethereum.FilterQuery
}{
{
"when block 0 has no state changes no payload for that block is returned",
[]core.StateChangeEvent{emptyStateChangeEvent, blockOneStateChangeEvent, blockTwoStateChangeEvent},
getExpectedPayloads([]*types.Block{block1, block2}, []AccountDiff{accountDiff1, accountDiff2}, t),
ethereum.FilterQuery{},
},
{
"when watching state changes from specific contracts",
[]core.StateChangeEvent{blockOneStateChangeEvent, blockTwoStateChangeEvent, emptyStateChangeEvent},
getExpectedPayloads([]*types.Block{block1, block2}, []AccountDiff{accountDiff1}, t),
ethereum.FilterQuery{Addresses: []common.Address{address1}},
},
{
"when no specific addresses are configured all state changes are returned",
[]core.StateChangeEvent{blockOneStateChangeEvent, blockTwoStateChangeEvent, emptyStateChangeEvent},
getExpectedPayloads([]*types.Block{block1, block2}, []AccountDiff{accountDiff1, accountDiff2}, t),
ethereum.FilterQuery{},
},
}
)
for _, test := range testCases {
chan0 := make(chan Payload)
sub0 := api.events.SubscribeStateChanges(test.crit, chan0)
payloads := make([]Payload, 0, len(test.expectedPayloads))
go func() {
// simulate client
// keep this loop open for numberOfBlocks - 1 because the first block has no StateChangeEvents and should
// not send an event to the client via the subscription
for index := 0; index < numberOfBlocks-1; index++ {
payload := <-chan0
payloads = append(payloads, payload)
}
sub0.Unsubscribe()
}()
time.Sleep(1 * time.Second)
// send the state change events to the subscriber
for _, e := range test.stateChangeEvents {
backend.stateChangeFeed.Send(e)
}
<-sub0.Err()
expectedNumberOfPayloads := len(test.expectedPayloads)
if len(payloads) != expectedNumberOfPayloads {
t.Errorf("Test failure: %s: %s", t.Name(), test.description)
t.Logf("Actual number of payloads does not equal expected.\nactual: %+v\nexpected: %+v", len(payloads), expectedNumberOfPayloads)
}
for index, payload := range payloads {
var actualStateDiff, expectedStateDiff StateDiff
actualRLP := payload.StateDiffRlp
err := rlp.DecodeBytes(actualRLP, &actualStateDiff)
if err != nil {
t.Errorf("error decoding state diff RLP into state diff: %s: %s", err, test.description)
}
expectedRLP := test.expectedPayloads[index].StateDiffRlp
err = rlp.DecodeBytes(expectedRLP, &expectedStateDiff)
if err != nil {
t.Errorf("error decoding state diff RLP into state diff: %s: %s", err, test.description)
}
if expectedStateDiff.BlockNumber.Cmp(actualStateDiff.BlockNumber) != 0 {
t.Errorf("Test failure: %s: %s", t.Name(), test.description)
t.Logf("Actual payload block number equal expected.\nactual:%+v\nexpected: %+v", actualStateDiff.BlockNumber, expectedStateDiff.BlockNumber)
}
if expectedStateDiff.BlockHash != actualStateDiff.BlockHash {
t.Errorf("Test failure: %s: %s", t.Name(), test.description)
t.Logf("Actual payload block hash equal expected.\nactual:%+v\nexpected: %+v", actualStateDiff.BlockHash, expectedStateDiff.BlockHash)
}
// The updated accounts are not necessarily in the same order for the state diffs.
// Put the actual in a map for easier lookup.
actualUpdatedAccounts := make(map[string]AccountDiff)
for _, account := range actualStateDiff.UpdatedAccounts {
actualUpdatedAccounts[string(account.Key)] = account
}
for _, e := range expectedStateDiff.UpdatedAccounts {
actualUpdatedAccount := actualUpdatedAccounts[string(e.Key)]
if !bytes.Equal(actualUpdatedAccount.Key, e.Key) {
t.Errorf("Test failure: %s: %s", t.Name(), test.description)
t.Logf("Actual payload updated account key equal expected.\nactual:%+v\nexpected: %+v", actualUpdatedAccount.Key, e.Key)
}
if !bytes.Equal(actualUpdatedAccount.Value, e.Value) {
t.Errorf("Test failure: %s: %s", t.Name(), test.description)
t.Logf("Actual payload updated account value equal expected.\nactual:%+v\nexpected: %+v", actualUpdatedAccount.Value, e.Value)
}
// Store storage keys for this updatedAccount in a map to avoid errors from order
actualUpdatedStorageDiffs := make(map[string]StorageDiff)
for _, storageDiff := range actualUpdatedAccount.Storage {
actualUpdatedStorageDiffs[string(storageDiff.Key)] = storageDiff
}
for _, se := range e.Storage {
actualStorageDiff := actualUpdatedStorageDiffs[string(se.Key)]
if !bytes.Equal(actualStorageDiff.Key, se.Key) {
t.Errorf("Test failure: %s: %s", t.Name(), test.description)
t.Logf("Actual payload updated account storage key equal expected.\nactual:%+v\nexpected: %+v", actualStorageDiff.Key, se.Key)
}
if !bytes.Equal(actualStorageDiff.Value, se.Value) {
t.Errorf("Test failure: %s: %s", t.Name(), test.description)
t.Logf("Actual payload updated account storage value equal expected.\nactual:%+v\nexpected: %+v", actualStorageDiff.Value, se.Value)
}
}
}
}
}
}
func getAccountDiff(accountAddress common.Address, modifedAccount state.ModifiedAccount, t *testing.T) AccountDiff {
accountRlp, accountRlpErr := rlp.EncodeToBytes(modifedAccount.Account)
if accountRlpErr != nil {
t.Error("Test failure:", t.Name())
t.Logf("Failed to encode account diff")
}
var storageDiffs []StorageDiff
for key, value := range modifedAccount.Storage {
storageValueRlp, storageRlpErr := rlp.EncodeToBytes(value)
if storageRlpErr != nil {
t.Error("Test failure:", t.Name())
t.Logf("Failed to encode storgae diff")
}
storageDiff := StorageDiff{
Key: key[:],
Value: storageValueRlp,
}
storageDiffs = append(storageDiffs, storageDiff)
}
return AccountDiff{
Key: accountAddress[:],
Value: accountRlp,
Storage: storageDiffs,
}
}
func getExpectedPayloads(blocks []*types.Block, updatedAccounts []AccountDiff, t *testing.T) []Payload {
var payloads []Payload
for _, block := range blocks {
expectedStateDiff := StateDiff{
BlockNumber: block.Number(),
BlockHash: block.Hash(),
UpdatedAccounts: updatedAccounts,
}
expectedStateDiffRLP, err := rlp.EncodeToBytes(expectedStateDiff)
if err != nil {
t.Error("Test failure:", t.Name())
t.Logf("Failed to encode state diff to bytes")
return payloads
}
payloads = append(payloads, Payload{StateDiffRlp: expectedStateDiffRLP})
}
return payloads
}

113
eth/filters/statediff.go Normal file
View File

@ -0,0 +1,113 @@
package filters
import (
"math/big"
"reflect"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/rlp"
)
var emptyPayload Payload
// processStateChanges builds the state diff Payload from the modified accounts in the StateChangeEvent
func processStateChanges(event core.StateChangeEvent, crit ethereum.FilterQuery) (Payload, error) {
var accountDiffs []AccountDiff
block := event.Block
// Iterate over state changes to build AccountDiffs
for addr, modifiedAccount := range event.StateChanges {
if len(crit.Addresses) > 0 && !includes(crit.Addresses, addr) {
continue
}
a, err := buildAccountDiff(addr, modifiedAccount)
if err != nil {
return emptyPayload, err
}
accountDiffs = append(accountDiffs, a)
}
if len(accountDiffs) == 0 {
return emptyPayload, nil
}
stateDiff := StateDiff{
BlockNumber: block.Number(),
BlockHash: block.Hash(),
UpdatedAccounts: accountDiffs,
}
stateDiffRlp, err := rlp.EncodeToBytes(stateDiff)
if err != nil {
return emptyPayload, err
}
payload := Payload{
StateDiffRlp: stateDiffRlp,
}
return payload, nil
}
// buildAccountDiff
func buildAccountDiff(addr common.Address, modifiedAccount state.ModifiedAccount) (AccountDiff, error) {
emptyAccountDiff := AccountDiff{}
accountBytes, err := rlp.EncodeToBytes(modifiedAccount.Account)
if err != nil {
return emptyAccountDiff, err
}
var storageDiffs []StorageDiff
for k, v := range modifiedAccount.Storage {
// Storage diff value should be an RLP object too
encodedValueRlp, err := rlp.EncodeToBytes(v[:])
if err != nil {
return emptyAccountDiff, err
}
storageKey := k
diff := StorageDiff{
Key: storageKey[:],
Value: encodedValueRlp,
}
storageDiffs = append(storageDiffs, diff)
}
address := addr
return AccountDiff{
Key: address[:],
Value: accountBytes,
Storage: storageDiffs,
}, nil
}
func isPayloadEmpty(payload Payload) bool {
return reflect.DeepEqual(payload, emptyPayload)
}
// Payload packages the data to send to statediff subscriptions
type Payload struct {
StateDiffRlp []byte `json:"stateDiff" gencodec:"required"`
}
// StateDiff is the final output structure from the builder
type StateDiff struct {
BlockNumber *big.Int `json:"blockNumber" gencodec:"required"`
BlockHash common.Hash `json:"blockHash" gencodec:"required"`
UpdatedAccounts []AccountDiff `json:"updatedAccounts" gencodec:"required"`
}
// AccountDiff holds the data for a single state diff node
type AccountDiff struct {
Key []byte `json:"key" gencodec:"required"`
Value []byte `json:"value" gencodec:"required"`
Storage []StorageDiff `json:"storage" gencodec:"required"`
}
// StorageDiff holds the data for a single storage diff node
type StorageDiff struct {
Key []byte `json:"key" gencodec:"required"`
Value []byte `json:"value" gencodec:"required"`
}

View File

@ -28,6 +28,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
)
@ -371,6 +372,15 @@ func (ec *Client) NonceAt(ctx context.Context, account common.Address, blockNumb
return uint64(result), err
}
// SubscribeNewStateChanges subscribes to notifications about the state change events on the given channel.
func (ec *Client) SubscribeNewStateChanges(ctx context.Context, q ethereum.FilterQuery, ch chan<- filters.Payload) (ethereum.Subscription, error) {
arg, err := toFilterArg(q)
if err != nil {
return nil, err
}
return ec.c.EthSubscribe(ctx, ch, "newStateChanges", arg)
}
// Filters
// FilterLogs executes a filter query.

View File

@ -63,6 +63,7 @@ type Backend interface {
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription
SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription
SubscribeStateChangeEvent(ch chan<- core.StateChangeEvent) event.Subscription
// Transaction pool API
SendTx(ctx context.Context, signedTx *types.Transaction) error

View File

@ -234,6 +234,10 @@ func (b *LesApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEven
return b.eth.blockchain.SubscribeRemovedLogsEvent(ch)
}
func (b *LesApiBackend) SubscribeStateChangeEvent(ch chan<- core.StateChangeEvent) event.Subscription {
return b.eth.blockchain.SubscribeStateChangeEvent(ch)
}
func (b *LesApiBackend) Downloader() *downloader.Downloader {
return b.eth.Downloader()
}

View File

@ -557,6 +557,12 @@ func (lc *LightChain) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent)
return lc.scope.Track(new(event.Feed).Subscribe(ch))
}
// SubscribeStateChangeEvent implements the interface of filters.Backend
// LightChain does not send core.StateChangeEvent, so return an empty subscription.
func (lc *LightChain) SubscribeStateChangeEvent(ch chan<- core.StateChangeEvent) event.Subscription {
return lc.scope.Track(new(event.Feed).Subscribe(ch))
}
// DisableCheckFreq disables header validation. This is used for ultralight mode.
func (lc *LightChain) DisableCheckFreq() {
atomic.StoreInt32(&lc.disableCheckFreq, 1)

View File

@ -218,7 +218,7 @@ func MakePreState(db ethdb.Database, accounts core.GenesisAlloc, snapshotter boo
}
}
// Commit and re-open to start with a clean state.
root, _ := statedb.Commit(false)
root, _, _ := statedb.Commit(false)
var snaps *snapshot.Tree
if snapshotter {