From 44b3fd59e854a53971a101b019d7d91f7765ea14 Mon Sep 17 00:00:00 2001 From: prathamesh0 <42446521+prathamesh0@users.noreply.github.com> Date: Thu, 18 Nov 2021 18:03:36 +0530 Subject: [PATCH] Add watched contract from subgraph yaml on startup (#56) --- packages/eden-watcher/src/cli/checkpoint.ts | 14 ++++- packages/eden-watcher/src/cli/export-state.ts | 15 ++++-- packages/eden-watcher/src/cli/import-state.ts | 11 ++-- packages/eden-watcher/src/cli/inspect-cid.ts | 13 ++++- .../eden-watcher/src/cli/reset-cmds/state.ts | 13 ++++- .../eden-watcher/src/cli/watch-contract.ts | 15 +++++- packages/eden-watcher/src/database.ts | 25 ++++----- packages/eden-watcher/src/events.ts | 24 +++++---- packages/eden-watcher/src/fill.ts | 17 +++--- packages/eden-watcher/src/indexer.ts | 46 +++++++--------- packages/eden-watcher/src/job-runner.ts | 27 ++++------ packages/eden-watcher/src/resolvers.ts | 5 +- packages/eden-watcher/src/server.ts | 9 ++-- .../erc20-watcher/src/cli/watch-contract.ts | 3 +- packages/erc20-watcher/src/indexer.ts | 6 +-- packages/erc20-watcher/src/resolvers.ts | 3 +- packages/erc20-watcher/src/utils/index.ts | 2 + .../test/subgraph/example1/subgraph.yaml | 1 + .../graph-test-watcher/src/cli/checkpoint.ts | 14 ++++- .../src/cli/export-state.ts | 15 ++++-- .../src/cli/import-state.ts | 11 ++-- .../graph-test-watcher/src/cli/inspect-cid.ts | 13 ++++- .../src/cli/reset-cmds/state.ts | 13 ++++- .../src/cli/watch-contract.ts | 15 +++++- packages/graph-test-watcher/src/database.ts | 25 ++++----- packages/graph-test-watcher/src/events.ts | 24 +++++---- packages/graph-test-watcher/src/fill.ts | 17 +++--- packages/graph-test-watcher/src/indexer.ts | 54 ++++++++----------- packages/graph-test-watcher/src/job-runner.ts | 24 ++++----- packages/graph-test-watcher/src/resolvers.ts | 12 ++++- packages/graph-test-watcher/src/schema.gql | 10 +++- packages/graph-test-watcher/src/server.ts | 9 ++-- packages/util/src/indexer.ts | 6 ++- packages/util/src/types.ts | 2 +- yarn.lock | 19 ++++--- 35 files changed, 314 insertions(+), 218 deletions(-) diff --git a/packages/eden-watcher/src/cli/checkpoint.ts b/packages/eden-watcher/src/cli/checkpoint.ts index 5d48dacd..f6f03b32 100644 --- a/packages/eden-watcher/src/cli/checkpoint.ts +++ b/packages/eden-watcher/src/cli/checkpoint.ts @@ -6,8 +6,9 @@ import path from 'path'; import yargs from 'yargs'; import 'reflect-metadata'; import debug from 'debug'; +import assert from 'assert'; -import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients } from '@vulcanize/util'; +import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util'; import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; import { Database } from '../database'; @@ -50,7 +51,16 @@ const main = async (): Promise => { const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, config.server.subgraphPath); - const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher); + const jobQueueConfig = config.jobQueue; + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + await jobQueue.start(); + + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher); graphWatcher.setIndexer(indexer); await graphWatcher.init(); diff --git a/packages/eden-watcher/src/cli/export-state.ts b/packages/eden-watcher/src/cli/export-state.ts index 73b6e624..8a558af2 100644 --- a/packages/eden-watcher/src/cli/export-state.ts +++ b/packages/eden-watcher/src/cli/export-state.ts @@ -9,7 +9,7 @@ import debug from 'debug'; import fs from 'fs'; import path from 'path'; -import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients } from '@vulcanize/util'; +import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util'; import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; import * as codec from '@ipld/dag-cbor'; @@ -48,7 +48,16 @@ const main = async (): Promise => { const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, config.server.subgraphPath); - const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher); + const jobQueueConfig = config.jobQueue; + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + await jobQueue.start(); + + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher); graphWatcher.setIndexer(indexer); await graphWatcher.init(); @@ -59,7 +68,7 @@ const main = async (): Promise => { ipldCheckpoints: [] }; - const contracts = await db.getContracts({}); + const contracts = await db.getContracts(); // Get latest canonical block. const block = await indexer.getLatestCanonicalBlock(); diff --git a/packages/eden-watcher/src/cli/import-state.ts b/packages/eden-watcher/src/cli/import-state.ts index b98e5997..433a18df 100644 --- a/packages/eden-watcher/src/cli/import-state.ts +++ b/packages/eden-watcher/src/cli/import-state.ts @@ -55,10 +55,6 @@ export const main = async (): Promise => { // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher); - - graphWatcher.setIndexer(indexer); - await graphWatcher.init(); const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); @@ -69,6 +65,11 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher); + + graphWatcher.setIndexer(indexer); + await graphWatcher.init(); + const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); // Import data. @@ -91,7 +92,7 @@ export const main = async (): Promise => { // Fill the Contracts. for (const contract of importData.contracts) { - await db.saveContract(contract.address, contract.kind, contract.checkpoint, contract.startingBlock); + await indexer.watchContract(contract.address, contract.kind, contract.checkpoint, contract.startingBlock); } // Get the snapshot block. diff --git a/packages/eden-watcher/src/cli/inspect-cid.ts b/packages/eden-watcher/src/cli/inspect-cid.ts index 7c91e59d..14c1ebf2 100644 --- a/packages/eden-watcher/src/cli/inspect-cid.ts +++ b/packages/eden-watcher/src/cli/inspect-cid.ts @@ -9,7 +9,7 @@ import 'reflect-metadata'; import debug from 'debug'; import util from 'util'; -import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients } from '@vulcanize/util'; +import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util'; import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; import { Database } from '../database'; @@ -48,7 +48,16 @@ const main = async (): Promise => { const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, config.server.subgraphPath); - const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher); + const jobQueueConfig = config.jobQueue; + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + await jobQueue.start(); + + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher); graphWatcher.setIndexer(indexer); await graphWatcher.init(); diff --git a/packages/eden-watcher/src/cli/reset-cmds/state.ts b/packages/eden-watcher/src/cli/reset-cmds/state.ts index 19ec7a16..a14ba5d9 100644 --- a/packages/eden-watcher/src/cli/reset-cmds/state.ts +++ b/packages/eden-watcher/src/cli/reset-cmds/state.ts @@ -7,7 +7,7 @@ import debug from 'debug'; import { MoreThan } from 'typeorm'; import assert from 'assert'; -import { getConfig, initClients, resetJobs } from '@vulcanize/util'; +import { getConfig, initClients, resetJobs, JobQueue } from '@vulcanize/util'; import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; import { Database } from '../../database'; @@ -40,7 +40,16 @@ export const handler = async (argv: any): Promise => { const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, config.server.subgraphPath); - const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher); + const jobQueueConfig = config.jobQueue; + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + await jobQueue.start(); + + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher); graphWatcher.setIndexer(indexer); await graphWatcher.init(); diff --git a/packages/eden-watcher/src/cli/watch-contract.ts b/packages/eden-watcher/src/cli/watch-contract.ts index 8b80c586..c58a4785 100644 --- a/packages/eden-watcher/src/cli/watch-contract.ts +++ b/packages/eden-watcher/src/cli/watch-contract.ts @@ -6,8 +6,9 @@ import path from 'path'; import yargs from 'yargs'; import 'reflect-metadata'; import debug from 'debug'; +import assert from 'assert'; -import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients } from '@vulcanize/util'; +import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util'; import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; import { Database } from '../database'; @@ -47,6 +48,7 @@ const main = async (): Promise => { }, startingBlock: { type: 'number', + default: 1, describe: 'Starting block' } }).argv; @@ -62,7 +64,16 @@ const main = async (): Promise => { const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, config.server.subgraphPath); - const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher); + const jobQueueConfig = config.jobQueue; + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + await jobQueue.start(); + + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher); graphWatcher.setIndexer(indexer); await graphWatcher.init(); diff --git a/packages/eden-watcher/src/database.ts b/packages/eden-watcher/src/database.ts index 887099bb..2dadb870 100644 --- a/packages/eden-watcher/src/database.ts +++ b/packages/eden-watcher/src/database.ts @@ -191,15 +191,10 @@ export class Database implements DatabaseInterface { return repo.save(entity); } - async getContracts (where: FindConditions): Promise { - const repo = this._conn.getRepository(Contract); - return repo.find({ where }); - } - - async getContract (address: string): Promise { + async getContracts (): Promise { const repo = this._conn.getRepository(Contract); - return this._baseDatabase.getContract(repo, address); + return this._baseDatabase.getContracts(repo); } async createTransactionRunner (): Promise { @@ -223,10 +218,10 @@ export class Database implements DatabaseInterface { return this._baseDatabase.saveEventEntity(repo, entity); } - async getBlockEvents (blockHash: string, where: FindConditions): Promise { + async getBlockEvents (blockHash: string, options: FindManyOptions): Promise { const repo = this._conn.getRepository(Event); - return this._baseDatabase.getBlockEvents(repo, blockHash, where); + return this._baseDatabase.getBlockEvents(repo, blockHash, options); } async saveEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise { @@ -236,12 +231,10 @@ export class Database implements DatabaseInterface { return this._baseDatabase.saveEvents(blockRepo, eventRepo, block, events); } - async saveContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise { - await this._conn.transaction(async (tx) => { - const repo = tx.getRepository(Contract); + async saveContract (queryRunner: QueryRunner, address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise { + const repo = queryRunner.manager.getRepository(Contract); - return this._baseDatabase.saveContract(repo, address, kind, checkpoint, startingBlock); - }); + return this._baseDatabase.saveContract(repo, address, kind, checkpoint, startingBlock); } async updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force = false): Promise { @@ -291,10 +284,10 @@ export class Database implements DatabaseInterface { return this._baseDatabase.getBlockProgress(repo, blockHash); } - async updateBlockProgress (queryRunner: QueryRunner, blockHash: string, lastProcessedEventIndex: number): Promise { + async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise { const repo = queryRunner.manager.getRepository(BlockProgress); - return this._baseDatabase.updateBlockProgress(repo, blockHash, lastProcessedEventIndex); + return this._baseDatabase.updateBlockProgress(repo, block, lastProcessedEventIndex); } async removeEntities (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindManyOptions | FindConditions): Promise { diff --git a/packages/eden-watcher/src/events.ts b/packages/eden-watcher/src/events.ts index 700aee60..0bae9fe6 100644 --- a/packages/eden-watcher/src/events.ts +++ b/packages/eden-watcher/src/events.ts @@ -93,16 +93,22 @@ export class EventWatcher implements EventWatcherInterface { return; } - const dbEvent = await this._baseEventWatcher.eventProcessingCompleteHandler(job); - + const dbEvents = await this._baseEventWatcher.eventProcessingCompleteHandler(job); const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000; - log(`Job onComplete event ${request.data.id} publish ${!!request.data.publish}`); - if (!failed && state === 'completed' && request.data.publish) { - // Check for max acceptable lag time between request and sending results to live subscribers. - if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) { - await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds); - } else { - log(`event ${request.data.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`); + + // Cannot publish individual event as they are processed together in a single job. + // TODO: Use a different pubsub to publish event from job-runner. + // https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries + for (const dbEvent of dbEvents) { + log(`Job onComplete event ${dbEvent.id} publish ${!!request.data.publish}`); + + if (!failed && state === 'completed' && request.data.publish) { + // Check for max acceptable lag time between request and sending results to live subscribers. + if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) { + await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds); + } else { + log(`event ${dbEvent.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`); + } } } }); diff --git a/packages/eden-watcher/src/fill.ts b/packages/eden-watcher/src/fill.ts index 0c3a5408..24069259 100644 --- a/packages/eden-watcher/src/fill.ts +++ b/packages/eden-watcher/src/fill.ts @@ -53,14 +53,6 @@ export const main = async (): Promise => { const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, config.server.subgraphPath); - // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. - // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries - const pubsub = new PubSub(); - const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher); - - graphWatcher.setIndexer(indexer); - await graphWatcher.init(); - const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); @@ -70,6 +62,15 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher); + + graphWatcher.setIndexer(indexer); + await graphWatcher.init(); + + // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. + // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries + const pubsub = new PubSub(); + const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); await fillBlocks(jobQueue, indexer, postgraphileClient, eventWatcher, config.upstream.ethServer.blockDelayInMilliSecs, argv); diff --git a/packages/eden-watcher/src/indexer.ts b/packages/eden-watcher/src/indexer.ts index 0b7e450a..acd85d9c 100644 --- a/packages/eden-watcher/src/indexer.ts +++ b/packages/eden-watcher/src/indexer.ts @@ -16,7 +16,7 @@ import { BaseProvider } from '@ethersproject/providers'; import * as codec from '@ipld/dag-cbor'; import { EthClient } from '@vulcanize/ipld-eth-client'; import { StorageLayout } from '@vulcanize/solidity-mapper'; -import { EventInterface, Indexer as BaseIndexer, IndexerInterface, UNKNOWN_EVENT_NAME, ServerConfig } from '@vulcanize/util'; +import { EventInterface, Indexer as BaseIndexer, IndexerInterface, UNKNOWN_EVENT_NAME, ServerConfig, JobQueue } from '@vulcanize/util'; import { GraphWatcher } from '@vulcanize/graph-node'; import { Database } from './database'; @@ -117,7 +117,7 @@ export class Indexer implements IndexerInterface { _ipfsClient: IPFSClient - constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider, graphWatcher: GraphWatcher) { + constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue, graphWatcher: GraphWatcher) { assert(db); assert(ethClient); assert(postgraphileClient); @@ -127,7 +127,7 @@ export class Indexer implements IndexerInterface { this._postgraphileClient = postgraphileClient; this._ethProvider = ethProvider; this._serverConfig = serverConfig; - this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, this._ethProvider); + this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, this._ethProvider, jobQueue); this._graphWatcher = graphWatcher; this._abiMap = new Map(); @@ -281,7 +281,7 @@ export class Indexer implements IndexerInterface { const { data: { blockHash, blockNumber } } = job; // Get all the contracts. - const contracts = await this._db.getContracts({}); + const contracts = await this._db.getContracts(); // For each contract, merge the diff till now to create a checkpoint. for (const contract of contracts) { @@ -976,24 +976,6 @@ export class Indexer implements IndexerInterface { }; } - async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock?: number): Promise { - // Use the checksum address (https://docs.ethers.io/v5/api/utils/address/#utils-getAddress) if input to address is a contract address. - // If a contract identifier is passed as address instead, no need to convert to checksum address. - // Customize: use the kind input to filter out non-contract-address input to address. - const formattedAddress = (kind === '__protocol__') ? address : ethers.utils.getAddress(address); - - if (!startingBlock) { - const syncStatus = await this.getSyncStatus(); - assert(syncStatus); - - startingBlock = syncStatus.latestIndexedBlockNumber; - } - - await this._db.saveContract(formattedAddress, kind, checkpoint, startingBlock); - - return true; - } - async getHookStatus (): Promise { const dbTx = await this._db.createTransactionRunner(); let res; @@ -1035,6 +1017,14 @@ export class Indexer implements IndexerInterface { return this.getBlockProgress(syncStatus.latestCanonicalBlockHash); } + async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise { + return this._baseIndexer.watchContract(address, kind, checkpoint, startingBlock); + } + + async saveEventEntity (dbEvent: Event): Promise { + return this._baseIndexer.saveEventEntity(dbEvent); + } + async getEventsByFilter (blockHash: string, contract?: string, name?: string): Promise> { return this._baseIndexer.getEventsByFilter(blockHash, contract, name); } @@ -1055,6 +1045,10 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getSyncStatus(); } + async getBlocks (blockFilter: { blockHash?: string, blockNumber?: number }): Promise { + return this._baseIndexer.getBlocks(blockFilter); + } + async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force = false): Promise { return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber, force); } @@ -1067,10 +1061,6 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.updateSyncStatusCanonicalBlock(blockHash, blockNumber, force); } - async getBlock (blockHash: string): Promise { - return this._baseIndexer.getBlock(blockHash); - } - async getEvent (id: string): Promise { return this._baseIndexer.getEvent(id); } @@ -1099,8 +1089,8 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.markBlocksAsPruned(blocks); } - async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise { - return this._baseIndexer.updateBlockProgress(blockHash, lastProcessedEventIndex); + async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise { + return this._baseIndexer.updateBlockProgress(block, lastProcessedEventIndex); } async getAncestorAtDepth (blockHash: string, depth: number): Promise { diff --git a/packages/eden-watcher/src/job-runner.ts b/packages/eden-watcher/src/job-runner.ts index 0a1f8e82..fbc029be 100644 --- a/packages/eden-watcher/src/job-runner.ts +++ b/packages/eden-watcher/src/job-runner.ts @@ -63,21 +63,12 @@ export class JobRunner { if (kind === JOB_KIND_INDEX) { await this._indexer.processBlock(blockHash); } - - await this._jobQueue.markComplete(job); }); } async subscribeEventProcessingQueue (): Promise { await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => { - const event = await this._baseJobRunner.processEvent(job); - - const watchedContract = await this._indexer.isWatchedContract(event.contract); - if (watchedContract) { - await this._indexer.processEvent(event); - } - - await this._jobQueue.markComplete(job); + await this._baseJobRunner.processEvent(job); }); } @@ -141,14 +132,6 @@ export const main = async (): Promise => { const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, config.server.subgraphPath); - const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher); - - graphWatcher.setIndexer(indexer); - await graphWatcher.init(); - - // Watching all the contracts in the subgraph. - await graphWatcher.addContracts(); - const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); @@ -158,6 +141,14 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher); + + graphWatcher.setIndexer(indexer); + await graphWatcher.init(); + + // Watching all the contracts in the subgraph. + await graphWatcher.addContracts(); + const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); await jobRunner.start(); }; diff --git a/packages/eden-watcher/src/resolvers.ts b/packages/eden-watcher/src/resolvers.ts index a993fea6..d959d735 100644 --- a/packages/eden-watcher/src/resolvers.ts +++ b/packages/eden-watcher/src/resolvers.ts @@ -51,10 +51,11 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch }, Mutation: { - watchContract: (_: any, { address, kind, checkpoint, startingBlock }: { address: string, kind: string, checkpoint: boolean, startingBlock: number }): Promise => { + watchContract: async (_: any, { address, kind, checkpoint, startingBlock = 1 }: { address: string, kind: string, checkpoint: boolean, startingBlock: number }): Promise => { log('watchContract', address, kind, checkpoint, startingBlock); + await indexer.watchContract(address, kind, checkpoint, startingBlock); - return indexer.watchContract(address, kind, checkpoint, startingBlock); + return true; } }, diff --git a/packages/eden-watcher/src/server.ts b/packages/eden-watcher/src/server.ts index aec430ec..acc268f0 100644 --- a/packages/eden-watcher/src/server.ts +++ b/packages/eden-watcher/src/server.ts @@ -51,10 +51,6 @@ export const main = async (): Promise => { // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher); - - graphWatcher.setIndexer(indexer); - await graphWatcher.init(); const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); @@ -64,6 +60,11 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher); + + graphWatcher.setIndexer(indexer); + await graphWatcher.init(); + const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); if (watcherKind === KIND_ACTIVE) { diff --git a/packages/erc20-watcher/src/cli/watch-contract.ts b/packages/erc20-watcher/src/cli/watch-contract.ts index 73ec918f..6bc16401 100644 --- a/packages/erc20-watcher/src/cli/watch-contract.ts +++ b/packages/erc20-watcher/src/cli/watch-contract.ts @@ -10,6 +10,7 @@ import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@ import { Database } from '../database'; import { Indexer } from '../indexer'; +import { CONTRACT_KIND } from '../utils/index'; (async () => { const argv = await yargs.parserConfiguration({ @@ -60,7 +61,7 @@ import { Indexer } from '../indexer'; const indexer = new Indexer(db, ethClient, postgraphileClient, ethProvider, jobQueue, mode); - await indexer.watchContract(argv.address, argv.checkpoint, argv.startingBlock); + await indexer.watchContract(argv.address, CONTRACT_KIND, argv.checkpoint, argv.startingBlock); await db.close(); await jobQueue.stop(); diff --git a/packages/erc20-watcher/src/indexer.ts b/packages/erc20-watcher/src/indexer.ts index a21dead5..f098ffd2 100644 --- a/packages/erc20-watcher/src/indexer.ts +++ b/packages/erc20-watcher/src/indexer.ts @@ -29,8 +29,6 @@ const ETH_CALL_MODE = 'eth_call'; const TRANSFER_EVENT = 'Transfer'; const APPROVAL_EVENT = 'Approval'; -const CONTRACT_KIND = 'token'; - interface EventResult { event: { from?: string; @@ -301,8 +299,8 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.isWatchedContract(address); } - async watchContract (address: string, checkpoint: boolean, startingBlock: number): Promise { - return this._baseIndexer.watchContract(address, CONTRACT_KIND, checkpoint, startingBlock); + async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise { + return this._baseIndexer.watchContract(address, kind, checkpoint, startingBlock); } async saveEventEntity (dbEvent: Event): Promise { diff --git a/packages/erc20-watcher/src/resolvers.ts b/packages/erc20-watcher/src/resolvers.ts index 75e02eb0..846f918a 100644 --- a/packages/erc20-watcher/src/resolvers.ts +++ b/packages/erc20-watcher/src/resolvers.ts @@ -10,6 +10,7 @@ import { ValueResult } from '@vulcanize/util'; import { Indexer } from './indexer'; import { EventWatcher } from './events'; +import { CONTRACT_KIND } from './utils/index'; const log = debug('vulcanize:resolver'); @@ -36,7 +37,7 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch Mutation: { watchToken: async (_: any, { token, checkpoint = false, startingBlock = 1 }: { token: string, checkpoint: boolean, startingBlock: number }): Promise => { log('watchToken', token, checkpoint, startingBlock); - await indexer.watchContract(token, checkpoint, startingBlock); + await indexer.watchContract(token, CONTRACT_KIND, checkpoint, startingBlock); return true; } diff --git a/packages/erc20-watcher/src/utils/index.ts b/packages/erc20-watcher/src/utils/index.ts index 2e2887c5..89332401 100644 --- a/packages/erc20-watcher/src/utils/index.ts +++ b/packages/erc20-watcher/src/utils/index.ts @@ -10,6 +10,8 @@ import ERC20SymbolBytesABI from '../artifacts/ERC20SymbolBytes.json'; import ERC20NameBytesABI from '../artifacts/ERC20NameBytes.json'; import { StaticTokenDefinition } from './static-token-definition'; +export const CONTRACT_KIND = 'token'; + export const fetchTokenSymbol = async (ethProvider: BaseProvider, blockHash: string, tokenAddress: string): Promise => { const contract = new Contract(tokenAddress, abi, ethProvider); const contractSymbolBytes = new Contract(tokenAddress, ERC20SymbolBytesABI, ethProvider); diff --git a/packages/graph-node/test/subgraph/example1/subgraph.yaml b/packages/graph-node/test/subgraph/example1/subgraph.yaml index 4b2b5dcb..569c08de 100644 --- a/packages/graph-node/test/subgraph/example1/subgraph.yaml +++ b/packages/graph-node/test/subgraph/example1/subgraph.yaml @@ -8,6 +8,7 @@ dataSources: source: address: "" abi: Example1 + startBlock: 100 mapping: kind: ethereum/events apiVersion: 0.0.5 diff --git a/packages/graph-test-watcher/src/cli/checkpoint.ts b/packages/graph-test-watcher/src/cli/checkpoint.ts index 5d48dacd..f6f03b32 100644 --- a/packages/graph-test-watcher/src/cli/checkpoint.ts +++ b/packages/graph-test-watcher/src/cli/checkpoint.ts @@ -6,8 +6,9 @@ import path from 'path'; import yargs from 'yargs'; import 'reflect-metadata'; import debug from 'debug'; +import assert from 'assert'; -import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients } from '@vulcanize/util'; +import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util'; import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; import { Database } from '../database'; @@ -50,7 +51,16 @@ const main = async (): Promise => { const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, config.server.subgraphPath); - const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher); + const jobQueueConfig = config.jobQueue; + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + await jobQueue.start(); + + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher); graphWatcher.setIndexer(indexer); await graphWatcher.init(); diff --git a/packages/graph-test-watcher/src/cli/export-state.ts b/packages/graph-test-watcher/src/cli/export-state.ts index 73b6e624..8a558af2 100644 --- a/packages/graph-test-watcher/src/cli/export-state.ts +++ b/packages/graph-test-watcher/src/cli/export-state.ts @@ -9,7 +9,7 @@ import debug from 'debug'; import fs from 'fs'; import path from 'path'; -import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients } from '@vulcanize/util'; +import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util'; import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; import * as codec from '@ipld/dag-cbor'; @@ -48,7 +48,16 @@ const main = async (): Promise => { const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, config.server.subgraphPath); - const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher); + const jobQueueConfig = config.jobQueue; + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + await jobQueue.start(); + + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher); graphWatcher.setIndexer(indexer); await graphWatcher.init(); @@ -59,7 +68,7 @@ const main = async (): Promise => { ipldCheckpoints: [] }; - const contracts = await db.getContracts({}); + const contracts = await db.getContracts(); // Get latest canonical block. const block = await indexer.getLatestCanonicalBlock(); diff --git a/packages/graph-test-watcher/src/cli/import-state.ts b/packages/graph-test-watcher/src/cli/import-state.ts index b98e5997..433a18df 100644 --- a/packages/graph-test-watcher/src/cli/import-state.ts +++ b/packages/graph-test-watcher/src/cli/import-state.ts @@ -55,10 +55,6 @@ export const main = async (): Promise => { // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher); - - graphWatcher.setIndexer(indexer); - await graphWatcher.init(); const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); @@ -69,6 +65,11 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher); + + graphWatcher.setIndexer(indexer); + await graphWatcher.init(); + const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); // Import data. @@ -91,7 +92,7 @@ export const main = async (): Promise => { // Fill the Contracts. for (const contract of importData.contracts) { - await db.saveContract(contract.address, contract.kind, contract.checkpoint, contract.startingBlock); + await indexer.watchContract(contract.address, contract.kind, contract.checkpoint, contract.startingBlock); } // Get the snapshot block. diff --git a/packages/graph-test-watcher/src/cli/inspect-cid.ts b/packages/graph-test-watcher/src/cli/inspect-cid.ts index 7c91e59d..14c1ebf2 100644 --- a/packages/graph-test-watcher/src/cli/inspect-cid.ts +++ b/packages/graph-test-watcher/src/cli/inspect-cid.ts @@ -9,7 +9,7 @@ import 'reflect-metadata'; import debug from 'debug'; import util from 'util'; -import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients } from '@vulcanize/util'; +import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util'; import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; import { Database } from '../database'; @@ -48,7 +48,16 @@ const main = async (): Promise => { const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, config.server.subgraphPath); - const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher); + const jobQueueConfig = config.jobQueue; + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + await jobQueue.start(); + + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher); graphWatcher.setIndexer(indexer); await graphWatcher.init(); diff --git a/packages/graph-test-watcher/src/cli/reset-cmds/state.ts b/packages/graph-test-watcher/src/cli/reset-cmds/state.ts index 39efc03e..efd76e28 100644 --- a/packages/graph-test-watcher/src/cli/reset-cmds/state.ts +++ b/packages/graph-test-watcher/src/cli/reset-cmds/state.ts @@ -7,7 +7,7 @@ import debug from 'debug'; import { MoreThan } from 'typeorm'; import assert from 'assert'; -import { getConfig, initClients, resetJobs } from '@vulcanize/util'; +import { getConfig, initClients, resetJobs, JobQueue } from '@vulcanize/util'; import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; import { Database } from '../../database'; @@ -43,7 +43,16 @@ export const handler = async (argv: any): Promise => { const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, config.server.subgraphPath); - const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher); + const jobQueueConfig = config.jobQueue; + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + await jobQueue.start(); + + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher); graphWatcher.setIndexer(indexer); await graphWatcher.init(); diff --git a/packages/graph-test-watcher/src/cli/watch-contract.ts b/packages/graph-test-watcher/src/cli/watch-contract.ts index 8b80c586..c58a4785 100644 --- a/packages/graph-test-watcher/src/cli/watch-contract.ts +++ b/packages/graph-test-watcher/src/cli/watch-contract.ts @@ -6,8 +6,9 @@ import path from 'path'; import yargs from 'yargs'; import 'reflect-metadata'; import debug from 'debug'; +import assert from 'assert'; -import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients } from '@vulcanize/util'; +import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util'; import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; import { Database } from '../database'; @@ -47,6 +48,7 @@ const main = async (): Promise => { }, startingBlock: { type: 'number', + default: 1, describe: 'Starting block' } }).argv; @@ -62,7 +64,16 @@ const main = async (): Promise => { const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, config.server.subgraphPath); - const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher); + const jobQueueConfig = config.jobQueue; + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + await jobQueue.start(); + + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher); graphWatcher.setIndexer(indexer); await graphWatcher.init(); diff --git a/packages/graph-test-watcher/src/database.ts b/packages/graph-test-watcher/src/database.ts index 1fd4e106..76e88904 100644 --- a/packages/graph-test-watcher/src/database.ts +++ b/packages/graph-test-watcher/src/database.ts @@ -225,15 +225,10 @@ export class Database implements DatabaseInterface { return repo.save(entity); } - async getContracts (where: FindConditions): Promise { - const repo = this._conn.getRepository(Contract); - return repo.find({ where }); - } - - async getContract (address: string): Promise { + async getContracts (): Promise { const repo = this._conn.getRepository(Contract); - return this._baseDatabase.getContract(repo, address); + return this._baseDatabase.getContracts(repo); } async createTransactionRunner (): Promise { @@ -257,10 +252,10 @@ export class Database implements DatabaseInterface { return this._baseDatabase.saveEventEntity(repo, entity); } - async getBlockEvents (blockHash: string, where: FindConditions): Promise { + async getBlockEvents (blockHash: string, options: FindManyOptions): Promise { const repo = this._conn.getRepository(Event); - return this._baseDatabase.getBlockEvents(repo, blockHash, where); + return this._baseDatabase.getBlockEvents(repo, blockHash, options); } async saveEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise { @@ -270,12 +265,10 @@ export class Database implements DatabaseInterface { return this._baseDatabase.saveEvents(blockRepo, eventRepo, block, events); } - async saveContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise { - await this._conn.transaction(async (tx) => { - const repo = tx.getRepository(Contract); + async saveContract (queryRunner: QueryRunner, address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise { + const repo = queryRunner.manager.getRepository(Contract); - return this._baseDatabase.saveContract(repo, address, kind, checkpoint, startingBlock); - }); + return this._baseDatabase.saveContract(repo, address, kind, checkpoint, startingBlock); } async updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force = false): Promise { @@ -325,10 +318,10 @@ export class Database implements DatabaseInterface { return this._baseDatabase.getBlockProgress(repo, blockHash); } - async updateBlockProgress (queryRunner: QueryRunner, blockHash: string, lastProcessedEventIndex: number): Promise { + async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise { const repo = queryRunner.manager.getRepository(BlockProgress); - return this._baseDatabase.updateBlockProgress(repo, blockHash, lastProcessedEventIndex); + return this._baseDatabase.updateBlockProgress(repo, block, lastProcessedEventIndex); } async removeEntities (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindManyOptions | FindConditions): Promise { diff --git a/packages/graph-test-watcher/src/events.ts b/packages/graph-test-watcher/src/events.ts index 700aee60..0bae9fe6 100644 --- a/packages/graph-test-watcher/src/events.ts +++ b/packages/graph-test-watcher/src/events.ts @@ -93,16 +93,22 @@ export class EventWatcher implements EventWatcherInterface { return; } - const dbEvent = await this._baseEventWatcher.eventProcessingCompleteHandler(job); - + const dbEvents = await this._baseEventWatcher.eventProcessingCompleteHandler(job); const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000; - log(`Job onComplete event ${request.data.id} publish ${!!request.data.publish}`); - if (!failed && state === 'completed' && request.data.publish) { - // Check for max acceptable lag time between request and sending results to live subscribers. - if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) { - await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds); - } else { - log(`event ${request.data.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`); + + // Cannot publish individual event as they are processed together in a single job. + // TODO: Use a different pubsub to publish event from job-runner. + // https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries + for (const dbEvent of dbEvents) { + log(`Job onComplete event ${dbEvent.id} publish ${!!request.data.publish}`); + + if (!failed && state === 'completed' && request.data.publish) { + // Check for max acceptable lag time between request and sending results to live subscribers. + if (timeElapsedInSeconds <= this._jobQueue.maxCompletionLag) { + await this.publishEventToSubscribers(dbEvent, timeElapsedInSeconds); + } else { + log(`event ${dbEvent.id} is too old (${timeElapsedInSeconds}s), not broadcasting to live subscribers`); + } } } }); diff --git a/packages/graph-test-watcher/src/fill.ts b/packages/graph-test-watcher/src/fill.ts index 0c3a5408..24069259 100644 --- a/packages/graph-test-watcher/src/fill.ts +++ b/packages/graph-test-watcher/src/fill.ts @@ -53,14 +53,6 @@ export const main = async (): Promise => { const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, config.server.subgraphPath); - // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. - // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries - const pubsub = new PubSub(); - const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher); - - graphWatcher.setIndexer(indexer); - await graphWatcher.init(); - const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); @@ -70,6 +62,15 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher); + + graphWatcher.setIndexer(indexer); + await graphWatcher.init(); + + // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. + // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries + const pubsub = new PubSub(); + const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); await fillBlocks(jobQueue, indexer, postgraphileClient, eventWatcher, config.upstream.ethServer.blockDelayInMilliSecs, argv); diff --git a/packages/graph-test-watcher/src/indexer.ts b/packages/graph-test-watcher/src/indexer.ts index 40acfc31..dd5f74cc 100644 --- a/packages/graph-test-watcher/src/indexer.ts +++ b/packages/graph-test-watcher/src/indexer.ts @@ -16,7 +16,7 @@ import { BaseProvider } from '@ethersproject/providers'; import * as codec from '@ipld/dag-cbor'; import { EthClient } from '@vulcanize/ipld-eth-client'; import { StorageLayout } from '@vulcanize/solidity-mapper'; -import { EventInterface, Indexer as BaseIndexer, IndexerInterface, ValueResult, UNKNOWN_EVENT_NAME, ServerConfig, updateStateForElementaryType } from '@vulcanize/util'; +import { EventInterface, Indexer as BaseIndexer, IndexerInterface, ValueResult, UNKNOWN_EVENT_NAME, ServerConfig, updateStateForElementaryType, JobQueue } from '@vulcanize/util'; import { GraphWatcher } from '@vulcanize/graph-node'; import { Database } from './database'; @@ -87,7 +87,7 @@ export class Indexer implements IndexerInterface { _ipfsClient: IPFSClient - constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider, graphWatcher: GraphWatcher) { + constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, postgraphileClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue, graphWatcher: GraphWatcher) { assert(db); assert(ethClient); assert(postgraphileClient); @@ -97,7 +97,7 @@ export class Indexer implements IndexerInterface { this._postgraphileClient = postgraphileClient; this._ethProvider = ethProvider; this._serverConfig = serverConfig; - this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, this._ethProvider); + this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, this._ethProvider, jobQueue); this._graphWatcher = graphWatcher; const { abi, storageLayout } = artifacts; @@ -271,8 +271,10 @@ export class Indexer implements IndexerInterface { const checkpoint = await this.getLatestIPLDBlock(contractAddress, 'checkpoint'); // There should be an initial checkpoint at least. - // Assumption: There should be no events for the contract at the starting block. - assert(checkpoint, 'Initial checkpoint doesn\'t exist'); + // Return if initial checkpoint doesn't exist. + if (!checkpoint) { + return; + } // Check if the latest checkpoint is in the same block. assert(checkpoint.block.blockHash !== block.blockHash, 'Checkpoint already created for the block hash.'); @@ -289,7 +291,7 @@ export class Indexer implements IndexerInterface { const { data: { blockHash, blockNumber } } = job; // Get all the contracts. - const contracts = await this._db.getContracts({}); + const contracts = await this._db.getContracts(); // For each contract, merge the diff till now to create a checkpoint. for (const contract of contracts) { @@ -299,7 +301,7 @@ export class Indexer implements IndexerInterface { const checkpointBlock = await this.getLatestIPLDBlock(contract.address, 'checkpoint'); if (!checkpointBlock) { - if (blockNumber === contract.startingBlock) { + if (blockNumber >= contract.startingBlock) { // Call initial checkpoint hook. await createInitialCheckpoint(this, contract.address, blockHash); } @@ -585,24 +587,6 @@ export class Indexer implements IndexerInterface { }; } - async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock?: number): Promise { - // Use the checksum address (https://docs.ethers.io/v5/api/utils/address/#utils-getAddress) if input to address is a contract address. - // If a contract identifier is passed as address instead, no need to convert to checksum address. - // Customize: use the kind input to filter out non-contract-address input to address. - const formattedAddress = (kind === '__protocol__') ? address : ethers.utils.getAddress(address); - - if (!startingBlock) { - const syncStatus = await this.getSyncStatus(); - assert(syncStatus); - - startingBlock = syncStatus.latestIndexedBlockNumber; - } - - await this._db.saveContract(formattedAddress, kind, checkpoint, startingBlock); - - return true; - } - async getHookStatus (): Promise { const dbTx = await this._db.createTransactionRunner(); let res; @@ -644,6 +628,14 @@ export class Indexer implements IndexerInterface { return this.getBlockProgress(syncStatus.latestCanonicalBlockHash); } + async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise { + return this._baseIndexer.watchContract(address, kind, checkpoint, startingBlock); + } + + async saveEventEntity (dbEvent: Event): Promise { + return this._baseIndexer.saveEventEntity(dbEvent); + } + async getEventsByFilter (blockHash: string, contract?: string, name?: string): Promise> { return this._baseIndexer.getEventsByFilter(blockHash, contract, name); } @@ -664,6 +656,10 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getSyncStatus(); } + async getBlocks (blockFilter: { blockHash?: string, blockNumber?: number }): Promise { + return this._baseIndexer.getBlocks(blockFilter); + } + async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force = false): Promise { return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber, force); } @@ -676,10 +672,6 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.updateSyncStatusCanonicalBlock(blockHash, blockNumber, force); } - async getBlock (blockHash: string): Promise { - return this._baseIndexer.getBlock(blockHash); - } - async getEvent (id: string): Promise { return this._baseIndexer.getEvent(id); } @@ -708,8 +700,8 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.markBlocksAsPruned(blocks); } - async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise { - return this._baseIndexer.updateBlockProgress(blockHash, lastProcessedEventIndex); + async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise { + return this._baseIndexer.updateBlockProgress(block, lastProcessedEventIndex); } async getAncestorAtDepth (blockHash: string, depth: number): Promise { diff --git a/packages/graph-test-watcher/src/job-runner.ts b/packages/graph-test-watcher/src/job-runner.ts index 60087c69..fbc029be 100644 --- a/packages/graph-test-watcher/src/job-runner.ts +++ b/packages/graph-test-watcher/src/job-runner.ts @@ -63,21 +63,12 @@ export class JobRunner { if (kind === JOB_KIND_INDEX) { await this._indexer.processBlock(blockHash); } - - await this._jobQueue.markComplete(job); }); } async subscribeEventProcessingQueue (): Promise { await this._jobQueue.subscribe(QUEUE_EVENT_PROCESSING, async (job) => { - const event = await this._baseJobRunner.processEvent(job); - - const watchedContract = await this._indexer.isWatchedContract(event.contract); - if (watchedContract) { - await this._indexer.processEvent(event); - } - - await this._jobQueue.markComplete(job); + await this._baseJobRunner.processEvent(job); }); } @@ -141,11 +132,6 @@ export const main = async (): Promise => { const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, config.server.subgraphPath); - const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher); - - graphWatcher.setIndexer(indexer); - await graphWatcher.init(); - const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); @@ -155,6 +141,14 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher); + + graphWatcher.setIndexer(indexer); + await graphWatcher.init(); + + // Watching all the contracts in the subgraph. + await graphWatcher.addContracts(); + const jobRunner = new JobRunner(jobQueueConfig, indexer, jobQueue); await jobRunner.start(); }; diff --git a/packages/graph-test-watcher/src/resolvers.ts b/packages/graph-test-watcher/src/resolvers.ts index 93f38417..a9fbc1b9 100644 --- a/packages/graph-test-watcher/src/resolvers.ts +++ b/packages/graph-test-watcher/src/resolvers.ts @@ -12,6 +12,7 @@ import { Indexer } from './indexer'; import { EventWatcher } from './events'; import { ExampleEntity } from './entity/ExampleEntity'; +import { RelatedEntity } from './entity/RelatedEntity'; const log = debug('vulcanize:resolver'); @@ -36,10 +37,11 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch }, Mutation: { - watchContract: (_: any, { address, kind, checkpoint, startingBlock }: { address: string, kind: string, checkpoint: boolean, startingBlock: number }): Promise => { + watchContract: async (_: any, { address, kind, checkpoint, startingBlock = 1 }: { address: string, kind: string, checkpoint: boolean, startingBlock: number }): Promise => { log('watchContract', address, kind, checkpoint, startingBlock); + await indexer.watchContract(address, kind, checkpoint, startingBlock); - return indexer.watchContract(address, kind, checkpoint, startingBlock); + return true; } }, @@ -54,6 +56,12 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch return indexer._test(blockHash, contractAddress); }, + relatedEntity: async (_: any, { id, blockHash }: { id: string, blockHash: string }): Promise => { + log('relatedEntity', id, blockHash); + + return indexer.getSubgraphEntity(RelatedEntity, id, blockHash); + }, + exampleEntity: async (_: any, { id, blockHash }: { id: string, blockHash: string }): Promise => { log('exampleEntity', id, blockHash); diff --git a/packages/graph-test-watcher/src/schema.gql b/packages/graph-test-watcher/src/schema.gql index ece2fa65..13b47675 100644 --- a/packages/graph-test-watcher/src/schema.gql +++ b/packages/graph-test-watcher/src/schema.gql @@ -72,6 +72,7 @@ type Query { eventsInRange(fromBlockNumber: Int!, toBlockNumber: Int!): [ResultEvent!] getMethod(blockHash: String!, contractAddress: String!): ResultString! _test(blockHash: String!, contractAddress: String!): ResultBigInt! + relatedEntity(id: String!, blockHash: String!): RelatedEntity! exampleEntity(id: String!, blockHash: String!): ExampleEntity! getStateByCID(cid: String!): ResultIPLDBlock getState(blockHash: String!, contractAddress: String!, kind: String): ResultIPLDBlock @@ -82,6 +83,13 @@ enum EnumType { choice2 } +type RelatedEntity { + id: ID! + paramBigInt: BigInt! + examples: [ExampleEntity!]! + bigIntArray: [BigInt!]! +} + type ExampleEntity { id: ID! count: BigInt! @@ -91,7 +99,7 @@ type ExampleEntity { paramBytes: Bytes! paramEnum: EnumType! paramBigDecimal: BigDecimal! - related: String! + related: RelatedEntity! } type Mutation { diff --git a/packages/graph-test-watcher/src/server.ts b/packages/graph-test-watcher/src/server.ts index aec430ec..acc268f0 100644 --- a/packages/graph-test-watcher/src/server.ts +++ b/packages/graph-test-watcher/src/server.ts @@ -51,10 +51,6 @@ export const main = async (): Promise => { // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher); - - graphWatcher.setIndexer(indexer); - await graphWatcher.init(); const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); @@ -64,6 +60,11 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher); + + graphWatcher.setIndexer(indexer); + await graphWatcher.init(); + const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); if (watcherKind === KIND_ACTIVE) { diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index 0792ddf4..59f2b46d 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -312,8 +312,10 @@ export class Indexer { assert(this._db.saveContract); const dbTx = await this._db.createTransactionRunner(); - // Always use the checksum address (https://docs.ethers.io/v5/api/utils/address/#utils-getAddress). - const contractAddress = ethers.utils.getAddress(address); + // Use the checksum address (https://docs.ethers.io/v5/api/utils/address/#utils-getAddress) if input to address is a contract address. + // If a contract identifier is passed as address instead, no need to convert to checksum address. + // Customize: use the kind input to filter out non-contract-address input to address. + const contractAddress = (kind === '__protocol__') ? address : ethers.utils.getAddress(address); try { const contract = await this._db.saveContract(dbTx, contractAddress, kind, checkpoint, startingBlock); diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index 85d11d80..c49304f3 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -73,7 +73,7 @@ export interface IndexerInterface { isWatchedContract?: (address: string) => Promise; cacheContract?: (contract: ContractInterface) => void; createDiffStaged?: (contractAddress: string, blockHash: string, data: any) => Promise - watchContract?: (address: string, kind: string, checkpoint: boolean, startingBlock?: number) => Promise + watchContract?: (address: string, kind: string, checkpoint: boolean, startingBlock: number) => Promise } export interface EventWatcherInterface { diff --git a/yarn.lock b/yarn.lock index b0c6e346..8f0ee7dd 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4209,6 +4209,11 @@ binary-extensions@^2.0.0: resolved "https://registry.yarnpkg.com/binary-extensions/-/binary-extensions-2.2.0.tgz#75f502eeaf9ffde42fc98829645be4ea76bd9e2d" integrity sha512-jDctJ/IVQbZoJykoeHbhXpOlNBqGNcwXJKJog42E5HDPUwQTSdjCHdihjj0DlnheQ7blbT6dHOafNAiS8ooQKA== +binaryen@101.0.0-nightly.20210723: + version "101.0.0-nightly.20210723" + resolved "https://registry.yarnpkg.com/binaryen/-/binaryen-101.0.0-nightly.20210723.tgz#b6bb7f3501341727681a03866c0856500eec3740" + integrity sha512-eioJNqhHlkguVSbblHOtLqlhtC882SOEPKmNFZaDuz1hzQjolxZ+eu3/kaS10n3sGPONsIZsO7R9fR00UyhEUA== + bindings@^1.2.1: version "1.5.0" resolved "https://registry.yarnpkg.com/bindings/-/bindings-1.5.0.tgz#10353c9e945334bc0511a6d90b38fbc7c9c504df" @@ -4216,11 +4221,6 @@ bindings@^1.2.1: dependencies: file-uri-to-path "1.0.0" -binaryen@101.0.0-nightly.20210723: - version "101.0.0-nightly.20210723" - resolved "https://registry.yarnpkg.com/binaryen/-/binaryen-101.0.0-nightly.20210723.tgz#b6bb7f3501341727681a03866c0856500eec3740" - integrity sha512-eioJNqhHlkguVSbblHOtLqlhtC882SOEPKmNFZaDuz1hzQjolxZ+eu3/kaS10n3sGPONsIZsO7R9fR00UyhEUA== - bip39@2.5.0: version "2.5.0" resolved "https://registry.yarnpkg.com/bip39/-/bip39-2.5.0.tgz#51cbd5179460504a63ea3c000db3f787ca051235" @@ -10646,6 +10646,13 @@ node-fetch@^2: resolved "https://registry.yarnpkg.com/node-fetch/-/node-fetch-2.6.2.tgz#986996818b73785e47b1965cc34eb093a1d464d0" integrity sha512-aLoxToI6RfZ+0NOjmWAgn9+LEd30YCkJKFSyWacNZdEKTit/ZMcKjGkTRo8uWEsnIb/hfKecNPEbln02PdWbcA== +node-fetch@^2.6.5: + version "2.6.6" + resolved "https://registry.yarnpkg.com/node-fetch/-/node-fetch-2.6.6.tgz#1751a7c01834e8e1697758732e9efb6eeadfaf89" + integrity sha512-Z8/6vRlTUChSdIgMa51jxQ4lrw/Jy5SOW10ObaA47/RElsAN2c5Pn8bTgFGWn/ibwzXTE8qwr1Yzx28vsecXEA== + dependencies: + whatwg-url "^5.0.0" + "node-fetch@https://registry.npmjs.org/@achingbrain/node-fetch/-/node-fetch-2.6.7.tgz": version "2.6.7" resolved "https://registry.npmjs.org/@achingbrain/node-fetch/-/node-fetch-2.6.7.tgz#1b5d62978f2ed07b99444f64f0df39f960a6d34d" @@ -11863,7 +11870,7 @@ proto-list@~1.2.1: resolved "https://registry.yarnpkg.com/proto-list/-/proto-list-1.2.4.tgz#212d5bfe1318306a420f6402b8e26ff39647a849" integrity sha1-IS1b/hMYMGpCD2QCuOJv85ZHqEk= -protobufjs@^6.10.2: +protobufjs@^6.10.2, protobufjs@~6.11.0: version "6.11.2" resolved "https://registry.yarnpkg.com/protobufjs/-/protobufjs-6.11.2.tgz#de39fabd4ed32beaa08e9bb1e30d08544c1edf8b" integrity sha512-4BQJoPooKJl2G9j3XftkIXjoC9C0Av2NOrWmbLWT1vH32GcSUHjM0Arra6UfTsVyfMAuFzaLucXn1sadxJydAw==