From 18861eaf795e38c51d209745bd0f8333661fa3b3 Mon Sep 17 00:00:00 2001 From: nikugogoi Date: Tue, 4 Oct 2022 13:31:29 +0530 Subject: [PATCH] Implement cache for latest updated entities to be used in mapping code (#194) * Load relations according to GQL query * Implement cache for latest entities to used in mapping code * Add metrics for cache hit and fix caching pruned entities * Changes in codegen and graph-test-watcher * Remove entity load counter reset to zero --- .../src/templates/indexer-template.handlebars | 24 +- .../templates/job-runner-template.handlebars | 2 +- .../templates/resolvers-template.handlebars | 12 +- packages/eden-watcher/src/indexer.ts | 29 +- packages/eden-watcher/src/job-runner.ts | 2 +- packages/eden-watcher/src/resolvers.ts | 226 ++++++++-- packages/erc721-watcher/src/indexer.ts | 4 +- packages/graph-node/src/database.ts | 410 ++++++++++++------ packages/graph-node/src/loader.ts | 6 +- packages/graph-node/src/watcher.ts | 67 ++- packages/graph-test-watcher/src/indexer.ts | 58 ++- packages/graph-test-watcher/src/job-runner.ts | 2 +- packages/graph-test-watcher/src/resolvers.ts | 56 ++- packages/mobymask-watcher/src/indexer.ts | 4 +- packages/util/src/database.ts | 32 +- packages/util/src/index-block.ts | 2 +- packages/util/src/job-runner.ts | 2 +- packages/util/src/metrics.ts | 15 + packages/util/src/types.ts | 2 +- 19 files changed, 702 insertions(+), 253 deletions(-) diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index ab41d254..e0d9ebcf 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -8,6 +8,7 @@ import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm'; import JSONbig from 'json-bigint'; import { ethers } from 'ethers'; import _ from 'lodash'; +import { SelectionNode } from 'graphql'; import { JsonFragment } from '@ethersproject/abi'; import { BaseProvider } from '@ethersproject/providers'; @@ -350,12 +351,16 @@ export class Indexer implements IPLDIndexerInterface { return createStateCheckpoint(this, contractAddress, blockHash); } - async processCanonicalBlock (blockHash: string): Promise { + async processCanonicalBlock (blockHash: string, blockNumber: number): Promise { // Finalize staged diff blocks if any. await this._baseIndexer.finalizeDiffStaged(blockHash); // Call custom stateDiff hook. await createStateDiff(this, blockHash); + {{#if (subgraphPath)}} + + this._graphWatcher.pruneEntityCacheFrothyBlocks(blockHash, blockNumber); + {{/if}} } async processCheckpoint (blockHash: string): Promise { @@ -442,8 +447,13 @@ export class Indexer implements IPLDIndexerInterface { } {{#if (subgraphPath)}} - async getSubgraphEntity (entity: new () => Entity, id: string, block?: BlockHeight): Promise { - const data = await this._graphWatcher.getEntity(entity, id, this._relationsMap, block); + async getSubgraphEntity ( + entity: new () => Entity, + id: string, + block: BlockHeight, + selections: ReadonlyArray = [] + ): Promise { + const data = await this._graphWatcher.getEntity(entity, id, this._relationsMap, block, selections); return data; } @@ -466,9 +476,13 @@ export class Indexer implements IPLDIndexerInterface { await this.triggerIndexingOnEvent(event); } - async processBlock (blockHash: string, blockNumber: number): Promise { + async processBlock (blockProgress: BlockProgress): Promise { // Call a function to create initial state for contracts. - await this._baseIndexer.createInit(this, blockHash, blockNumber); + await this._baseIndexer.createInit(this, blockProgress.blockHash, blockProgress.blockNumber); + + {{#if (subgraphPath)}} + this._graphWatcher.updateEntityCacheFrothyBlocks(blockProgress); + {{/if}} } {{#if (subgraphPath)}} diff --git a/packages/codegen/src/templates/job-runner-template.handlebars b/packages/codegen/src/templates/job-runner-template.handlebars index efbf13d8..99f38982 100644 --- a/packages/codegen/src/templates/job-runner-template.handlebars +++ b/packages/codegen/src/templates/job-runner-template.handlebars @@ -105,7 +105,7 @@ export class JobRunner { } // Process the hooks for the given block number. - await this._indexer.processCanonicalBlock(blockHash); + await this._indexer.processCanonicalBlock(blockHash, blockNumber); // Update the IPLD status. await this._indexer.updateIPLDStatusHooksBlock(blockNumber); diff --git a/packages/codegen/src/templates/resolvers-template.handlebars b/packages/codegen/src/templates/resolvers-template.handlebars index 26494dc3..c26d859f 100644 --- a/packages/codegen/src/templates/resolvers-template.handlebars +++ b/packages/codegen/src/templates/resolvers-template.handlebars @@ -6,7 +6,7 @@ import assert from 'assert'; import BigInt from 'apollo-type-bigint'; import debug from 'debug'; import Decimal from 'decimal.js'; -import { GraphQLScalarType } from 'graphql'; +import { GraphQLResolveInfo, GraphQLScalarType } from 'graphql'; import { ValueResult, BlockHeight, gqlTotalQueryCount, gqlQueryCount, jsonBigIntStringReplacer } from '@cerc-io/util'; @@ -78,12 +78,18 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch {{/each}} {{~#each subgraphQueries}} - {{this.queryName}}: async (_: any, { id, block = {} }: { id: string, block: BlockHeight }) => { + {{this.queryName}}: async ( + _: any, + { id, block = {} }: { id: string, block: BlockHeight }, + __: any, + info: GraphQLResolveInfo + ) => { log('{{this.queryName}}', id, JSON.stringify(block, jsonBigIntStringReplacer)); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('{{this.queryName}}').inc(1); + assert(info.fieldNodes[0].selectionSet); - return indexer.getSubgraphEntity({{this.entityName}}, id, block); + return indexer.getSubgraphEntity({{this.entityName}}, id, block, info.fieldNodes[0].selectionSet.selections); }, {{/each}} diff --git a/packages/eden-watcher/src/indexer.ts b/packages/eden-watcher/src/indexer.ts index 2c68a75c..965cec46 100644 --- a/packages/eden-watcher/src/indexer.ts +++ b/packages/eden-watcher/src/indexer.ts @@ -8,6 +8,7 @@ import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm'; import JSONbig from 'json-bigint'; import { ethers } from 'ethers'; import _ from 'lodash'; +import { SelectionNode } from 'graphql'; import { JsonFragment } from '@ethersproject/abi'; import { BaseProvider } from '@ethersproject/providers'; @@ -264,16 +265,16 @@ export class Indexer implements IPLDIndexerInterface { return createStateCheckpoint(this, contractAddress, blockHash); } - async processCanonicalBlock (blockHash: string): Promise { + async processCanonicalBlock (blockHash: string, blockNumber: number): Promise { console.time('time:indexer#processCanonicalBlock-finalize_auto_diffs'); - // Finalize staged diff blocks if any. await this._baseIndexer.finalizeDiffStaged(blockHash); - console.timeEnd('time:indexer#processCanonicalBlock-finalize_auto_diffs'); // Call custom stateDiff hook. await createStateDiff(this, blockHash); + + this._graphWatcher.pruneEntityCacheFrothyBlocks(blockHash, blockNumber); } async processCheckpoint (blockHash: string): Promise { @@ -371,14 +372,20 @@ export class Indexer implements IPLDIndexerInterface { await this._baseIndexer.removeIPLDBlocks(blockNumber, kind); } - async getSubgraphEntity (entity: new () => Entity, id: string, block?: BlockHeight): Promise { - const data = await this._graphWatcher.getEntity(entity, id, this._relationsMap, block); + async getSubgraphEntity (entity: new () => Entity, id: string, block: BlockHeight, selections: ReadonlyArray = []): Promise { + const data = await this._graphWatcher.getEntity(entity, id, this._relationsMap, block, selections); return data; } - async getSubgraphEntities (entity: new () => Entity, block: BlockHeight, where: { [key: string]: any } = {}, queryOptions: QueryOptions = {}): Promise { - return this._graphWatcher.getEntities(entity, this._relationsMap, block, where, queryOptions); + async getSubgraphEntities ( + entity: new () => Entity, + block: BlockHeight, + where: { [key: string]: any } = {}, + queryOptions: QueryOptions = {}, + selections: ReadonlyArray = [] + ): Promise { + return this._graphWatcher.getEntities(entity, this._relationsMap, block, where, queryOptions, selections); } async triggerIndexingOnEvent (event: Event): Promise { @@ -400,13 +407,13 @@ export class Indexer implements IPLDIndexerInterface { await this.triggerIndexingOnEvent(event); } - async processBlock (blockHash: string, blockNumber: number): Promise { + async processBlock (blockProgress: BlockProgress): Promise { console.time('time:indexer#processBlock-init_state'); - // Call a function to create initial state for contracts. - await this._baseIndexer.createInit(this, blockHash, blockNumber); - + await this._baseIndexer.createInit(this, blockProgress.blockHash, blockProgress.blockNumber); console.timeEnd('time:indexer#processBlock-init_state'); + + this._graphWatcher.updateEntityCacheFrothyBlocks(blockProgress); } async processBlockAfterEvents (blockHash: string): Promise { diff --git a/packages/eden-watcher/src/job-runner.ts b/packages/eden-watcher/src/job-runner.ts index 7be46d6a..240b3f18 100644 --- a/packages/eden-watcher/src/job-runner.ts +++ b/packages/eden-watcher/src/job-runner.ts @@ -102,7 +102,7 @@ export class JobRunner { } // Process the hooks for the given block number. - await this._indexer.processCanonicalBlock(blockHash); + await this._indexer.processCanonicalBlock(blockHash, blockNumber); // Update the IPLD status. await this._indexer.updateIPLDStatusHooksBlock(blockNumber); diff --git a/packages/eden-watcher/src/resolvers.ts b/packages/eden-watcher/src/resolvers.ts index ba1ed63f..5e8fb008 100644 --- a/packages/eden-watcher/src/resolvers.ts +++ b/packages/eden-watcher/src/resolvers.ts @@ -6,7 +6,7 @@ import assert from 'assert'; import BigInt from 'apollo-type-bigint'; import debug from 'debug'; import Decimal from 'decimal.js'; -import { GraphQLScalarType } from 'graphql'; +import { GraphQLResolveInfo, GraphQLScalarType } from 'graphql'; import { BlockHeight, OrderDirection, gqlTotalQueryCount, gqlQueryCount, jsonBigIntStringReplacer } from '@cerc-io/util'; @@ -77,200 +77,336 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch }, Query: { - producer: async (_: any, { id, block = {} }: { id: string, block: BlockHeight }) => { + producer: async ( + _: any, + { id, block = {} }: { id: string, block: BlockHeight }, + __: any, + info: GraphQLResolveInfo + ) => { log('producer', id, JSON.stringify(block, jsonBigIntStringReplacer)); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('producer').inc(1); + assert(info.fieldNodes[0].selectionSet); - return indexer.getSubgraphEntity(Producer, id, block); + return indexer.getSubgraphEntity(Producer, id, block, info.fieldNodes[0].selectionSet.selections); }, - producers: async (_: any, { block = {}, first, skip }: { block: BlockHeight, first: number, skip: number }) => { + producers: async ( + _: any, + { block = {}, first, skip }: { block: BlockHeight, first: number, skip: number }, + __: any, + info: GraphQLResolveInfo + ) => { log('producers', JSON.stringify(block, jsonBigIntStringReplacer), first, skip); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('producers').inc(1); + assert(info.fieldNodes[0].selectionSet); return indexer.getSubgraphEntities( Producer, block, {}, - { limit: first, skip } + { limit: first, skip }, + info.fieldNodes[0].selectionSet.selections ); }, - producerSet: async (_: any, { id, block = {} }: { id: string, block: BlockHeight }) => { + producerSet: async ( + _: any, + { id, block = {} }: { id: string, block: BlockHeight }, + __: any, + info: GraphQLResolveInfo + ) => { log('producerSet', id, JSON.stringify(block, jsonBigIntStringReplacer)); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('producerSet').inc(1); + assert(info.fieldNodes[0].selectionSet); - return indexer.getSubgraphEntity(ProducerSet, id, block); + return indexer.getSubgraphEntity(ProducerSet, id, block, info.fieldNodes[0].selectionSet.selections); }, - producerSetChange: async (_: any, { id, block = {} }: { id: string, block: BlockHeight }) => { + producerSetChange: async ( + _: any, + { id, block = {} }: { id: string, block: BlockHeight }, + __: any, + info: GraphQLResolveInfo + ) => { log('producerSetChange', id, JSON.stringify(block, jsonBigIntStringReplacer)); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('producerSetChange').inc(1); + assert(info.fieldNodes[0].selectionSet); - return indexer.getSubgraphEntity(ProducerSetChange, id, block); + return indexer.getSubgraphEntity(ProducerSetChange, id, block, info.fieldNodes[0].selectionSet.selections); }, - producerRewardCollectorChange: async (_: any, { id, block = {} }: { id: string, block: BlockHeight }) => { + producerRewardCollectorChange: async ( + _: any, + { id, block = {} }: { id: string, block: BlockHeight }, + __: any, + info: GraphQLResolveInfo + ) => { log('producerRewardCollectorChange', id, JSON.stringify(block, jsonBigIntStringReplacer)); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('producerRewardCollectorChange').inc(1); + assert(info.fieldNodes[0].selectionSet); - return indexer.getSubgraphEntity(ProducerRewardCollectorChange, id, block); + return indexer.getSubgraphEntity(ProducerRewardCollectorChange, id, block, info.fieldNodes[0].selectionSet.selections); }, - rewardScheduleEntry: async (_: any, { id, block = {} }: { id: string, block: BlockHeight }) => { + rewardScheduleEntry: async ( + _: any, + { id, block = {} }: { id: string, block: BlockHeight }, + __: any, + info: GraphQLResolveInfo + ) => { log('rewardScheduleEntry', id, JSON.stringify(block, jsonBigIntStringReplacer)); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('rewardScheduleEntry').inc(1); + assert(info.fieldNodes[0].selectionSet); - return indexer.getSubgraphEntity(RewardScheduleEntry, id, block); + return indexer.getSubgraphEntity(RewardScheduleEntry, id, block, info.fieldNodes[0].selectionSet.selections); }, - rewardSchedule: async (_: any, { id, block = {} }: { id: string, block: BlockHeight }) => { + rewardSchedule: async ( + _: any, + { id, block = {} }: { id: string, block: BlockHeight }, + __: any, + info: GraphQLResolveInfo + ) => { log('rewardSchedule', id, JSON.stringify(block, jsonBigIntStringReplacer)); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('rewardSchedule').inc(1); + assert(info.fieldNodes[0].selectionSet); - return indexer.getSubgraphEntity(RewardSchedule, id, block); + return indexer.getSubgraphEntity(RewardSchedule, id, block, info.fieldNodes[0].selectionSet.selections); }, - producerEpoch: async (_: any, { id, block = {} }: { id: string, block: BlockHeight }) => { + producerEpoch: async ( + _: any, + { id, block = {} }: { id: string, block: BlockHeight }, + __: any, + info: GraphQLResolveInfo + ) => { log('producerEpoch', id, JSON.stringify(block, jsonBigIntStringReplacer)); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('producerEpoch').inc(1); + assert(info.fieldNodes[0].selectionSet); - return indexer.getSubgraphEntity(ProducerEpoch, id, block); + return indexer.getSubgraphEntity(ProducerEpoch, id, block, info.fieldNodes[0].selectionSet.selections); }, - block: async (_: any, { id, block = {} }: { id: string, block: BlockHeight }) => { + block: async ( + _: any, + { id, block = {} }: { id: string, block: BlockHeight }, + __: any, + info: GraphQLResolveInfo + ) => { log('block', id, JSON.stringify(block, jsonBigIntStringReplacer)); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('block').inc(1); + assert(info.fieldNodes[0].selectionSet); - return indexer.getSubgraphEntity(Block, id, block); + return indexer.getSubgraphEntity(Block, id, block, info.fieldNodes[0].selectionSet.selections); }, - blocks: async (_: any, { block = {}, where, first, skip, orderBy, orderDirection }: { block: BlockHeight, where: { [key: string]: any }, first: number, skip: number, orderBy: string, orderDirection: OrderDirection }) => { + blocks: async ( + _: any, + { block = {}, where, first, skip, orderBy, orderDirection }: { block: BlockHeight, where: { [key: string]: any }, first: number, skip: number, orderBy: string, orderDirection: OrderDirection }, + __: any, + info: GraphQLResolveInfo + ) => { log('blocks', JSON.stringify(block, jsonBigIntStringReplacer), JSON.stringify(where, jsonBigIntStringReplacer), first, skip, orderBy, orderDirection); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('blocks').inc(1); + assert(info.fieldNodes[0].selectionSet); return indexer.getSubgraphEntities( Block, block, where, - { limit: first, skip, orderBy, orderDirection } + { limit: first, skip, orderBy, orderDirection }, + info.fieldNodes[0].selectionSet.selections ); }, - epoch: async (_: any, { id, block = {} }: { id: string, block: BlockHeight }) => { + epoch: async ( + _: any, + { id, block = {} }: { id: string, block: BlockHeight }, + __: any, + info: GraphQLResolveInfo + ) => { log('epoch', id, JSON.stringify(block, jsonBigIntStringReplacer)); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('epoch').inc(1); + assert(info.fieldNodes[0].selectionSet); - return indexer.getSubgraphEntity(Epoch, id, block); + return indexer.getSubgraphEntity(Epoch, id, block, info.fieldNodes[0].selectionSet.selections); }, - epoches: async (_: any, { block = {}, where, first, skip }: { block: BlockHeight, where: { [key: string]: any }, first: number, skip: number }) => { + epoches: async ( + _: any, + { block = {}, where, first, skip }: { block: BlockHeight, where: { [key: string]: any }, first: number, skip: number }, + __: any, + info: GraphQLResolveInfo + ) => { log('epoches', JSON.stringify(block, jsonBigIntStringReplacer), JSON.stringify(where, jsonBigIntStringReplacer), first, skip); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('epoches').inc(1); + assert(info.fieldNodes[0].selectionSet); return indexer.getSubgraphEntities( Epoch, block, where, - { limit: first, skip } + { limit: first, skip }, + info.fieldNodes[0].selectionSet.selections ); }, - slotClaim: async (_: any, { id, block = {} }: { id: string, block: BlockHeight }) => { + slotClaim: async ( + _: any, + { id, block = {} }: { id: string, block: BlockHeight }, + __: any, + info: GraphQLResolveInfo + ) => { log('slotClaim', id, JSON.stringify(block, jsonBigIntStringReplacer)); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('slotClaim').inc(1); + assert(info.fieldNodes[0].selectionSet); - return indexer.getSubgraphEntity(SlotClaim, id, block); + return indexer.getSubgraphEntity(SlotClaim, id, block, info.fieldNodes[0].selectionSet.selections); }, - slot: async (_: any, { id, block = {} }: { id: string, block: BlockHeight }) => { + slot: async ( + _: any, + { id, block = {} }: { id: string, block: BlockHeight }, + __: any, + info: GraphQLResolveInfo + ) => { log('slot', id, JSON.stringify(block, jsonBigIntStringReplacer)); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('slot').inc(1); + assert(info.fieldNodes[0].selectionSet); - return indexer.getSubgraphEntity(Slot, id, block); + return indexer.getSubgraphEntity(Slot, id, block, info.fieldNodes[0].selectionSet.selections); }, - staker: async (_: any, { id, block = {} }: { id: string, block: BlockHeight }) => { + staker: async ( + _: any, + { id, block = {} }: { id: string, block: BlockHeight }, + __: any, + info: GraphQLResolveInfo + ) => { log('staker', id, JSON.stringify(block, jsonBigIntStringReplacer)); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('staker').inc(1); + assert(info.fieldNodes[0].selectionSet); - return indexer.getSubgraphEntity(Staker, id, block); + return indexer.getSubgraphEntity(Staker, id, block, info.fieldNodes[0].selectionSet.selections); }, - stakers: async (_: any, { block = {}, where, first, skip, orderBy, orderDirection }: { block: BlockHeight, where: { [key: string]: any }, first: number, skip: number, orderBy: string, orderDirection: OrderDirection }) => { + stakers: async ( + _: any, + { block = {}, where, first, skip, orderBy, orderDirection }: { block: BlockHeight, where: { [key: string]: any }, first: number, skip: number, orderBy: string, orderDirection: OrderDirection }, + __: any, + info: GraphQLResolveInfo + ) => { log('stakers', JSON.stringify(block, jsonBigIntStringReplacer), JSON.stringify(where, jsonBigIntStringReplacer), first, skip, orderBy, orderDirection); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('stakers').inc(1); + assert(info.fieldNodes[0].selectionSet); return indexer.getSubgraphEntities( Staker, block, where, - { limit: first, skip, orderBy, orderDirection } + { limit: first, skip, orderBy, orderDirection }, + info.fieldNodes[0].selectionSet.selections ); }, - network: async (_: any, { id, block = {} }: { id: string, block: BlockHeight }) => { + network: async ( + _: any, + { id, block = {} }: { id: string, block: BlockHeight }, + __: any, + info: GraphQLResolveInfo + ) => { log('network', id, JSON.stringify(block, jsonBigIntStringReplacer)); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('network').inc(1); + assert(info.fieldNodes[0].selectionSet); - return indexer.getSubgraphEntity(Network, id, block); + return indexer.getSubgraphEntity(Network, id, block, info.fieldNodes[0].selectionSet.selections); }, - distributor: async (_: any, { id, block = {} }: { id: string, block: BlockHeight }) => { + distributor: async ( + _: any, + { id, block = {} }: { id: string, block: BlockHeight }, + __: any, + info: GraphQLResolveInfo + ) => { log('distributor', id, JSON.stringify(block, jsonBigIntStringReplacer)); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('distributor').inc(1); + assert(info.fieldNodes[0].selectionSet); - return indexer.getSubgraphEntity(Distributor, id, block); + return indexer.getSubgraphEntity(Distributor, id, block, info.fieldNodes[0].selectionSet.selections); }, - distribution: async (_: any, { id, block = {} }: { id: string, block: BlockHeight }) => { + distribution: async ( + _: any, + { id, block = {} }: { id: string, block: BlockHeight }, + __: any, + info: GraphQLResolveInfo + ) => { log('distribution', id, JSON.stringify(block, jsonBigIntStringReplacer)); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('distribution').inc(1); + assert(info.fieldNodes[0].selectionSet); - return indexer.getSubgraphEntity(Distribution, id, block); + return indexer.getSubgraphEntity(Distribution, id, block, info.fieldNodes[0].selectionSet.selections); }, - claim: async (_: any, { id, block = {} }: { id: string, block: BlockHeight }) => { + claim: async ( + _: any, + { id, block = {} }: { id: string, block: BlockHeight }, + __: any, + info: GraphQLResolveInfo + ) => { log('claim', id, JSON.stringify(block, jsonBigIntStringReplacer)); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('claim').inc(1); + assert(info.fieldNodes[0].selectionSet); - return indexer.getSubgraphEntity(Claim, id, block); + return indexer.getSubgraphEntity(Claim, id, block, info.fieldNodes[0].selectionSet.selections); }, - slash: async (_: any, { id, block = {} }: { id: string, block: BlockHeight }) => { + slash: async ( + _: any, + { id, block = {} }: { id: string, block: BlockHeight }, + __: any, + info: GraphQLResolveInfo + ) => { log('slash', id, JSON.stringify(block, jsonBigIntStringReplacer)); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('slash').inc(1); + assert(info.fieldNodes[0].selectionSet); - return indexer.getSubgraphEntity(Slash, id, block); + return indexer.getSubgraphEntity(Slash, id, block, info.fieldNodes[0].selectionSet.selections); }, - account: async (_: any, { id, block = {} }: { id: string, block: BlockHeight }) => { + account: async ( + _: any, + { id, block = {} }: { id: string, block: BlockHeight }, + __: any, + info: GraphQLResolveInfo + ) => { log('account', id, JSON.stringify(block, jsonBigIntStringReplacer)); gqlTotalQueryCount.inc(1); gqlQueryCount.labels('account').inc(1); + assert(info.fieldNodes[0].selectionSet); - return indexer.getSubgraphEntity(Account, id, block); + return indexer.getSubgraphEntity(Account, id, block, info.fieldNodes[0].selectionSet.selections); }, events: async (_: any, { blockHash, contractAddress, name }: { blockHash: string, contractAddress: string, name?: string }) => { diff --git a/packages/erc721-watcher/src/indexer.ts b/packages/erc721-watcher/src/indexer.ts index 70624bad..bd370bbc 100644 --- a/packages/erc721-watcher/src/indexer.ts +++ b/packages/erc721-watcher/src/indexer.ts @@ -794,9 +794,9 @@ export class Indexer implements IPLDIndexerInterface { await this.triggerIndexingOnEvent(event); } - async processBlock (blockHash: string, blockNumber: number): Promise { + async processBlock (blockProgress: BlockProgress): Promise { // Call a function to create initial state for contracts. - await this._baseIndexer.createInit(this, blockHash, blockNumber); + await this._baseIndexer.createInit(this, blockProgress.blockHash, blockProgress.blockNumber); } parseEventNameAndArgs (kind: string, logObj: any): any { diff --git a/packages/graph-node/src/database.ts b/packages/graph-node/src/database.ts index 6ca055ed..a2d15bbd 100644 --- a/packages/graph-node/src/database.ts +++ b/packages/graph-node/src/database.ts @@ -12,11 +12,16 @@ import { QueryRunner, Repository } from 'typeorm'; +import { SelectionNode } from 'graphql'; +import _ from 'lodash'; +import debug from 'debug'; import { BlockHeight, BlockProgressInterface, Database as BaseDatabase, + eventProcessingLoadEntityCacheHitCount, + eventProcessingLoadEntityDBQueryDuration, QueryOptions, Where } from '@cerc-io/util'; @@ -25,11 +30,30 @@ import { Block, fromEntityValue, fromStateEntityValues, toEntityValue } from './ export const DEFAULT_LIMIT = 100; +const log = debug('vulcanize:graph-node-database'); + +interface CachedEntities { + frothyBlocks: Map< + string, + { + blockNumber: number; + parentHash: string; + entities: Map>; + } + >; + latestPrunedEntities: Map>; +} + export class Database { _config: ConnectionOptions _conn!: Connection _baseDatabase: BaseDatabase + _cachedEntities: CachedEntities = { + frothyBlocks: new Map(), + latestPrunedEntities: new Map() + } + constructor (config: ConnectionOptions, entitiesPath: string) { assert(config); @@ -42,6 +66,10 @@ export class Database { this._baseDatabase = new BaseDatabase(this._config); } + get cachedEntities () { + return this._cachedEntities; + } + async init (): Promise { this._conn = await this._baseDatabase.init(); } @@ -54,11 +82,11 @@ export class Database { return this._baseDatabase.createTransactionRunner(); } - async getEntity (entity: (new () => Entity) | string, id: string, blockHash?: string): Promise { + async getEntity (entityName: string, id: string, blockHash?: string): Promise { const queryRunner = this._conn.createQueryRunner(); try { - const repo = queryRunner.manager.getRepository(entity); + const repo: Repository = queryRunner.manager.getRepository(entityName); const whereOptions: { [key: string]: any } = { id }; @@ -73,13 +101,65 @@ export class Database { } }; - let entityData = await repo.findOne(findOptions as FindOneOptions); + if (findOptions.where.blockHash) { + // Check cache only if latestPrunedEntities is updated. + // latestPrunedEntities is updated when frothyBlocks is filled till canonical block height. + if (this._cachedEntities.latestPrunedEntities.size > 0) { + let frothyBlock = this._cachedEntities.frothyBlocks.get(findOptions.where.blockHash); + let canonicalBlockNumber = -1; - if (!entityData && findOptions.where.blockHash) { - entityData = await this._baseDatabase.getPrevEntityVersion(queryRunner, repo, findOptions); + // Loop through frothy region until latest entity is found. + while (frothyBlock) { + const entity = frothyBlock.entities + .get(repo.metadata.tableName) + ?.get(findOptions.where.id); + + if (entity) { + eventProcessingLoadEntityCacheHitCount.inc(); + return _.cloneDeep(entity) as Entity; + } + + canonicalBlockNumber = frothyBlock.blockNumber + 1; + frothyBlock = this._cachedEntities.frothyBlocks.get(frothyBlock.parentHash); + } + + // Canonical block number is not assigned if blockHash does not exist in frothy region. + // Get latest pruned entity from cache only if blockHash exists in frothy region. + // i.e. Latest entity in cache is the version before frothy region. + if (canonicalBlockNumber > -1) { + // If entity not found in frothy region get latest entity in the pruned region. + // Check if latest entity is cached in pruned region. + const entity = this._cachedEntities.latestPrunedEntities + .get(repo.metadata.tableName) + ?.get(findOptions.where.id); + + if (entity) { + eventProcessingLoadEntityCacheHitCount.inc(); + return _.cloneDeep(entity) as Entity; + } + + // Get latest pruned entity from DB if not found in cache. + const endTimer = eventProcessingLoadEntityDBQueryDuration.startTimer(); + const dbEntity = await this._baseDatabase.getLatestPrunedEntity(repo, findOptions.where.id, canonicalBlockNumber); + endTimer(); + + if (dbEntity) { + // Update latest pruned entity in cache. + this.cacheUpdatedEntity(entityName, dbEntity, true); + } + + return dbEntity; + } + } + + const endTimer = eventProcessingLoadEntityDBQueryDuration.startTimer(); + const dbEntity = await this._baseDatabase.getPrevEntityVersion(repo.queryRunner!, repo, findOptions); + endTimer(); + + return dbEntity; } - return entityData; + return repo.findOne(findOptions as FindOneOptions); } catch (error) { console.log(error); } finally { @@ -124,7 +204,14 @@ export class Database { return count > 0; } - async getEntityWithRelations (queryRunner: QueryRunner, entity: (new () => Entity), id: string, relationsMap: Map, block: BlockHeight = {}, depth = 1): Promise { + async getEntityWithRelations ( + queryRunner: QueryRunner, + entity: (new () => Entity), + id: string, + relationsMap: Map, + block: BlockHeight = {}, + selections: ReadonlyArray = [] + ): Promise { let { hash: blockHash, number: blockNumber } = block; const repo = queryRunner.manager.getRepository(entity); const whereOptions: any = { id }; @@ -154,26 +241,33 @@ export class Database { // Get relational fields if (entityData) { - entityData = await this.loadEntityRelations(queryRunner, block, relationsMap, entity, entityData, depth); + entityData = await this.loadEntityRelations(queryRunner, block, relationsMap, entity, entityData, selections); } return entityData; } - async loadEntityRelations (queryRunner: QueryRunner, block: BlockHeight, relationsMap: Map, entity: new () => Entity, entityData: any, depth: number): Promise { - // Only support two-level nesting of relations - if (depth > 2) { - return entityData; - } - + async loadEntityRelations ( + queryRunner: QueryRunner, + block: BlockHeight, + relationsMap: Map, + entity: new () => Entity, entityData: any, + selections: ReadonlyArray = [] + ): Promise { const relations = relationsMap.get(entity); if (relations === undefined) { return entityData; } - const relationPromises = Object.entries(relations) - .map(async ([field, data]) => { - const { entity: relationEntity, isArray, isDerived, field: foreignKey } = data; + const relationPromises = selections.filter((selection) => selection.kind === 'Field' && Boolean(relations[selection.name.value])) + .map(async (selection) => { + assert(selection.kind === 'Field'); + const field = selection.name.value; + const { entity: relationEntity, isArray, isDerived, field: foreignKey } = relations[field]; + let childSelections = selection.selectionSet?.selections || []; + + // Filter out __typename field in GQL for loading relations. + childSelections = childSelections.filter(selection => !(selection.kind === 'Field' && selection.name.value === '__typename')); if (isDerived) { const where: Where = { @@ -191,7 +285,7 @@ export class Database { block, where, { limit: DEFAULT_LIMIT }, - depth + 1 + childSelections ); entityData[field] = relatedEntities; @@ -215,7 +309,7 @@ export class Database { block, where, { limit: DEFAULT_LIMIT }, - depth + 1 + childSelections ); entityData[field] = relatedEntities; @@ -230,7 +324,7 @@ export class Database { entityData[field], relationsMap, block, - depth + 1 + childSelections ); entityData[field] = relatedEntity; @@ -241,7 +335,15 @@ export class Database { return entityData; } - async getEntities (queryRunner: QueryRunner, entity: new () => Entity, relationsMap: Map, block: BlockHeight, where: Where = {}, queryOptions: QueryOptions = {}, depth = 1): Promise { + async getEntities ( + queryRunner: QueryRunner, + entity: new () => Entity, + relationsMap: Map, + block: BlockHeight, + where: Where = {}, + queryOptions: QueryOptions = {}, + selections: ReadonlyArray = [] + ): Promise { const repo = queryRunner.manager.getRepository(entity); const { tableName } = repo.metadata; @@ -297,27 +399,134 @@ export class Database { return []; } - return this.loadEntitiesRelations(queryRunner, block, relationsMap, entity, entities, depth); + return this.loadEntitiesRelations(queryRunner, block, relationsMap, entity, entities, selections); } - async loadEntitiesRelations (queryRunner: QueryRunner, block: BlockHeight, relationsMap: Map, entity: new () => Entity, entities: Entity[], depth: number): Promise { - // Only support two-level nesting of relations - if (depth > 2) { - return entities; - } - + async loadEntitiesRelations ( + queryRunner: QueryRunner, + block: BlockHeight, + relationsMap: Map, + entity: new () => Entity, + entities: Entity[], + selections: ReadonlyArray = [] + ): Promise { const relations = relationsMap.get(entity); if (relations === undefined) { return entities; } - const relationPromises = Object.entries(relations).map(async ([field, data]) => { - const { entity: relationEntity, isArray, isDerived, field: foreignKey } = data; + const relationPromises = selections.filter((selection) => selection.kind === 'Field' && Boolean(relations[selection.name.value])) + .map(async selection => { + assert(selection.kind === 'Field'); + const field = selection.name.value; + const { entity: relationEntity, isArray, isDerived, field: foreignKey } = relations[field]; + let childSelections = selection.selectionSet?.selections || []; + + // Filter out __typename field in GQL for loading relations. + childSelections = childSelections.filter(selection => !(selection.kind === 'Field' && selection.name.value === '__typename')); + + if (isDerived) { + const where: Where = { + [foreignKey]: [{ + value: entities.map((entity: any) => entity.id), + not: false, + operator: 'in' + }] + }; + + const relatedEntities = await this.getEntities( + queryRunner, + relationEntity, + relationsMap, + block, + where, + {}, + childSelections + ); + + const relatedEntitiesMap = relatedEntities.reduce((acc: {[key:string]: any[]}, entity: any) => { + // Related entity might be loaded with data. + const parentEntityId = entity[foreignKey].id ?? entity[foreignKey]; + + if (!acc[parentEntityId]) { + acc[parentEntityId] = []; + } + + if (acc[parentEntityId].length < DEFAULT_LIMIT) { + acc[parentEntityId].push(entity); + } + + return acc; + }, {}); + + entities.forEach((entity: any) => { + if (relatedEntitiesMap[entity.id]) { + entity[field] = relatedEntitiesMap[entity.id]; + } else { + entity[field] = []; + } + }); + + return; + } + + if (isArray) { + const relatedIds = entities.reduce((acc: Set, entity: any) => { + entity[field].forEach((relatedEntityId: string) => acc.add(relatedEntityId)); + + return acc; + }, new Set()); + + const where: Where = { + id: [{ + value: Array.from(relatedIds), + not: false, + operator: 'in' + }] + }; + + const relatedEntities = await this.getEntities( + queryRunner, + relationEntity, + relationsMap, + block, + where, + {}, + childSelections + ); + + entities.forEach((entity: any) => { + const relatedEntityIds: Set = entity[field].reduce((acc: Set, id: string) => { + acc.add(id); + + return acc; + }, new Set()); + + entity[field] = []; + + relatedEntities.forEach((relatedEntity: any) => { + if (relatedEntityIds.has(relatedEntity.id) && entity[field].length < DEFAULT_LIMIT) { + entity[field].push(relatedEntity); + } + }); + }); + + return; + } + + // field is neither an array nor derivedFrom + if (childSelections.length === 1 && childSelections[0].kind === 'Field' && childSelections[0].name.value === 'id') { + // Avoid loading relation if selections only has id field. + entities.forEach((entity: any) => { + entity[field] = { id: entity[field] }; + }); + + return; + } - if (isDerived) { const where: Where = { - [foreignKey]: [{ - value: entities.map((entity: any) => entity.id), + id: [{ + value: entities.map((entity: any) => entity[field]), not: false, operator: 'in' }] @@ -330,121 +539,32 @@ export class Database { block, where, {}, - depth + 1 + childSelections ); - const relatedEntitiesMap = relatedEntities.reduce((acc: {[key:string]: any[]}, entity: any) => { - // Related entity might be loaded with data. - const parentEntityId = entity[foreignKey].id ?? entity[foreignKey]; - - if (!acc[parentEntityId]) { - acc[parentEntityId] = []; - } - - if (acc[parentEntityId].length < DEFAULT_LIMIT) { - acc[parentEntityId].push(entity); - } + const relatedEntitiesMap = relatedEntities.reduce((acc: {[key:string]: any}, entity: any) => { + acc[entity.id] = entity; return acc; }, {}); entities.forEach((entity: any) => { - if (relatedEntitiesMap[entity.id]) { - entity[field] = relatedEntitiesMap[entity.id]; - } else { - entity[field] = []; + if (relatedEntitiesMap[entity[field]]) { + entity[field] = relatedEntitiesMap[entity[field]]; } }); - - return; - } - - if (isArray) { - const relatedIds = entities.reduce((acc: Set, entity: any) => { - entity[field].forEach((relatedEntityId: string) => acc.add(relatedEntityId)); - - return acc; - }, new Set()); - - const where: Where = { - id: [{ - value: Array.from(relatedIds), - not: false, - operator: 'in' - }] - }; - - const relatedEntities = await this.getEntities( - queryRunner, - relationEntity, - relationsMap, - block, - where, - {}, - depth + 1 - ); - - entities.forEach((entity: any) => { - const relatedEntityIds: Set = entity[field].reduce((acc: Set, id: string) => { - acc.add(id); - - return acc; - }, new Set()); - - entity[field] = []; - - relatedEntities.forEach((relatedEntity: any) => { - if (relatedEntityIds.has(relatedEntity.id) && entity[field].length < DEFAULT_LIMIT) { - entity[field].push(relatedEntity); - } - }); - }); - - return; - } - - // field is neither an array nor derivedFrom - const where: Where = { - id: [{ - value: entities.map((entity: any) => entity[field]), - not: false, - operator: 'in' - }] - }; - - const relatedEntities = await this.getEntities( - queryRunner, - relationEntity, - relationsMap, - block, - where, - {}, - depth + 1 - ); - - const relatedEntitiesMap = relatedEntities.reduce((acc: {[key:string]: any}, entity: any) => { - acc[entity.id] = entity; - - return acc; - }, {}); - - entities.forEach((entity: any) => { - if (relatedEntitiesMap[entity[field]]) { - entity[field] = relatedEntitiesMap[entity[field]]; - } }); - }); await Promise.all(relationPromises); return entities; } - async saveEntity (entity: string, data: any): Promise { + async saveEntity (entity: string, data: any): Promise { const repo = this._conn.getRepository(entity); const dbEntity: any = repo.create(data); - await repo.save(dbEntity); + return repo.save(dbEntity); } async toGraphEntity (instanceExports: any, entity: string, data: any, entityTypes: { [key: string]: string }): Promise { @@ -557,6 +677,38 @@ export class Database { }, {}); } + cacheUpdatedEntity (entityName: string, entity: any, pruned = false): void { + const repo = this._conn.getRepository(entityName); + const tableName = repo.metadata.tableName; + + if (pruned) { + let entityIdMap = this._cachedEntities.latestPrunedEntities.get(tableName); + + if (!entityIdMap) { + entityIdMap = new Map(); + } + + entityIdMap.set(entity.id, _.cloneDeep(entity)); + this._cachedEntities.latestPrunedEntities.set(tableName, entityIdMap); + return; + } + + const frothyBlock = this._cachedEntities.frothyBlocks.get(entity.blockHash); + + // Update frothyBlock only if already present in cache. + // Might not be present when event processing starts without block processing on job retry. + if (frothyBlock) { + let entityIdMap = frothyBlock.entities.get(tableName); + + if (!entityIdMap) { + entityIdMap = new Map(); + } + + entityIdMap.set(entity.id, _.cloneDeep(entity)); + frothyBlock.entities.set(tableName, entityIdMap); + } + } + async getBlocksAtHeight (height: number, isPruned: boolean) { const repo: Repository = this._conn.getRepository('block_progress'); diff --git a/packages/graph-node/src/loader.ts b/packages/graph-node/src/loader.ts index d9a89b70..6352ebc6 100644 --- a/packages/graph-node/src/loader.ts +++ b/packages/graph-node/src/loader.ts @@ -15,7 +15,7 @@ import debug from 'debug'; import { BaseProvider } from '@ethersproject/providers'; import loader from '@vulcanize/assemblyscript/lib/loader'; -import { IndexerInterface, GraphDecimal, getGraphDigitsAndExp } from '@cerc-io/util'; +import { IndexerInterface, GraphDecimal, getGraphDigitsAndExp, eventProcessingLoadEntityCount } from '@cerc-io/util'; import { TypeId, Level } from './types'; import { @@ -74,6 +74,7 @@ export const instantiate = async ( const entityId = __getString(id); assert(context.block); + eventProcessingLoadEntityCount.inc(); const entityData = await database.getEntity(entityName, entityId, context.block.blockHash); if (!entityData) { @@ -95,7 +96,8 @@ export const instantiate = async ( assert(context.block); const dbData = await database.fromGraphEntity(instanceExports, context.block, entityName, entityInstance); - await database.saveEntity(entityName, dbData); + const dbEntity = await database.saveEntity(entityName, dbData); + database.cacheUpdatedEntity(entityName, dbEntity); // Update the in-memory subgraph state if not disabled. if (!indexer.serverConfig.disableSubgraphState) { diff --git a/packages/graph-node/src/watcher.ts b/packages/graph-node/src/watcher.ts index 1f8f075e..787aebf9 100644 --- a/packages/graph-node/src/watcher.ts +++ b/packages/graph-node/src/watcher.ts @@ -8,10 +8,11 @@ import debug from 'debug'; import path from 'path'; import fs from 'fs'; import { ContractInterface, utils, providers } from 'ethers'; +import { SelectionNode } from 'graphql'; import { ResultObject } from '@vulcanize/assemblyscript/lib/loader'; import { EthClient } from '@cerc-io/ipld-eth-client'; -import { getFullBlock, BlockHeight, ServerConfig, getFullTransaction, QueryOptions, IPLDBlockInterface, IPLDIndexerInterface } from '@cerc-io/util'; +import { getFullBlock, BlockHeight, ServerConfig, getFullTransaction, QueryOptions, IPLDBlockInterface, IPLDIndexerInterface, BlockProgressInterface } from '@cerc-io/util'; import { createBlock, createEvent, getSubgraphConfig, resolveEntityFieldConflicts, Transaction } from './utils'; import { Context, GraphData, instantiate } from './loader'; @@ -257,12 +258,18 @@ export class GraphWatcher { this._indexer = indexer; } - async getEntity (entity: new () => Entity, id: string, relationsMap: Map, block?: BlockHeight): Promise { + async getEntity ( + entity: new () => Entity, + id: string, + relationsMap: Map, + block: BlockHeight, + selections: ReadonlyArray = [] + ): Promise { const dbTx = await this._database.createTransactionRunner(); try { // Get entity from the database. - const result = await this._database.getEntityWithRelations(dbTx, entity, id, relationsMap, block); + const result = await this._database.getEntityWithRelations(dbTx, entity, id, relationsMap, block, selections); await dbTx.commitTransaction(); // Resolve any field name conflicts in the entity result. @@ -275,7 +282,14 @@ export class GraphWatcher { } } - async getEntities (entity: new () => Entity, relationsMap: Map, block: BlockHeight, where: { [key: string]: any } = {}, queryOptions: QueryOptions): Promise { + async getEntities ( + entity: new () => Entity, + relationsMap: Map, + block: BlockHeight, + where: { [key: string]: any } = {}, + queryOptions: QueryOptions, + selections: ReadonlyArray = [] + ): Promise { const dbTx = await this._database.createTransactionRunner(); try { @@ -313,7 +327,7 @@ export class GraphWatcher { } // Get entities from the database. - const entities = await this._database.getEntities(dbTx, entity, relationsMap, block, where, queryOptions); + const entities = await this._database.getEntities(dbTx, entity, relationsMap, block, where, queryOptions, selections); await dbTx.commitTransaction(); // Resolve any field name conflicts in the entity result. @@ -350,6 +364,49 @@ export class GraphWatcher { } } + updateEntityCacheFrothyBlocks (blockProgress: BlockProgressInterface): void { + // Set latest block in frothy region to cachedEntities.frothyBlocks map. + if (!this._database.cachedEntities.frothyBlocks.has(blockProgress.blockHash)) { + this._database.cachedEntities.frothyBlocks.set( + blockProgress.blockHash, + { + blockNumber: blockProgress.blockNumber, + parentHash: blockProgress.parentHash, + entities: new Map() + } + ); + + log(`Size of cachedEntities.frothyBlocks map: ${this._database.cachedEntities.frothyBlocks.size}`); + } + } + + pruneEntityCacheFrothyBlocks (canonicalBlockHash: string, canonicalBlockNumber: number) { + const canonicalBlock = this._database.cachedEntities.frothyBlocks.get(canonicalBlockHash); + + if (canonicalBlock) { + // Update latestPrunedEntities map with entities from latest canonical block. + canonicalBlock.entities.forEach((entityIdMap, entityTableName) => { + entityIdMap.forEach((data, id) => { + let entityIdMap = this._database.cachedEntities.latestPrunedEntities.get(entityTableName); + + if (!entityIdMap) { + entityIdMap = new Map(); + } + + entityIdMap.set(id, data); + this._database.cachedEntities.latestPrunedEntities.set(entityTableName, entityIdMap); + }); + }); + } + + // Remove pruned blocks from frothyBlocks. + const prunedBlockHashes = Array.from(this._database.cachedEntities.frothyBlocks.entries()) + .filter(([, value]) => value.blockNumber <= canonicalBlockNumber) + .map(([blockHash]) => blockHash); + + prunedBlockHashes.forEach(blockHash => this._database.cachedEntities.frothyBlocks.delete(blockHash)); + } + /** * Method to reinstantiate WASM instance for specified dataSource. * @param dataSourceName diff --git a/packages/graph-test-watcher/src/indexer.ts b/packages/graph-test-watcher/src/indexer.ts index 2f1643a8..509ee1ee 100644 --- a/packages/graph-test-watcher/src/indexer.ts +++ b/packages/graph-test-watcher/src/indexer.ts @@ -8,6 +8,7 @@ import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm'; import JSONbig from 'json-bigint'; import { ethers } from 'ethers'; import _ from 'lodash'; +import { SelectionNode } from 'graphql'; import { JsonFragment } from '@ethersproject/abi'; import { BaseProvider } from '@ethersproject/providers'; @@ -299,12 +300,14 @@ export class Indexer implements IPLDIndexerInterface { return createStateCheckpoint(this, contractAddress, blockHash); } - async processCanonicalBlock (blockHash: string): Promise { + async processCanonicalBlock (blockHash: string, blockNumber: number): Promise { // Finalize staged diff blocks if any. await this._baseIndexer.finalizeDiffStaged(blockHash); // Call custom stateDiff hook. await createStateDiff(this, blockHash); + + this._graphWatcher.pruneEntityCacheFrothyBlocks(blockHash, blockNumber); } async processCheckpoint (blockHash: string): Promise { @@ -335,6 +338,10 @@ export class Indexer implements IPLDIndexerInterface { return this._baseIndexer.getIPLDBlockByCid(cid); } + async getIPLDBlocks (where: FindConditions): Promise { + return this._db.getIPLDBlocks(where); + } + getIPLDData (ipldBlock: IPLDBlock): any { return this._baseIndexer.getIPLDData(ipldBlock); } @@ -380,8 +387,13 @@ export class Indexer implements IPLDIndexerInterface { await this._baseIndexer.removeIPLDBlocks(blockNumber, kind); } - async getSubgraphEntity (entity: new () => Entity, id: string, block: BlockHeight): Promise { - const data = await this._graphWatcher.getEntity(entity, id, this._relationsMap, block); + async getSubgraphEntity ( + entity: new () => Entity, + id: string, + block: BlockHeight, + selections: ReadonlyArray = [] + ): Promise { + const data = await this._graphWatcher.getEntity(entity, id, this._relationsMap, block, selections); return data; } @@ -401,9 +413,11 @@ export class Indexer implements IPLDIndexerInterface { await this.triggerIndexingOnEvent(event); } - async processBlock (blockHash: string, blockNumber: number): Promise { + async processBlock (blockProgress: BlockProgress): Promise { // Call a function to create initial state for contracts. - await this._baseIndexer.createInit(this, blockHash, blockNumber); + await this._baseIndexer.createInit(this, blockProgress.blockHash, blockProgress.blockNumber); + + this._graphWatcher.updateEntityCacheFrothyBlocks(blockProgress); } async processBlockAfterEvents (blockHash: string): Promise { @@ -411,7 +425,7 @@ export class Indexer implements IPLDIndexerInterface { await this._graphWatcher.handleBlock(blockHash); // Persist subgraph state to the DB. - await this._dumpSubgraphState(blockHash); + await this.dumpSubgraphState(blockHash); } parseEventNameAndArgs (kind: string, logObj: any): any { @@ -535,7 +549,7 @@ export class Indexer implements IPLDIndexerInterface { } async getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise> { - return this._baseIndexer.getEventsInRange(fromBlockNumber, toBlockNumber); + return this._baseIndexer.getEventsInRange(fromBlockNumber, toBlockNumber, this._serverConfig.maxEventsBlockRange); } async getSyncStatus (): Promise { @@ -613,6 +627,23 @@ export class Indexer implements IPLDIndexerInterface { this._subgraphStateMap.set(contractAddress, updatedData); } + async dumpSubgraphState (blockHash: string, isStateFinalized = false): Promise { + // Create a diff for each contract in the subgraph state map. + const createDiffPromises = Array.from(this._subgraphStateMap.entries()) + .map(([contractAddress, data]): Promise => { + if (isStateFinalized) { + return this.createDiff(contractAddress, blockHash, data); + } + + return this.createDiffStaged(contractAddress, blockHash, data); + }); + + await Promise.all(createDiffPromises); + + // Reset the subgraph state map. + this._subgraphStateMap.clear(); + } + _populateEntityTypesMap (): void { this._entityTypesMap.set( 'Author', @@ -674,19 +705,6 @@ export class Indexer implements IPLDIndexerInterface { }); } - async _dumpSubgraphState (blockHash: string): Promise { - // Create a diff for each contract in the subgraph state map. - const createDiffPromises = Array.from(this._subgraphStateMap.entries()) - .map(([contractAddress, data]): Promise => { - return this.createDiffStaged(contractAddress, blockHash, data); - }); - - await Promise.all(createDiffPromises); - - // Reset the subgraph state map. - this._subgraphStateMap.clear(); - } - async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial): Promise { assert(blockHash); const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash }); diff --git a/packages/graph-test-watcher/src/job-runner.ts b/packages/graph-test-watcher/src/job-runner.ts index 7be46d6a..240b3f18 100644 --- a/packages/graph-test-watcher/src/job-runner.ts +++ b/packages/graph-test-watcher/src/job-runner.ts @@ -102,7 +102,7 @@ export class JobRunner { } // Process the hooks for the given block number. - await this._indexer.processCanonicalBlock(blockHash); + await this._indexer.processCanonicalBlock(blockHash, blockNumber); // Update the IPLD status. await this._indexer.updateIPLDStatusHooksBlock(blockNumber); diff --git a/packages/graph-test-watcher/src/resolvers.ts b/packages/graph-test-watcher/src/resolvers.ts index 56ab9324..0c3ec210 100644 --- a/packages/graph-test-watcher/src/resolvers.ts +++ b/packages/graph-test-watcher/src/resolvers.ts @@ -6,9 +6,9 @@ import assert from 'assert'; import BigInt from 'apollo-type-bigint'; import debug from 'debug'; import Decimal from 'decimal.js'; -import { GraphQLScalarType } from 'graphql'; +import { GraphQLResolveInfo, GraphQLScalarType } from 'graphql'; -import { ValueResult, BlockHeight, jsonBigIntStringReplacer } from '@cerc-io/util'; +import { ValueResult, BlockHeight, gqlTotalQueryCount, gqlQueryCount, jsonBigIntStringReplacer } from '@cerc-io/util'; import { Indexer } from './indexer'; import { EventWatcher } from './events'; @@ -64,34 +64,66 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch Query: { getMethod: (_: any, { blockHash, contractAddress }: { blockHash: string, contractAddress: string }): Promise => { log('getMethod', blockHash, contractAddress); + gqlTotalQueryCount.inc(1); + gqlQueryCount.labels('getMethod').inc(1); + return indexer.getMethod(blockHash, contractAddress); }, _test: (_: any, { blockHash, contractAddress }: { blockHash: string, contractAddress: string }): Promise => { log('_test', blockHash, contractAddress); + gqlTotalQueryCount.inc(1); + gqlQueryCount.labels('_test').inc(1); + return indexer._test(blockHash, contractAddress); }, - blog: async (_: any, { id, block = {} }: { id: string, block: BlockHeight }): Promise => { + blog: async ( + _: any, + { id, block = {} }: { id: string, block: BlockHeight }, + __: any, + info: GraphQLResolveInfo + ) => { log('blog', id, JSON.stringify(block, jsonBigIntStringReplacer)); + gqlTotalQueryCount.inc(1); + gqlQueryCount.labels('blog').inc(1); + assert(info.fieldNodes[0].selectionSet); - return indexer.getSubgraphEntity(Blog, id, block); + return indexer.getSubgraphEntity(Blog, id, block, info.fieldNodes[0].selectionSet.selections); }, - category: async (_: any, { id, block = {} }: { id: string, block: BlockHeight }): Promise => { + category: async ( + _: any, + { id, block = {} }: { id: string, block: BlockHeight }, + __: any, + info: GraphQLResolveInfo + ) => { log('category', id, JSON.stringify(block, jsonBigIntStringReplacer)); + gqlTotalQueryCount.inc(1); + gqlQueryCount.labels('category').inc(1); + assert(info.fieldNodes[0].selectionSet); - return indexer.getSubgraphEntity(Category, id, block); + return indexer.getSubgraphEntity(Category, id, block, info.fieldNodes[0].selectionSet.selections); }, - author: async (_: any, { id, block = {} }: { id: string, block: BlockHeight }): Promise => { + author: async ( + _: any, + { id, block = {} }: { id: string, block: BlockHeight }, + __: any, + info: GraphQLResolveInfo + ) => { log('author', id, JSON.stringify(block, jsonBigIntStringReplacer)); + gqlTotalQueryCount.inc(1); + gqlQueryCount.labels('author').inc(1); + assert(info.fieldNodes[0].selectionSet); - return indexer.getSubgraphEntity(Author, id, block); + return indexer.getSubgraphEntity(Author, id, block, info.fieldNodes[0].selectionSet.selections); }, events: async (_: any, { blockHash, contractAddress, name }: { blockHash: string, contractAddress: string, name?: string }) => { log('events', blockHash, contractAddress, name); + gqlTotalQueryCount.inc(1); + gqlQueryCount.labels('events').inc(1); const block = await indexer.getBlockProgress(blockHash); if (!block || !block.isComplete) { @@ -104,6 +136,8 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch eventsInRange: async (_: any, { fromBlockNumber, toBlockNumber }: { fromBlockNumber: number, toBlockNumber: number }) => { log('eventsInRange', fromBlockNumber, toBlockNumber); + gqlTotalQueryCount.inc(1); + gqlQueryCount.labels('eventsInRange').inc(1); const { expected, actual } = await indexer.getProcessedBlockCountForRange(fromBlockNumber, toBlockNumber); if (expected !== actual) { @@ -116,6 +150,8 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch getStateByCID: async (_: any, { cid }: { cid: string }) => { log('getStateByCID', cid); + gqlTotalQueryCount.inc(1); + gqlQueryCount.labels('getStateByCID').inc(1); const ipldBlock = await indexer.getIPLDBlockByCid(cid); @@ -124,6 +160,8 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch getState: async (_: any, { blockHash, contractAddress, kind }: { blockHash: string, contractAddress: string, kind: string }) => { log('getState', blockHash, contractAddress, kind); + gqlTotalQueryCount.inc(1); + gqlQueryCount.labels('getState').inc(1); const ipldBlock = await indexer.getPrevIPLDBlock(blockHash, contractAddress, kind); @@ -132,6 +170,8 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch getSyncStatus: async () => { log('getSyncStatus'); + gqlTotalQueryCount.inc(1); + gqlQueryCount.labels('getSyncStatus').inc(1); return indexer.getSyncStatus(); } diff --git a/packages/mobymask-watcher/src/indexer.ts b/packages/mobymask-watcher/src/indexer.ts index fc4847f0..a753d0ab 100644 --- a/packages/mobymask-watcher/src/indexer.ts +++ b/packages/mobymask-watcher/src/indexer.ts @@ -521,9 +521,9 @@ export class Indexer implements IPLDIndexerInterface { await this.triggerIndexingOnEvent(event); } - async processBlock (blockHash: string, blockNumber: number): Promise { + async processBlock (blockProgress: BlockProgress): Promise { // Call a function to create initial state for contracts. - await this._baseIndexer.createInit(this, blockHash, blockNumber); + await this._baseIndexer.createInit(this, blockProgress.blockHash, blockProgress.blockNumber); } parseEventNameAndArgs (kind: string, logObj: any): any { diff --git a/packages/util/src/database.ts b/packages/util/src/database.ts index b7c21132..99b6ae5e 100644 --- a/packages/util/src/database.ts +++ b/packages/util/src/database.ts @@ -455,24 +455,26 @@ export class Database { if (id) { // Entity found in frothy region. findOptions.where.blockHash = blockHash; - } else { - // If entity not found in frothy region get latest entity in the pruned region. - // Filter out entities from pruned blocks. - const canonicalBlockNumber = blockNumber + 1; - const entityInPrunedRegion:any = await repo.createQueryBuilder('entity') - .innerJoinAndSelect('block_progress', 'block', 'block.block_hash = entity.block_hash') - .where('block.is_pruned = false') - .andWhere('entity.id = :id', { id: findOptions.where.id }) - .andWhere('entity.block_number <= :canonicalBlockNumber', { canonicalBlockNumber }) - .orderBy('entity.block_number', 'DESC') - .limit(1) - .getOne(); - - findOptions.where.blockHash = entityInPrunedRegion?.blockHash; + return repo.findOne(findOptions); } - return repo.findOne(findOptions); + return this.getLatestPrunedEntity(repo, findOptions.where.id, blockNumber + 1); + } + + async getLatestPrunedEntity (repo: Repository, id: string, canonicalBlockNumber: number): Promise { + // Filter out latest entity from pruned blocks. + + const entityInPrunedRegion = await repo.createQueryBuilder('entity') + .innerJoinAndSelect('block_progress', 'block', 'block.block_hash = entity.block_hash') + .where('block.is_pruned = false') + .andWhere('entity.id = :id', { id }) + .andWhere('entity.block_number <= :canonicalBlockNumber', { canonicalBlockNumber }) + .orderBy('entity.block_number', 'DESC') + .limit(1) + .getOne(); + + return entityInPrunedRegion; } async getFrothyRegion (queryRunner: QueryRunner, blockHash: string): Promise<{ canonicalBlockNumber: number, blockHashes: string[] }> { diff --git a/packages/util/src/index-block.ts b/packages/util/src/index-block.ts index de6941cc..17ee7f3a 100644 --- a/packages/util/src/index-block.ts +++ b/packages/util/src/index-block.ts @@ -42,7 +42,7 @@ export const indexBlock = async ( } assert(indexer.processBlock); - await indexer.processBlock(blockProgress.blockHash, blockProgress.blockNumber); + await indexer.processBlock(blockProgress); await processBatchEvents(indexer, blockProgress, eventsInBatch); } diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 1c52289a..68ecf4da 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -251,7 +251,7 @@ export class JobRunner { } if (this._indexer.processBlock) { - await this._indexer.processBlock(blockHash, blockNumber); + await this._indexer.processBlock(blockProgress); } // Push job to event processing queue. diff --git a/packages/util/src/metrics.ts b/packages/util/src/metrics.ts index 4d4f2785..c504d2e3 100644 --- a/packages/util/src/metrics.ts +++ b/packages/util/src/metrics.ts @@ -53,6 +53,21 @@ export const eventCount = new client.Gauge({ help: 'Total entries in event table' }); +export const eventProcessingLoadEntityCount = new client.Gauge({ + name: 'event_processing_load_entity_total', + help: 'Total load entities in a single event processing' +}); + +export const eventProcessingLoadEntityCacheHitCount = new client.Gauge({ + name: 'event_processing_load_entity_cache_hit_total', + help: 'Total load entities hitting cache in a single event processing' +}); + +export const eventProcessingLoadEntityDBQueryDuration = new client.Histogram({ + name: 'event_processing_load_entity_db_query_seconds', + help: 'Duration of DB query made in event processing' +}); + // Export metrics on a server const app: Application = express(); diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index 6c1f4a25..5c585825 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -110,7 +110,7 @@ export interface IndexerInterface { createDiffStaged?: (contractAddress: string, blockHash: string, data: any) => Promise processInitialState?: (contractAddress: string, blockHash: string) => Promise processStateCheckpoint?: (contractAddress: string, blockHash: string) => Promise - processBlock?: (blockHash: string, blockNumber: number) => Promise + processBlock?: (blockProgres: BlockProgressInterface) => Promise processBlockAfterEvents?: (blockHash: string) => Promise getStorageValue (storageLayout: StorageLayout, blockHash: string, contractAddress: string, variable: string, ...mappingKeys: MappingKey[]): Promise updateSubgraphState?: (contractAddress: string, data: any) => void