This commit is contained in:
Roy Crihfield 2020-11-19 20:25:45 +08:00
parent 02c7e785c5
commit bf02717f60
2 changed files with 8 additions and 5 deletions

View File

@ -116,7 +116,7 @@ type Service struct {
// A mapping of subscription params rlp hash to the corresponding subscription params // A mapping of subscription params rlp hash to the corresponding subscription params
SubscriptionTypes map[common.Hash]Params SubscriptionTypes map[common.Hash]Params
// Cache the last block so that we can avoid having to lookup the next block's parent // Cache the last block so that we can avoid having to lookup the next block's parent
lastBlock blockCache BlockCache blockCache
// Whether or not we have any subscribers; only if we do, do we processes state diffs // Whether or not we have any subscribers; only if we do, do we processes state diffs
subscribers int32 subscribers int32
// Interface for publishing statediffs as PG-IPLD objects // Interface for publishing statediffs as PG-IPLD objects
@ -134,7 +134,7 @@ type blockCache struct {
maxSize uint maxSize uint
} }
func newBlockCache(max uint) blockCache { func NewBlockCache(max uint) blockCache {
return blockCache{ return blockCache{
blocks: make(map[common.Hash]*types.Block), blocks: make(map[common.Hash]*types.Block),
maxSize: max, maxSize: max,
@ -173,7 +173,7 @@ func New(stack *node.Node, ethServ *eth.Ethereum, params ServiceParams) error {
QuitChan: make(chan bool), QuitChan: make(chan bool),
Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription), Subscriptions: make(map[common.Hash]map[rpc.ID]Subscription),
SubscriptionTypes: make(map[common.Hash]Params), SubscriptionTypes: make(map[common.Hash]Params),
lastBlock: newBlockCache(workers), BlockCache: NewBlockCache(workers),
indexer: indexer, indexer: indexer,
enableWriteLoop: params.EnableWriteLoop, enableWriteLoop: params.EnableWriteLoop,
numWorkers: workers, numWorkers: workers,
@ -230,7 +230,7 @@ func (sds *Service) WriteLoop(chainEventCh chan core.ChainEvent) {
log.Debug("(WriteLoop) Event received from chainEventCh", "event", chainEvent) log.Debug("(WriteLoop) Event received from chainEventCh", "event", chainEvent)
currentBlock := chainEvent.Block currentBlock := chainEvent.Block
statediffMetrics.lastEventHeight.Update(int64(currentBlock.Number().Uint64())) statediffMetrics.lastEventHeight.Update(int64(currentBlock.Number().Uint64()))
parentBlock := sds.lastBlock.replace(currentBlock, sds.BlockChain) parentBlock := sds.BlockCache.replace(currentBlock, sds.BlockChain)
if parentBlock == nil { if parentBlock == nil {
log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number()) log.Error("Parent block is nil, skipping this block", "block height", currentBlock.Number())
continue continue
@ -271,7 +271,7 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) {
continue continue
} }
currentBlock := chainEvent.Block currentBlock := chainEvent.Block
parentBlock := sds.lastBlock.replace(currentBlock, sds.BlockChain) parentBlock := sds.BlockCache.replace(currentBlock, sds.BlockChain)
if parentBlock == nil { if parentBlock == nil {
log.Error("Parent block is nil, skipping this block", "number", currentBlock.Number()) log.Error("Parent block is nil, skipping this block", "number", currentBlock.Number())
continue continue

View File

@ -94,6 +94,7 @@ func testErrorInChainEventLoop(t *testing.T) {
QuitChan: serviceQuit, QuitChan: serviceQuit,
Subscriptions: make(map[common.Hash]map[rpc.ID]statediff.Subscription), Subscriptions: make(map[common.Hash]map[rpc.ID]statediff.Subscription),
SubscriptionTypes: make(map[common.Hash]statediff.Params), SubscriptionTypes: make(map[common.Hash]statediff.Params),
BlockCache: statediff.NewBlockCache(1),
} }
payloadChan := make(chan statediff.Payload, 2) payloadChan := make(chan statediff.Payload, 2)
quitChan := make(chan bool) quitChan := make(chan bool)
@ -177,6 +178,7 @@ func testErrorInBlockLoop(t *testing.T) {
QuitChan: make(chan bool), QuitChan: make(chan bool),
Subscriptions: make(map[common.Hash]map[rpc.ID]statediff.Subscription), Subscriptions: make(map[common.Hash]map[rpc.ID]statediff.Subscription),
SubscriptionTypes: make(map[common.Hash]statediff.Params), SubscriptionTypes: make(map[common.Hash]statediff.Params),
BlockCache: statediff.NewBlockCache(1),
} }
payloadChan := make(chan statediff.Payload) payloadChan := make(chan statediff.Payload)
quitChan := make(chan bool) quitChan := make(chan bool)
@ -256,6 +258,7 @@ func testErrorInStateDiffAt(t *testing.T) {
QuitChan: make(chan bool), QuitChan: make(chan bool),
Subscriptions: make(map[common.Hash]map[rpc.ID]statediff.Subscription), Subscriptions: make(map[common.Hash]map[rpc.ID]statediff.Subscription),
SubscriptionTypes: make(map[common.Hash]statediff.Params), SubscriptionTypes: make(map[common.Hash]statediff.Params),
BlockCache: statediff.NewBlockCache(1),
} }
stateDiffPayload, err := service.StateDiffAt(testBlock1.NumberU64(), defaultParams) stateDiffPayload, err := service.StateDiffAt(testBlock1.NumberU64(), defaultParams)
if err != nil { if err != nil {