mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-01-07 20:08:06 +00:00
Batch diff creation for subgraph entities updated in mapping code (#172)
* Batch diff creation for subgraph entities updated in mapping code * Propagate changes to graph-test-watcher and codegen
This commit is contained in:
parent
288e153287
commit
f3091dee3d
@ -50,10 +50,10 @@ const main = async (): Promise<void> => {
|
||||
const db = new Database(config.database);
|
||||
await db.init();
|
||||
{{#if (subgraphPath)}}
|
||||
|
||||
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, '../entity/*'));
|
||||
await graphDb.init();
|
||||
|
||||
|
||||
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
|
||||
{{/if}}
|
||||
|
||||
|
@ -44,7 +44,7 @@ export async function createStateDiff (indexer: Indexer, blockHash: string): Pro
|
||||
assert(indexer);
|
||||
assert(blockHash);
|
||||
|
||||
// Use indexer.createStateDiff() method to save custom state diff(s).
|
||||
// Use indexer.createDiff() method to save custom state diff(s).
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -7,6 +7,7 @@ import debug from 'debug';
|
||||
import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm';
|
||||
import JSONbig from 'json-bigint';
|
||||
import { ethers } from 'ethers';
|
||||
import _ from 'lodash';
|
||||
|
||||
import { JsonFragment } from '@ethersproject/abi';
|
||||
import { BaseProvider } from '@ethersproject/providers';
|
||||
@ -119,7 +120,9 @@ export class Indexer implements IPLDIndexerInterface {
|
||||
{{#if (subgraphPath)}}
|
||||
_entityTypesMap: Map<string, { [key: string]: string }>
|
||||
_relationsMap: Map<any, { [key: string]: any }>
|
||||
|
||||
|
||||
_subgraphStateMap: Map<string, any>
|
||||
|
||||
{{/if}}
|
||||
constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue{{#if (subgraphPath)}}, graphWatcher: GraphWatcher{{/if}}) {
|
||||
assert(db);
|
||||
@ -164,6 +167,8 @@ export class Indexer implements IPLDIndexerInterface {
|
||||
|
||||
this._relationsMap = new Map();
|
||||
this._populateRelationsMap();
|
||||
|
||||
this._subgraphStateMap = new Map();
|
||||
{{/if}}
|
||||
}
|
||||
|
||||
@ -462,6 +467,9 @@ export class Indexer implements IPLDIndexerInterface {
|
||||
async processBlockAfterEvents (blockHash: string): Promise<void> {
|
||||
// Call subgraph handler for block.
|
||||
await this._graphWatcher.handleBlock(blockHash);
|
||||
|
||||
// Persist subgraph state to the DB.
|
||||
await this._dumpSubgraphState(blockHash);
|
||||
}
|
||||
|
||||
{{/if}}
|
||||
@ -473,7 +481,7 @@ export class Indexer implements IPLDIndexerInterface {
|
||||
|
||||
const logDescription = contract.parseLog({ data, topics });
|
||||
|
||||
const { eventName, eventInfo } = this._baseIndexer.parseEvent(logDescription)
|
||||
const { eventName, eventInfo } = this._baseIndexer.parseEvent(logDescription);
|
||||
|
||||
return {
|
||||
eventName,
|
||||
@ -654,9 +662,17 @@ export class Indexer implements IPLDIndexerInterface {
|
||||
return this._entityTypesMap;
|
||||
}
|
||||
|
||||
{{/if}}
|
||||
getRelationsMap (): Map<any, { [key: string]: any }> {
|
||||
return this._relationsMap;
|
||||
}
|
||||
|
||||
updateSubgraphState (contractAddress: string, data: any): void {
|
||||
// Update the subgraph state for a given contract.
|
||||
const oldData = this._subgraphStateMap.get(contractAddress);
|
||||
const updatedData = _.merge(oldData, data);
|
||||
this._subgraphStateMap.set(contractAddress, updatedData);
|
||||
}
|
||||
|
||||
{{#if (subgraphPath)}}
|
||||
_populateEntityTypesMap (): void {
|
||||
{{#each subgraphEntities as | subgraphEntity |}}
|
||||
this._entityTypesMap.set('{{subgraphEntity.className}}', {
|
||||
@ -671,9 +687,7 @@ export class Indexer implements IPLDIndexerInterface {
|
||||
});
|
||||
{{/each}}
|
||||
}
|
||||
{{/if}}
|
||||
|
||||
{{#if (subgraphPath)}}
|
||||
_populateRelationsMap (): void {
|
||||
{{#each subgraphEntities as | subgraphEntity |}}
|
||||
{{#if subgraphEntity.relations}}
|
||||
@ -696,6 +710,19 @@ export class Indexer implements IPLDIndexerInterface {
|
||||
{{/if}}
|
||||
{{/each}}
|
||||
}
|
||||
|
||||
async _dumpSubgraphState (blockHash: string): Promise<void> {
|
||||
// Create a diff for each contract in the subgraph state map.
|
||||
const createDiffPromises = Array.from(this._subgraphStateMap.entries())
|
||||
.map(([contractAddress, data]): Promise<void> => {
|
||||
return this.createDiffStaged(contractAddress, blockHash, data);
|
||||
});
|
||||
|
||||
await Promise.all(createDiffPromises);
|
||||
|
||||
// Reset the subgraph state map.
|
||||
this._subgraphStateMap.clear();
|
||||
}
|
||||
{{/if}}
|
||||
|
||||
async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial<BlockProgress>): Promise<BlockProgress> {
|
||||
@ -718,8 +745,8 @@ export class Indexer implements IPLDIndexerInterface {
|
||||
|
||||
// Flatten logs by contract and sort by index.
|
||||
logs = contractlogs.map(data => {
|
||||
return data.logs;
|
||||
}).flat()
|
||||
return data.logs;
|
||||
}).flat()
|
||||
.sort((a, b) => {
|
||||
return a.index - b.index;
|
||||
});
|
||||
@ -730,7 +757,7 @@ export class Indexer implements IPLDIndexerInterface {
|
||||
|
||||
let [
|
||||
{ block },
|
||||
{
|
||||
{
|
||||
allEthHeaderCids: {
|
||||
nodes: [
|
||||
{
|
||||
|
@ -47,10 +47,10 @@ const main = async (): Promise<void> => {
|
||||
const db = new Database(config.database);
|
||||
await db.init();
|
||||
{{#if (subgraphPath)}}
|
||||
|
||||
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, '../entity/*'));
|
||||
await graphDb.init();
|
||||
|
||||
|
||||
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
|
||||
{{/if}}
|
||||
|
||||
|
@ -258,10 +258,10 @@ export const main = async (): Promise<any> => {
|
||||
const db = new Database(config.database);
|
||||
await db.init();
|
||||
{{#if (subgraphPath)}}
|
||||
|
||||
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, 'entity/*'));
|
||||
await graphDb.init();
|
||||
|
||||
|
||||
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
|
||||
{{/if}}
|
||||
|
||||
|
@ -43,10 +43,10 @@ export const handler = async (argv: any): Promise<void> => {
|
||||
const db = new Database(config.database);
|
||||
await db.init();
|
||||
{{#if (subgraphPath)}}
|
||||
|
||||
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, '../../entity/*'));
|
||||
await graphDb.init();
|
||||
|
||||
|
||||
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
|
||||
{{/if}}
|
||||
|
||||
|
@ -63,10 +63,10 @@ const main = async (): Promise<void> => {
|
||||
const db = new Database(config.database);
|
||||
await db.init();
|
||||
{{#if (subgraphPath)}}
|
||||
|
||||
|
||||
const graphDb = new GraphDatabase(config.database, path.resolve(__dirname, '../entity/*'));
|
||||
await graphDb.init();
|
||||
|
||||
|
||||
const graphWatcher = new GraphWatcher(graphDb, ethClient, ethProvider, config.server);
|
||||
{{/if}}
|
||||
|
||||
|
@ -39,7 +39,7 @@ export async function createStateDiff (indexer: Indexer, blockHash: string): Pro
|
||||
assert(indexer);
|
||||
assert(blockHash);
|
||||
|
||||
// Use indexer.createStateDiff() method to create a custom diff.
|
||||
// Use indexer.createDiff() method to save custom state diff(s).
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -7,6 +7,7 @@ import debug from 'debug';
|
||||
import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm';
|
||||
import JSONbig from 'json-bigint';
|
||||
import { ethers } from 'ethers';
|
||||
import _ from 'lodash';
|
||||
|
||||
import { JsonFragment } from '@ethersproject/abi';
|
||||
import { BaseProvider } from '@ethersproject/providers';
|
||||
@ -119,6 +120,8 @@ export class Indexer implements IPLDIndexerInterface {
|
||||
_entityTypesMap: Map<string, { [key: string]: string }>
|
||||
_relationsMap: Map<any, { [key: string]: any }>
|
||||
|
||||
_subgraphStateMap: Map<string, any>
|
||||
|
||||
constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue, graphWatcher: GraphWatcher) {
|
||||
assert(db);
|
||||
assert(ethClient);
|
||||
@ -165,6 +168,8 @@ export class Indexer implements IPLDIndexerInterface {
|
||||
|
||||
this._relationsMap = new Map();
|
||||
this._populateRelationsMap();
|
||||
|
||||
this._subgraphStateMap = new Map();
|
||||
}
|
||||
|
||||
get serverConfig (): ServerConfig {
|
||||
@ -401,6 +406,13 @@ export class Indexer implements IPLDIndexerInterface {
|
||||
await this._graphWatcher.handleBlock(blockHash);
|
||||
|
||||
console.timeEnd('time:indexer#processBlockAfterEvents-mapping_code');
|
||||
|
||||
console.time('time:indexer#processBlockAfterEvents-dump_subgraph_state');
|
||||
|
||||
// Persist subgraph state to the DB.
|
||||
await this._dumpSubgraphState(blockHash);
|
||||
|
||||
console.timeEnd('time:indexer#processBlockAfterEvents-dump_subgraph_state');
|
||||
}
|
||||
|
||||
parseEventNameAndArgs (kind: string, logObj: any): any {
|
||||
@ -591,6 +603,13 @@ export class Indexer implements IPLDIndexerInterface {
|
||||
return this._relationsMap;
|
||||
}
|
||||
|
||||
updateSubgraphState (contractAddress: string, data: any): void {
|
||||
// Update the subgraph state for a given contract.
|
||||
const oldData = this._subgraphStateMap.get(contractAddress);
|
||||
const updatedData = _.merge(oldData, data);
|
||||
this._subgraphStateMap.set(contractAddress, updatedData);
|
||||
}
|
||||
|
||||
_populateEntityTypesMap (): void {
|
||||
this._entityTypesMap.set(
|
||||
'Producer',
|
||||
@ -952,6 +971,19 @@ export class Indexer implements IPLDIndexerInterface {
|
||||
});
|
||||
}
|
||||
|
||||
async _dumpSubgraphState (blockHash: string): Promise<void> {
|
||||
// Create a diff for each contract in the subgraph state map.
|
||||
const createDiffPromises = Array.from(this._subgraphStateMap.entries())
|
||||
.map(([contractAddress, data]): Promise<void> => {
|
||||
return this.createDiffStaged(contractAddress, blockHash, data);
|
||||
});
|
||||
|
||||
await Promise.all(createDiffPromises);
|
||||
|
||||
// Reset the subgraph state map.
|
||||
this._subgraphStateMap.clear();
|
||||
}
|
||||
|
||||
async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial<BlockProgress>): Promise<BlockProgress> {
|
||||
assert(blockHash);
|
||||
const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash });
|
||||
|
@ -45,7 +45,7 @@ export async function createStateDiff (indexer: Indexer, blockHash: string): Pro
|
||||
assert(indexer);
|
||||
assert(blockHash);
|
||||
|
||||
// Use indexer.createStateDiff() method to save custom state diff(s).
|
||||
// Use indexer.createDiff() method to save custom state diff(s).
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -141,10 +141,10 @@ export const instantiate = async (
|
||||
[dbData.id]: JSON.parse(JSON.stringify(dbData, jsonBigIntStringReplacer))
|
||||
};
|
||||
|
||||
// Create an auto-diff.
|
||||
assert(indexer.createDiffStaged);
|
||||
// Update the in-memory subgraph state.
|
||||
assert(indexer.updateSubgraphState);
|
||||
assert(context.contractAddress);
|
||||
await indexer.createDiffStaged(context.contractAddress, context.block.blockHash, diffData);
|
||||
indexer.updateSubgraphState(context.contractAddress, diffData);
|
||||
},
|
||||
|
||||
'log.log': (level: number, msg: number) => {
|
||||
|
@ -36,7 +36,7 @@ export async function createStateDiff (indexer: Indexer, blockHash: string): Pro
|
||||
assert(indexer);
|
||||
assert(blockHash);
|
||||
|
||||
// Use indexer.createStateDiff() method to create a custom diff.
|
||||
// Use indexer.createDiff() method to save custom state diff(s).
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -7,6 +7,7 @@ import debug from 'debug';
|
||||
import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm';
|
||||
import JSONbig from 'json-bigint';
|
||||
import { ethers } from 'ethers';
|
||||
import _ from 'lodash';
|
||||
|
||||
import { JsonFragment } from '@ethersproject/abi';
|
||||
import { BaseProvider } from '@ethersproject/providers';
|
||||
@ -103,6 +104,8 @@ export class Indexer implements IPLDIndexerInterface {
|
||||
_entityTypesMap: Map<string, { [key: string]: string }>
|
||||
_relationsMap: Map<any, { [key: string]: any }>
|
||||
|
||||
_subgraphStateMap: Map<string, any>
|
||||
|
||||
constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue, graphWatcher: GraphWatcher) {
|
||||
assert(db);
|
||||
assert(ethClient);
|
||||
@ -135,6 +138,8 @@ export class Indexer implements IPLDIndexerInterface {
|
||||
|
||||
this._relationsMap = new Map();
|
||||
this._populateRelationsMap();
|
||||
|
||||
this._subgraphStateMap = new Map();
|
||||
}
|
||||
|
||||
get serverConfig (): ServerConfig {
|
||||
@ -404,6 +409,9 @@ export class Indexer implements IPLDIndexerInterface {
|
||||
async processBlockAfterEvents (blockHash: string): Promise<void> {
|
||||
// Call subgraph handler for block.
|
||||
await this._graphWatcher.handleBlock(blockHash);
|
||||
|
||||
// Persist subgraph state to the DB.
|
||||
await this._dumpSubgraphState(blockHash);
|
||||
}
|
||||
|
||||
parseEventNameAndArgs (kind: string, logObj: any): any {
|
||||
@ -594,6 +602,17 @@ export class Indexer implements IPLDIndexerInterface {
|
||||
return this._entityTypesMap;
|
||||
}
|
||||
|
||||
getRelationsMap (): Map<any, { [key: string]: any }> {
|
||||
return this._relationsMap;
|
||||
}
|
||||
|
||||
updateSubgraphState (contractAddress: string, data: any): void {
|
||||
// Update the subgraph state for a given contract.
|
||||
const oldData = this._subgraphStateMap.get(contractAddress);
|
||||
const updatedData = _.merge(oldData, data);
|
||||
this._subgraphStateMap.set(contractAddress, updatedData);
|
||||
}
|
||||
|
||||
_populateEntityTypesMap (): void {
|
||||
this._entityTypesMap.set(
|
||||
'Author',
|
||||
@ -655,6 +674,19 @@ export class Indexer implements IPLDIndexerInterface {
|
||||
});
|
||||
}
|
||||
|
||||
async _dumpSubgraphState (blockHash: string): Promise<void> {
|
||||
// Create a diff for each contract in the subgraph state map.
|
||||
const createDiffPromises = Array.from(this._subgraphStateMap.entries())
|
||||
.map(([contractAddress, data]): Promise<void> => {
|
||||
return this.createDiffStaged(contractAddress, blockHash, data);
|
||||
});
|
||||
|
||||
await Promise.all(createDiffPromises);
|
||||
|
||||
// Reset the subgraph state map.
|
||||
this._subgraphStateMap.clear();
|
||||
}
|
||||
|
||||
async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial<BlockProgress>): Promise<BlockProgress> {
|
||||
assert(blockHash);
|
||||
const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash });
|
||||
|
@ -49,7 +49,7 @@ export async function createStateDiff (indexer: Indexer, blockHash: string): Pro
|
||||
assert(indexer);
|
||||
assert(blockHash);
|
||||
|
||||
// Use indexer.createStateDiff() method to save custom state diff(s).
|
||||
// Use indexer.createDiff() method to save custom state diff(s).
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -113,6 +113,7 @@ export interface IndexerInterface {
|
||||
processBlock?: (blockHash: string, blockNumber: number) => Promise<void>
|
||||
processBlockAfterEvents?: (blockHash: string) => Promise<void>
|
||||
getStorageValue (storageLayout: StorageLayout, blockHash: string, contractAddress: string, variable: string, ...mappingKeys: MappingKey[]): Promise<ValueResult>
|
||||
updateSubgraphState?: (contractAddress: string, data: any) => void
|
||||
}
|
||||
|
||||
export interface IPLDIndexerInterface extends IndexerInterface {
|
||||
|
Loading…
Reference in New Issue
Block a user