From 198e49e5a0c02b0c4807d02cf4bf6f9d8a57570a Mon Sep 17 00:00:00 2001 From: nikugogoi Date: Tue, 21 Dec 2021 15:58:17 +0530 Subject: [PATCH] Implement WASM instance restart to handle out of memory error (#81) * Test case for wasm out of memory error * Restart wasm instance after N blocks * Handle out of memory error and re instantiate WASM * Remove old instance from map before reinstantiating WASM --- packages/eden-watcher/environments/local.toml | 3 +- packages/eden-watcher/src/cli/checkpoint.ts | 2 +- packages/eden-watcher/src/cli/export-state.ts | 2 +- packages/eden-watcher/src/cli/import-state.ts | 2 +- packages/eden-watcher/src/cli/inspect-cid.ts | 2 +- .../eden-watcher/src/cli/reset-cmds/state.ts | 2 +- .../eden-watcher/src/cli/watch-contract.ts | 2 +- packages/eden-watcher/src/fill.ts | 2 +- packages/eden-watcher/src/job-runner.ts | 11 +-- packages/eden-watcher/src/server.ts | 2 +- packages/graph-node/assembly/index.ts | 4 + packages/graph-node/package.json | 2 +- packages/graph-node/src/loader.test.ts | 41 +++++++++ packages/graph-node/src/loader.ts | 21 +++-- packages/graph-node/src/watcher.ts | 83 ++++++++++++++++--- packages/graph-node/test/utils/indexer.ts | 29 +++++-- .../environments/local.toml | 1 + .../graph-test-watcher/src/cli/checkpoint.ts | 2 +- .../src/cli/export-state.ts | 2 +- .../src/cli/import-state.ts | 2 +- .../graph-test-watcher/src/cli/inspect-cid.ts | 2 +- .../src/cli/reset-cmds/state.ts | 2 +- .../src/cli/watch-contract.ts | 2 +- packages/graph-test-watcher/src/fill.ts | 2 +- packages/graph-test-watcher/src/job-runner.ts | 11 +-- packages/graph-test-watcher/src/server.ts | 2 +- packages/util/src/config.ts | 1 + packages/util/src/job-runner.ts | 4 + packages/util/src/types.ts | 1 + 29 files changed, 181 insertions(+), 63 deletions(-) diff --git a/packages/eden-watcher/environments/local.toml b/packages/eden-watcher/environments/local.toml index 58438a90..708ef471 100644 --- a/packages/eden-watcher/environments/local.toml +++ b/packages/eden-watcher/environments/local.toml @@ -11,8 +11,9 @@ # IPFS API address (can be taken from the output on running the IPFS daemon). ipfsApiAddr = "/ip4/127.0.0.1/tcp/5001" - + subgraphPath = "../graph-node/test/subgraph/eden" + wasmRestartBlocksInterval = 20 [database] type = "postgres" diff --git a/packages/eden-watcher/src/cli/checkpoint.ts b/packages/eden-watcher/src/cli/checkpoint.ts index 9b5fc3e2..eda5011b 100644 --- a/packages/eden-watcher/src/cli/checkpoint.ts +++ b/packages/eden-watcher/src/cli/checkpoint.ts @@ -49,7 +49,7 @@ const main = async (): Promise => { const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); + const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server); const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); diff --git a/packages/eden-watcher/src/cli/export-state.ts b/packages/eden-watcher/src/cli/export-state.ts index 0cb3e9ed..cbd90e19 100644 --- a/packages/eden-watcher/src/cli/export-state.ts +++ b/packages/eden-watcher/src/cli/export-state.ts @@ -46,7 +46,7 @@ const main = async (): Promise => { const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); + const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server); const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); diff --git a/packages/eden-watcher/src/cli/import-state.ts b/packages/eden-watcher/src/cli/import-state.ts index e70c4d2e..1ba4d2dc 100644 --- a/packages/eden-watcher/src/cli/import-state.ts +++ b/packages/eden-watcher/src/cli/import-state.ts @@ -50,7 +50,7 @@ export const main = async (): Promise => { const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); + const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server); // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries diff --git a/packages/eden-watcher/src/cli/inspect-cid.ts b/packages/eden-watcher/src/cli/inspect-cid.ts index 3922eb7a..e1d6d9ec 100644 --- a/packages/eden-watcher/src/cli/inspect-cid.ts +++ b/packages/eden-watcher/src/cli/inspect-cid.ts @@ -46,7 +46,7 @@ const main = async (): Promise => { const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); + const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server); const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); diff --git a/packages/eden-watcher/src/cli/reset-cmds/state.ts b/packages/eden-watcher/src/cli/reset-cmds/state.ts index bbfb2610..75f1ec80 100644 --- a/packages/eden-watcher/src/cli/reset-cmds/state.ts +++ b/packages/eden-watcher/src/cli/reset-cmds/state.ts @@ -56,7 +56,7 @@ export const handler = async (argv: any): Promise => { const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); + const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server); const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); diff --git a/packages/eden-watcher/src/cli/watch-contract.ts b/packages/eden-watcher/src/cli/watch-contract.ts index ac00e24b..1402d587 100644 --- a/packages/eden-watcher/src/cli/watch-contract.ts +++ b/packages/eden-watcher/src/cli/watch-contract.ts @@ -62,7 +62,7 @@ const main = async (): Promise => { const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); + const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server); const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); diff --git a/packages/eden-watcher/src/fill.ts b/packages/eden-watcher/src/fill.ts index 4fe36987..aa9e2ef3 100644 --- a/packages/eden-watcher/src/fill.ts +++ b/packages/eden-watcher/src/fill.ts @@ -61,7 +61,7 @@ export const main = async (): Promise => { const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); + const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server); const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); diff --git a/packages/eden-watcher/src/job-runner.ts b/packages/eden-watcher/src/job-runner.ts index f969139e..6b363991 100644 --- a/packages/eden-watcher/src/job-runner.ts +++ b/packages/eden-watcher/src/job-runner.ts @@ -21,8 +21,7 @@ import { QUEUE_IPFS, JobQueueConfig, DEFAULT_CONFIG_PATH, - initClients, - JOB_KIND_INDEX + initClients } from '@vulcanize/util'; import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; @@ -57,12 +56,6 @@ export class JobRunner { // TODO Call pre-block hook here (Directly or indirectly (Like done through indexer.processEvent for events)). await this._baseJobRunner.processBlock(job); - - const { data: { kind, blockHash, blockNumber } } = job; - - if (kind === JOB_KIND_INDEX) { - await this._indexer.processBlock(blockHash, blockNumber); - } }); } @@ -138,7 +131,7 @@ export const main = async (): Promise => { const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); + const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server); const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); diff --git a/packages/eden-watcher/src/server.ts b/packages/eden-watcher/src/server.ts index 426e062d..cee22e5a 100644 --- a/packages/eden-watcher/src/server.ts +++ b/packages/eden-watcher/src/server.ts @@ -46,7 +46,7 @@ export const main = async (): Promise => { const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); + const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server); // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries diff --git a/packages/graph-node/assembly/index.ts b/packages/graph-node/assembly/index.ts index acf593d7..905d1cae 100644 --- a/packages/graph-node/assembly/index.ts +++ b/packages/graph-node/assembly/index.ts @@ -80,3 +80,7 @@ export function testLog (): void { log.error('Error message', []); log.critical('Critical message', []); } + +export function testMemory (value: string): void { + log.debug('testMemory value:', [value.slice(0, 10)]); +} diff --git a/packages/graph-node/package.json b/packages/graph-node/package.json index 055fc9fc..c405c744 100644 --- a/packages/graph-node/package.json +++ b/packages/graph-node/package.json @@ -30,7 +30,7 @@ "scripts": { "lint": "eslint .", "build": "tsc", - "asbuild:debug": "asc assembly/index.ts --lib ./node_modules --exportRuntime --target debug --runPasses asyncify", + "asbuild:debug": "asc assembly/index.ts --lib ./node_modules --exportRuntime --target debug --runPasses asyncify --runtime stub --maximumMemory 10", "asbuild:release": "asc assembly/index.ts --lib ./node_modules --exportRuntime --target release --runPasses asyncify", "asbuild": "yarn asbuild:debug && yarn asbuild:release", "test": "yarn asbuild:debug && DEBUG=vulcanize:* mocha src/**/*.test.ts", diff --git a/packages/graph-node/src/loader.test.ts b/packages/graph-node/src/loader.test.ts index 3a4dceb7..d97c2c64 100644 --- a/packages/graph-node/src/loader.test.ts +++ b/packages/graph-node/src/loader.test.ts @@ -4,6 +4,7 @@ import path from 'path'; import { expect } from 'chai'; +import { utils } from 'ethers'; import { BaseProvider } from '@ethersproject/providers'; @@ -19,6 +20,7 @@ describe('wasm loader tests', () => { let db: Database; let indexer: Indexer; let provider: BaseProvider; + let module: WebAssembly.Module; before(async () => { db = getTestDatabase(); @@ -35,6 +37,7 @@ describe('wasm loader tests', () => { ); exports = instance.exports; + module = instance.module; }); it('should execute exported function', async () => { @@ -76,4 +79,42 @@ describe('wasm loader tests', () => { // Should print all log messages for different levels. await testLog(); }); + + it('should throw out of memory error', async () => { + // Maximum memory is set to 10 pages (640KB) when compiling using asc maximumMemory option. + // https://www.assemblyscript.org/compiler.html#command-line-options + + const { testMemory, __newString, memory } = exports; + + try { + // Continue loop until memory size reaches max size 640KB + // https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/WebAssembly/Memory/buffer + while (memory.buffer.byteLength <= 1024 * 640) { + // Create long string of 100KB. + const longString = utils.hexValue(utils.randomBytes(1024 * 100 / 2)); + + const stringPtr = await __newString(longString); + await testMemory(stringPtr); + } + + expect.fail('wasm code should throw error'); + } catch (error) { + expect(error).to.be.instanceof(WebAssembly.RuntimeError); + expect(error.message).to.equal('unreachable'); + } + }); + + it('should reinstantiate wasm', async () => { + const instance = await instantiate( + db, + indexer, + provider, + { event: {} }, + module + ); + + exports = instance.exports; + const { callGraphAPI } = exports; + await callGraphAPI(); + }); }); diff --git a/packages/graph-node/src/loader.ts b/packages/graph-node/src/loader.ts index a471a5e5..d9ddddf1 100644 --- a/packages/graph-node/src/loader.ts +++ b/packages/graph-node/src/loader.ts @@ -34,13 +34,11 @@ const BN_ENDIANNESS = 'le'; type idOfType = (TypeId: number) => number -interface DataSource { - address: string -} - -interface GraphData { +export interface GraphData { abis?: {[key: string]: ContractInterface}; - dataSource?: DataSource; + dataSource?: { + address: string + }; } export interface Context { @@ -56,11 +54,16 @@ export const instantiate = async ( indexer: IndexerInterface, provider: BaseProvider, context: Context, - filePath: string, + filePathOrModule: string | WebAssembly.Module, data: GraphData = {} ): Promise => { const { abis = {}, dataSource } = data; - const buffer = await fs.readFile(filePath); + + let source = filePathOrModule; + + if (!(filePathOrModule instanceof WebAssembly.Module)) { + source = await fs.readFile(filePathOrModule); + } const imports: WebAssembly.Imports = { index: { @@ -580,7 +583,7 @@ export const instantiate = async ( } }; - const instance = await loader.instantiate(buffer, imports); + const instance = await loader.instantiate(source, imports); const { exports: instanceExports } = instance; const { __getString, __newString, __getArray, __newArray } = instanceExports; diff --git a/packages/graph-node/src/watcher.ts b/packages/graph-node/src/watcher.ts index 3c14e31f..a8e710e8 100644 --- a/packages/graph-node/src/watcher.ts +++ b/packages/graph-node/src/watcher.ts @@ -11,17 +11,18 @@ import { ContractInterface, utils, providers } from 'ethers'; import { ResultObject } from '@vulcanize/assemblyscript/lib/loader'; import { EthClient } from '@vulcanize/ipld-eth-client'; -import { IndexerInterface, getFullBlock, BlockHeight } from '@vulcanize/util'; +import { IndexerInterface, getFullBlock, BlockHeight, ServerConfig } from '@vulcanize/util'; import { createBlock, createEvent, getSubgraphConfig, resolveEntityFieldConflicts } from './utils'; -import { Context, instantiate } from './loader'; +import { Context, GraphData, instantiate } from './loader'; import { Database } from './database'; const log = debug('vulcanize:graph-watcher'); interface DataSource { - instance: ResultObject & { exports: any }, - contractInterface: utils.Interface + instance?: ResultObject & { exports: any }, + contractInterface: utils.Interface, + data: GraphData, } export class GraphWatcher { @@ -30,6 +31,7 @@ export class GraphWatcher { _postgraphileClient: EthClient; _ethProvider: providers.BaseProvider; _subgraphPath: string; + _wasmRestartBlocksInterval: number; _dataSources: any[] = []; _dataSourceMap: { [key: string]: DataSource } = {}; @@ -37,11 +39,12 @@ export class GraphWatcher { event: {} } - constructor (database: Database, postgraphileClient: EthClient, ethProvider: providers.BaseProvider, subgraphPath: string) { + constructor (database: Database, postgraphileClient: EthClient, ethProvider: providers.BaseProvider, serverConfig: ServerConfig) { this._database = database; this._postgraphileClient = postgraphileClient; this._ethProvider = ethProvider; - this._subgraphPath = subgraphPath; + this._subgraphPath = serverConfig.subgraphPath; + this._wasmRestartBlocksInterval = serverConfig.wasmRestartBlocksInterval; } async init () { @@ -76,7 +79,8 @@ export class GraphWatcher { return { instance: await instantiate(this._database, this._indexer, this._ethProvider, this._context, filePath, data), - contractInterface + contractInterface, + data }; }, {}); @@ -130,7 +134,9 @@ export class GraphWatcher { return; } - const { instance: { exports: instanceExports }, contractInterface } = this._dataSourceMap[contract]; + const { instance, contractInterface } = this._dataSourceMap[contract]; + assert(instance); + const { exports: instanceExports } = instance; // Get event handler based on event topic (from event signature). const eventTopic = contractInterface.getEventTopic(eventSignature); @@ -162,7 +168,7 @@ export class GraphWatcher { // Create ethereum event to be passed to the wasm event handler. const ethereumEvent = await createEvent(instanceExports, contract, data); - await instanceExports[eventHandler.handler](ethereumEvent); + await this._handleMemoryError(instanceExports[eventHandler.handler](ethereumEvent), dataSource.source.address); } async handleBlock (blockHash: string) { @@ -172,12 +178,23 @@ export class GraphWatcher { // Call block handler(s) for each contract. for (const dataSource of this._dataSources) { + // Reinstantiate WASM after every N blocks. + if (blockData.blockNumber % this._wasmRestartBlocksInterval === 0) { + // The WASM instance allocates memory as required and the limit is 4GB. + // https://stackoverflow.com/a/40453962 + // https://github.com/AssemblyScript/assemblyscript/pull/1268#issue-618411291 + // https://github.com/WebAssembly/memory64/blob/main/proposals/memory64/Overview.md#motivation + await this._reInitWasm(dataSource.source.address); + } + // Check if block handler(s) are configured and start block has been reached. if (!dataSource.mapping.blockHandlers || blockData.blockNumber < dataSource.source.startBlock) { continue; } - const { instance: { exports: instanceExports } } = this._dataSourceMap[dataSource.source.address]; + const { instance } = this._dataSourceMap[dataSource.source.address]; + assert(instance); + const { exports: instanceExports } = instance; // Create ethereum block to be passed to a wasm block handler. const ethereumBlock = await createBlock(instanceExports, blockData); @@ -187,7 +204,7 @@ export class GraphWatcher { await instanceExports[blockHandler.handler](ethereumBlock); }); - await Promise.all(blockHandlerPromises); + await this._handleMemoryError(Promise.all(blockHandlerPromises), dataSource.source.address); } } @@ -202,4 +219,48 @@ export class GraphWatcher { // Resolve any field name conflicts in the entity result. return resolveEntityFieldConflicts(result); } + + /** + * Method to reinstantiate WASM instance for specified contract address. + * @param contractAddress + */ + async _reInitWasm (contractAddress: string): Promise { + const { data, instance } = this._dataSourceMap[contractAddress]; + + assert(instance); + const { module } = instance; + delete this._dataSourceMap[contractAddress].instance; + + assert(this._indexer); + + // Reinstantiate with existing module. + this._dataSourceMap[contractAddress].instance = await instantiate( + this._database, + this._indexer, + this._ethProvider, + this._context, + module, + data + ); + + // Important to call _start for built subgraphs on instantiation! + // TODO: Check api version https://github.com/graphprotocol/graph-node/blob/6098daa8955bdfac597cec87080af5449807e874/runtime/wasm/src/module/mod.rs#L533 + this._dataSourceMap[contractAddress].instance!.exports._start(); + } + + async _handleMemoryError (handlerPromise: Promise, contractAddress: string): Promise { + try { + await handlerPromise; + } catch (error) { + if (error instanceof WebAssembly.RuntimeError && error instanceof Error) { + if (error.message === 'unreachable') { + // Reintantiate WASM for out of memory error. + this._reInitWasm(contractAddress); + } + } + + // Job will retry after throwing error. + throw error; + } + } } diff --git a/packages/graph-node/test/utils/indexer.ts b/packages/graph-node/test/utils/indexer.ts index 359bb4d7..ee71043b 100644 --- a/packages/graph-node/test/utils/indexer.ts +++ b/packages/graph-node/test/utils/indexer.ts @@ -1,5 +1,5 @@ import assert from 'assert'; -import { DeepPartial } from 'typeorm'; +import { FindConditions, FindManyOptions } from 'typeorm'; import { IndexerInterface, @@ -51,19 +51,19 @@ export class Indexer implements IndexerInterface { return ''; } - async getOrFetchBlockEvents (block: DeepPartial): Promise> { - assert(block); - - return []; + async fetchBlockEvents (block: BlockProgressInterface): Promise { + return block; } async removeUnknownEvents (block: BlockProgressInterface): Promise { assert(block); } - async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise { - assert(blockHash); + async updateBlockProgress (block: BlockProgressInterface, lastProcessedEventIndex: number): Promise { + assert(block); assert(lastProcessedEventIndex); + + return block; } async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise { @@ -104,6 +104,21 @@ export class Indexer implements IndexerInterface { getEntityTypesMap (): Map { return new Map(); } + + async getBlockProgressEntities (where: FindConditions, options: FindManyOptions): Promise { + assert(where); + assert(options); + + return []; + } + + async saveEventEntity (dbEvent: EventInterface): Promise { + return dbEvent; + } + + async processEvent (event: EventInterface): Promise { + assert(event); + } } class SyncStatus implements SyncStatusInterface { diff --git a/packages/graph-test-watcher/environments/local.toml b/packages/graph-test-watcher/environments/local.toml index 99c73955..44a5f0b9 100644 --- a/packages/graph-test-watcher/environments/local.toml +++ b/packages/graph-test-watcher/environments/local.toml @@ -13,6 +13,7 @@ ipfsApiAddr = "/ip4/127.0.0.1/tcp/5001" subgraphPath = "../graph-node/test/subgraph/example1/build" + wasmRestartBlocksInterval = 20 [database] type = "postgres" diff --git a/packages/graph-test-watcher/src/cli/checkpoint.ts b/packages/graph-test-watcher/src/cli/checkpoint.ts index 9b5fc3e2..eda5011b 100644 --- a/packages/graph-test-watcher/src/cli/checkpoint.ts +++ b/packages/graph-test-watcher/src/cli/checkpoint.ts @@ -49,7 +49,7 @@ const main = async (): Promise => { const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); + const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server); const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); diff --git a/packages/graph-test-watcher/src/cli/export-state.ts b/packages/graph-test-watcher/src/cli/export-state.ts index 0cb3e9ed..cbd90e19 100644 --- a/packages/graph-test-watcher/src/cli/export-state.ts +++ b/packages/graph-test-watcher/src/cli/export-state.ts @@ -46,7 +46,7 @@ const main = async (): Promise => { const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); + const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server); const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); diff --git a/packages/graph-test-watcher/src/cli/import-state.ts b/packages/graph-test-watcher/src/cli/import-state.ts index e70c4d2e..1ba4d2dc 100644 --- a/packages/graph-test-watcher/src/cli/import-state.ts +++ b/packages/graph-test-watcher/src/cli/import-state.ts @@ -50,7 +50,7 @@ export const main = async (): Promise => { const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); + const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server); // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries diff --git a/packages/graph-test-watcher/src/cli/inspect-cid.ts b/packages/graph-test-watcher/src/cli/inspect-cid.ts index 3922eb7a..e1d6d9ec 100644 --- a/packages/graph-test-watcher/src/cli/inspect-cid.ts +++ b/packages/graph-test-watcher/src/cli/inspect-cid.ts @@ -46,7 +46,7 @@ const main = async (): Promise => { const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); + const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server); const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); diff --git a/packages/graph-test-watcher/src/cli/reset-cmds/state.ts b/packages/graph-test-watcher/src/cli/reset-cmds/state.ts index af6518f9..06cea258 100644 --- a/packages/graph-test-watcher/src/cli/reset-cmds/state.ts +++ b/packages/graph-test-watcher/src/cli/reset-cmds/state.ts @@ -44,7 +44,7 @@ export const handler = async (argv: any): Promise => { const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); + const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server); const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); diff --git a/packages/graph-test-watcher/src/cli/watch-contract.ts b/packages/graph-test-watcher/src/cli/watch-contract.ts index ac00e24b..1402d587 100644 --- a/packages/graph-test-watcher/src/cli/watch-contract.ts +++ b/packages/graph-test-watcher/src/cli/watch-contract.ts @@ -62,7 +62,7 @@ const main = async (): Promise => { const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); + const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server); const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); diff --git a/packages/graph-test-watcher/src/fill.ts b/packages/graph-test-watcher/src/fill.ts index 4fe36987..aa9e2ef3 100644 --- a/packages/graph-test-watcher/src/fill.ts +++ b/packages/graph-test-watcher/src/fill.ts @@ -61,7 +61,7 @@ export const main = async (): Promise => { const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); + const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server); const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); diff --git a/packages/graph-test-watcher/src/job-runner.ts b/packages/graph-test-watcher/src/job-runner.ts index f969139e..6b363991 100644 --- a/packages/graph-test-watcher/src/job-runner.ts +++ b/packages/graph-test-watcher/src/job-runner.ts @@ -21,8 +21,7 @@ import { QUEUE_IPFS, JobQueueConfig, DEFAULT_CONFIG_PATH, - initClients, - JOB_KIND_INDEX + initClients } from '@vulcanize/util'; import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; @@ -57,12 +56,6 @@ export class JobRunner { // TODO Call pre-block hook here (Directly or indirectly (Like done through indexer.processEvent for events)). await this._baseJobRunner.processBlock(job); - - const { data: { kind, blockHash, blockNumber } } = job; - - if (kind === JOB_KIND_INDEX) { - await this._indexer.processBlock(blockHash, blockNumber); - } }); } @@ -138,7 +131,7 @@ export const main = async (): Promise => { const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); + const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server); const jobQueueConfig = config.jobQueue; assert(jobQueueConfig, 'Missing job queue config'); diff --git a/packages/graph-test-watcher/src/server.ts b/packages/graph-test-watcher/src/server.ts index 426e062d..cee22e5a 100644 --- a/packages/graph-test-watcher/src/server.ts +++ b/packages/graph-test-watcher/src/server.ts @@ -46,7 +46,7 @@ export const main = async (): Promise => { const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); await graphDb.init(); - const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); + const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server); // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries diff --git a/packages/util/src/config.ts b/packages/util/src/config.ts index fe9fd1a7..8ae223d1 100644 --- a/packages/util/src/config.ts +++ b/packages/util/src/config.ts @@ -33,6 +33,7 @@ export interface ServerConfig { checkpointInterval: number; ipfsApiAddr: string; subgraphPath: string; + wasmRestartBlocksInterval: number; } export interface UpstreamConfig { diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index fe0977db..595fe50e 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -232,6 +232,10 @@ export class JobRunner { await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { kind: JOB_KIND_EVENTS, blockHash: blockProgress.blockHash, publish: true }); } + if (this._indexer.processBlock) { + await this._indexer.processBlock(blockHash, blockNumber); + } + const indexBlockDuration = new Date().getTime() - indexBlockStartTime.getTime(); log(`time:job-runner#_indexBlock: ${indexBlockDuration}ms`); } diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index 04591e24..c7754383 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -98,6 +98,7 @@ export interface IndexerInterface { createDiffStaged?: (contractAddress: string, blockHash: string, data: any) => Promise processInitialState?: (contractAddress: string, blockHash: string) => Promise processStateCheckpoint?: (contractAddress: string, blockHash: string) => Promise + processBlock?: (blockHash: string, blockNumber: number) => Promise } export interface EventWatcherInterface {