core: use ChainHeadEvent subscription in the chain indexer (#17826)

This commit is contained in:
Felföldi Zsolt 2018-10-03 16:25:25 +02:00 committed by Péter Szilágyi
parent e5677114dc
commit 9d06b2c5f3

View File

@ -53,14 +53,14 @@ type ChainIndexerChain interface {
// CurrentHeader retrieves the latest locally known header. // CurrentHeader retrieves the latest locally known header.
CurrentHeader() *types.Header CurrentHeader() *types.Header
// SubscribeChainEvent subscribes to new head header notifications. // SubscribeChainHeadEvent subscribes to new head header notifications.
SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription SubscribeChainHeadEvent(ch chan<- ChainHeadEvent) event.Subscription
} }
// ChainIndexer does a post-processing job for equally sized sections of the // ChainIndexer does a post-processing job for equally sized sections of the
// canonical chain (like BlooomBits and CHT structures). A ChainIndexer is // canonical chain (like BlooomBits and CHT structures). A ChainIndexer is
// connected to the blockchain through the event system by starting a // connected to the blockchain through the event system by starting a
// ChainEventLoop in a goroutine. // ChainHeadEventLoop in a goroutine.
// //
// Further child ChainIndexers can be added which use the output of the parent // Further child ChainIndexers can be added which use the output of the parent
// section indexer. These child indexers receive new head notifications only // section indexer. These child indexers receive new head notifications only
@ -142,8 +142,8 @@ func (c *ChainIndexer) AddCheckpoint(section uint64, shead common.Hash) {
// cascading background processing. Children do not need to be started, they // cascading background processing. Children do not need to be started, they
// are notified about new events by their parents. // are notified about new events by their parents.
func (c *ChainIndexer) Start(chain ChainIndexerChain) { func (c *ChainIndexer) Start(chain ChainIndexerChain) {
events := make(chan ChainEvent, 10) events := make(chan ChainHeadEvent, 10)
sub := chain.SubscribeChainEvent(events) sub := chain.SubscribeChainHeadEvent(events)
go c.eventLoop(chain.CurrentHeader(), events, sub) go c.eventLoop(chain.CurrentHeader(), events, sub)
} }
@ -190,7 +190,7 @@ func (c *ChainIndexer) Close() error {
// eventLoop is a secondary - optional - event loop of the indexer which is only // eventLoop is a secondary - optional - event loop of the indexer which is only
// started for the outermost indexer to push chain head events into a processing // started for the outermost indexer to push chain head events into a processing
// queue. // queue.
func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainEvent, sub event.Subscription) { func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainHeadEvent, sub event.Subscription) {
// Mark the chain indexer as active, requiring an additional teardown // Mark the chain indexer as active, requiring an additional teardown
atomic.StoreUint32(&c.active, 1) atomic.StoreUint32(&c.active, 1)