Add support for meta query in watcher GQL API (#453)

* Add a method to get meta data for watcher indexing status

* Add a flag indicating indexing error to sync status

* Codegen changes

* Clear indexing error on job-runner startup

* Fix lint errors
This commit is contained in:
prathamesh0 2023-11-07 14:37:05 +05:30 committed by GitHub
parent 8547876764
commit 5efab298a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 220 additions and 53 deletions

View File

@ -115,6 +115,7 @@ export class JobRunnerCmd {
// Delete all active and pending (before completed) jobs to start job-runner without old queued jobs
await jobRunner.jobQueue.deleteAllJobs('completed');
await jobRunner.resetToPrevIndexedBlock();
await indexer.updateSyncStatusIndexingError(false);
await startJobRunner(jobRunner);
jobRunner.handleShutdown();

View File

@ -49,6 +49,13 @@ columns:
pgType: integer
tsType: number
columnType: Column
- name: hasIndexingError
pgType: boolean
tsType: boolean
columnType: Column
columnOptions:
- option: default
value: false
imports:
- toImport:
- Entity

View File

@ -3,7 +3,7 @@
//
import assert from 'assert';
import { GraphQLSchema, parse, printSchema, print, GraphQLDirective, GraphQLInt, GraphQLBoolean, GraphQLEnumType, DefinitionNode } from 'graphql';
import { GraphQLSchema, parse, printSchema, print, GraphQLDirective, GraphQLInt, GraphQLBoolean, GraphQLEnumType, DefinitionNode, GraphQLString, GraphQLNonNull } from 'graphql';
import { ObjectTypeComposer, NonNullComposer, ObjectTypeComposerDefinition, ObjectTypeComposerFieldConfigMapDefinition, SchemaComposer } from 'graphql-compose';
import { Writable } from 'stream';
import { utils } from 'ethers';
@ -98,13 +98,16 @@ export class Schema {
// Add a mutation for watching a contract.
this._addWatchContractMutation();
// Add type and query for SyncStatus.
this._addSyncStatus();
// Add State type and queries.
this._addStateType();
this._addStateQuery();
// Add type and query for SyncStatus.
this._addSyncStatus();
// Add type and query for meta data
this._addMeta();
// Build the schema.
return this._composer.buildSchema();
}
@ -269,7 +272,7 @@ export class Schema {
typeComposer = this._composer.createObjectTC({
name: '_Block_',
fields: {
cid: 'String!',
cid: 'String',
hash: 'String!',
number: 'Int!',
timestamp: 'Int!',
@ -456,6 +459,28 @@ export class Schema {
});
}
_addMeta (): void {
const typeComposer = this._composer.createObjectTC({
name: '_Meta_',
fields: {
block: this._composer.getOTC('_Block_').NonNull,
deployment: { type: new GraphQLNonNull(GraphQLString) },
hasIndexingErrors: { type: new GraphQLNonNull(GraphQLBoolean) }
}
});
this._composer.addSchemaMustHaveType(typeComposer);
this._composer.Query.addFields({
_meta: {
type: this._composer.getOTC('_Meta_'),
args: {
block: BlockHeight
}
}
});
}
_addStateType (): void {
const typeComposer = this._composer.createObjectTC({
name: 'ResultState',

View File

@ -259,6 +259,12 @@ export class Database implements DatabaseInterface {
return this._baseDatabase.forceUpdateSyncStatus(repo, blockHash, blockNumber);
}
async updateSyncStatusIndexingError (queryRunner: QueryRunner, hasIndexingError: boolean): Promise<SyncStatus> {
const repo = queryRunner.manager.getRepository(SyncStatus);
return this._baseDatabase.updateSyncStatusIndexingError(repo, hasIndexingError);
}
async getSyncStatus (queryRunner: QueryRunner): Promise<SyncStatus | undefined> {
const repo = queryRunner.manager.getRepository(SyncStatus);

View File

@ -43,7 +43,8 @@ import {
DatabaseInterface,
Clients,
EthClient,
UpstreamConfig
UpstreamConfig,
ResultMeta
} from '@cerc-io/util';
{{#if (subgraphPath)}}
import { GraphWatcher } from '@cerc-io/graph-node';
@ -197,6 +198,10 @@ export class Indexer implements IndexerInterface {
await this._baseIndexer.fetchStateStatus();
}
async getMetaData (block: BlockHeight): Promise<ResultMeta | null> {
return this._baseIndexer.getMetaData(block);
}
getResultEvent (event: Event): ResultEvent {
return getResultEvent(event);
}
@ -660,6 +665,10 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.forceUpdateSyncStatus(blockHash, blockNumber);
}
async updateSyncStatusIndexingError (hasIndexingError: boolean): Promise<SyncStatus> {
return this._baseIndexer.updateSyncStatusIndexingError(hasIndexingError);
}
async getEvent (id: string): Promise<Event | undefined> {
return this._baseIndexer.getEvent(id);
}

View File

@ -168,6 +168,17 @@ export const createResolvers = async (indexerArg: IndexerInterface, eventWatcher
gqlQueryCount.labels('getSyncStatus').inc(1);
return indexer.getSyncStatus();
},
_meta: async (
_: any,
{ block = {} }: { block: BlockHeight }
) => {
log('_meta');
gqlTotalQueryCount.inc(1);
gqlQueryCount.labels('_meta').inc(1);
return indexer.getMetaData(block);
}
}
};

View File

@ -177,6 +177,12 @@ export class Indexer implements IndexerInterface {
return {} as SyncStatusInterface;
}
async updateSyncStatusIndexingError (hasIndexingError: boolean): Promise<SyncStatusInterface> {
assert(hasIndexingError);
return {} as SyncStatusInterface;
}
async markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise<void> {
assert(blocks);

View File

@ -202,6 +202,15 @@ export class Database {
return await repo.save(entity);
}
async updateSyncStatusIndexingError (repo: Repository<SyncStatusInterface>, hasIndexingError: boolean): Promise<SyncStatusInterface> {
const entity = await repo.findOne();
assert(entity);
entity.hasIndexingError = hasIndexingError;
return repo.save(entity);
}
async getBlockProgress (repo: Repository<BlockProgressInterface>, blockHash: string): Promise<BlockProgressInterface | undefined> {
return repo.findOne({ where: { blockHash } });
}
@ -1099,7 +1108,6 @@ export class Database {
eventCount.set(res);
}
// TODO: Transform in the GQL type BigInt parsing itself
_transformBigValues (value: any): any {
// Handle array of bigints
if (Array.isArray(value)) {

View File

@ -9,8 +9,8 @@ import PgBoss from 'pg-boss';
import { constants } from 'ethers';
import { JobQueue } from './job-queue';
import { BlockProgressInterface, EventInterface, IndexerInterface, EthClient } from './types';
import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME, JOB_KIND_EVENTS, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, QUEUE_HISTORICAL_PROCESSING } from './constants';
import { BlockProgressInterface, EventInterface, IndexerInterface, EthClient, EventsJobData, EventsQueueJobKind } from './types';
import { MAX_REORG_DEPTH, JOB_KIND_PRUNE, JOB_KIND_INDEX, UNKNOWN_EVENT_NAME, QUEUE_BLOCK_PROCESSING, QUEUE_EVENT_PROCESSING, QUEUE_HISTORICAL_PROCESSING } from './constants';
import { createPruningJob, processBlockByNumber } from './common';
import { OrderDirection } from './database';
import { HistoricalJobData, HistoricalJobResponseData } from './job-runner';
@ -258,27 +258,26 @@ export class EventWatcher {
);
}
async eventProcessingCompleteHandler (job: any): Promise<void> {
const { id, data: { request, failed, state, createdOn } } = job;
async eventProcessingCompleteHandler (job: PgBoss.Job<any>): Promise<void> {
const { id, data: { request: { data }, failed, state, createdOn } } = job;
if (failed) {
log(`Job ${id} for queue ${QUEUE_EVENT_PROCESSING} failed`);
return;
}
const { data: { kind, blockHash, publish } } = request;
// Ignore jobs other than JOB_KIND_EVENTS
if (kind !== JOB_KIND_EVENTS) {
// Ignore jobs other than event processsing
const { kind } = data;
if (kind !== EventsQueueJobKind.EVENTS) {
return;
}
const { blockHash, publish }: EventsJobData = data;
// Check if publish is set to true
// Events and blocks are not published in historical processing
// GQL subscription events will not be triggered if publish is set to false
if (publish) {
assert(blockHash);
const blockProgress = await this._indexer.getBlockProgress(blockHash);
assert(blockProgress);
@ -303,7 +302,7 @@ export class EventWatcher {
// TODO: Use a different pubsub to publish event from job-runner.
// https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
for (const dbEvent of dbEvents) {
log(`Job onComplete event ${dbEvent.id} publish ${!!request.data.publish}`);
log(`Job onComplete event ${dbEvent.id} publish ${publish}`);
if (!failed && state === 'completed') {
// Check for max acceptable lag time between request and sending results to live subscribers.

View File

@ -22,11 +22,13 @@ import {
SyncStatusInterface,
StateInterface,
StateKind,
EthClient
EthClient,
ContractJobData,
EventsQueueJobKind
} from './types';
import { UNKNOWN_EVENT_NAME, JOB_KIND_CONTRACT, QUEUE_EVENT_PROCESSING, DIFF_MERGE_BATCH_SIZE } from './constants';
import { UNKNOWN_EVENT_NAME, QUEUE_EVENT_PROCESSING, DIFF_MERGE_BATCH_SIZE } from './constants';
import { JobQueue } from './job-queue';
import { Where, QueryOptions } from './database';
import { Where, QueryOptions, BlockHeight } from './database';
import { ServerConfig, UpstreamConfig } from './config';
import { createOrUpdateStateData, StateDataMeta } from './state-helper';
@ -88,6 +90,18 @@ export type ResultEvent = {
proof: string;
};
export type ResultMeta = {
block: {
cid: string | null;
hash: string;
number: number;
timestamp: number;
parentHash: string;
};
deployment: string;
hasIndexingErrors: boolean;
};
export class Indexer {
_serverConfig: ServerConfig;
_upstreamConfig: UpstreamConfig;
@ -131,6 +145,40 @@ export class Indexer {
}, {});
}
async getMetaData (block: BlockHeight): Promise<ResultMeta | null> {
let resultBlock: BlockProgressInterface | undefined;
const syncStatus = await this.getSyncStatus();
assert(syncStatus);
if (block.hash) {
resultBlock = await this.getBlockProgress(block.hash);
} else {
const blockHeight = block.number ? block.number : syncStatus.latestIndexedBlockNumber - 1;
// Get all the blocks at a height
const blocksAtHeight = await this.getBlocksAtHeight(blockHeight, false);
if (blocksAtHeight.length) {
resultBlock = blocksAtHeight[0];
}
}
return resultBlock
? {
block: {
cid: resultBlock.cid,
number: resultBlock.blockNumber,
hash: resultBlock.blockHash,
timestamp: resultBlock.blockTimestamp,
parentHash: resultBlock.parentHash
},
deployment: '',
hasIndexingErrors: syncStatus.hasIndexingError
}
: null;
}
async getSyncStatus (): Promise<SyncStatusInterface | undefined> {
const dbTx = await this._db.createTransactionRunner();
let res;
@ -216,6 +264,23 @@ export class Indexer {
return res;
}
async updateSyncStatusIndexingError (hasIndexingError: boolean): Promise<SyncStatusInterface> {
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.updateSyncStatusIndexingError(dbTx, hasIndexingError);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
}
async getBlocks (blockFilter: { blockNumber?: number, blockHash?: string }): Promise<any> {
assert(blockFilter.blockHash || blockFilter.blockNumber);
const result = await this._ethClient.getBlocks(blockFilter);
@ -760,12 +825,10 @@ export class Indexer {
this.cacheContract(contract);
await dbTx.commitTransaction();
const contractJob: ContractJobData = { kind: EventsQueueJobKind.CONTRACT, contract };
await this._jobQueue.pushJob(
QUEUE_EVENT_PROCESSING,
{
kind: JOB_KIND_CONTRACT,
contract
},
contractJob,
{ priority: 1 }
);
} catch (error) {

View File

@ -12,8 +12,6 @@ import { JobQueueConfig } from './config';
import {
JOB_KIND_INDEX,
JOB_KIND_PRUNE,
JOB_KIND_EVENTS,
JOB_KIND_CONTRACT,
MAX_REORG_DEPTH,
QUEUE_BLOCK_PROCESSING,
QUEUE_EVENT_PROCESSING,
@ -22,7 +20,7 @@ import {
QUEUE_HISTORICAL_PROCESSING
} from './constants';
import { JobQueue } from './job-queue';
import { BlockProgressInterface, EventInterface, IndexerInterface } from './types';
import { BlockProgressInterface, ContractJobData, EventInterface, EventsJobData, EventsQueueJobKind, IndexerInterface } from './types';
import { wait } from './misc';
import {
createPruningJob,
@ -194,16 +192,17 @@ export class JobRunner {
);
// Push event processing job for each block
const pushJobForBlockPromises = blocks.map(async block => this.jobQueue.pushJob(
QUEUE_EVENT_PROCESSING,
{
kind: JOB_KIND_EVENTS,
const pushJobForBlockPromises = blocks.map(async block => {
const eventsProcessingJob: EventsJobData = {
kind: EventsQueueJobKind.EVENTS,
blockHash: block.blockHash,
isRetryAttempt: false,
// Avoid publishing GQL subscription event in historical processing
// Publishing when realtime processing is listening to events will cause problems
publish: false
}
));
};
this.jobQueue.pushJob(QUEUE_EVENT_PROCESSING, eventsProcessingJob);
});
await Promise.all(pushJobForBlockPromises);
this._historicalProcessingCompletedUpto = endBlock;
@ -214,21 +213,17 @@ export class JobRunner {
);
}
async processEvent (job: any): Promise<EventInterface | void> {
const { data: { kind } } = job;
async processEvent (job: PgBoss.JobWithDoneCallback<EventsJobData | ContractJobData, any>): Promise<EventInterface | void> {
const { data: jobData } = job;
switch (kind) {
case JOB_KIND_EVENTS:
await this._processEvents(job);
switch (jobData.kind) {
case EventsQueueJobKind.EVENTS:
await this._processEvents(jobData);
break;
case JOB_KIND_CONTRACT:
case EventsQueueJobKind.CONTRACT:
this._updateWatchedContracts(job);
break;
default:
log(`Invalid Job kind ${kind} in QUEUE_EVENT_PROCESSING.`);
break;
}
await this.jobQueue.markComplete(job);
@ -532,14 +527,20 @@ export class JobRunner {
// Push job to event processing queue.
// Block with all events processed or no events will not be processed again due to check in _processEvents.
await this.jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { kind: JOB_KIND_EVENTS, blockHash: blockProgress.blockHash, publish: true });
const eventsProcessingJob: EventsJobData = {
kind: EventsQueueJobKind.EVENTS,
blockHash: blockProgress.blockHash,
isRetryAttempt: false,
publish: true
};
await this.jobQueue.pushJob(QUEUE_EVENT_PROCESSING, eventsProcessingJob);
const indexBlockDuration = new Date().getTime() - indexBlockStartTime.getTime();
log(`time:job-runner#_indexBlock: ${indexBlockDuration}ms`);
}
async _processEvents (job: any): Promise<void> {
const { blockHash } = job.data;
async _processEvents (jobData: EventsJobData): Promise<void> {
const { blockHash, isRetryAttempt } = jobData;
try {
if (!this._blockAndEventsMap.has(blockHash)) {
@ -580,7 +581,7 @@ export class JobRunner {
if (this._historicalProcessingCompletedUpto && this._historicalProcessingCompletedUpto > block.blockNumber) {
const nextBlockNumberToProcess = block.blockNumber + 1;
// Push a new job to restart historical blocks processing afyre current block
// Push a new job to restart historical blocks processing after current block
log('New contract added in historical processing with filterLogsByAddresses set to true');
await this.jobQueue.pushJob(
QUEUE_HISTORICAL_PROCESSING,
@ -599,6 +600,12 @@ export class JobRunner {
this._endBlockProcessTimer = lastBlockProcessDuration.startTimer();
// If this was a retry attempt, unset the indexing error flag in sync status
if (isRetryAttempt) {
await this._indexer.updateSyncStatusIndexingError(false);
}
// TODO: Shutdown after job gets marked as complete
if (this._shutDown) {
log(`Graceful shutdown after processing block ${block.blockNumber}`);
this.jobQueue.stop();
@ -608,18 +615,22 @@ export class JobRunner {
log(`Error in processing events for block ${blockHash}`);
log(error);
// Set the indexing error flag in sync status
await this._indexer.updateSyncStatusIndexingError(true);
// TODO: Remove processed entities for current block to avoid reprocessing of events
// Catch event processing error and push to job queue after some time with higher priority
log(`Retrying event processing after ${EVENTS_PROCESSING_RETRY_WAIT} ms`);
await wait(EVENTS_PROCESSING_RETRY_WAIT);
// TODO: Stop next job in queue from processing next
// TODO: Stop job for next block in queue (in historical processing)
const eventsProcessingRetryJob: EventsJobData = { ...jobData, isRetryAttempt: true };
await this.jobQueue.pushJob(
QUEUE_EVENT_PROCESSING,
job.data,
{
priority: 1
}
eventsProcessingRetryJob,
{ priority: 1 }
);
}
}

View File

@ -9,6 +9,7 @@ import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper';
import { ServerConfig, UpstreamConfig } from './config';
import { Where, QueryOptions, Database } from './database';
import { ValueResult, StateStatus } from './indexer';
import { JOB_KIND_CONTRACT, JOB_KIND_EVENTS } from './constants';
export enum StateKind {
Diff = 'diff',
@ -42,6 +43,7 @@ export interface SyncStatusInterface {
latestCanonicalBlockNumber: number;
initialIndexedBlockHash: string;
initialIndexedBlockNumber: number;
hasIndexingError: boolean;
}
export interface StateSyncStatusInterface {
@ -106,6 +108,7 @@ export interface IndexerInterface {
updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>
updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>
forceUpdateSyncStatus (blockHash: string, blockNumber: number): Promise<SyncStatusInterface>
updateSyncStatusIndexingError (hasIndexingError: boolean): Promise<SyncStatusInterface>
updateStateSyncStatusIndexedBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface | undefined>
updateStateSyncStatusCheckpointBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface>
markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise<void>
@ -169,6 +172,7 @@ export interface DatabaseInterface {
updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>;
updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>;
forceUpdateSyncStatus (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatusInterface>;
updateSyncStatusIndexingError (queryRunner: QueryRunner, hasIndexingError: boolean): Promise<SyncStatusInterface>;
saveEvents (queryRunner: QueryRunner, events: DeepPartial<EventInterface>[]): Promise<void>;
saveBlockWithEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgressInterface>, events: DeepPartial<EventInterface>[]): Promise<BlockProgressInterface>;
saveEventEntity (queryRunner: QueryRunner, entity: EventInterface): Promise<EventInterface>;
@ -240,3 +244,20 @@ export type Clients = {
ethClient: EthClient;
[key: string]: any;
}
export enum EventsQueueJobKind {
EVENTS = JOB_KIND_EVENTS,
CONTRACT = JOB_KIND_CONTRACT
}
export interface EventsJobData {
kind: EventsQueueJobKind.EVENTS;
blockHash: string;
isRetryAttempt: boolean;
publish: boolean;
}
export interface ContractJobData {
kind: EventsQueueJobKind.CONTRACT;
contract: ContractInterface;
}