diff --git a/packages/codegen/src/reset.ts b/packages/codegen/src/reset.ts index b4ae6fd7..6799c99e 100644 --- a/packages/codegen/src/reset.ts +++ b/packages/codegen/src/reset.ts @@ -50,6 +50,20 @@ export class Reset { this._queries.push(queryObject); } + addSubgraphEntities (subgraphSchemaDocument: any): void { + const subgraphTypeDefs = subgraphSchemaDocument.definitions; + + subgraphTypeDefs.forEach((def: any) => { + if (def.kind !== 'ObjectTypeDefinition') { + return; + } + + this._queries.push({ + entityName: def.name.value + }); + }); + } + /** * Writes the reset.ts, job-queue.ts, state.ts files generated from templates to respective streams. * @param outStream A writable output stream to write the database file to. diff --git a/packages/codegen/src/templates/checkpoint-template.handlebars b/packages/codegen/src/templates/checkpoint-template.handlebars index 2f2b3c86..eda5011b 100644 --- a/packages/codegen/src/templates/checkpoint-template.handlebars +++ b/packages/codegen/src/templates/checkpoint-template.handlebars @@ -6,8 +6,9 @@ import path from 'path'; import yargs from 'yargs'; import 'reflect-metadata'; import debug from 'debug'; +import assert from 'assert'; -import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients } from '@vulcanize/util'; +import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util'; import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; import { Database } from '../database'; @@ -48,9 +49,19 @@ const main = async (): Promise => { const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); + const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server); - const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher); + const jobQueueConfig = config.jobQueue; + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + await jobQueue.start(); + + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher); + await indexer.init(); graphWatcher.setIndexer(indexer); await graphWatcher.init(); diff --git a/packages/codegen/src/templates/export-state-template.handlebars b/packages/codegen/src/templates/export-state-template.handlebars index f2ae170a..cbd90e19 100644 --- a/packages/codegen/src/templates/export-state-template.handlebars +++ b/packages/codegen/src/templates/export-state-template.handlebars @@ -9,7 +9,7 @@ import debug from 'debug'; import fs from 'fs'; import path from 'path'; -import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients } from '@vulcanize/util'; +import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, StateKind } from '@vulcanize/util'; import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; import * as codec from '@ipld/dag-cbor'; @@ -46,9 +46,19 @@ const main = async (): Promise => { const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); + const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server); - const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher); + const jobQueueConfig = config.jobQueue; + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + await jobQueue.start(); + + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher); + await indexer.init(); graphWatcher.setIndexer(indexer); await graphWatcher.init(); @@ -59,10 +69,10 @@ const main = async (): Promise => { ipldCheckpoints: [] }; - const contracts = await db.getContracts({}); + const contracts = await db.getContracts(); - // Get latest canonical block. - const block = await indexer.getLatestCanonicalBlock(); + // Get latest block with hooks processed. + const block = await indexer.getLatestHooksProcessedBlock(); assert(block); // Export snapshot block. @@ -84,10 +94,14 @@ const main = async (): Promise => { if (contract.checkpoint) { await indexer.createCheckpoint(contract.address, block.blockHash); - const ipldBlock = await indexer.getLatestIPLDBlock(contract.address, 'checkpoint', block.blockNumber); + const ipldBlock = await indexer.getLatestIPLDBlock(contract.address, StateKind.Checkpoint, block.blockNumber); assert(ipldBlock); - const data = codec.decode(Buffer.from(ipldBlock.data)) as any; + const data = indexer.getIPLDData(ipldBlock); + + if (indexer.isIPFSConfigured()) { + await indexer.pushToIPFS(data); + } exportData.ipldCheckpoints.push({ contractAddress: ipldBlock.contractAddress, diff --git a/packages/codegen/src/templates/import-state-template.handlebars b/packages/codegen/src/templates/import-state-template.handlebars index b8985490..1ba4d2dc 100644 --- a/packages/codegen/src/templates/import-state-template.handlebars +++ b/packages/codegen/src/templates/import-state-template.handlebars @@ -11,7 +11,7 @@ import { PubSub } from 'apollo-server-express'; import fs from 'fs'; import path from 'path'; -import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, Config, initClients } from '@vulcanize/util'; +import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, Config, initClients, StateKind } from '@vulcanize/util'; import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; import * as codec from '@ipld/dag-cbor'; @@ -50,15 +50,11 @@ export const main = async (): Promise => { const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); + const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server); // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries const pubsub = new PubSub(); - const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher); - - graphWatcher.setIndexer(indexer); - await graphWatcher.init(); const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); @@ -69,6 +65,12 @@ export const main = async (): Promise => { const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); await jobQueue.start(); + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher); + await indexer.init(); + + graphWatcher.setIndexer(indexer); + await graphWatcher.init(); + const eventWatcher = new EventWatcher(config.upstream, ethClient, postgraphileClient, indexer, pubsub, jobQueue); // Import data. @@ -80,7 +82,6 @@ export const main = async (): Promise => { await fillBlocks( jobQueue, indexer, - postgraphileClient, eventWatcher, config.upstream.ethServer.blockDelayInMilliSecs, { @@ -91,7 +92,7 @@ export const main = async (): Promise => { // Fill the Contracts. for (const contract of importData.contracts) { - await db.saveContract(contract.address, contract.kind, contract.checkpoint, contract.startingBlock); + await indexer.watchContract(contract.address, contract.kind, contract.checkpoint, contract.startingBlock); } // Get the snapshot block. @@ -107,8 +108,12 @@ export const main = async (): Promise => { ipldBlock.data = Buffer.from(codec.encode(ipldBlock.data)); - await db.saveOrUpdateIPLDBlock(ipldBlock); + await indexer.saveOrUpdateIPLDBlock(ipldBlock); } + + // The 'diff_staged' and 'init' IPLD blocks are unnecessary as checkpoints have been already created for the snapshot block. + await indexer.removeIPLDBlocks(block.blockNumber, StateKind.Init); + await indexer.removeIPLDBlocks(block.blockNumber, StateKind.DiffStaged); }; main().catch(err => { diff --git a/packages/codegen/src/templates/inspect-cid-template.handlebars b/packages/codegen/src/templates/inspect-cid-template.handlebars index 501ad8ac..e1d6d9ec 100644 --- a/packages/codegen/src/templates/inspect-cid-template.handlebars +++ b/packages/codegen/src/templates/inspect-cid-template.handlebars @@ -9,7 +9,7 @@ import 'reflect-metadata'; import debug from 'debug'; import util from 'util'; -import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients } from '@vulcanize/util'; +import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util'; import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; import { Database } from '../database'; @@ -46,9 +46,19 @@ const main = async (): Promise => { const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); + const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server); - const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher); + const jobQueueConfig = config.jobQueue; + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + await jobQueue.start(); + + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher); + await indexer.init(); graphWatcher.setIndexer(indexer); await graphWatcher.init(); diff --git a/packages/codegen/src/templates/reset-state-template.handlebars b/packages/codegen/src/templates/reset-state-template.handlebars index 023fd22e..5ef3b12d 100644 --- a/packages/codegen/src/templates/reset-state-template.handlebars +++ b/packages/codegen/src/templates/reset-state-template.handlebars @@ -7,7 +7,7 @@ import debug from 'debug'; import { MoreThan } from 'typeorm'; import assert from 'assert'; -import { getConfig, initClients, resetJobs } from '@vulcanize/util'; +import { getConfig, initClients, resetJobs, JobQueue } from '@vulcanize/util'; import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; import { Database } from '../../database'; @@ -42,9 +42,19 @@ export const handler = async (argv: any): Promise => { const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); + const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server); - const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher); + const jobQueueConfig = config.jobQueue; + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + await jobQueue.start(); + + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher); + await indexer.init(); graphWatcher.setIndexer(indexer); await graphWatcher.init(); @@ -87,6 +97,8 @@ export const handler = async (argv: any): Promise => { await indexer.updateHookStatusProcessedBlock(blockProgress.blockNumber, true); } + await indexer.updateSyncStatusChainHead(blockProgress.blockHash, blockProgress.blockNumber, true); + dbTx.commitTransaction(); } catch (error) { await dbTx.rollbackTransaction(); diff --git a/packages/codegen/src/templates/watch-contract-template.handlebars b/packages/codegen/src/templates/watch-contract-template.handlebars index 431f2c33..1402d587 100644 --- a/packages/codegen/src/templates/watch-contract-template.handlebars +++ b/packages/codegen/src/templates/watch-contract-template.handlebars @@ -6,8 +6,9 @@ import path from 'path'; import yargs from 'yargs'; import 'reflect-metadata'; import debug from 'debug'; +import assert from 'assert'; -import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients } from '@vulcanize/util'; +import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util'; import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; import { Database } from '../database'; @@ -47,6 +48,7 @@ const main = async (): Promise => { }, startingBlock: { type: 'number', + default: 1, describe: 'Starting block' } }).argv; @@ -60,9 +62,19 @@ const main = async (): Promise => { const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); + const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server); - const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, graphWatcher); + const jobQueueConfig = config.jobQueue; + assert(jobQueueConfig, 'Missing job queue config'); + + const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig; + assert(dbConnectionString, 'Missing job queue db connection string'); + + const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs }); + await jobQueue.start(); + + const indexer = new Indexer(config.server, db, ethClient, postgraphileClient, ethProvider, jobQueue, graphWatcher); + await indexer.init(); graphWatcher.setIndexer(indexer); await graphWatcher.init(); diff --git a/packages/codegen/src/visitor.ts b/packages/codegen/src/visitor.ts index 8100f7ca..c6727e0a 100644 --- a/packages/codegen/src/visitor.ts +++ b/packages/codegen/src/visitor.ts @@ -118,6 +118,7 @@ export class Visitor { this._schema.addSubgraphSchema(subgraphSchemaDocument); this._entity.addSubgraphEntities(subgraphSchemaDocument); this._resolvers.addSubgraphResolvers(subgraphSchemaDocument); + this._reset.addSubgraphEntities(subgraphSchemaDocument); } /**