Implement WASM instance restart to handle out of memory error (#81)

* Test case for wasm out of memory error

* Restart wasm instance after N blocks

* Handle out of memory error and re instantiate WASM

* Remove old instance from map before reinstantiating WASM
This commit is contained in:
nikugogoi 2021-12-21 15:58:17 +05:30 committed by nabarun
parent 68bc1c00db
commit 198e49e5a0
29 changed files with 181 additions and 63 deletions

View File

@ -13,6 +13,7 @@
ipfsApiAddr = "/ip4/127.0.0.1/tcp/5001" ipfsApiAddr = "/ip4/127.0.0.1/tcp/5001"
subgraphPath = "../graph-node/test/subgraph/eden" subgraphPath = "../graph-node/test/subgraph/eden"
wasmRestartBlocksInterval = 20
[database] [database]
type = "postgres" type = "postgres"

View File

@ -49,7 +49,7 @@ const main = async (): Promise<void> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init(); await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue; const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config'); assert(jobQueueConfig, 'Missing job queue config');

View File

@ -46,7 +46,7 @@ const main = async (): Promise<void> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init(); await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue; const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config'); assert(jobQueueConfig, 'Missing job queue config');

View File

@ -50,7 +50,7 @@ export const main = async (): Promise<any> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init(); await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries

View File

@ -46,7 +46,7 @@ const main = async (): Promise<void> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init(); await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue; const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config'); assert(jobQueueConfig, 'Missing job queue config');

View File

@ -56,7 +56,7 @@ export const handler = async (argv: any): Promise<void> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init(); await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue; const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config'); assert(jobQueueConfig, 'Missing job queue config');

View File

@ -62,7 +62,7 @@ const main = async (): Promise<void> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init(); await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue; const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config'); assert(jobQueueConfig, 'Missing job queue config');

View File

@ -61,7 +61,7 @@ export const main = async (): Promise<any> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init(); await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue; const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config'); assert(jobQueueConfig, 'Missing job queue config');

View File

@ -21,8 +21,7 @@ import {
QUEUE_IPFS, QUEUE_IPFS,
JobQueueConfig, JobQueueConfig,
DEFAULT_CONFIG_PATH, DEFAULT_CONFIG_PATH,
initClients, initClients
JOB_KIND_INDEX
} from '@vulcanize/util'; } from '@vulcanize/util';
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
@ -57,12 +56,6 @@ export class JobRunner {
// TODO Call pre-block hook here (Directly or indirectly (Like done through indexer.processEvent for events)). // TODO Call pre-block hook here (Directly or indirectly (Like done through indexer.processEvent for events)).
await this._baseJobRunner.processBlock(job); await this._baseJobRunner.processBlock(job);
const { data: { kind, blockHash, blockNumber } } = job;
if (kind === JOB_KIND_INDEX) {
await this._indexer.processBlock(blockHash, blockNumber);
}
}); });
} }
@ -138,7 +131,7 @@ export const main = async (): Promise<any> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init(); await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue; const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config'); assert(jobQueueConfig, 'Missing job queue config');

View File

@ -46,7 +46,7 @@ export const main = async (): Promise<any> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init(); await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries

View File

@ -80,3 +80,7 @@ export function testLog (): void {
log.error('Error message', []); log.error('Error message', []);
log.critical('Critical message', []); log.critical('Critical message', []);
} }
export function testMemory (value: string): void {
log.debug('testMemory value:', [value.slice(0, 10)]);
}

View File

@ -30,7 +30,7 @@
"scripts": { "scripts": {
"lint": "eslint .", "lint": "eslint .",
"build": "tsc", "build": "tsc",
"asbuild:debug": "asc assembly/index.ts --lib ./node_modules --exportRuntime --target debug --runPasses asyncify", "asbuild:debug": "asc assembly/index.ts --lib ./node_modules --exportRuntime --target debug --runPasses asyncify --runtime stub --maximumMemory 10",
"asbuild:release": "asc assembly/index.ts --lib ./node_modules --exportRuntime --target release --runPasses asyncify", "asbuild:release": "asc assembly/index.ts --lib ./node_modules --exportRuntime --target release --runPasses asyncify",
"asbuild": "yarn asbuild:debug && yarn asbuild:release", "asbuild": "yarn asbuild:debug && yarn asbuild:release",
"test": "yarn asbuild:debug && DEBUG=vulcanize:* mocha src/**/*.test.ts", "test": "yarn asbuild:debug && DEBUG=vulcanize:* mocha src/**/*.test.ts",

View File

@ -4,6 +4,7 @@
import path from 'path'; import path from 'path';
import { expect } from 'chai'; import { expect } from 'chai';
import { utils } from 'ethers';
import { BaseProvider } from '@ethersproject/providers'; import { BaseProvider } from '@ethersproject/providers';
@ -19,6 +20,7 @@ describe('wasm loader tests', () => {
let db: Database; let db: Database;
let indexer: Indexer; let indexer: Indexer;
let provider: BaseProvider; let provider: BaseProvider;
let module: WebAssembly.Module;
before(async () => { before(async () => {
db = getTestDatabase(); db = getTestDatabase();
@ -35,6 +37,7 @@ describe('wasm loader tests', () => {
); );
exports = instance.exports; exports = instance.exports;
module = instance.module;
}); });
it('should execute exported function', async () => { it('should execute exported function', async () => {
@ -76,4 +79,42 @@ describe('wasm loader tests', () => {
// Should print all log messages for different levels. // Should print all log messages for different levels.
await testLog(); await testLog();
}); });
it('should throw out of memory error', async () => {
// Maximum memory is set to 10 pages (640KB) when compiling using asc maximumMemory option.
// https://www.assemblyscript.org/compiler.html#command-line-options
const { testMemory, __newString, memory } = exports;
try {
// Continue loop until memory size reaches max size 640KB
// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/WebAssembly/Memory/buffer
while (memory.buffer.byteLength <= 1024 * 640) {
// Create long string of 100KB.
const longString = utils.hexValue(utils.randomBytes(1024 * 100 / 2));
const stringPtr = await __newString(longString);
await testMemory(stringPtr);
}
expect.fail('wasm code should throw error');
} catch (error) {
expect(error).to.be.instanceof(WebAssembly.RuntimeError);
expect(error.message).to.equal('unreachable');
}
});
it('should reinstantiate wasm', async () => {
const instance = await instantiate(
db,
indexer,
provider,
{ event: {} },
module
);
exports = instance.exports;
const { callGraphAPI } = exports;
await callGraphAPI();
});
}); });

View File

@ -34,13 +34,11 @@ const BN_ENDIANNESS = 'le';
type idOfType = (TypeId: number) => number type idOfType = (TypeId: number) => number
interface DataSource { export interface GraphData {
address: string
}
interface GraphData {
abis?: {[key: string]: ContractInterface}; abis?: {[key: string]: ContractInterface};
dataSource?: DataSource; dataSource?: {
address: string
};
} }
export interface Context { export interface Context {
@ -56,11 +54,16 @@ export const instantiate = async (
indexer: IndexerInterface, indexer: IndexerInterface,
provider: BaseProvider, provider: BaseProvider,
context: Context, context: Context,
filePath: string, filePathOrModule: string | WebAssembly.Module,
data: GraphData = {} data: GraphData = {}
): Promise<loader.ResultObject & { exports: any }> => { ): Promise<loader.ResultObject & { exports: any }> => {
const { abis = {}, dataSource } = data; const { abis = {}, dataSource } = data;
const buffer = await fs.readFile(filePath);
let source = filePathOrModule;
if (!(filePathOrModule instanceof WebAssembly.Module)) {
source = await fs.readFile(filePathOrModule);
}
const imports: WebAssembly.Imports = { const imports: WebAssembly.Imports = {
index: { index: {
@ -580,7 +583,7 @@ export const instantiate = async (
} }
}; };
const instance = await loader.instantiate(buffer, imports); const instance = await loader.instantiate(source, imports);
const { exports: instanceExports } = instance; const { exports: instanceExports } = instance;
const { __getString, __newString, __getArray, __newArray } = instanceExports; const { __getString, __newString, __getArray, __newArray } = instanceExports;

View File

@ -11,17 +11,18 @@ import { ContractInterface, utils, providers } from 'ethers';
import { ResultObject } from '@vulcanize/assemblyscript/lib/loader'; import { ResultObject } from '@vulcanize/assemblyscript/lib/loader';
import { EthClient } from '@vulcanize/ipld-eth-client'; import { EthClient } from '@vulcanize/ipld-eth-client';
import { IndexerInterface, getFullBlock, BlockHeight } from '@vulcanize/util'; import { IndexerInterface, getFullBlock, BlockHeight, ServerConfig } from '@vulcanize/util';
import { createBlock, createEvent, getSubgraphConfig, resolveEntityFieldConflicts } from './utils'; import { createBlock, createEvent, getSubgraphConfig, resolveEntityFieldConflicts } from './utils';
import { Context, instantiate } from './loader'; import { Context, GraphData, instantiate } from './loader';
import { Database } from './database'; import { Database } from './database';
const log = debug('vulcanize:graph-watcher'); const log = debug('vulcanize:graph-watcher');
interface DataSource { interface DataSource {
instance: ResultObject & { exports: any }, instance?: ResultObject & { exports: any },
contractInterface: utils.Interface contractInterface: utils.Interface,
data: GraphData,
} }
export class GraphWatcher { export class GraphWatcher {
@ -30,6 +31,7 @@ export class GraphWatcher {
_postgraphileClient: EthClient; _postgraphileClient: EthClient;
_ethProvider: providers.BaseProvider; _ethProvider: providers.BaseProvider;
_subgraphPath: string; _subgraphPath: string;
_wasmRestartBlocksInterval: number;
_dataSources: any[] = []; _dataSources: any[] = [];
_dataSourceMap: { [key: string]: DataSource } = {}; _dataSourceMap: { [key: string]: DataSource } = {};
@ -37,11 +39,12 @@ export class GraphWatcher {
event: {} event: {}
} }
constructor (database: Database, postgraphileClient: EthClient, ethProvider: providers.BaseProvider, subgraphPath: string) { constructor (database: Database, postgraphileClient: EthClient, ethProvider: providers.BaseProvider, serverConfig: ServerConfig) {
this._database = database; this._database = database;
this._postgraphileClient = postgraphileClient; this._postgraphileClient = postgraphileClient;
this._ethProvider = ethProvider; this._ethProvider = ethProvider;
this._subgraphPath = subgraphPath; this._subgraphPath = serverConfig.subgraphPath;
this._wasmRestartBlocksInterval = serverConfig.wasmRestartBlocksInterval;
} }
async init () { async init () {
@ -76,7 +79,8 @@ export class GraphWatcher {
return { return {
instance: await instantiate(this._database, this._indexer, this._ethProvider, this._context, filePath, data), instance: await instantiate(this._database, this._indexer, this._ethProvider, this._context, filePath, data),
contractInterface contractInterface,
data
}; };
}, {}); }, {});
@ -130,7 +134,9 @@ export class GraphWatcher {
return; return;
} }
const { instance: { exports: instanceExports }, contractInterface } = this._dataSourceMap[contract]; const { instance, contractInterface } = this._dataSourceMap[contract];
assert(instance);
const { exports: instanceExports } = instance;
// Get event handler based on event topic (from event signature). // Get event handler based on event topic (from event signature).
const eventTopic = contractInterface.getEventTopic(eventSignature); const eventTopic = contractInterface.getEventTopic(eventSignature);
@ -162,7 +168,7 @@ export class GraphWatcher {
// Create ethereum event to be passed to the wasm event handler. // Create ethereum event to be passed to the wasm event handler.
const ethereumEvent = await createEvent(instanceExports, contract, data); const ethereumEvent = await createEvent(instanceExports, contract, data);
await instanceExports[eventHandler.handler](ethereumEvent); await this._handleMemoryError(instanceExports[eventHandler.handler](ethereumEvent), dataSource.source.address);
} }
async handleBlock (blockHash: string) { async handleBlock (blockHash: string) {
@ -172,12 +178,23 @@ export class GraphWatcher {
// Call block handler(s) for each contract. // Call block handler(s) for each contract.
for (const dataSource of this._dataSources) { for (const dataSource of this._dataSources) {
// Reinstantiate WASM after every N blocks.
if (blockData.blockNumber % this._wasmRestartBlocksInterval === 0) {
// The WASM instance allocates memory as required and the limit is 4GB.
// https://stackoverflow.com/a/40453962
// https://github.com/AssemblyScript/assemblyscript/pull/1268#issue-618411291
// https://github.com/WebAssembly/memory64/blob/main/proposals/memory64/Overview.md#motivation
await this._reInitWasm(dataSource.source.address);
}
// Check if block handler(s) are configured and start block has been reached. // Check if block handler(s) are configured and start block has been reached.
if (!dataSource.mapping.blockHandlers || blockData.blockNumber < dataSource.source.startBlock) { if (!dataSource.mapping.blockHandlers || blockData.blockNumber < dataSource.source.startBlock) {
continue; continue;
} }
const { instance: { exports: instanceExports } } = this._dataSourceMap[dataSource.source.address]; const { instance } = this._dataSourceMap[dataSource.source.address];
assert(instance);
const { exports: instanceExports } = instance;
// Create ethereum block to be passed to a wasm block handler. // Create ethereum block to be passed to a wasm block handler.
const ethereumBlock = await createBlock(instanceExports, blockData); const ethereumBlock = await createBlock(instanceExports, blockData);
@ -187,7 +204,7 @@ export class GraphWatcher {
await instanceExports[blockHandler.handler](ethereumBlock); await instanceExports[blockHandler.handler](ethereumBlock);
}); });
await Promise.all(blockHandlerPromises); await this._handleMemoryError(Promise.all(blockHandlerPromises), dataSource.source.address);
} }
} }
@ -202,4 +219,48 @@ export class GraphWatcher {
// Resolve any field name conflicts in the entity result. // Resolve any field name conflicts in the entity result.
return resolveEntityFieldConflicts(result); return resolveEntityFieldConflicts(result);
} }
/**
* Method to reinstantiate WASM instance for specified contract address.
* @param contractAddress
*/
async _reInitWasm (contractAddress: string): Promise<void> {
const { data, instance } = this._dataSourceMap[contractAddress];
assert(instance);
const { module } = instance;
delete this._dataSourceMap[contractAddress].instance;
assert(this._indexer);
// Reinstantiate with existing module.
this._dataSourceMap[contractAddress].instance = await instantiate(
this._database,
this._indexer,
this._ethProvider,
this._context,
module,
data
);
// Important to call _start for built subgraphs on instantiation!
// TODO: Check api version https://github.com/graphprotocol/graph-node/blob/6098daa8955bdfac597cec87080af5449807e874/runtime/wasm/src/module/mod.rs#L533
this._dataSourceMap[contractAddress].instance!.exports._start();
}
async _handleMemoryError (handlerPromise: Promise<any>, contractAddress: string): Promise<void> {
try {
await handlerPromise;
} catch (error) {
if (error instanceof WebAssembly.RuntimeError && error instanceof Error) {
if (error.message === 'unreachable') {
// Reintantiate WASM for out of memory error.
this._reInitWasm(contractAddress);
}
}
// Job will retry after throwing error.
throw error;
}
}
} }

View File

@ -1,5 +1,5 @@
import assert from 'assert'; import assert from 'assert';
import { DeepPartial } from 'typeorm'; import { FindConditions, FindManyOptions } from 'typeorm';
import { import {
IndexerInterface, IndexerInterface,
@ -51,19 +51,19 @@ export class Indexer implements IndexerInterface {
return ''; return '';
} }
async getOrFetchBlockEvents (block: DeepPartial<BlockProgressInterface>): Promise<Array<EventInterface>> { async fetchBlockEvents (block: BlockProgressInterface): Promise<BlockProgressInterface> {
assert(block); return block;
return [];
} }
async removeUnknownEvents (block: BlockProgressInterface): Promise<void> { async removeUnknownEvents (block: BlockProgressInterface): Promise<void> {
assert(block); assert(block);
} }
async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void> { async updateBlockProgress (block: BlockProgressInterface, lastProcessedEventIndex: number): Promise<BlockProgressInterface> {
assert(blockHash); assert(block);
assert(lastProcessedEventIndex); assert(lastProcessedEventIndex);
return block;
} }
async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatusInterface> { async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatusInterface> {
@ -104,6 +104,21 @@ export class Indexer implements IndexerInterface {
getEntityTypesMap (): Map<string, { [key: string]: string; }> { getEntityTypesMap (): Map<string, { [key: string]: string; }> {
return new Map(); return new Map();
} }
async getBlockProgressEntities (where: FindConditions<BlockProgressInterface>, options: FindManyOptions<BlockProgressInterface>): Promise<BlockProgressInterface[]> {
assert(where);
assert(options);
return [];
}
async saveEventEntity (dbEvent: EventInterface): Promise<EventInterface> {
return dbEvent;
}
async processEvent (event: EventInterface): Promise<void> {
assert(event);
}
} }
class SyncStatus implements SyncStatusInterface { class SyncStatus implements SyncStatusInterface {

View File

@ -13,6 +13,7 @@
ipfsApiAddr = "/ip4/127.0.0.1/tcp/5001" ipfsApiAddr = "/ip4/127.0.0.1/tcp/5001"
subgraphPath = "../graph-node/test/subgraph/example1/build" subgraphPath = "../graph-node/test/subgraph/example1/build"
wasmRestartBlocksInterval = 20
[database] [database]
type = "postgres" type = "postgres"

View File

@ -49,7 +49,7 @@ const main = async (): Promise<void> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init(); await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue; const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config'); assert(jobQueueConfig, 'Missing job queue config');

View File

@ -46,7 +46,7 @@ const main = async (): Promise<void> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init(); await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue; const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config'); assert(jobQueueConfig, 'Missing job queue config');

View File

@ -50,7 +50,7 @@ export const main = async (): Promise<any> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init(); await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries

View File

@ -46,7 +46,7 @@ const main = async (): Promise<void> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init(); await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue; const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config'); assert(jobQueueConfig, 'Missing job queue config');

View File

@ -44,7 +44,7 @@ export const handler = async (argv: any): Promise<void> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init(); await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue; const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config'); assert(jobQueueConfig, 'Missing job queue config');

View File

@ -62,7 +62,7 @@ const main = async (): Promise<void> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init(); await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue; const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config'); assert(jobQueueConfig, 'Missing job queue config');

View File

@ -61,7 +61,7 @@ export const main = async (): Promise<any> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init(); await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue; const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config'); assert(jobQueueConfig, 'Missing job queue config');

View File

@ -21,8 +21,7 @@ import {
QUEUE_IPFS, QUEUE_IPFS,
JobQueueConfig, JobQueueConfig,
DEFAULT_CONFIG_PATH, DEFAULT_CONFIG_PATH,
initClients, initClients
JOB_KIND_INDEX
} from '@vulcanize/util'; } from '@vulcanize/util';
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
@ -57,12 +56,6 @@ export class JobRunner {
// TODO Call pre-block hook here (Directly or indirectly (Like done through indexer.processEvent for events)). // TODO Call pre-block hook here (Directly or indirectly (Like done through indexer.processEvent for events)).
await this._baseJobRunner.processBlock(job); await this._baseJobRunner.processBlock(job);
const { data: { kind, blockHash, blockNumber } } = job;
if (kind === JOB_KIND_INDEX) {
await this._indexer.processBlock(blockHash, blockNumber);
}
}); });
} }
@ -138,7 +131,7 @@ export const main = async (): Promise<any> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init(); await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue; const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config'); assert(jobQueueConfig, 'Missing job queue config');

View File

@ -46,7 +46,7 @@ export const main = async (): Promise<any> => {
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*')); const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
await graphDb.init(); await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server.subgraphPath); const graphWatcher = new GraphWatcher(graphDb, postgraphileClient, ethProvider, config.server);
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway. // Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries // Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries

View File

@ -33,6 +33,7 @@ export interface ServerConfig {
checkpointInterval: number; checkpointInterval: number;
ipfsApiAddr: string; ipfsApiAddr: string;
subgraphPath: string; subgraphPath: string;
wasmRestartBlocksInterval: number;
} }
export interface UpstreamConfig { export interface UpstreamConfig {

View File

@ -232,6 +232,10 @@ export class JobRunner {
await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { kind: JOB_KIND_EVENTS, blockHash: blockProgress.blockHash, publish: true }); await this._jobQueue.pushJob(QUEUE_EVENT_PROCESSING, { kind: JOB_KIND_EVENTS, blockHash: blockProgress.blockHash, publish: true });
} }
if (this._indexer.processBlock) {
await this._indexer.processBlock(blockHash, blockNumber);
}
const indexBlockDuration = new Date().getTime() - indexBlockStartTime.getTime(); const indexBlockDuration = new Date().getTime() - indexBlockStartTime.getTime();
log(`time:job-runner#_indexBlock: ${indexBlockDuration}ms`); log(`time:job-runner#_indexBlock: ${indexBlockDuration}ms`);
} }

View File

@ -98,6 +98,7 @@ export interface IndexerInterface {
createDiffStaged?: (contractAddress: string, blockHash: string, data: any) => Promise<void> createDiffStaged?: (contractAddress: string, blockHash: string, data: any) => Promise<void>
processInitialState?: (contractAddress: string, blockHash: string) => Promise<any> processInitialState?: (contractAddress: string, blockHash: string) => Promise<any>
processStateCheckpoint?: (contractAddress: string, blockHash: string) => Promise<boolean> processStateCheckpoint?: (contractAddress: string, blockHash: string) => Promise<boolean>
processBlock?: (blockHash: string, blockNumber: number) => Promise<void>
} }
export interface EventWatcherInterface { export interface EventWatcherInterface {