Accommodate GQL optimizations in codegen (#254)

* Prune optional methods in indexer and database interfaces

* Implement GQL optimization changes in codegen

* Fix graph-node test indexer

* Add demos to codegen package
This commit is contained in:
nikugogoi 2022-11-23 17:42:25 +05:30 committed by GitHub
parent 0b33cc98c9
commit cc28474537
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 758 additions and 57 deletions

View File

@ -61,7 +61,6 @@ export class ResetStateCmd {
const { blockNumber } = this._argv;
try {
// Delete all State entries after the given block
assert(this._database.removeStatesAfterBlock);
await this._database.removeStatesAfterBlock(dbTx, blockNumber);
// Reset the stateSyncStatus.

View File

@ -18,6 +18,14 @@
## Run
Follow the steps below or follow the demos:
* [Subgraph watcher](./subgraph-demo.md)
* [Non subgraph watcher](./non-subgraph-demo.md)
Steps:
* Create a `.yaml` config file in the following format for generating a watcher:
```yaml

View File

@ -0,0 +1,348 @@
# Subgraph watcher demo
* Clone the [stack-orchestrator](https://github.com/vulcanize/stack-orchestrator) repo.
```bash
git clone https://github.com/vulcanize/stack-orchestrator
```
* Create a `config.sh` file.
```bash
cd stack-orchestrator/helper-scripts
./create-config.sh
```
* Setup the required repositories.
```bash
./setup-repositories.sh -p ssh
```
* Checkout [v4 release](https://github.com/cerc-io/go-ethereum/releases/tag/v1.10.26-statediff-4.2.2-alpha) in go-ethereum repo. The path for go-ethereum is specified by `vulcanize_go_ethereum` variable in `config.sh` file created in stack-orchestrator repo.
```bash
# In go-ethereum repo.
git checkout v1.10.26-statediff-4.2.2-alpha
```
* Update to use latest images for ipld-eth-db and ipld-eth-server
* In [docker/latest/docker-compose-db-sharding.yml](https://github.com/vulcanize/stack-orchestrator/blob/main/docker/latest/docker-compose-db-sharding.yml) update image version
```yml
services:
migrations:
image: git.vdb.to/cerc-io/ipld-eth-db/ipld-eth-db:v4.2.3-alpha
```
* In [docker/latest/docker-compose-ipld-eth-server.yml](https://github.com/vulcanize/stack-orchestrator/blob/main/docker/latest/docker-compose-ipld-eth-server.yml) update image version
```yml
services:
ipld-eth-server:
image: git.vdb.to/cerc-io/ipld-eth-server/ipld-eth-server:v4.2.3-alpha
```
* To run the stack-orchestrator, the docker-compose version used is:
```bash
docker-compose version
# docker-compose version 1.29.2, build 5becea4c
```
* Run the stack-orchestrator
```bash
cd stack-orchestrator/helper-scripts
```
```bash
./wrapper.sh -f true \
-m true \
-s v4 \
-l latest \
-v remove \
-p ../config.sh
```
* In [packages/codegen](./), create a `config.yaml` file:
```yaml
# Config to generate demo-erc721-watcher using codegen.
# Contracts to watch (required).
contracts:
# Contract name.
- name: ERC721
# Contract file path or an url.
path: ../../node_modules/@openzeppelin/contracts/token/ERC721/ERC721.sol
# Contract kind
kind: ERC721
# Output folder path (logs output using `stdout` if not provided).
outputFolder: ../demo-erc721-watcher
# Code generation mode [eth_call | storage | all | none] (default: none).
mode: all
# Kind of watcher [lazy | active] (default: active).
kind: active
# Watcher server port (default: 3008).
port: 3009
# Flatten the input contract file(s) [true | false] (default: true).
flatten: true
```
* Run codegen to generate watcher:
```bash
yarn codegen --config-file ./config.yaml
```
The watcher should be generated in `packages/demo-erc721-watcher`
* Create a postgres12 database for the watcher:
```bash
sudo su - postgres
# If database already exists
# dropdb demo-erc721-watcher
createdb demo-erc721-watcher
```
* Create database for the job queue and enable the `pgcrypto` extension on them (https://github.com/timgit/pg-boss/blob/master/docs/usage.md#intro):
```bash
# If database already exists
# dropdb demo-erc721-watcher-job-queue
createdb demo-erc721-watcher-job-queue
```
```
postgres@tesla:~$ psql -U postgres -h localhost demo-erc721-watcher-job-queue
Password for user postgres:
psql (12.7 (Ubuntu 12.7-1.pgdg18.04+1))
SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, bits: 256, compression: off)
Type "help" for help.
demo-erc721-watcher-job-queue=# CREATE EXTENSION pgcrypto;
CREATE EXTENSION
demo-erc721-watcher-job-queue=# exit
```
## Custom hooks:
For generating default state for `ERC721` from the indexer methods, replace the `handleEvent` hook in `demo-erc721-watcher/src/hooks.ts` file with:
```ts
export async function handleEvent (indexer: Indexer, eventData: ResultEvent): Promise<void> {
assert(indexer);
assert(eventData);
// Perform indexing based on the type of event.
switch (eventData.event.__typename) {
case 'TransferEvent': {
// Get event fields from eventData.
const { from, to, tokenId } = eventData.event;
// Update balance entry for the sender in database.
if (from !== '0x0000000000000000000000000000000000000000') {
await indexer._balances(eventData.block.hash, eventData.contract, from, true);
}
// Update balance entry for the receiver in database.
if (to !== '0x0000000000000000000000000000000000000000') {
await indexer._balances(eventData.block.hash, eventData.contract, to, true);
}
// Update owner for the tokenId in database.
await indexer._owners(eventData.block.hash, eventData.contract, tokenId, true);
break;
}
case 'ApprovalEvent': {
// Get event fields from eventData.
const { tokenId } = eventData.event;
// Update tokenApprovals for the tokenId in database.
await indexer._tokenApprovals(eventData.block.hash, eventData.contract, tokenId, true);
break;
}
case 'ApprovalForAllEvent': {
// Get event fields from eventData.
const { owner, operator } = eventData.event;
// Update operatorApprovals for the tokenId in database.
await indexer._operatorApprovals(eventData.block.hash, eventData.contract, owner, operator, true);
break;
}
}
}
```
Here, the `diff` is passed as true to indexer methods to store default state.
* In `watcher-ts` repo, follow the instructions in [Setup](../../README.md#setup) for installing and building packages.
```bash
# After setup
yarn && yarn build
```
* In `packages/demo-erc721-watcher`, run the job-runner:
```bash
yarn job-runner
```
* Run the watcher:
```bash
yarn server
```
## Operations
Run the following in [packages/erc721-watcher](../erc721-watcher/):
* Get the signer account address and export to a shell variable:
```bash
yarn account
```
```bash
export SIGNER_ADDRESS="<SIGNER_ADDRESS>"
```
* Connect MetaMask to `http://localhost:8545` (with chain ID `99`)
* Add a second account to Metamask and export the account address to a shell variable for later use:
```bash
export RECIPIENT_ADDRESS="<RECIPIENT_ADDRESS>"
```
* Deploy token:
```bash
yarn nft:deploy
```
* Set the returned address to the variable `$NFT_ADDRESS`:
```bash
NFT_ADDRESS=<NFT_ADDRESS>
```
* Run the following GQL mutation in generated watcher graphql endpoint http://127.0.0.1:3009/graphql
```graphql
mutation {
watchContract(
address: "NFT_ADDRESS"
kind: "ERC721"
checkpoint: true
)
}
```
* Run the following GQL subscription in generated watcher graphql endpoint:
```graphql
subscription {
onEvent {
event {
__typename
... on TransferEvent {
from
to
tokenId
},
... on ApprovalEvent {
owner
approved
tokenId
}
},
block {
number
hash
}
}
}
```
* Mint token:
```bash
yarn nft:mint --nft $NFT_ADDRESS --to $SIGNER_ADDRESS --token-id 1
```
* A `Transfer` event to `$SIGNER_ADDRESS` shall be visible in the subscription at endpoint.
* An auto-generated `diff` entry `State` should be added with `parent` cid pointing to the initial checkpoint `State`.
* Run the `getState` query at the endpoint to get the latest `State` for `NFT_ADDRESS`:
```graphql
query {
getState (
blockHash: "EVENT_BLOCK_HASH"
contractAddress: "NFT_ADDRESS"
# kind: "checkpoint"
kind: "diff"
) {
cid
block {
cid
hash
number
timestamp
parentHash
}
contractAddress
data
}
}
```
* Transfer token:
```bash
yarn nft:transfer --nft $NFT_ADDRESS --from $SIGNER_ADDRESS --to $RECIPIENT_ADDRESS --token-id 1
```
* An `Approval` event for `ZERO_ADDRESS` shall be visible in the subscription at endpoint.
* A `Transfer` event to `$RECIPIENT_ADDRESS` shall be visible in the subscription at endpoint.
* An auto-generated `diff` entry `State` should be added with `parent` cid pointing to the previous `State`.
* Run the `getState` query again at the endpoint with event blockHash.
* Get the latest `blockHash`:
```bash
yarn block:latest
```
* In `packages/demo-erc721-watcher`, create a checkpoint using CLI:
```bash
yarn checkpoint create --address $NFT_ADDRESS
```
* Run the `getState` query again with the output blockHash and kind `checkpoint` at the endpoint.
* The latest checkpoint should have the aggregate of state diffs since the last checkpoint.
* The `State` entries can be seen in `pg-admin` in table `state`.

View File

@ -253,6 +253,20 @@ export class Entity {
// Add subgraph entity specific columns.
entityObject = this._addSubgraphColumns(subgraphTypeDefs, entityObject, def);
// Add is_pruned column.
entityObject.columns.push({
name: 'isPruned',
pgType: 'boolean',
tsType: 'boolean',
columnType: 'Column',
columnOptions: [
{
option: 'default',
value: false
}
]
});
// Add decimalTransformer column option if required.
this._addDecimalTransformerOption(entityObject);

View File

@ -40,13 +40,12 @@ export const handler = async (argv: any): Promise<void> => {
await db.init();
{{#if (subgraphPath)}}
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
{{/if}}
{{/if}}
const jobQueueConfig = config.jobQueue;
assert(jobQueueConfig, 'Missing job queue config');

View File

@ -9,7 +9,7 @@ import assert from 'assert';
import { getConfig, initClients, JobQueue, Config, verifyCheckpointData } from '@cerc-io/util';
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import { Database } from '../../database';
import { Database, ENTITY_TO_LATEST_ENTITY_MAP, ENTITY_QUERY_TYPE_MAP } from '../../database';
import { Indexer } from '../../indexer';
const log = debug('vulcanize:checkpoint-verify');
@ -34,7 +34,7 @@ export const handler = async (argv: any): Promise<void> => {
const db = new Database(config.database);
await db.init();
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -7,6 +7,9 @@ import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner
import path from 'path';
import { Database as BaseDatabase, DatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util';
{{#if (subgraphPath)}}
import { ENTITY_QUERY_TYPE } from '@cerc-io/graph-node';
{{/if}}
import { Contract } from './entity/Contract';
import { Event } from './entity/Event';
@ -21,11 +24,23 @@ import { {{query.entityName}} } from './entity/{{query.entityName}}';
import { {{subgraphEntity.className}} } from './entity/{{subgraphEntity.className}}';
{{/each}}
export const ENTITIES = [
{{~#each queries as | query |}}{{query.entityName}}, {{/each}}
{{#if (subgraphPath)}}
export const SUBGRAPH_ENTITIES = new Set([
{{~#each subgraphEntities as | subgraphEntity |}}{{subgraphEntity.className}}
{{~#unless @last}}, {{/unless}}
{{~/each}}]);
{{/if}}
export const ENTITIES = [
{{~#if (subgraphPath)}}...SUBGRAPH_ENTITIES, {{/if}}
{{~#each queries as | query |}}{{query.entityName}}
{{~#unless @last}}, {{/unless}}
{{~/each}}];
{{#if (subgraphPath)}}
// Map: Entity to suitable query type.
export const ENTITY_QUERY_TYPE_MAP = new Map<new() => any, ENTITY_QUERY_TYPE>([]);
export const ENTITY_TO_LATEST_ENTITY_MAP: Map<any, any> = new Map();
{{/if}}
export class Database implements DatabaseInterface {
_config: ConnectionOptions;
@ -38,6 +53,9 @@ export class Database implements DatabaseInterface {
this._config = {
...config,
{{#if (subgraphPath)}}
subscribers: [path.join(__dirname, 'entity/Subscriber.*')],
{{/if}}
entities: [path.join(__dirname, 'entity/*')]
};

View File

@ -24,7 +24,7 @@ import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
{{/if}}
import * as codec from '@ipld/dag-cbor';
import { Database } from '../database';
import { Database{{#if (subgraphPath)}}, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP{{/if}} } from '../database';
import { Indexer } from '../indexer';
const log = debug('vulcanize:export-state');
@ -59,7 +59,7 @@ const main = async (): Promise<void> => {
await db.init();
{{#if (subgraphPath)}}
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -7,14 +7,14 @@ import 'reflect-metadata';
import yargs from 'yargs';
import { hideBin } from 'yargs/helpers';
import debug from 'debug';
import { PubSub } from 'graphql-subscriptions';
import { PubSub } from 'graphql-subscriptions';
import { Config, getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, initClients } from '@cerc-io/util';
{{#if (subgraphPath)}}
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
{{/if}}
import { Database } from './database';
import { Database{{#if (subgraphPath)}}, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP{{/if}} } from './database';
import { Indexer } from './indexer';
import { EventWatcher } from './events';
{{#if (subgraphPath)}}
@ -72,7 +72,7 @@ export const main = async (): Promise<any> => {
await db.init();
{{#if (subgraphPath)}}
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -17,7 +17,7 @@ import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
{{/if}}
import * as codec from '@ipld/dag-cbor';
import { Database } from '../database';
import { Database{{#if (subgraphPath)}}, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP{{/if}} } from '../database';
import { Indexer } from '../indexer';
import { EventWatcher } from '../events';
import { State } from '../entity/State';
@ -50,7 +50,7 @@ export const main = async (): Promise<any> => {
await db.init();
{{#if (subgraphPath)}}
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -12,7 +12,7 @@ import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, indexBlo
import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
{{/if}}
import { Database } from '../database';
import { Database{{#if (subgraphPath)}}, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP{{/if}} } from '../database';
import { Indexer } from '../indexer';
const log = debug('vulcanize:index-block');
@ -44,7 +44,7 @@ const main = async (): Promise<void> => {
await db.init();
{{#if (subgraphPath)}}
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -41,7 +41,7 @@ import { GraphWatcher } from '@cerc-io/graph-node';
{{#each contracts as | contract |}}
import {{contract.contractName}}Artifacts from './artifacts/{{contract.contractName}}.json';
{{/each}}
import { Database, ENTITIES } from './database';
import { Database, ENTITIES{{#if (subgraphPath)}}, SUBGRAPH_ENTITIES{{/if}} } from './database';
import { createInitialState, handleEvent, createStateDiff, createStateCheckpoint } from './hooks';
import { Contract } from './entity/Contract';
import { Event } from './entity/Event';
@ -104,17 +104,17 @@ export class Indexer implements IndexerInterface {
this._abiMap = new Map();
this._storageLayoutMap = new Map();
this._contractMap = new Map();
{{#each contracts as | contract |}}
const {
abi: {{contract.contractName}}ABI,
{{#if contract.contractStorageLayout}}
storageLayout: {{contract.contractName}}StorageLayout
{{/if}}
} = {{contract.contractName}}Artifacts;
{{/each}}
{{#each contracts as | contract |}}
assert({{contract.contractName}}ABI);
this._abiMap.set(KIND_{{capitalize contract.contractName}}, {{contract.contractName}}ABI);
{{#if contract.contractStorageLayout}}
@ -122,9 +122,9 @@ export class Indexer implements IndexerInterface {
this._storageLayoutMap.set(KIND_{{capitalize contract.contractName}}, {{contract.contractName}}StorageLayout);
{{/if}}
this._contractMap.set(KIND_{{capitalize contract.contractName}}, new ethers.utils.Interface({{contract.contractName}}ABI));
{{/each}}
{{#if (subgraphPath)}}
this._entityTypesMap = new Map();
this._populateEntityTypesMap();
@ -259,8 +259,10 @@ export class Indexer implements IndexerInterface {
}
async processCanonicalBlock (blockHash: string, blockNumber: number): Promise<void> {
console.time('time:indexer#processCanonicalBlock-finalize_auto_diffs');
// Finalize staged diff blocks if any.
await this._baseIndexer.finalizeDiffStaged(blockHash);
console.timeEnd('time:indexer#processCanonicalBlock-finalize_auto_diffs');
// Call custom stateDiff hook.
await createStateDiff(this, blockHash);
@ -275,7 +277,9 @@ export class Indexer implements IndexerInterface {
const checkpointInterval = this._serverConfig.checkpointInterval;
if (checkpointInterval <= 0) return;
console.time('time:indexer#processCheckpoint-checkpoint');
await this._baseIndexer.processCheckpoint(this, blockHash, checkpointInterval);
console.timeEnd('time:indexer#processCheckpoint-checkpoint');
}
async processCLICheckpoint (contractAddress: string, blockHash?: string): Promise<string | undefined> {
@ -308,7 +312,9 @@ export class Indexer implements IndexerInterface {
// Method used to create auto diffs (diff_staged).
async createDiffStaged (contractAddress: string, blockHash: string, data: any): Promise<void> {
console.time('time:indexer#createDiffStaged-auto_diff');
await this._baseIndexer.createDiffStaged(contractAddress, blockHash, data);
console.timeEnd('time:indexer#createDiffStaged-auto_diff');
}
// Method to be used by createStateDiff hook.
@ -361,13 +367,25 @@ export class Indexer implements IndexerInterface {
return data;
}
async getSubgraphEntities<Entity> (
entity: new () => Entity,
block: BlockHeight,
where: { [key: string]: any } = {},
queryOptions: QueryOptions = {},
selections: ReadonlyArray<SelectionNode> = []
): Promise<any[]> {
return this._graphWatcher.getEntities(entity, this._relationsMap, block, where, queryOptions, selections);
}
{{/if}}
async triggerIndexingOnEvent (event: Event): Promise<void> {
const resultEvent = this.getResultEvent(event);
{{#if (subgraphPath)}}
console.time('time:indexer#processEvent-mapping_code');
// Call subgraph handler for event.
await this._graphWatcher.handleEvent(resultEvent);
console.timeEnd('time:indexer#processEvent-mapping_code');
{{/if}}
// Call custom hook function for indexing on event.
@ -380,8 +398,10 @@ export class Indexer implements IndexerInterface {
}
async processBlock (blockProgress: BlockProgress): Promise<void> {
console.time('time:indexer#processBlock-init_state');
// Call a function to create initial state for contracts.
await this._baseIndexer.createInit(this, blockProgress.blockHash, blockProgress.blockNumber);
console.timeEnd('time:indexer#processBlock-init_state');
{{#if (subgraphPath)}}
this._graphWatcher.updateEntityCacheFrothyBlocks(blockProgress);
@ -390,11 +410,15 @@ export class Indexer implements IndexerInterface {
{{#if (subgraphPath)}}
async processBlockAfterEvents (blockHash: string, blockNumber: number): Promise<void> {
console.time('time:indexer#processBlockAfterEvents-mapping_code');
// Call subgraph handler for block.
await this._graphWatcher.handleBlock(blockHash, blockNumber);
console.timeEnd('time:indexer#processBlockAfterEvents-mapping_code');
console.time('time:indexer#processBlockAfterEvents-dump_subgraph_state');
// Persist subgraph state to the DB.
await this.dumpSubgraphState(blockHash);
console.timeEnd('time:indexer#processBlockAfterEvents-dump_subgraph_state');
}
{{/if}}
@ -557,15 +581,23 @@ export class Indexer implements IndexerInterface {
}
async markBlocksAsPruned (blocks: BlockProgress[]): Promise<void> {
return this._baseIndexer.markBlocksAsPruned(blocks);
await this._baseIndexer.markBlocksAsPruned(blocks);
{{#if (subgraphPath)}}
await this._graphWatcher.pruneEntities(FrothyEntity, blocks, SUBGRAPH_ENTITIES);
{{/if}}
}
{{#if (subgraphPath)}}
async pruneFrothyEntities (blockNumber: number): Promise<void> {
await this._graphWatcher.pruneFrothyEntities(FrothyEntity, blockNumber);
}
{{/if}}
async resetLatestEntities (blockNumber: number): Promise<void> {
await this._graphWatcher.resetLatestEntities(blockNumber);
}
{{/if}}
async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise<BlockProgress> {
return this._baseIndexer.updateBlockProgress(block, lastProcessedEventIndex);
}
@ -581,9 +613,13 @@ export class Indexer implements IndexerInterface {
const entities = [...ENTITIES];
{{/if}}
await this._baseIndexer.resetWatcherToBlock(blockNumber, entities);
}
{{#if (subgraphPath)}}
await this.resetLatestEntities(blockNumber);
{{/if}}
}
{{#if (subgraphPath)}}
getEntityTypesMap (): Map<string, { [key: string]: string }> {
return this._entityTypesMap;
}

View File

@ -28,7 +28,7 @@ import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
{{/if}}
import { Indexer } from './indexer';
import { Database } from './database';
import { Database{{#if (subgraphPath)}}, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP{{/if}} } from './database';
const log = debug('vulcanize:job-runner');
@ -97,7 +97,7 @@ export const main = async (): Promise<any> => {
await db.init();
{{#if (subgraphPath)}}
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -12,10 +12,12 @@ import { ValueResult, BlockHeight, gqlTotalQueryCount, gqlQueryCount, jsonBigInt
import { Indexer } from './indexer';
import { EventWatcher } from './events';
{{#if (subgraphPath)}}
{{#each subgraphQueries as | query |}}
import { {{query.entityName}} } from './entity/{{query.entityName}}';
{{/each}}
{{/if}}
const log = debug('vulcanize:resolver');

View File

@ -20,7 +20,7 @@ import { GraphWatcher, Database as GraphDatabase } from '@cerc-io/graph-node';
import { createResolvers } from './resolvers';
import { Indexer } from './indexer';
import { Database } from './database';
import { Database{{#if (subgraphPath)}}, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP{{/if}} } from './database';
import { EventWatcher } from './events';
const log = debug('vulcanize:server');
@ -45,7 +45,7 @@ export const main = async (): Promise<any> => {
await db.init();
{{#if (subgraphPath)}}
const graphDb = new GraphDatabase(config.server, db.baseDatabase);
const graphDb = new GraphDatabase(config.server, db.baseDatabase, ENTITY_QUERY_TYPE_MAP, ENTITY_TO_LATEST_ENTITY_MAP);
await graphDb.init();
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);

View File

@ -5,16 +5,16 @@
import { EventSubscriber, EntitySubscriberInterface, InsertEvent, UpdateEvent } from 'typeorm';
import { FrothyEntity } from './FrothyEntity';
import { ENTITIES } from '../database';
import { ENTITY_TO_LATEST_ENTITY_MAP, SUBGRAPH_ENTITIES } from '../database';
import { afterEntityInsertOrUpdate } from '@cerc-io/graph-node';
@EventSubscriber()
export class EntitySubscriber implements EntitySubscriberInterface {
async afterInsert (event: InsertEvent<any>): Promise<void> {
await afterEntityInsertOrUpdate(FrothyEntity, ENTITIES, event);
await afterEntityInsertOrUpdate(FrothyEntity, SUBGRAPH_ENTITIES, event, ENTITY_TO_LATEST_ENTITY_MAP);
}
async afterUpdate (event: UpdateEvent<any>): Promise<void> {
await afterEntityInsertOrUpdate(FrothyEntity, ENTITIES, event);
await afterEntityInsertOrUpdate(FrothyEntity, SUBGRAPH_ENTITIES, event, ENTITY_TO_LATEST_ENTITY_MAP);
}
}

View File

@ -1,8 +1,8 @@
//
// Copyright 2021 Vulcanize, Inc.
//
{{#each types as | type |}}
export enum {{type.name}} {
{{#each type.values as | value |}}
{{value}} = '{{value}}',

View File

@ -0,0 +1,270 @@
# Subgraph watcher demo
* Clone the [stack-orchestrator](https://github.com/vulcanize/stack-orchestrator) repo.
```bash
git clone https://github.com/vulcanize/stack-orchestrator
```
* Create a `config.sh` file.
```bash
cd stack-orchestrator/helper-scripts
./create-config.sh
```
* Setup the required repositories.
```bash
./setup-repositories.sh -p ssh
```
* Checkout [v4 release](https://github.com/cerc-io/go-ethereum/releases/tag/v1.10.26-statediff-4.2.2-alpha) in go-ethereum repo. The path for go-ethereum is specified by `vulcanize_go_ethereum` variable in `config.sh` file created in stack-orchestrator repo.
```bash
# In go-ethereum repo.
git checkout v1.10.26-statediff-4.2.2-alpha
```
* Update to use latest images for ipld-eth-db and ipld-eth-server
* In [docker/latest/docker-compose-db-sharding.yml](https://github.com/vulcanize/stack-orchestrator/blob/main/docker/latest/docker-compose-db-sharding.yml) update image version
```yml
services:
migrations:
image: git.vdb.to/cerc-io/ipld-eth-db/ipld-eth-db:v4.2.3-alpha
```
* In [docker/latest/docker-compose-ipld-eth-server.yml](https://github.com/vulcanize/stack-orchestrator/blob/main/docker/latest/docker-compose-ipld-eth-server.yml) update image version
```yml
services:
ipld-eth-server:
image: git.vdb.to/cerc-io/ipld-eth-server/ipld-eth-server:v4.2.3-alpha
```
* To run the stack-orchestrator, the docker-compose version used is:
```bash
docker-compose version
# docker-compose version 1.29.2, build 5becea4c
```
* Run the stack-orchestrator
```bash
cd stack-orchestrator/helper-scripts
```
```bash
./wrapper.sh -f true \
-m true \
-s v4 \
-l latest \
-v remove \
-p ../config.sh
```
* In watcher-ts [packages/graph-node](../graph-node/), deploy an `Example` contract:
```bash
yarn example:deploy
```
* Set the returned address to the variable `$EXAMPLE_ADDRESS`:
```bash
export EXAMPLE_ADDRESS=<EXAMPLE_ADDRESS>
```
* In [packages/graph-node/test/subgraph/example1/subgraph.yaml](../graph-node/test/subgraph/example1/subgraph.yaml), set the source address for `Example1` datasource to the `EXAMPLE_ADDRESS`. Then in [packages/graph-node](../graph-node/) run:
```bash
yarn build:example
```
* In [packages/codegen](./), create a `config.yaml` file:
```yaml
# Example config.yaml
# Contracts to watch (required).
# Can pass empty array ([]) when using subgraphPath.
contracts:
# Contract name.
- name: Example
# Contract file path or an url.
path: ../graph-node/test/contracts/Example.sol
# Contract kind (should match that in {subgraphPath}/subgraph.yaml if subgraphPath provided)
kind: Example1
# Output folder path (logs output using `stdout` if not provided).
outputFolder: ../test-watcher
# Code generation mode [eth_call | storage | all | none] (default: none).
mode: none
# Kind of watcher [lazy | active] (default: active).
kind: active
# Watcher server port (default: 3008).
port: 3008
# Flatten the input contract file(s) [true | false] (default: true).
flatten: true
# Path to the subgraph build (optional).
# Can set empty contracts array when using subgraphPath.
subgraphPath: ../graph-node/test/subgraph/example1/build
```
* Run codegen to generate watcher:
```bash
yarn codegen --config-file ./config.yaml
```
The watcher should be generated in `packages/test-watcher`
* Create a postgres12 database for the watcher:
```bash
sudo su - postgres
# If database already exists
# dropdb test-watcher
createdb test-watcher
```
* Create database for the job queue and enable the `pgcrypto` extension on them (https://github.com/timgit/pg-boss/blob/master/docs/usage.md#intro):
```bash
# If database already exists
# dropdb test-watcher-job-queue
createdb test-watcher-job-queue
```
```
postgres@tesla:~$ psql -U postgres -h localhost test-watcher-job-queue
Password for user postgres:
psql (12.7 (Ubuntu 12.7-1.pgdg18.04+1))
SSL connection (protocol: TLSv1.3, cipher: TLS_AES_256_GCM_SHA384, bits: 256, compression: off)
Type "help" for help.
test-watcher-job-queue=# CREATE EXTENSION pgcrypto;
CREATE EXTENSION
test-watcher-job-queue=# exit
```
* In `watcher-ts` repo, follow the instructions in [Setup](../../README.md#setup) for installing and building packages.
```bash
# After setup
yarn && yarn build
```
* In `packages/test-watcher`, run the job-runner:
```bash
yarn job-runner
```
* Run the watcher:
```bash
yarn server
```
## Operations
* Run the following GQL subscription at the [graphql endpoint](http://localhost:3008/graphql):
```graphql
subscription {
onEvent {
event {
__typename
... on TestEvent {
param1
param2
},
},
block {
number
hash
}
}
}
```
* In [packages/graph-node](../graph-node/), trigger the `Test` event by calling a example contract method:
```bash
yarn example:test --address $EXAMPLE_ADDRESS
```
* A `Test` event shall be visible in the subscription at endpoint.
* The subgraph entity `Category` should be updated in the database.
* An auto-generated `diff-staged` entry `State` should be added.
* Run the query for entity in at the endpoint:
```graphql
query {
category(
block: { hash: "EVENT_BLOCK_HASH" },
id: "1"
) {
__typename
id
count
name
}
}
```
* Run the `getState` query at the endpoint to get the latest `State` for `EXAMPLE_ADDRESS`:
```graphql
query {
getState (
blockHash: "EVENT_BLOCK_HASH"
contractAddress: "EXAMPLE_ADDRESS"
# kind: "checkpoint"
# kind: "diff"
kind: "diff_staged"
) {
cid
block {
cid
hash
number
timestamp
parentHash
}
contractAddress
data
}
}
```
* `diff` states get created corresponding to the `diff_staged` states when their respective blocks reach the pruned region.
* In `packages/test-watcher`:
* After the `diff` state has been created, create a `checkpoint`:
```bash
yarn checkpoint create --address $EXAMPLE_ADDRESS
```
* A `checkpoint` state should be created at the latest canonical block hash.
* Run the `getState` query again at the endpoint with the output `blockHash` and kind `checkpoint`.
* All the `State` entries can be seen in `pg-admin` in table `state`.

View File

@ -271,6 +271,11 @@ export class Indexer implements IndexerInterface {
return undefined;
}
async processStateCheckpoint (contractAddress: string, blockHash: string): Promise<boolean> {
// TODO: Call checkpoint hook.
return false;
}
async processCheckpoint (blockHash: string): Promise<void> {
// TODO Implement
}

View File

@ -142,6 +142,12 @@ GQL console: http://localhost:3006/graphql
yarn reset job-queue --block-number <previous-block-number>
```
* Reset state:
```bash
yarn reset state --block-number <previous-block-number>
```
* `block-number`: Block number to which to reset the watcher.
* To export and import the watcher state:

View File

@ -244,4 +244,16 @@ export class Indexer implements IndexerInterface {
async resetWatcherToBlock (blockNumber: number): Promise<void> {
return undefined;
}
cacheContract (contract: ContractInterface): void {
return undefined;
}
async processInitialState (contractAddress: string, blockHash: string): Promise<any> {
return undefined;
}
async processStateCheckpoint (contractAddress: string, blockHash: string): Promise<boolean> {
return false;
}
}

View File

@ -136,6 +136,12 @@ GQL console: http://localhost:3010/graphql
yarn reset job-queue --block-number <previous-block-number>
```
* Reset state:
```bash
yarn reset state --block-number <previous-block-number>
```
* `block-number`: Block number to which to reset the watcher.
* To export and import the watcher state:

View File

@ -482,22 +482,6 @@ export class Database {
async getLatestPrunedEntity<Entity> (repo: Repository<Entity>, id: string, canonicalBlockNumber: number): Promise<Entity | undefined> {
// Filter out latest entity from pruned blocks.
const entityInPrunedRegion = await repo.createQueryBuilder('entity')
.innerJoinAndSelect('block_progress', 'block', 'block.block_hash = entity.block_hash')
.where('block.is_pruned = false')
.andWhere('entity.id = :id', { id })
.andWhere('entity.block_number <= :canonicalBlockNumber', { canonicalBlockNumber })
.orderBy('entity.block_number', 'DESC')
.limit(1)
.getOne();
return entityInPrunedRegion;
}
async getLatestPrunedEntityWithoutJoin<Entity> (repo: Repository<Entity>, id: string, canonicalBlockNumber: number): Promise<Entity | undefined> {
// Filter out latest entity from pruned blocks.
const entityInPrunedRegion = await repo.createQueryBuilder('entity')
.where('entity.id = :id', { id })
.andWhere('entity.is_pruned = false')

View File

@ -687,7 +687,6 @@ export class Indexer {
}
// Call initial state hook.
assert(indexer.processInitialState);
const stateData = await indexer.processInitialState(contract.address, blockHash);
const block = await this.getBlockProgress(blockHash);
@ -800,7 +799,6 @@ export class Indexer {
assert(currentBlock.blockNumber <= stateSyncStatus.latestIndexedBlockNumber, 'State should be indexed for checkpoint at a block');
// Call state checkpoint hook and check if default checkpoint is disabled.
assert(indexer.processStateCheckpoint);
const disableDefaultCheckpoint = await indexer.processStateCheckpoint(contractAddress, currentBlock.blockHash);
if (disableDefaultCheckpoint) {

View File

@ -423,10 +423,7 @@ export class JobRunner {
_updateWatchedContracts (job: any): void {
const { data: { contract } } = job;
assert(this._indexer.cacheContract);
this._indexer.cacheContract(contract);
this._indexer.updateStateStatusMap(contract.address, {});
}
}

View File

@ -109,13 +109,12 @@ export interface IndexerInterface {
parseEventNameAndArgs?: (kind: string, logObj: any) => any
isWatchedContract: (address: string) => ContractInterface | undefined;
getContractsByKind?: (kind: string) => ContractInterface[]
cacheContract?: (contract: ContractInterface) => void;
cacheContract: (contract: ContractInterface) => void;
watchContract: (address: string, kind: string, checkpoint: boolean, startingBlock: number) => Promise<void>
getEntityTypesMap?: () => Map<string, { [key: string]: string }>
getRelationsMap?: () => Map<any, { [key: string]: any }>
createDiffStaged?: (contractAddress: string, blockHash: string, data: any) => Promise<void>
processInitialState?: (contractAddress: string, blockHash: string) => Promise<any>
processStateCheckpoint?: (contractAddress: string, blockHash: string) => Promise<boolean>
processInitialState: (contractAddress: string, blockHash: string) => Promise<any>
processStateCheckpoint: (contractAddress: string, blockHash: string) => Promise<boolean>
processBlock: (blockProgres: BlockProgressInterface) => Promise<void>
processBlockAfterEvents?: (blockHash: string, blockNumber: number) => Promise<void>
processCanonicalBlock (blockHash: string, blockNumber: number): Promise<void>
@ -175,7 +174,7 @@ export interface DatabaseInterface {
getDiffStatesInRange (contractAddress: string, startBlock: number, endBlock: number): Promise<StateInterface[]>
getNewState (): StateInterface
removeStates(queryRunner: QueryRunner, blockNumber: number, kind: StateKind): Promise<void>
removeStatesAfterBlock?: (queryRunner: QueryRunner, blockNumber: number) => Promise<void>
removeStatesAfterBlock: (queryRunner: QueryRunner, blockNumber: number) => Promise<void>
saveOrUpdateState (queryRunner: QueryRunner, state: StateInterface): Promise<StateInterface>
getStateSyncStatus (): Promise<StateSyncStatusInterface | undefined>
updateStateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface>