From ae458c99d50f10efa282f1362de2ae3a0d5baa2d Mon Sep 17 00:00:00 2001 From: Elizabeth Engelman Date: Thu, 21 May 2020 11:45:18 -0500 Subject: [PATCH] backends, core, eth, ethapi, filters, les, light: Subscribe to StateChangeEvents in filter event system --- accounts/abi/bind/backends/simulated.go | 4 ++ core/blockchain.go | 3 +- core/blockchain_test.go | 2 +- eth/api_backend.go | 4 ++ eth/filters/filter.go | 1 + eth/filters/filter_system.go | 54 ++++++++++++++----------- eth/filters/filter_system_test.go | 5 +++ internal/ethapi/backend.go | 1 + les/api_backend.go | 4 ++ light/lightchain.go | 6 +++ 10 files changed, 59 insertions(+), 25 deletions(-) diff --git a/accounts/abi/bind/backends/simulated.go b/accounts/abi/bind/backends/simulated.go index 0783b586e..ff2434d95 100644 --- a/accounts/abi/bind/backends/simulated.go +++ b/accounts/abi/bind/backends/simulated.go @@ -777,6 +777,10 @@ func (fb *filterBackend) SubscribePendingLogsEvent(ch chan<- []*types.Log) event return nullSubscription() } +func (fb *filterBackend) SubscribeStateChangeEvent(ch chan<- core.StateChangeEvent) event.Subscription { + return nullSubscription() +} + func (fb *filterBackend) BloomStatus() (uint64, uint64) { return 4096, 0 } func (fb *filterBackend) ServiceFilter(ctx context.Context, ms *bloombits.MatcherSession) { diff --git a/core/blockchain.go b/core/blockchain.go index 6445b3149..4b936d3bb 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -2463,6 +2463,7 @@ func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscr return bc.scope.Track(bc.blockProcFeed.Subscribe(ch)) } -func (bc *BlockChain) SubscribeStateChangeEvents(ch chan<- StateChangeEvent) event.Subscription { +// SubscribeStateChangeEvent registers a subscription StateChangeEvent. +func (bc *BlockChain) SubscribeStateChangeEvent(ch chan<- StateChangeEvent) event.Subscription { return bc.scope.Track(bc.stateChangeEventFeed.Subscribe(ch)) } diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 507fcf49c..db34a51e4 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -3048,7 +3048,7 @@ func testSendingStateChangeEvents(t *testing.T, numberOfEventsToSend int) { defer blockchain.Stop() stateChangeCh := make(chan StateChangeEvent) - blockchain.SubscribeStateChangeEvents(stateChangeCh) + blockchain.SubscribeStateChangeEvent(stateChangeCh) // create numberOfEventsToSend blocks that include State Changes chain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, numberOfEventsToSend, func(i int, block *BlockGen) { diff --git a/eth/api_backend.go b/eth/api_backend.go index a82aee4eb..0a44c3f0d 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -220,6 +220,10 @@ func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscri return b.eth.BlockChain().SubscribeLogsEvent(ch) } +func (b *EthAPIBackend) SubscribeStateChangeEvent(ch chan<- core.StateChangeEvent) event.Subscription { + return b.eth.BlockChain().SubscribeStateChangeEvent(ch) +} + func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { return b.eth.txPool.AddLocal(signedTx) } diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 17635837a..0384e5a5a 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -42,6 +42,7 @@ type Backend interface { SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription + SubscribeStateChangeEvent(ch chan<- core.StateChangeEvent) event.Subscription BloomStatus() (uint64, uint64) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index a105ec51c..21e73e1c8 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -24,7 +24,7 @@ import ( "sync" "time" - ethereum "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/rawdb" @@ -88,20 +88,22 @@ type EventSystem struct { lastHead *types.Header // Subscriptions - txsSub event.Subscription // Subscription for new transaction event - logsSub event.Subscription // Subscription for new log event - rmLogsSub event.Subscription // Subscription for removed log event - pendingLogsSub event.Subscription // Subscription for pending log event - chainSub event.Subscription // Subscription for new chain event + txsSub event.Subscription // Subscription for new transaction event + logsSub event.Subscription // Subscription for new log event + rmLogsSub event.Subscription // Subscription for removed log event + pendingLogsSub event.Subscription // Subscription for pending log event + chainSub event.Subscription // Subscription for new chain event + stateChangeEventSub event.Subscription // Subscription for new state change event // Channels - install chan *subscription // install filter for event notification - uninstall chan *subscription // remove filter for event notification - txsCh chan core.NewTxsEvent // Channel to receive new transactions event - logsCh chan []*types.Log // Channel to receive new log event - pendingLogsCh chan []*types.Log // Channel to receive new log event - rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event - chainCh chan core.ChainEvent // Channel to receive new chain event + install chan *subscription // install filter for event notification + uninstall chan *subscription // remove filter for event notification + txsCh chan core.NewTxsEvent // Channel to receive new transactions event + logsCh chan []*types.Log // Channel to receive new log event + pendingLogsCh chan []*types.Log // Channel to receive new log event + rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event + chainCh chan core.ChainEvent // Channel to receive new chain event + stateChangeEventChan chan core.StateChangeEvent // Channel to receive new state change event } // NewEventSystem creates a new manager that listens for event on the given mux, @@ -112,15 +114,16 @@ type EventSystem struct { // or by stopping the given mux. func NewEventSystem(backend Backend, lightMode bool) *EventSystem { m := &EventSystem{ - backend: backend, - lightMode: lightMode, - install: make(chan *subscription), - uninstall: make(chan *subscription), - txsCh: make(chan core.NewTxsEvent, txChanSize), - logsCh: make(chan []*types.Log, logsChanSize), - rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), - pendingLogsCh: make(chan []*types.Log, logsChanSize), - chainCh: make(chan core.ChainEvent, chainEvChanSize), + backend: backend, + lightMode: lightMode, + install: make(chan *subscription), + uninstall: make(chan *subscription), + txsCh: make(chan core.NewTxsEvent, txChanSize), + logsCh: make(chan []*types.Log, logsChanSize), + rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize), + pendingLogsCh: make(chan []*types.Log, logsChanSize), + chainCh: make(chan core.ChainEvent, chainEvChanSize), + stateChangeEventChan: make(chan core.StateChangeEvent, stateChangeChanSize), } // Subscribe events @@ -129,9 +132,10 @@ func NewEventSystem(backend Backend, lightMode bool) *EventSystem { m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh) m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) m.pendingLogsSub = m.backend.SubscribePendingLogsEvent(m.pendingLogsCh) + m.stateChangeEventSub = m.backend.SubscribeStateChangeEvent(m.stateChangeEventChan) // Make sure none of the subscriptions are empty - if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil { + if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil || m.pendingLogsSub == nil || m.stateChangeEventSub == nil { log.Crit("Subscribe for event system failed") } @@ -167,6 +171,7 @@ func (sub *Subscription) Unsubscribe() { case <-sub.f.logs: case <-sub.f.hashes: case <-sub.f.headers: + case <-sub.f.stateChangePayloads: } } @@ -448,6 +453,7 @@ func (es *EventSystem) eventLoop() { es.rmLogsSub.Unsubscribe() es.pendingLogsSub.Unsubscribe() es.chainSub.Unsubscribe() + es.stateChangeEventSub.Unsubscribe() }() index := make(filterIndex) @@ -497,6 +503,8 @@ func (es *EventSystem) eventLoop() { return case <-es.chainSub.Err(): return + case <-es.stateChangeEventSub.Err(): + return } } } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index c8d1d43ab..796483b54 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -47,6 +47,7 @@ type testBackend struct { rmLogsFeed event.Feed pendingLogsFeed event.Feed chainFeed event.Feed + stateChangeFeed event.Feed } func (b *testBackend) ChainDb() ethdb.Database { @@ -121,6 +122,10 @@ func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subsc return b.chainFeed.Subscribe(ch) } +func (b *testBackend) SubscribeStateChangeEvent(ch chan<- core.StateChangeEvent) event.Subscription { + return b.stateChangeFeed.Subscribe(ch) +} + func (b *testBackend) BloomStatus() (uint64, uint64) { return params.BloomBitsBlocks, b.sections } diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 245091df3..3fceeb65f 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -63,6 +63,7 @@ type Backend interface { SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription + SubscribeStateChangeEvent(ch chan<- core.StateChangeEvent) event.Subscription // Transaction pool API SendTx(ctx context.Context, signedTx *types.Transaction) error diff --git a/les/api_backend.go b/les/api_backend.go index 756beaf6a..8dd38aad2 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -234,6 +234,10 @@ func (b *LesApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEven return b.eth.blockchain.SubscribeRemovedLogsEvent(ch) } +func (b *LesApiBackend) SubscribeStateChangeEvent(ch chan<- core.StateChangeEvent) event.Subscription { + return b.eth.blockchain.SubscribeStateChangeEvent(ch) +} + func (b *LesApiBackend) Downloader() *downloader.Downloader { return b.eth.Downloader() } diff --git a/light/lightchain.go b/light/lightchain.go index 79eba62c9..b26ad160e 100644 --- a/light/lightchain.go +++ b/light/lightchain.go @@ -557,6 +557,12 @@ func (lc *LightChain) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) return lc.scope.Track(new(event.Feed).Subscribe(ch)) } +// SubscribeStateChangeEvent implements the interface of filters.Backend +// LightChain does not send core.StateChangeEvent, so return an empty subscription. +func (lc *LightChain) SubscribeStateChangeEvent(ch chan<- core.StateChangeEvent) event.Subscription { + return lc.scope.Track(new(event.Feed).Subscribe(ch)) +} + // DisableCheckFreq disables header validation. This is used for ultralight mode. func (lc *LightChain) DisableCheckFreq() { atomic.StoreInt32(&lc.disableCheckFreq, 1)