Add a CLI in eden-watcher to fill state for a given range (#176)

* Add a CLI to fill state for a given range

* Refactor code

* Add a CLI to reset IPLD state

* Replace ORDER BY clause in the query to get latest IPLD block

* Optimize delete query in CLI to reset IPLD state

* Add an option to decouple subgraph state creation from mapping code

* Use a raw SQL query to delete IPLD blocks in a block range

* Accomodate changes in codegen
This commit is contained in:
prathamesh0 2022-09-09 16:23:41 +05:30 committed by GitHub
parent 4e5ec36f07
commit e30af92901
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 569 additions and 99 deletions

View File

@ -26,7 +26,6 @@ export class Client {
/**
* Stores the query to be passed to the template.
* @param mode Code generation mode.
* @param name Name of the query.
* @param params Parameters to the query.
* @param returnType Return type for the query.

View File

@ -7,15 +7,24 @@ import path from 'path';
import Handlebars from 'handlebars';
import { Writable } from 'stream';
const TEMPLATE_FILE = './templates/fill-template.handlebars';
const FILL_TEMPLATE_FILE = './templates/fill-template.handlebars';
const FILL_STATE_TEMPLATE_FILE = './templates/fill-state-template.handlebars';
/**
* Writes the fill file generated from a template to a stream.
* @param outStream A writable output stream to write the fill file to.
* @param fillOutStream A writable output stream to write the fill file to.
* @param fillStateOutStream A writable output stream to write the fill state file to.
*/
export function exportFill (outStream: Writable, subgraphPath: string): void {
const templateString = fs.readFileSync(path.resolve(__dirname, TEMPLATE_FILE)).toString();
export function exportFill (fillOutStream: Writable, fillStateOutStream: Writable | undefined, subgraphPath: string): void {
const templateString = fs.readFileSync(path.resolve(__dirname, FILL_TEMPLATE_FILE)).toString();
const template = Handlebars.compile(templateString);
const fill = template({ subgraphPath });
outStream.write(fill);
fillOutStream.write(fill);
if (fillStateOutStream) {
const templateString = fs.readFileSync(path.resolve(__dirname, FILL_STATE_TEMPLATE_FILE)).toString();
const template = Handlebars.compile(templateString);
const fillState = template({});
fillStateOutStream.write(fillState);
}
}

View File

@ -246,10 +246,18 @@ function generateWatcher (visitor: Visitor, contracts: any[], config: any) {
: process.stdout;
exportHooks(outStream);
outStream = outputDir
const fillOutStream = outputDir
? fs.createWriteStream(path.join(outputDir, 'src/fill.ts'))
: process.stdout;
exportFill(outStream, config.subgraphPath);
let fillStateOutStream;
if (config.subgraphPath) {
fillStateOutStream = outputDir
? fs.createWriteStream(path.join(outputDir, 'src/fill-state.ts'))
: process.stdout;
}
exportFill(fillOutStream, fillStateOutStream, config.subgraphPath);
outStream = outputDir
? fs.createWriteStream(path.join(outputDir, 'src/types.ts'))
@ -273,19 +281,25 @@ function generateWatcher (visitor: Visitor, contracts: any[], config: any) {
: process.stdout;
visitor.exportClient(outStream, schemaContent, path.join(outputDir, 'src/gql'));
let resetOutStream, resetJQOutStream, resetStateOutStream;
let resetOutStream, resetJQOutStream, resetStateOutStream, resetIPLDStateOutStream;
if (outputDir) {
resetOutStream = fs.createWriteStream(path.join(outputDir, 'src/cli/reset.ts'));
resetJQOutStream = fs.createWriteStream(path.join(outputDir, 'src/cli/reset-cmds/job-queue.ts'));
resetStateOutStream = fs.createWriteStream(path.join(outputDir, 'src/cli/reset-cmds/state.ts'));
if (config.subgraphPath) {
resetIPLDStateOutStream = fs.createWriteStream(path.join(outputDir, 'src/cli/reset-cmds/ipld-state.ts'));
}
} else {
resetOutStream = process.stdout;
resetJQOutStream = process.stdout;
resetStateOutStream = process.stdout;
if (config.subgraphPath) {
resetIPLDStateOutStream = process.stdout;
}
}
visitor.exportReset(resetOutStream, resetJQOutStream, resetStateOutStream, config.subgraphPath);
visitor.exportReset(resetOutStream, resetJQOutStream, resetStateOutStream, resetIPLDStateOutStream, config.subgraphPath);
outStream = outputDir
? fs.createWriteStream(path.join(outputDir, 'src/cli/export-state.ts'))

View File

@ -35,7 +35,7 @@ export class Indexer {
* @param name Name of the query.
* @param params Parameters to the query.
* @param returnType Return type for the query.
* @param stateVariableTypeName Type of the state variable in case of state variable query.
* @param stateVariableType Type of the state variable in case of state variable query.
*/
addQuery (contract: string, mode: string, name: string, params: Array<Param>, returnType: string, stateVariableType?: string): void {
// Check if the query is already added.

View File

@ -10,25 +10,26 @@ import { Writable } from 'stream';
const RESET_TEMPLATE_FILE = './templates/reset-template.handlebars';
const RESET_JQ_TEMPLATE_FILE = './templates/reset-job-queue-template.handlebars';
const RESET_STATE_TEMPLATE_FILE = './templates/reset-state-template.handlebars';
const RESET_IPLD_STATE_TEMPLATE_FILE = './templates/reset-ipld-state-template.handlebars';
export class Reset {
_queries: Array<any>;
_resetTemplateString: string;
_resetJQTemplateString: string;
_resetStateTemplateString: string;
_resetIPLDStateTemplateString: string;
constructor () {
this._queries = [];
this._resetTemplateString = fs.readFileSync(path.resolve(__dirname, RESET_TEMPLATE_FILE)).toString();
this._resetJQTemplateString = fs.readFileSync(path.resolve(__dirname, RESET_JQ_TEMPLATE_FILE)).toString();
this._resetStateTemplateString = fs.readFileSync(path.resolve(__dirname, RESET_STATE_TEMPLATE_FILE)).toString();
this._resetIPLDStateTemplateString = fs.readFileSync(path.resolve(__dirname, RESET_IPLD_STATE_TEMPLATE_FILE)).toString();
}
/**
* Stores the query to be passed to the template.
* @param name Name of the query.
* @param params Parameters to the query.
* @param returnType Return type for the query.
*/
addQuery (name: string): void {
// Check if the query is already added.
@ -74,8 +75,9 @@ export class Reset {
* @param resetOutStream A writable output stream to write the reset file to.
* @param resetJQOutStream A writable output stream to write the reset job-queue file to.
* @param resetStateOutStream A writable output stream to write the reset state file to.
* @param resetIPLDStateOutStream A writable output stream to write the reset IPLD state file to.
*/
exportReset (resetOutStream: Writable, resetJQOutStream: Writable, resetStateOutStream: Writable, subgraphPath: string): void {
exportReset (resetOutStream: Writable, resetJQOutStream: Writable, resetStateOutStream: Writable, resetIPLDStateOutStream: Writable | undefined, subgraphPath: string): void {
const resetTemplate = Handlebars.compile(this._resetTemplateString);
const resetString = resetTemplate({});
resetOutStream.write(resetString);
@ -91,5 +93,11 @@ export class Reset {
};
const resetState = resetStateTemplate(obj);
resetStateOutStream.write(resetState);
if (resetIPLDStateOutStream) {
const resetIPLDStateTemplate = Handlebars.compile(this._resetIPLDStateTemplateString);
const resetIPLDStateString = resetIPLDStateTemplate({});
resetIPLDStateOutStream.write(resetIPLDStateString);
}
}
}

View File

@ -14,6 +14,12 @@
{{#if (subgraphPath)}}
subgraphPath = "{{subgraphPath}}"
# Disable creation of state from subgraph entity updates
# CAUTION: Disable only if subgraph state is not desired or can be filled subsequently
disableSubgraphState = false
# Interval to restart wasm instance periodically
wasmRestartBlocksInterval = 20
{{/if}}

View File

@ -120,6 +120,12 @@ export class Database implements IPLDDatabaseInterface {
await this._baseDatabase.removeIPLDBlocks(repo, blockNumber, kind);
}
async removeIPLDBlocksInRange (dbTx: QueryRunner, startBlock: number, endBlock: number): Promise<void> {
const repo = dbTx.manager.getRepository(IPLDBlock);
await this._baseDatabase.removeIPLDBlocksInRange(repo, startBlock, endBlock);
}
async getIPLDStatus (): Promise<IpldStatus | undefined> {
const repo = this._conn.getRepository(IpldStatus);

View File

@ -0,0 +1,96 @@
//
// Copyright 2022 Vulcanize, Inc.
//
import 'reflect-metadata';
import debug from 'debug';
import { Between } from 'typeorm';
import { Database as GraphDatabase, prepareEntityState } from '@vulcanize/graph-node';
import { Indexer } from './indexer';
const log = debug('vulcanize:fill-state');
export const fillState = async (
indexer: Indexer,
graphDb: GraphDatabase,
dataSources: any[],
argv: {
startBlock: number,
endBlock: number
}
): Promise<void> => {
const { startBlock, endBlock } = argv;
if (startBlock > endBlock) {
log('endBlock should be greater than or equal to startBlock');
process.exit(1);
}
// NOTE: Assuming all blocks in the given range are in the pruned region
log(`Filling state for subgraph entities in range: [${startBlock}, ${endBlock}]`);
// Check that there are no existing diffs in this range
const existingStates = await indexer.getIPLDBlocks({ block: { blockNumber: Between(startBlock, endBlock) } });
if (existingStates.length > 0) {
log('found existing state(s) in the given range');
process.exit(1);
}
// Map: contractAddress -> entities updated
const contractEntitiesMap: Map<string, string[]> = new Map();
// Populate contractEntitiesMap using data sources from subgraph
// NOTE: Assuming each entity type is only mapped to a single contract
// This is true for eden subgraph; may not be the case for other subgraphs
dataSources.forEach((dataSource: any) => {
const { source: { address: contractAddress }, mapping: { entities } } = dataSource;
contractEntitiesMap.set(contractAddress, entities as string[]);
});
// Fill state for blocks in the given range
for (let blockNumber = startBlock; blockNumber <= endBlock; blockNumber++) {
// Get the canonical block hash at current height
const blocks = await indexer.getBlocksAtHeight(blockNumber, false);
if (blocks.length === 0) {
log(`block not found at height ${blockNumber}`);
process.exit(1);
} else if (blocks.length > 1) {
log(`found more than one non-pruned block at height ${blockNumber}`);
process.exit(1);
}
const blockHash = blocks[0].blockHash;
// Fill state for each contract in contractEntitiesMap
const contractStatePromises = Array.from(contractEntitiesMap.entries())
.map(async ([contractAddress, entities]): Promise<void> => {
// Get all the updated entities at this block
const updatedEntitiesListPromises = entities.map(async (entity): Promise<any[]> => {
return graphDb.getEntitiesForBlock(blockHash, entity);
});
const updatedEntitiesList = await Promise.all(updatedEntitiesListPromises);
// Populate state with all the updated entities of each entity type
updatedEntitiesList.forEach((updatedEntities, index) => {
const entityName = entities[index];
updatedEntities.forEach((updatedEntity) => {
// Prepare diff data for the entity update
const diffData = prepareEntityState(updatedEntity, entityName, indexer.getRelationsMap());
// Update the in-memory subgraph state
indexer.updateSubgraphState(contractAddress, diffData);
});
});
});
await Promise.all(contractStatePromises);
// Persist subgraph state to the DB
await indexer.dumpSubgraphState(blockHash, true);
}
log(`Filled state for subgraph entities in range: [${startBlock}, ${endBlock}]`);
};

View File

@ -20,6 +20,9 @@ import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
import { Database } from './database';
import { Indexer } from './indexer';
import { EventWatcher } from './events';
{{#if (subgraphPath)}}
import { fillState } from './fill-state';
{{/if}}
const log = debug('vulcanize:server');
@ -41,6 +44,13 @@ export const main = async (): Promise<any> => {
demandOption: true,
describe: 'Block number to start processing at'
},
{{#if (subgraphPath)}}
state: {
type: 'boolean',
default: false,
describe: 'Fill state for subgraph entities'
},
{{/if}}
endBlock: {
type: 'number',
demandOption: true,
@ -86,6 +96,11 @@ export const main = async (): Promise<any> => {
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
if (argv.state) {
await fillState(indexer, graphDb, graphWatcher.dataSources, argv);
return;
}
{{/if}}
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.

View File

@ -386,6 +386,10 @@ export class Indexer implements IPLDIndexerInterface {
return this._baseIndexer.getIPLDBlockByCid(cid);
}
async getIPLDBlocks (where: FindConditions<IPLDBlock>): Promise<IPLDBlock[]> {
return this._db.getIPLDBlocks(where);
}
getIPLDData (ipldBlock: IPLDBlock): any {
return this._baseIndexer.getIPLDData(ipldBlock);
}
@ -469,7 +473,7 @@ export class Indexer implements IPLDIndexerInterface {
await this._graphWatcher.handleBlock(blockHash);
// Persist subgraph state to the DB.
await this._dumpSubgraphState(blockHash);
await this.dumpSubgraphState(blockHash);
}
{{/if}}
@ -673,6 +677,23 @@ export class Indexer implements IPLDIndexerInterface {
this._subgraphStateMap.set(contractAddress, updatedData);
}
async dumpSubgraphState (blockHash: string, isStateFinalized = false): Promise<void> {
// Create a diff for each contract in the subgraph state map.
const createDiffPromises = Array.from(this._subgraphStateMap.entries())
.map(([contractAddress, data]): Promise<void> => {
if (isStateFinalized) {
return this.createDiff(contractAddress, blockHash, data);
}
return this.createDiffStaged(contractAddress, blockHash, data);
});
await Promise.all(createDiffPromises);
// Reset the subgraph state map.
this._subgraphStateMap.clear();
}
_populateEntityTypesMap (): void {
{{#each subgraphEntities as | subgraphEntity |}}
this._entityTypesMap.set('{{subgraphEntity.className}}', {
@ -710,19 +731,6 @@ export class Indexer implements IPLDIndexerInterface {
{{/if}}
{{/each}}
}
async _dumpSubgraphState (blockHash: string): Promise<void> {
// Create a diff for each contract in the subgraph state map.
const createDiffPromises = Array.from(this._subgraphStateMap.entries())
.map(([contractAddress, data]): Promise<void> => {
return this.createDiffStaged(contractAddress, blockHash, data);
});
await Promise.all(createDiffPromises);
// Reset the subgraph state map.
this._subgraphStateMap.clear();
}
{{/if}}
async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial<BlockProgress>): Promise<BlockProgress> {

View File

@ -15,6 +15,9 @@
"job-runner:dev": "DEBUG=vulcanize:* ts-node src/job-runner.ts",
"watch:contract": "DEBUG=vulcanize:* ts-node src/cli/watch-contract.ts",
"fill": "DEBUG=vulcanize:* ts-node src/fill.ts",
{{#if (subgraphPath)}}
"fill:state": "DEBUG=vulcanize:* ts-node src/fill.ts --state",
{{/if}}
"reset": "DEBUG=vulcanize:* ts-node src/cli/reset.ts",
"checkpoint": "DEBUG=vulcanize:* ts-node src/cli/checkpoint.ts",
"export-state": "DEBUG=vulcanize:* ts-node src/cli/export-state.ts",

View File

@ -0,0 +1,55 @@
//
// Copyright 2022 Vulcanize, Inc.
//
import debug from 'debug';
import { getConfig } from '@vulcanize/util';
import { Database } from '../../database';
const log = debug('vulcanize:reset-ipld-state');
export const command = 'ipld-state';
export const desc = 'Reset IPLD state in the given range';
export const builder = {
startBlock: {
type: 'number'
},
endBlock: {
type: 'number'
}
};
export const handler = async (argv: any): Promise<void> => {
const { startBlock, endBlock } = argv;
if (startBlock > endBlock) {
log('endBlock should be greater than or equal to startBlock');
process.exit(1);
}
const config = await getConfig(argv.configFile);
// Initialize database
const db = new Database(config.database);
await db.init();
// Create a DB transaction
const dbTx = await db.createTransactionRunner();
try {
// Delete all IPLDBlock entries in the given range
await db.removeIPLDBlocksInRange(dbTx, startBlock, endBlock);
dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
log(`Reset ipld-state successfully for range [${startBlock}, ${endBlock}]`);
};

View File

@ -215,8 +215,8 @@ export class Visitor {
* @param resetJQOutStream A writable output stream to write the reset job-queue file to.
* @param resetStateOutStream A writable output stream to write the reset state file to.
*/
exportReset (resetOutStream: Writable, resetJQOutStream: Writable, resetStateOutStream: Writable, subgraphPath: string): void {
this._reset.exportReset(resetOutStream, resetJQOutStream, resetStateOutStream, subgraphPath);
exportReset (resetOutStream: Writable, resetJQOutStream: Writable, resetStateOutStream: Writable, resetIPLDStateOutStream: Writable | undefined, subgraphPath: string): void {
this._reset.exportReset(resetOutStream, resetJQOutStream, resetStateOutStream, resetIPLDStateOutStream, subgraphPath);
}
/**

View File

@ -13,6 +13,12 @@
# ipfsApiAddr = "/ip4/127.0.0.1/tcp/5001"
subgraphPath = "../graph-node/test/subgraph/eden"
# Disable creation of state from subgraph entity updates
# CAUTION: Disable only if subgraph state is not desired or can be filled subsequently
disableSubgraphState = false
# Interval to restart wasm instance periodically
wasmRestartBlocksInterval = 20
# Boolean to filter logs by contract.

View File

@ -15,6 +15,7 @@
"job-runner:dev": "DEBUG=vulcanize:* YARN_CHILD_PROCESS=true ts-node src/job-runner.ts",
"watch:contract": "DEBUG=vulcanize:* ts-node src/cli/watch-contract.ts",
"fill": "DEBUG=vulcanize:* ts-node src/fill.ts",
"fill:state": "DEBUG=vulcanize:* ts-node src/fill.ts --state",
"reset": "DEBUG=vulcanize:* ts-node src/cli/reset.ts",
"checkpoint": "DEBUG=vulcanize:* ts-node src/cli/checkpoint.ts",
"export-state": "DEBUG=vulcanize:* ts-node src/cli/export-state.ts",

View File

@ -0,0 +1,57 @@
//
// Copyright 2022 Vulcanize, Inc.
//
import debug from 'debug';
import { getConfig } from '@vulcanize/util';
import { Database } from '../../database';
const log = debug('vulcanize:reset-ipld-state');
export const command = 'ipld-state';
export const desc = 'Reset IPLD state in the given range';
export const builder = {
startBlock: {
type: 'number'
},
endBlock: {
type: 'number'
}
};
export const handler = async (argv: any): Promise<void> => {
const { startBlock, endBlock } = argv;
if (startBlock > endBlock) {
log('endBlock should be greater than or equal to startBlock');
process.exit(1);
}
const config = await getConfig(argv.configFile);
// Initialize database
const db = new Database(config.database);
await db.init();
// Create a DB transaction
const dbTx = await db.createTransactionRunner();
console.time('time:reset-ipld-state');
try {
// Delete all IPLDBlock entries in the given range
await db.removeIPLDBlocksInRange(dbTx, startBlock, endBlock);
dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
console.timeEnd('time:reset-ipld-state');
log(`Reset ipld-state successfully for range [${startBlock}, ${endBlock}]`);
};

View File

@ -80,6 +80,12 @@ export class Database implements IPLDDatabaseInterface {
await this._baseDatabase.removeIPLDBlocks(repo, blockNumber, kind);
}
async removeIPLDBlocksInRange (dbTx: QueryRunner, startBlock: number, endBlock: number): Promise<void> {
const repo = dbTx.manager.getRepository(IPLDBlock);
await this._baseDatabase.removeIPLDBlocksInRange(repo, startBlock, endBlock);
}
async getIPLDStatus (): Promise<IpldStatus | undefined> {
const repo = this._conn.getRepository(IpldStatus);

View File

@ -0,0 +1,104 @@
//
// Copyright 2022 Vulcanize, Inc.
//
import 'reflect-metadata';
import debug from 'debug';
import { Between } from 'typeorm';
import { Database as GraphDatabase, prepareEntityState } from '@vulcanize/graph-node';
import { Indexer } from './indexer';
const log = debug('vulcanize:fill-state');
export const fillState = async (
indexer: Indexer,
graphDb: GraphDatabase,
dataSources: any[],
argv: {
startBlock: number,
endBlock: number
}
): Promise<void> => {
const { startBlock, endBlock } = argv;
if (startBlock > endBlock) {
log('endBlock should be greater than or equal to startBlock');
process.exit(1);
}
// NOTE: Assuming all blocks in the given range are in the pruned region
log(`Filling state for subgraph entities in range: [${startBlock}, ${endBlock}]`);
// Check that there are no existing diffs in this range
const existingStates = await indexer.getIPLDBlocks({ block: { blockNumber: Between(startBlock, endBlock) } });
if (existingStates.length > 0) {
log('found existing state(s) in the given range');
process.exit(1);
}
// Map: contractAddress -> entities updated
const contractEntitiesMap: Map<string, string[]> = new Map();
// Populate contractEntitiesMap using data sources from subgraph
// NOTE: Assuming each entity type is only mapped to a single contract
// This is true for eden subgraph; may not be the case for other subgraphs
dataSources.forEach((dataSource: any) => {
const { source: { address: contractAddress }, mapping: { entities } } = dataSource;
contractEntitiesMap.set(contractAddress, entities as string[]);
});
console.time('time:fill-state');
// Fill state for blocks in the given range
for (let blockNumber = startBlock; blockNumber <= endBlock; blockNumber++) {
console.time(`time:fill-state-${blockNumber}`);
// Get the canonical block hash at current height
const blocks = await indexer.getBlocksAtHeight(blockNumber, false);
if (blocks.length === 0) {
log(`block not found at height ${blockNumber}`);
process.exit(1);
} else if (blocks.length > 1) {
log(`found more than one non-pruned block at height ${blockNumber}`);
process.exit(1);
}
const blockHash = blocks[0].blockHash;
// Fill state for each contract in contractEntitiesMap
const contractStatePromises = Array.from(contractEntitiesMap.entries())
.map(async ([contractAddress, entities]): Promise<void> => {
// Get all the updated entities at this block
const updatedEntitiesListPromises = entities.map(async (entity): Promise<any[]> => {
return graphDb.getEntitiesForBlock(blockHash, entity);
});
const updatedEntitiesList = await Promise.all(updatedEntitiesListPromises);
// Populate state with all the updated entities of each entity type
updatedEntitiesList.forEach((updatedEntities, index) => {
const entityName = entities[index];
updatedEntities.forEach((updatedEntity) => {
// Prepare diff data for the entity update
const diffData = prepareEntityState(updatedEntity, entityName, indexer.getRelationsMap());
// Update the in-memory subgraph state
indexer.updateSubgraphState(contractAddress, diffData);
});
});
});
await Promise.all(contractStatePromises);
// Persist subgraph state to the DB
await indexer.dumpSubgraphState(blockHash, true);
console.timeEnd(`time:fill-state-${blockNumber}`);
}
console.timeEnd('time:fill-state');
log(`Filled state for subgraph entities in range: [${startBlock}, ${endBlock}]`);
};

View File

@ -16,6 +16,7 @@ import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
import { Database } from './database';
import { Indexer } from './indexer';
import { EventWatcher } from './events';
import { fillState } from './fill-state';
const log = debug('vulcanize:server');
@ -51,6 +52,11 @@ export const main = async (): Promise<any> => {
type: 'number',
default: 10,
describe: 'Number of blocks prefetched in batch'
},
state: {
type: 'boolean',
default: false,
describe: 'Fill state for subgraph entities'
}
}).argv;
@ -80,6 +86,11 @@ export const main = async (): Promise<any> => {
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
if (argv.state) {
await fillState(indexer, graphDb, graphWatcher.dataSources, argv);
return;
}
// Note: In-memory pubsub works fine for now, as each watcher is a single process anyway.
// Later: https://www.apollographql.com/docs/apollo-server/data/subscriptions/#production-pubsub-libraries
const pubsub = new PubSub();

View File

@ -308,6 +308,10 @@ export class Indexer implements IPLDIndexerInterface {
return this._baseIndexer.getIPLDBlockByCid(cid);
}
async getIPLDBlocks (where: FindConditions<IPLDBlock>): Promise<IPLDBlock[]> {
return this._db.getIPLDBlocks(where);
}
async getDiffIPLDBlocksInRange (contractAddress: string, startBlock: number, endBlock: number): Promise<IPLDBlock[]> {
return this._db.getDiffIPLDBlocksInRange(contractAddress, startBlock, endBlock);
}
@ -410,7 +414,7 @@ export class Indexer implements IPLDIndexerInterface {
console.time('time:indexer#processBlockAfterEvents-dump_subgraph_state');
// Persist subgraph state to the DB.
await this._dumpSubgraphState(blockHash);
await this.dumpSubgraphState(blockHash);
console.timeEnd('time:indexer#processBlockAfterEvents-dump_subgraph_state');
}
@ -610,6 +614,23 @@ export class Indexer implements IPLDIndexerInterface {
this._subgraphStateMap.set(contractAddress, updatedData);
}
async dumpSubgraphState (blockHash: string, isStateFinalized = false): Promise<void> {
// Create a diff for each contract in the subgraph state map.
const createDiffPromises = Array.from(this._subgraphStateMap.entries())
.map(([contractAddress, data]): Promise<void> => {
if (isStateFinalized) {
return this.createDiff(contractAddress, blockHash, data);
}
return this.createDiffStaged(contractAddress, blockHash, data);
});
await Promise.all(createDiffPromises);
// Reset the subgraph state map.
this._subgraphStateMap.clear();
}
_populateEntityTypesMap (): void {
this._entityTypesMap.set(
'Producer',
@ -971,19 +992,6 @@ export class Indexer implements IPLDIndexerInterface {
});
}
async _dumpSubgraphState (blockHash: string): Promise<void> {
// Create a diff for each contract in the subgraph state map.
const createDiffPromises = Array.from(this._subgraphStateMap.entries())
.map(([contractAddress, data]): Promise<void> => {
return this.createDiffStaged(contractAddress, blockHash, data);
});
await Promise.all(createDiffPromises);
// Reset the subgraph state map.
this._subgraphStateMap.clear();
}
async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial<BlockProgress>): Promise<BlockProgress> {
assert(blockHash);
const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash });

View File

@ -82,6 +82,18 @@ export class Database {
}
}
async getEntitiesForBlock (blockHash: string, tableName: string): Promise<any[]> {
const repo = this._conn.getRepository(tableName);
const entities = await repo.find({
where: {
blockHash
}
});
return entities;
}
async getEntityIdsAtBlockNumber (blockNumber: number, tableName: string): Promise<string[]> {
const repo = this._conn.getRepository(tableName);

View File

@ -1,2 +1,3 @@
export * from './watcher';
export * from './database';
export { prepareEntityState } from './utils';

View File

@ -22,10 +22,10 @@ import {
Block,
fromEthereumValue,
toEthereumValue,
resolveEntityFieldConflicts,
getEthereumTypes,
jsonFromBytes,
getStorageValueType
getStorageValueType,
prepareEntityState
} from './utils';
import { Database } from './database';
@ -94,53 +94,19 @@ export const instantiate = async (
const entityInstance = await Entity.wrap(data);
assert(context.block);
let dbData = await database.fromGraphEntity(instanceExports, context.block, entityName, entityInstance);
const dbData = await database.fromGraphEntity(instanceExports, context.block, entityName, entityInstance);
await database.saveEntity(entityName, dbData);
// Resolve any field name conflicts in the dbData for auto-diff.
dbData = resolveEntityFieldConflicts(dbData);
// Update the in-memory subgraph state if not disabled.
if (!indexer.serverConfig.disableSubgraphState) {
// Prepare diff data for the entity update
assert(indexer.getRelationsMap);
const diffData = prepareEntityState(dbData, entityName, indexer.getRelationsMap());
// Prepare the diff data.
const diffData: any = { state: {} };
assert(indexer.getRelationsMap);
const result = Array.from(indexer.getRelationsMap().entries())
.find(([key]) => key.name === entityName);
if (result) {
// Update dbData if relations exist.
const [_, relations] = result;
// Update relation fields for diff data to be similar to GQL query entities.
Object.entries(relations).forEach(([relation, { isArray, isDerived }]) => {
if (isDerived || !dbData[relation]) {
// Field is not present in dbData for derived relations
return;
}
if (isArray) {
dbData[relation] = dbData[relation]
.map((id: string) => ({ id }))
.sort((a: any, b: any) => a.id.localeCompare(b.id));
} else {
dbData[relation] = { id: dbData[relation] };
}
});
assert(indexer.updateSubgraphState);
assert(context.contractAddress);
indexer.updateSubgraphState(context.contractAddress, diffData);
}
// JSON stringify and parse data for handling unknown types when encoding.
// For example, decimal.js values are converted to string in the diff data.
diffData.state[entityName] = {
// Using custom replacer to store bigints as string values to be encoded by IPLD dag-cbor.
// TODO: Parse and store as native bigint by using Type encoders in IPLD dag-cbor encode.
// https://github.com/rvagg/cborg#type-encoders
[dbData.id]: JSON.parse(JSON.stringify(dbData, jsonBigIntStringReplacer))
};
// Update the in-memory subgraph state.
assert(indexer.updateSubgraphState);
assert(context.contractAddress);
indexer.updateSubgraphState(context.contractAddress, diffData);
},
'log.log': (level: number, msg: number) => {

View File

@ -6,7 +6,7 @@ import yaml from 'js-yaml';
import { ColumnMetadata } from 'typeorm/metadata/ColumnMetadata';
import assert from 'assert';
import { GraphDecimal } from '@vulcanize/util';
import { GraphDecimal, jsonBigIntStringReplacer } from '@vulcanize/util';
import { TypeId, EthereumValueKind, ValueKind } from './types';
import { MappingKey, StorageLayout } from '@vulcanize/solidity-mapper';
@ -798,3 +798,46 @@ const getEthereumType = (storageTypes: StorageLayout['types'], type: string, map
return utils.ParamType.from(label);
};
export const prepareEntityState = (updatedEntity: any, entityName: string, relationsMap: Map<any, { [key: string]: any }>): any => {
// Resolve any field name conflicts in the dbData for auto-diff.
updatedEntity = resolveEntityFieldConflicts(updatedEntity);
// Prepare the diff data.
const diffData: any = { state: {} };
const result = Array.from(relationsMap.entries())
.find(([key]) => key.name === entityName);
if (result) {
// Update entity data if relations exist.
const [_, relations] = result;
// Update relation fields for diff data to be similar to GQL query entities.
Object.entries(relations).forEach(([relation, { isArray, isDerived }]) => {
if (isDerived || !updatedEntity[relation]) {
// Field is not present in dbData for derived relations
return;
}
if (isArray) {
updatedEntity[relation] = updatedEntity[relation]
.map((id: string) => ({ id }))
.sort((a: any, b: any) => a.id.localeCompare(b.id));
} else {
updatedEntity[relation] = { id: updatedEntity[relation] };
}
});
}
// JSON stringify and parse data for handling unknown types when encoding.
// For example, decimal.js values are converted to string in the diff data.
diffData.state[entityName] = {
// Using custom replacer to store bigints as string values to be encoded by IPLD dag-cbor.
// TODO: Parse and store as native bigint by using Type encoders in IPLD dag-cbor encode.
// https://github.com/rvagg/cborg#type-encoders
[updatedEntity.id]: JSON.parse(JSON.stringify(updatedEntity, jsonBigIntStringReplacer))
};
return diffData;
};

View File

@ -101,6 +101,10 @@ export class GraphWatcher {
}, {});
}
get dataSources (): any[] {
return this._dataSources;
}
async addContracts () {
assert(this._indexer);
assert(this._indexer.watchContract);

View File

@ -188,6 +188,7 @@ class ServerConfig implements ServerConfigInterface {
checkpointInterval: number;
ipfsApiAddr: string;
subgraphPath: string;
disableSubgraphState: boolean;
wasmRestartBlocksInterval: number;
filterLogs: boolean;
maxEventsBlockRange: number;
@ -201,6 +202,7 @@ class ServerConfig implements ServerConfigInterface {
this.checkpointInterval = 0;
this.ipfsApiAddr = '';
this.subgraphPath = '';
this.disableSubgraphState = false;
this.wasmRestartBlocksInterval = 0;
this.filterLogs = false;
this.maxEventsBlockRange = 0;

View File

@ -3,11 +3,9 @@
//
import yargs from 'yargs';
import { ethers, providers } from 'ethers';
import { providers } from 'ethers';
import debug from 'debug';
import { readAbi } from './common';
const log = debug('vulcanize:test');
const main = async (): Promise<void> => {

View File

@ -33,6 +33,7 @@ export interface ServerConfig {
checkpointInterval: number;
ipfsApiAddr: string;
subgraphPath: string;
disableSubgraphState: boolean;
wasmRestartBlocksInterval: number;
filterLogs: boolean;
maxEventsBlockRange: number;

View File

@ -22,16 +22,33 @@ export class IPLDDatabase extends Database {
queryBuilder.andWhere('block.block_number <= :blockNumber', { blockNumber });
}
// Filter using kind if specified else order by id to give preference to checkpoint.
// Filter using kind if specified else avoid diff_staged block.
queryBuilder = kind
? queryBuilder.andWhere('ipld_block.kind = :kind', { kind })
: queryBuilder.andWhere('ipld_block.kind != :kind', { kind: StateKind.DiffStaged })
.addOrderBy('ipld_block.id', 'DESC');
: queryBuilder.andWhere('ipld_block.kind != :kind', { kind: StateKind.DiffStaged });
// Get the first entry.
queryBuilder.limit(1);
// Get the first two entries.
queryBuilder.limit(2);
return queryBuilder.getOne();
const results = await queryBuilder.getMany();
switch (results.length) {
case 0:
// No result found.
return;
case 1:
// Return the only IPLD block entry found.
return results[0];
case 2:
// If there are two entries in the result and both are at the same block number, give preference to checkpoint kind.
if (results[0].block.blockNumber === results[1].block.blockNumber) {
return (results[1].kind === StateKind.Checkpoint) ? results[1] : results[0];
} else {
return results[0];
}
default:
throw new Error(`Unexpected results length ${results.length}`);
}
}
async getPrevIPLDBlock (repo: Repository<IPLDBlockInterface>, blockHash: string, contractAddress: string, kind?: string): Promise<IPLDBlockInterface | undefined> {
@ -148,6 +165,20 @@ export class IPLDDatabase extends Database {
}
}
async removeIPLDBlocksInRange (repo: Repository<IPLDBlockInterface>, startBlock: number, endBlock: number): Promise<void> {
// Use raw SQL as TypeORM curently doesn't support delete via 'join' or 'using'
const deleteQuery = `
DELETE FROM
ipld_block
USING block_progress
WHERE
ipld_block.block_id = block_progress.id
AND block_progress.block_number BETWEEN $1 AND $2;
`;
await repo.query(deleteQuery, [startBlock, endBlock]);
}
async getIPLDStatus (repo: Repository<IpldStatusInterface>): Promise<IpldStatusInterface | undefined> {
return repo.findOne();
}