Update watcher import CLI to create entities from checkpoint (#186)

* Import entities from checkpoint

* Fix IPLD state when updating subgraph entity

* Changes in codegen and other watchers

* Update IPLD state with all Block entities

* Add verify and create sub commands to checkpoint

* Add option for specifying snapshot block in export state CLI
This commit is contained in:
nikugogoi 2022-09-22 15:26:06 +05:30 committed by GitHub
parent b2cf997900
commit a8fdcca866
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 447 additions and 226 deletions

View File

@ -42,11 +42,6 @@ const main = async (): Promise<void> => {
describe: 'Configuration file path (toml)',
default: DEFAULT_CONFIG_PATH
},
exportFile: {
alias: 'o',
type: 'string',
describe: 'Export file path'
},
{{#if (subgraphPath)}}
verify: {
alias: 'v',
@ -55,11 +50,10 @@ const main = async (): Promise<void> => {
default: true
},
{{/if}}
createCheckpoint: {
alias: 'c',
type: 'boolean',
describe: 'Create new checkpoint',
default: false
exportFile: {
alias: 'o',
type: 'string',
describe: 'Export file path'
}
}).argv;
@ -122,10 +116,7 @@ const main = async (): Promise<void> => {
// Create and export checkpoint if checkpointing is on for the contract.
if (contract.checkpoint) {
if (argv.createCheckpoint) {
log(`Creating checkpoint at block ${block.blockNumber}`);
await indexer.createCheckpoint(contract.address, block.blockHash);
}
await indexer.createCheckpoint(contract.address, block.blockHash);
const ipldBlock = await indexer.getLatestIPLDBlock(contract.address, StateKind.Checkpoint, block.blockNumber);
assert(ipldBlock);

View File

@ -91,6 +91,7 @@ export const main = async (): Promise<any> => {
eventWatcher,
config.upstream.ethServer.blockDelayInMilliSecs,
{
prefetch: true,
startBlock: importData.snapshotBlock.blockNumber,
endBlock: importData.snapshotBlock.blockNumber
}
@ -114,12 +115,23 @@ export const main = async (): Promise<any> => {
ipldBlock.data = Buffer.from(codec.encode(ipldBlock.data));
await indexer.saveOrUpdateIPLDBlock(ipldBlock);
ipldBlock = await indexer.saveOrUpdateIPLDBlock(ipldBlock);
{{#if (subgraphPath)}}
await graphWatcher.updateEntitiesFromIPLDState(ipldBlock);
{{/if}}
}
// Mark snapshot block as completely processed.
block.isComplete = true;
await indexer.updateBlockProgress(block, block.lastProcessedEventIndex);
await indexer.updateSyncStatusChainHead(block.blockHash, block.blockNumber);
await indexer.updateSyncStatusIndexedBlock(block.blockHash, block.blockNumber);
// The 'diff_staged' and 'init' IPLD blocks are unnecessary as checkpoints have been already created for the snapshot block.
await indexer.removeIPLDBlocks(block.blockNumber, StateKind.Init);
await indexer.removeIPLDBlocks(block.blockNumber, StateKind.DiffStaged);
log(`Import completed for snapshot block at height ${block.blockNumber}`);
};
main().catch(err => {

View File

@ -57,12 +57,12 @@ const JSONbigNative = JSONbig({ useNativeBigInt: true });
{{#each contracts as | contract |}}
const KIND_{{capitalize contract.contractName}} = '{{contract.contractKind}}';
{{/each}}
{{/each}}
{{#each uniqueEvents as | event |}}
const {{capitalize event}}_EVENT = '{{event}}';
{{/each}}
{{/each}}
export type ResultEvent = {
block: {
cid: string;
@ -443,8 +443,6 @@ export class Indexer implements IPLDIndexerInterface {
{{#if (subgraphPath)}}
async getSubgraphEntity<Entity> (entity: new () => Entity, id: string, block?: BlockHeight): Promise<any> {
const relations = this._relationsMap.get(entity) || {};
const data = await this._graphWatcher.getEntity(entity, id, this._relationsMap, block);
return data;

View File

@ -150,7 +150,7 @@ GQL console: http://localhost:{{port}}/graphql
yarn export-state --export-file [export-file-path]
```
* `export-file`: Path of JSON file to which to export the watcher data.
* `export-file`: Path of file to which to export the watcher data.
* In target watcher, run job-runner:
@ -164,15 +164,7 @@ GQL console: http://localhost:{{port}}/graphql
yarn import-state --import-file <import-file-path>
```
* `import-file`: Path of JSON file from which to import the watcher data.
* Run fill:
```bash
yarn fill --start-block <snapshot-block> --end-block <to-block>
```
* `snapshot-block`: Block number at which the watcher state was exported.
* `import-file`: Path of file from which to import the watcher data.
* Run server:

View File

@ -120,12 +120,20 @@ GQL console: http://localhost:3012/graphql
* To create a checkpoint for a contract:
```bash
yarn checkpoint --address <contract-address> --block-hash [block-hash]
yarn checkpoint create --address <contract-address> --block-hash [block-hash]
```
* `address`: Address or identifier of the contract for which to create a checkpoint.
* `block-hash`: Hash of a block (in the pruned region) at which to create the checkpoint (default: latest canonical block hash).
* To verify a checkpoint:
```bash
yarn checkpoint verify --cid <checkpoint-cid>
```
`cid`: CID of the checkpoint for which to verify.
* To reset the watcher to a previous block number:
* Reset state:
@ -147,10 +155,11 @@ GQL console: http://localhost:3012/graphql
* In source watcher, export watcher state:
```bash
yarn export-state --export-file [export-file-path]
yarn export-state --export-file [export-file-path] --block-number [snapshot-block-height]
```
* `export-file`: Path of JSON file to which to export the watcher data.
* `export-file`: Path of file to which to export the watcher data.
* `block-number`: Block height at which to take snapshot for export.
* In target watcher, run job-runner:
@ -164,15 +173,7 @@ GQL console: http://localhost:3012/graphql
yarn import-state --import-file <import-file-path>
```
* `import-file`: Path of JSON file from which to import the watcher data.
* Run fill:
```bash
yarn fill --start-block <snapshot-block> --end-block <to-block>
```
* `snapshot-block`: Block number at which the watcher state was exported.
* `import-file`: Path of file from which to import the watcher data.
* Run server:

View File

@ -17,9 +17,12 @@
"fill": "DEBUG=vulcanize:* ts-node src/fill.ts",
"fill:state": "DEBUG=vulcanize:* ts-node src/fill.ts --state",
"reset": "DEBUG=vulcanize:* ts-node src/cli/reset.ts",
"checkpoint": "DEBUG=vulcanize:* ts-node src/cli/checkpoint.ts",
"export-state": "DEBUG=vulcanize:* ts-node src/cli/export-state.ts",
"import-state": "DEBUG=vulcanize:* ts-node src/cli/import-state.ts",
"checkpoint": "DEBUG=vulcanize:* node --enable-source-maps --max-old-space-size=3072 dist/cli/checkpoint.js",
"checkpoint:dev": "DEBUG=vulcanize:* ts-node src/cli/checkpoint.ts",
"export-state": "DEBUG=vulcanize:* node --enable-source-maps --max-old-space-size=3072 dist/cli/export-state.js",
"export-state:dev": "DEBUG=vulcanize:* ts-node src/cli/export-state.ts",
"import-state": "DEBUG=vulcanize:* node --enable-source-maps --max-old-space-size=3072 dist/cli/import-state.js",
"import-state:dev": "DEBUG=vulcanize:* ts-node src/cli/import-state.ts",
"inspect-cid": "DEBUG=vulcanize:* ts-node src/cli/inspect-cid.ts",
"index-block": "DEBUG=vulcanize:* ts-node src/cli/index-block.ts"
},
@ -35,12 +38,12 @@
"homepage": "https://github.com/cerc-io/watcher-ts#readme",
"dependencies": {
"@apollo/client": "^3.3.19",
"@ethersproject/providers": "^5.4.4",
"@ipld/dag-cbor": "^6.0.12",
"@cerc-io/graph-node": "^0.1.0",
"@cerc-io/ipld-eth-client": "^0.1.0",
"@cerc-io/solidity-mapper": "^0.1.0",
"@cerc-io/util": "^0.1.0",
"@ethersproject/providers": "^5.4.4",
"@ipld/dag-cbor": "^6.0.12",
"apollo-server-express": "^2.25.0",
"apollo-type-bigint": "^0.1.3",
"debug": "^4.3.1",

View File

@ -0,0 +1,66 @@
//
// Copyright 2022 Vulcanize, Inc.
//
import path from 'path';
import debug from 'debug';
import assert from 'assert';
import { getConfig, initClients, JobQueue, Config } from '@cerc-io/util';
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import { Database } from '../../database';
import { Indexer } from '../../indexer';
const log = debug('vulcanize:checkpoint-create');
export const command = 'create';
export const desc = 'Create checkpoint';
export const builder = {
address: {
type: 'string',
require: true,
demandOption: true,
describe: 'Contract address to create the checkpoint for.'
},
blockHash: {
type: 'string',
describe: 'Blockhash at which to create the checkpoint.'
}
};
export const handler = async (argv: any): Promise<void> => {
const config: Config = await getConfig(argv.configFile);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, '../../entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
const blockHash = await indexer.processCLICheckpoint(argv.address, argv.blockHash);
log(`Created a checkpoint for contract ${argv.address} at block-hash ${blockHash}`);
await db.close();
};

View File

@ -0,0 +1,66 @@
//
// Copyright 2022 Vulcanize, Inc.
//
import path from 'path';
import debug from 'debug';
import assert from 'assert';
import { getConfig, initClients, JobQueue, Config, verifyCheckpointData } from '@cerc-io/util';
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import { Database } from '../../database';
import { Indexer } from '../../indexer';
const log = debug('vulcanize:checkpoint-verify');
export const command = 'verify';
export const desc = 'Verify checkpoint';
export const builder = {
cid: {
alias: 'c',
type: 'string',
demandOption: true,
describe: 'Checkpoint CID to be verified'
}
};
export const handler = async (argv: any): Promise<void> => {
const config: Config = await getConfig(argv.configFile);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, '../../entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
const ipldBlock = await indexer.getIPLDBlockByCid(argv.cid);
assert(ipldBlock, 'IPLDBlock for the provided CID doesn\'t exist.');
const data = indexer.getIPLDData(ipldBlock);
log(`Verifying checkpoint data for contract ${ipldBlock.contractAddress}`);
await verifyCheckpointData(graphDb, ipldBlock.block, data);
log('Checkpoint data verified');
await db.close();
};

View File

@ -2,79 +2,38 @@
// Copyright 2021 Vulcanize, Inc.
//
import path from 'path';
import yargs from 'yargs';
import 'reflect-metadata';
import debug from 'debug';
import assert from 'assert';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@cerc-io/util';
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import { DEFAULT_CONFIG_PATH } from '@cerc-io/util';
import { Database } from '../database';
import { Indexer } from '../indexer';
import { hideBin } from 'yargs/helpers';
const log = debug('vulcanize:checkpoint');
const main = async (): Promise<void> => {
const argv = await yargs.parserConfiguration({
'parse-numbers': false
}).options({
configFile: {
alias: 'f',
type: 'string',
require: true,
demandOption: true,
describe: 'Configuration file path (toml)',
default: DEFAULT_CONFIG_PATH
},
address: {
type: 'string',
require: true,
demandOption: true,
describe: 'Contract address to create the checkpoint for.'
},
blockHash: {
type: 'string',
describe: 'Blockhash at which to create the checkpoint.'
}
}).argv;
const config: Config = await getConfig(argv.configFile);
const { ethClient, ethProvider } = await initClients(config);
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, '../entity/*'));
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
assert(dbConnectionString, 'Missing job queue db connection string');
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
await jobQueue.start();
const indexer = new Indexer(config.server, db, ethClient, ethProvider, jobQueue, graphWatcher);
await indexer.init();
graphWatcher.setIndexer(indexer);
await graphWatcher.init();
const blockHash = await indexer.processCLICheckpoint(argv.address, argv.blockHash);
log(`Created a checkpoint for contract ${argv.address} at block-hash ${blockHash}`);
await db.close();
const main = async () => {
return yargs(hideBin(process.argv))
.parserConfiguration({
'parse-numbers': false
}).options({
configFile: {
alias: 'f',
type: 'string',
require: true,
demandOption: true,
describe: 'configuration file path (toml)',
default: DEFAULT_CONFIG_PATH
}
})
.commandDir('checkpoint-cmds', { extensions: ['ts', 'js'], exclude: /([a-zA-Z0-9\s_\\.\-:])+(.d.ts)$/ })
.demandCommand(1)
.help()
.argv;
};
main().catch(err => {
main().then(() => {
process.exit();
}).catch(err => {
log(err);
}).finally(() => {
process.exit(0);
});

View File

@ -9,7 +9,7 @@ import debug from 'debug';
import fs from 'fs';
import path from 'path';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, StateKind, verifyCheckpointData } from '@cerc-io/util';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, StateKind } from '@cerc-io/util';
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import * as codec from '@ipld/dag-cbor';
@ -35,17 +35,9 @@ const main = async (): Promise<void> => {
type: 'string',
describe: 'Export file path'
},
createCheckpoint: {
alias: 'c',
type: 'boolean',
describe: 'Create new checkpoint',
default: false
},
verify: {
alias: 'v',
type: 'boolean',
describe: 'Verify checkpoint',
default: true
blockNumber: {
type: 'number',
describe: 'Block number to create snapshot at'
}
}).argv;
@ -82,11 +74,25 @@ const main = async (): Promise<void> => {
};
const contracts = await db.getContracts();
// Get latest block with hooks processed.
const block = await indexer.getLatestHooksProcessedBlock();
let block = await indexer.getLatestHooksProcessedBlock();
assert(block);
if (argv.blockNumber) {
if (argv.blockNumber > block.blockNumber) {
throw new Error(`Export snapshot block height ${argv.blockNumber} should be less than latest hooks processed block height ${block.blockNumber}`);
}
const blocksAtSnapshotHeight = await indexer.getBlocksAtHeight(argv.blockNumber, false);
if (!blocksAtSnapshotHeight.length) {
throw new Error(`No blocks at snapshot height ${argv.blockNumber}`);
}
block = blocksAtSnapshotHeight[0];
}
log(`Creating export snapshot at block height ${block.blockNumber}`);
// Export snapshot block.
exportData.snapshotBlock = {
blockNumber: block.blockNumber,
@ -104,22 +110,13 @@ const main = async (): Promise<void> => {
// Create and export checkpoint if checkpointing is on for the contract.
if (contract.checkpoint) {
if (argv.createCheckpoint) {
log(`Creating checkpoint at block ${block.blockNumber}`);
await indexer.createCheckpoint(contract.address, block.blockHash);
}
await indexer.createCheckpoint(contract.address, block.blockHash);
const ipldBlock = await indexer.getLatestIPLDBlock(contract.address, StateKind.Checkpoint, block.blockNumber);
assert(ipldBlock);
const data = indexer.getIPLDData(ipldBlock);
if (argv.verify) {
log(`Verifying checkpoint data for contract ${contract.address}`);
await verifyCheckpointData(graphDb, ipldBlock.block, data);
log('Checkpoint data verified');
}
if (indexer.isIPFSConfigured()) {
await indexer.pushToIPFS(data);
}

View File

@ -85,6 +85,7 @@ export const main = async (): Promise<any> => {
eventWatcher,
config.upstream.ethServer.blockDelayInMilliSecs,
{
prefetch: true,
startBlock: importData.snapshotBlock.blockNumber,
endBlock: importData.snapshotBlock.blockNumber
}
@ -108,12 +109,23 @@ export const main = async (): Promise<any> => {
ipldBlock.data = Buffer.from(codec.encode(ipldBlock.data));
await indexer.saveOrUpdateIPLDBlock(ipldBlock);
ipldBlock = await indexer.saveOrUpdateIPLDBlock(ipldBlock);
await graphWatcher.updateEntitiesFromIPLDState(ipldBlock);
}
// Mark snapshot block as completely processed.
block.isComplete = true;
await indexer.updateBlockProgress(block, block.lastProcessedEventIndex);
await indexer.updateSyncStatusChainHead(block.blockHash, block.blockNumber);
await indexer.updateSyncStatusIndexedBlock(block.blockHash, block.blockNumber);
await indexer.updateIPLDStatusHooksBlock(block.blockNumber);
await indexer.updateIPLDStatusCheckpointBlock(block.blockNumber);
// The 'diff_staged' and 'init' IPLD blocks are unnecessary as checkpoints have been already created for the snapshot block.
await indexer.removeIPLDBlocks(block.blockNumber, StateKind.Init);
await indexer.removeIPLDBlocks(block.blockNumber, StateKind.DiffStaged);
log(`Import completed for snapshot block at height ${block.blockNumber}`);
};
main().catch(err => {

View File

@ -9,6 +9,8 @@ import _ from 'lodash';
import { Indexer, ResultEvent } from './indexer';
const IPLD_BATCH_BLOCKS = 10000;
/**
* Hook function to store an initial state.
* @param indexer Indexer instance.
@ -91,22 +93,28 @@ export async function createStateCheckpoint (indexer: Indexer, contractAddress:
state: prevNonDiffBlockData.state
};
// Merge all diff blocks after previous checkpoint.
for (const diffBlock of diffBlocks) {
const diff = codec.decode(Buffer.from(diffBlock.data)) as any;
data.state = _.merge(data.state, diff.state);
}
// Check if Block entity exists.
if (data.state.Block) {
// Store only block entity at checkpoint height instead of all entities.
data.state.Block = {
[blockHash]: data.state.Block[blockHash]
};
console.time('time:hooks#createStateCheckpoint');
for (let i = diffStartBlockNumber; i < block.blockNumber;) {
const endBlockHeight = Math.min(i + IPLD_BATCH_BLOCKS, block.blockNumber);
console.time(`time:hooks#createStateCheckpoint-batch-merge-diff-${i}-${endBlockHeight}`);
const diffBlocks = await indexer.getDiffIPLDBlocksInRange(contractAddress, i, endBlockHeight);
// Merge all diff blocks after previous checkpoint.
for (const diffBlock of diffBlocks) {
const diff = codec.decode(Buffer.from(diffBlock.data)) as any;
data.state = _.merge(data.state, diff.state);
}
console.timeEnd(`time:hooks#createStateCheckpoint-batch-merge-diff-${i}-${endBlockHeight}`);
i = endBlockHeight;
}
console.time('time:hooks#createStateCheckpoint-db-save-checkpoint');
await indexer.createStateCheckpoint(contractAddress, blockHash, data);
console.timeEnd('time:hooks#createStateCheckpoint-db-save-checkpoint');
console.timeEnd('time:hooks#createStateCheckpoint');
return true;
}

View File

@ -158,7 +158,7 @@ GQL console: http://localhost:3006/graphql
yarn export-state --export-file [export-file-path]
```
* `export-file`: Path of JSON file to which to export the watcher data.
* `export-file`: Path of file to which to export the watcher data.
* In target watcher, run job-runner:
@ -172,15 +172,7 @@ GQL console: http://localhost:3006/graphql
yarn import-state --import-file <import-file-path>
```
* `import-file`: Path of JSON file from which to import the watcher data.
* Run fill:
```bash
yarn fill --start-block <snapshot-block> --end-block <to-block>
```
* `snapshot-block`: Block number at which the watcher state was exported.
* `import-file`: Path of file from which to import the watcher data.
* Run server:

View File

@ -33,12 +33,6 @@ const main = async (): Promise<void> => {
alias: 'o',
type: 'string',
describe: 'Export file path'
},
createCheckpoint: {
alias: 'c',
type: 'boolean',
describe: 'Create new checkpoint',
default: false
}
}).argv;
@ -89,10 +83,7 @@ const main = async (): Promise<void> => {
// Create and export checkpoint if checkpointing is on for the contract.
if (contract.checkpoint) {
if (argv.createCheckpoint) {
log(`Creating checkpoint at block ${block.blockNumber}`);
await indexer.createCheckpoint(contract.address, block.blockHash);
}
await indexer.createCheckpoint(contract.address, block.blockHash);
const ipldBlock = await indexer.getLatestIPLDBlock(contract.address, StateKind.Checkpoint, block.blockNumber);
assert(ipldBlock);

View File

@ -76,6 +76,7 @@ export const main = async (): Promise<any> => {
eventWatcher,
config.upstream.ethServer.blockDelayInMilliSecs,
{
prefetch: true,
startBlock: importData.snapshotBlock.blockNumber,
endBlock: importData.snapshotBlock.blockNumber
}
@ -102,9 +103,17 @@ export const main = async (): Promise<any> => {
await indexer.saveOrUpdateIPLDBlock(ipldBlock);
}
// Mark snapshot block as completely processed.
block.isComplete = true;
await indexer.updateBlockProgress(block, block.lastProcessedEventIndex);
await indexer.updateSyncStatusChainHead(block.blockHash, block.blockNumber);
await indexer.updateSyncStatusIndexedBlock(block.blockHash, block.blockNumber);
// The 'diff_staged' and 'init' IPLD blocks are unnecessary as checkpoints have been already created for the snapshot block.
await indexer.removeIPLDBlocks(block.blockNumber, StateKind.Init);
await indexer.removeIPLDBlocks(block.blockNumber, StateKind.DiffStaged);
log(`Import completed for snapshot block at height ${block.blockNumber}`);
};
main().catch(err => {

View File

@ -182,7 +182,7 @@ export const main = async (): Promise<void> => {
}
} catch (err: any) {
log('Error:', err.message);
log('Error:', err);
log('Error:', JSON.stringify(err, null, 2));
}
}

View File

@ -276,7 +276,7 @@ export const combineIPLDState = (contractIPLDs: {[key: string]: any}[]): {[key:
const data = JSON.parse(contractIPLD.data);
// Apply default limit on array type relation fields.
// Apply default limit and sort by id on array type relation fields.
Object.values(data.state)
.forEach((idEntityMap: any) => {
Object.values(idEntityMap)
@ -288,6 +288,7 @@ export const combineIPLDState = (contractIPLDs: {[key: string]: any}[]): {[key:
fieldValue.length &&
fieldValue[0].id
) {
fieldValue.sort((a: any, b: any) => a.id.localeCompare(b.id));
fieldValue.splice(DEFAULT_LIMIT);
}
});
@ -323,7 +324,7 @@ export const checkEntityInIPLDState = async (
}
});
const diff = compareObjects(ipldEntity, resultEntity, rawJson);
const diff = compareObjects(resultEntity, ipldEntity, rawJson);
return diff;
};

View File

@ -21,7 +21,7 @@ import {
Where
} from '@cerc-io/util';
import { Block, fromEntityValue, toEntityValue } from './utils';
import { Block, fromEntityValue, fromStateEntityValues, toEntityValue } from './utils';
export const DEFAULT_LIMIT = 100;
@ -508,6 +508,43 @@ export class Database {
}, {});
}
fromIPLDState (block: BlockProgressInterface, entity: string, stateEntity: any, relations: { [key: string]: any } = {}): any {
const repo = this._conn.getRepository(entity);
const entityFields = repo.metadata.columns;
return this.getStateEntityValues(block, stateEntity, entityFields, relations);
}
getStateEntityValues (block: BlockProgressInterface, stateEntity: any, entityFields: any, relations: { [key: string]: any } = {}): { [key: string]: any } {
const entityValues = entityFields.map((field: any) => {
const { propertyName } = field;
// Get blockHash property for db entry from block instance.
if (propertyName === 'blockHash') {
return block.blockHash;
}
// Get blockNumber property for db entry from block instance.
if (propertyName === 'blockNumber') {
return block.blockNumber;
}
// Get blockNumber as _blockNumber and blockHash as _blockHash from the entityInstance (wasm).
if (['_blockNumber', '_blockHash'].includes(propertyName)) {
return fromStateEntityValues(stateEntity, propertyName.slice(1), relations);
}
return fromStateEntityValues(stateEntity, propertyName, relations);
}, {});
return entityFields.reduce((acc: { [key: string]: any }, field: any, index: number) => {
const { propertyName } = field;
acc[propertyName] = entityValues[index];
return acc;
}, {});
}
async getBlocksAtHeight (height: number, isPruned: boolean) {
const repo: Repository<BlockProgressInterface> = this._conn.getRepository('block_progress');

View File

@ -821,9 +821,7 @@ export const prepareEntityState = (updatedEntity: any, entityName: string, relat
}
if (isArray) {
updatedEntity[relation] = updatedEntity[relation]
.map((id: string) => ({ id }))
.sort((a: any, b: any) => a.id.localeCompare(b.id));
updatedEntity[relation] = updatedEntity[relation].map((id: string) => ({ id }));
} else {
updatedEntity[relation] = { id: updatedEntity[relation] };
}
@ -841,3 +839,20 @@ export const prepareEntityState = (updatedEntity: any, entityName: string, relat
return diffData;
};
export const fromStateEntityValues = (stateEntity: any, propertyName: string, relations: { [key: string]: any } = {}): any => {
// Parse DB data value from state entity data.
if (relations) {
const relation = relations[propertyName];
if (relation) {
if (relation.isArray) {
return stateEntity[propertyName].map((relatedEntity: { id: string }) => relatedEntity.id);
} else {
return stateEntity[propertyName]?.id;
}
}
}
return stateEntity[propertyName];
};

View File

@ -11,7 +11,7 @@ import { ContractInterface, utils, providers } from 'ethers';
import { ResultObject } from '@vulcanize/assemblyscript/lib/loader';
import { EthClient } from '@cerc-io/ipld-eth-client';
import { IndexerInterface, getFullBlock, BlockHeight, ServerConfig, getFullTransaction, QueryOptions } from '@cerc-io/util';
import { getFullBlock, BlockHeight, ServerConfig, getFullTransaction, QueryOptions, IPLDBlockInterface, IPLDIndexerInterface } from '@cerc-io/util';
import { createBlock, createEvent, getSubgraphConfig, resolveEntityFieldConflicts, Transaction } from './utils';
import { Context, GraphData, instantiate } from './loader';
@ -27,7 +27,7 @@ interface DataSource {
export class GraphWatcher {
_database: Database;
_indexer?: IndexerInterface;
_indexer?: IPLDIndexerInterface;
_ethClient: EthClient;
_ethProvider: providers.BaseProvider;
_subgraphPath: string;
@ -253,7 +253,7 @@ export class GraphWatcher {
}
}
setIndexer (indexer: IndexerInterface): void {
setIndexer (indexer: IPLDIndexerInterface): void {
this._indexer = indexer;
}
@ -326,6 +326,30 @@ export class GraphWatcher {
}
}
async updateEntitiesFromIPLDState (ipldBlock: IPLDBlockInterface) {
assert(this._indexer);
const data = this._indexer.getIPLDData(ipldBlock);
for (const [entityName, entities] of Object.entries(data.state)) {
// Get relations for subgraph entity
assert(this._indexer.getRelationsMap);
const relationsMap = this._indexer.getRelationsMap();
const result = Array.from(relationsMap.entries())
.find(([key]) => key.name === entityName);
const relations = result ? result[1] : {};
log(`Updating entities from IPLD state for entity ${entityName}`);
console.time(`time:watcher#GraphWatcher-updateEntitiesFromIPLDState-IPLD-update-entity-${entityName}`);
for (const [id, entityData] of Object.entries(entities as any)) {
const dbData = this._database.fromIPLDState(ipldBlock.block, entityName, entityData, relations);
await this._database.saveEntity(entityName, dbData);
}
console.timeEnd(`time:watcher#GraphWatcher-updateEntitiesFromIPLDState-IPLD-update-entity-${entityName}`);
}
}
/**
* Method to reinstantiate WASM instance for specified dataSource.
* @param dataSourceName

View File

@ -152,7 +152,7 @@ GQL console: http://localhost:3008/graphql
yarn export-state --export-file [export-file-path]
```
* `export-file`: Path of JSON file to which to export the watcher data.
* `export-file`: Path of file to which to export the watcher data.
* In target watcher, run job-runner:
@ -166,15 +166,7 @@ GQL console: http://localhost:3008/graphql
yarn import-state --import-file <import-file-path>
```
* `import-file`: Path of JSON file from which to import the watcher data.
* Run fill:
```bash
yarn fill --start-block <snapshot-block> --end-block <to-block>
```
* `snapshot-block`: Block number at which the watcher state was exported.
* `import-file`: Path of file from which to import the watcher data.
* Run server:

View File

@ -35,12 +35,6 @@ const main = async (): Promise<void> => {
type: 'string',
describe: 'Export file path'
},
createCheckpoint: {
alias: 'c',
type: 'boolean',
describe: 'Create new checkpoint',
default: false
},
verify: {
alias: 'v',
type: 'boolean',
@ -104,10 +98,7 @@ const main = async (): Promise<void> => {
// Create and export checkpoint if checkpointing is on for the contract.
if (contract.checkpoint) {
if (argv.createCheckpoint) {
log(`Creating checkpoint at block ${block.blockNumber}`);
await indexer.createCheckpoint(contract.address, block.blockHash);
}
await indexer.createCheckpoint(contract.address, block.blockHash);
const ipldBlock = await indexer.getLatestIPLDBlock(contract.address, StateKind.Checkpoint, block.blockNumber);
assert(ipldBlock);

View File

@ -85,6 +85,7 @@ export const main = async (): Promise<any> => {
eventWatcher,
config.upstream.ethServer.blockDelayInMilliSecs,
{
prefetch: true,
startBlock: importData.snapshotBlock.blockNumber,
endBlock: importData.snapshotBlock.blockNumber
}
@ -108,12 +109,21 @@ export const main = async (): Promise<any> => {
ipldBlock.data = Buffer.from(codec.encode(ipldBlock.data));
await indexer.saveOrUpdateIPLDBlock(ipldBlock);
ipldBlock = await indexer.saveOrUpdateIPLDBlock(ipldBlock);
await graphWatcher.updateEntitiesFromIPLDState(ipldBlock);
}
// Mark snapshot block as completely processed.
block.isComplete = true;
await indexer.updateBlockProgress(block, block.lastProcessedEventIndex);
await indexer.updateSyncStatusChainHead(block.blockHash, block.blockNumber);
await indexer.updateSyncStatusIndexedBlock(block.blockHash, block.blockNumber);
// The 'diff_staged' and 'init' IPLD blocks are unnecessary as checkpoints have been already created for the snapshot block.
await indexer.removeIPLDBlocks(block.blockNumber, StateKind.Init);
await indexer.removeIPLDBlocks(block.blockNumber, StateKind.DiffStaged);
log(`Import completed for snapshot block at height ${block.blockNumber}`);
};
main().catch(err => {

View File

@ -152,7 +152,7 @@ GQL console: http://localhost:3010/graphql
yarn export-state --export-file [export-file-path]
```
* `export-file`: Path of JSON file to which to export the watcher data.
* `export-file`: Path of file to which to export the watcher data.
* In target watcher, run job-runner:
@ -166,15 +166,7 @@ GQL console: http://localhost:3010/graphql
yarn import-state --import-file <import-file-path>
```
* `import-file`: Path of JSON file from which to import the watcher data.
* Run fill:
```bash
yarn fill --start-block <snapshot-block> --end-block <to-block>
```
* `snapshot-block`: Block number at which the watcher state was exported.
* `import-file`: Path of file from which to import the watcher data.
* Run server:

View File

@ -33,12 +33,6 @@ const main = async (): Promise<void> => {
alias: 'o',
type: 'string',
describe: 'Export file path'
},
createCheckpoint: {
alias: 'c',
type: 'boolean',
describe: 'Create new checkpoint',
default: false
}
}).argv;
@ -89,10 +83,7 @@ const main = async (): Promise<void> => {
// Create and export checkpoint if checkpointing is on for the contract.
if (contract.checkpoint) {
if (argv.createCheckpoint) {
log(`Creating checkpoint at block ${block.blockNumber}`);
await indexer.createCheckpoint(contract.address, block.blockHash);
}
await indexer.createCheckpoint(contract.address, block.blockHash);
const ipldBlock = await indexer.getLatestIPLDBlock(contract.address, StateKind.Checkpoint, block.blockNumber);
assert(ipldBlock);

View File

@ -76,6 +76,7 @@ export const main = async (): Promise<any> => {
eventWatcher,
config.upstream.ethServer.blockDelayInMilliSecs,
{
prefetch: true,
startBlock: importData.snapshotBlock.blockNumber,
endBlock: importData.snapshotBlock.blockNumber
}
@ -102,9 +103,17 @@ export const main = async (): Promise<any> => {
await indexer.saveOrUpdateIPLDBlock(ipldBlock);
}
// Mark snapshot block as completely processed.
block.isComplete = true;
await indexer.updateBlockProgress(block, block.lastProcessedEventIndex);
await indexer.updateSyncStatusChainHead(block.blockHash, block.blockNumber);
await indexer.updateSyncStatusIndexedBlock(block.blockHash, block.blockNumber);
// The 'diff_staged' and 'init' IPLD blocks are unnecessary as checkpoints have been already created for the snapshot block.
await indexer.removeIPLDBlocks(block.blockNumber, StateKind.Init);
await indexer.removeIPLDBlocks(block.blockNumber, StateKind.DiffStaged);
log(`Import completed for snapshot block at height ${block.blockNumber}`);
};
main().catch(err => {

View File

@ -12,16 +12,18 @@
"ipfs-http-client": "^56.0.3",
"lodash": "^4.17.21",
"multiformats": "^9.4.8",
"pg": "^8.5.1",
"pg-boss": "^6.1.0",
"prom-client": "^14.0.1",
"toml": "^3.0.0"
},
"devDependencies": {
"@types/fs-extra": "^9.0.11",
"@typescript-eslint/eslint-plugin": "^4.25.0",
"@typescript-eslint/parser": "^4.25.0",
"@cerc-io/cache": "^0.1.0",
"@cerc-io/ipld-eth-client": "^0.1.0",
"@types/fs-extra": "^9.0.11",
"@types/pg": "^8.6.5",
"@typescript-eslint/eslint-plugin": "^4.25.0",
"@typescript-eslint/parser": "^4.25.0",
"apollo-server-express": "^2.25.0",
"decimal.js": "^10.3.1",
"eslint": "^7.27.0",

View File

@ -2,14 +2,30 @@
// Copyright 2021 Vulcanize, Inc.
//
import { Between, FindConditions, Repository } from 'typeorm';
import { Between, ConnectionOptions, FindConditions, Repository } from 'typeorm';
import assert from 'assert';
import { Pool } from 'pg';
import { IPLDBlockInterface, IpldStatusInterface, StateKind } from './types';
import { Database } from './database';
import { MAX_REORG_DEPTH } from './constants';
export class IPLDDatabase extends Database {
_pgPool: Pool
constructor (config: ConnectionOptions) {
super(config);
assert(config.type === 'postgres');
this._pgPool = new Pool({
user: config.username,
host: config.host,
database: config.database,
password: config.password,
port: config.port
});
}
async getLatestIPLDBlock (repo: Repository<IPLDBlockInterface>, contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise<IPLDBlockInterface | undefined> {
let queryBuilder = repo.createQueryBuilder('ipld_block')
.leftJoinAndSelect('ipld_block.block', 'block')
@ -150,7 +166,39 @@ export class IPLDDatabase extends Database {
}
async saveOrUpdateIPLDBlock (repo: Repository<IPLDBlockInterface>, ipldBlock: IPLDBlockInterface): Promise<IPLDBlockInterface> {
return repo.save(ipldBlock);
let updatedData: {[key: string]: any};
console.time('time:ipld-database#saveOrUpdateIPLDBlock-DB-query');
if (ipldBlock.id) {
// Using pg query as workaround for typeorm memory issue when saving checkpoint with large sized data.
const { rows } = await this._pgPool.query(`
UPDATE ipld_block
SET block_id = $1, contract_address = $2, cid = $3, kind = $4, data = $5
WHERE id = $6
RETURNING *
`, [ipldBlock.block.id, ipldBlock.contractAddress, ipldBlock.cid, ipldBlock.kind, ipldBlock.data, ipldBlock.id]);
updatedData = rows[0];
} else {
const { rows } = await this._pgPool.query(`
INSERT INTO ipld_block(block_id, contract_address, cid, kind, data)
VALUES($1, $2, $3, $4, $5)
RETURNING *
`, [ipldBlock.block.id, ipldBlock.contractAddress, ipldBlock.cid, ipldBlock.kind, ipldBlock.data]);
updatedData = rows[0];
}
console.timeEnd('time:ipld-database#saveOrUpdateIPLDBlock-DB-query');
assert(updatedData);
return {
block: ipldBlock.block,
contractAddress: updatedData.contract_address,
cid: updatedData.cid,
kind: updatedData.kind,
data: updatedData.data,
id: updatedData.id
};
}
async removeIPLDBlocks (repo: Repository<IPLDBlockInterface>, blockNumber: number, kind: string): Promise<void> {

View File

@ -348,6 +348,7 @@ export class IPLDIndexer extends Indexer {
}
async prepareIPLDBlock (block: BlockProgressInterface, contractAddress: string, data: any, kind: StateKind):Promise<any> {
console.time('time:ipld-indexer#prepareIPLDBlock');
let ipldBlock: IPLDBlockInterface;
// Get IPLD status for the contract.
@ -415,6 +416,7 @@ export class IPLDIndexer extends Indexer {
data: Buffer.from(bytes)
});
console.timeEnd('time:ipld-indexer#prepareIPLDBlock');
return ipldBlock;
}

View File

@ -118,6 +118,7 @@ export interface IndexerInterface {
export interface IPLDIndexerInterface extends IndexerInterface {
updateIPLDStatusMap (address: string, ipldStatus: IpldStatus): Promise<void>
getIPLDData (ipldBlock: IPLDBlockInterface): any
}
export interface EventWatcherInterface {

View File

@ -2571,6 +2571,15 @@
dependencies:
"@types/node" "*"
"@types/pg@^8.6.5":
version "8.6.5"
resolved "https://registry.yarnpkg.com/@types/pg/-/pg-8.6.5.tgz#2dce9cb468a6a5e0f1296a59aea3ac75dd27b702"
integrity sha512-tOkGtAqRVkHa/PVZicq67zuujI4Oorfglsr2IbKofDwBSysnaqSx7W1mDqFqdkGE6Fbgh+PZAl0r/BWON/mozw==
dependencies:
"@types/node" "*"
pg-protocol "*"
pg-types "^2.2.0"
"@types/pluralize@^0.0.29":
version "0.0.29"
resolved "https://registry.yarnpkg.com/@types/pluralize/-/pluralize-0.0.29.tgz#6ffa33ed1fc8813c469b859681d09707eb40d03c"
@ -11391,12 +11400,12 @@ pg-pool@^3.3.0:
resolved "https://registry.yarnpkg.com/pg-pool/-/pg-pool-3.3.0.tgz#12d5c7f65ea18a6e99ca9811bd18129071e562fc"
integrity sha512-0O5huCql8/D6PIRFAlmccjphLYWC+JIzvUhSzXSpGaf+tjTZc4nn+Lr7mLXBbFJfvwbP0ywDv73EiaBsxn7zdg==
pg-protocol@^1.5.0:
pg-protocol@*, pg-protocol@^1.5.0:
version "1.5.0"
resolved "https://registry.yarnpkg.com/pg-protocol/-/pg-protocol-1.5.0.tgz#b5dd452257314565e2d54ab3c132adc46565a6a0"
integrity sha512-muRttij7H8TqRNu/DxrAJQITO4Ac7RmX3Klyr/9mJEOBeIpgnF8f9jAfRz5d3XwQZl5qBjF9gLsUtMPJE0vezQ==
pg-types@^2.1.0:
pg-types@^2.1.0, pg-types@^2.2.0:
version "2.2.0"
resolved "https://registry.yarnpkg.com/pg-types/-/pg-types-2.2.0.tgz#2d0250d636454f7cfa3b6ae0382fdfa8063254a3"
integrity sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA==