diff --git a/ethstats/ethstats.go b/ethstats/ethstats.go index e0f4f95ff..0f386c02f 100644 --- a/ethstats/ethstats.go +++ b/ethstats/ethstats.go @@ -95,6 +95,8 @@ type Service struct { pongCh chan struct{} // Pong notifications are fed into this channel histCh chan []uint64 // History request block numbers are fed into this channel + headSub event.Subscription + txSub event.Subscription } // connWrapper is a wrapper to prevent concurrent-write or concurrent-read on the @@ -167,7 +169,12 @@ func New(node *node.Node, backend backend, engine consensus.Engine, url string) // Start implements node.Lifecycle, starting up the monitoring and reporting daemon. func (s *Service) Start() error { - go s.loop() + // Subscribe to chain events to execute updates on + chainHeadCh := make(chan core.ChainHeadEvent, chainHeadChanSize) + s.headSub = s.backend.SubscribeChainHeadEvent(chainHeadCh) + txEventCh := make(chan core.NewTxsEvent, txChanSize) + s.txSub = s.backend.SubscribeNewTxsEvent(txEventCh) + go s.loop(chainHeadCh, txEventCh) log.Info("Stats daemon started") return nil @@ -175,22 +182,15 @@ func (s *Service) Start() error { // Stop implements node.Lifecycle, terminating the monitoring and reporting daemon. func (s *Service) Stop() error { + s.headSub.Unsubscribe() + s.txSub.Unsubscribe() log.Info("Stats daemon stopped") return nil } // loop keeps trying to connect to the netstats server, reporting chain events // until termination. -func (s *Service) loop() { - // Subscribe to chain events to execute updates on - chainHeadCh := make(chan core.ChainHeadEvent, chainHeadChanSize) - headSub := s.backend.SubscribeChainHeadEvent(chainHeadCh) - defer headSub.Unsubscribe() - - txEventCh := make(chan core.NewTxsEvent, txChanSize) - txSub := s.backend.SubscribeNewTxsEvent(txEventCh) - defer txSub.Unsubscribe() - +func (s *Service) loop(chainHeadCh chan core.ChainHeadEvent, txEventCh chan core.NewTxsEvent) { // Start a goroutine that exhausts the subscriptions to avoid events piling up var ( quitCh = make(chan struct{}) @@ -223,9 +223,9 @@ func (s *Service) loop() { } // node stopped - case <-txSub.Err(): + case <-s.txSub.Err(): break HandleLoop - case <-headSub.Err(): + case <-s.headSub.Err(): break HandleLoop } }