Changes to use packages in uniswap-watcher (#196)

* Changes to use util from uniswap-watcher

* Refactor ResultIPLDBlock to util

* Verify state in compare CLI for uniswap multiple queries

* Prepare watcher-ts for publishing packages

* Fix verify state in compare CLI for multiple entities query

* Fix codegen util imports
This commit is contained in:
nikugogoi 2022-10-11 13:41:26 +05:30 committed by GitHub
parent 978f0bb456
commit e1aef1a7e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 1227 additions and 990 deletions

View File

@ -4,5 +4,10 @@
],
"version": "0.1.0",
"npmClient": "yarn",
"useWorkspaces": true
"useWorkspaces": true,
"command": {
"publish": {
"registry": "https://git.vdb.to/api/packages/cerc-io/npm/"
}
}
}

View File

@ -18,6 +18,7 @@
"build": "lerna run build --stream",
"build:watch": "lerna run build --stream --parallel -- -w",
"db:reset": "sudo ./scripts/reset-dbs.sh",
"prepare": "husky install"
"prepare": "husky install",
"publish:workspace": "yarn build && lerna publish"
}
}

5
packages/cache/.npmignore vendored Normal file
View File

@ -0,0 +1,5 @@
/src/
index.ts
tsconfig.json
.eslintrc.json
.eslintignore

View File

@ -2,7 +2,6 @@
"name": "@cerc-io/cache",
"version": "0.1.0",
"description": "Generic object cache",
"private": true,
"main": "dist/index.js",
"scripts": {
"lint": "eslint .",

View File

@ -6,7 +6,7 @@ import assert from 'assert';
import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions } from 'typeorm';
import path from 'path';
import { IPLDDatabase as BaseDatabase, IPLDDatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util';
import { Database as BaseDatabase, DatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util';
import { Contract } from './entity/Contract';
import { Event } from './entity/Event';
@ -18,7 +18,7 @@ import { IPLDBlock } from './entity/IPLDBlock';
import { {{query.entityName}} } from './entity/{{query.entityName}}';
{{/each}}
export class Database implements IPLDDatabaseInterface {
export class Database implements DatabaseInterface {
_config: ConnectionOptions;
_conn!: Connection;
_baseDatabase: BaseDatabase;
@ -183,11 +183,23 @@ export class Database implements IPLDDatabaseInterface {
return this._baseDatabase.getBlockEvents(repo, blockHash, where, queryOptions);
}
async saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<BlockProgress> {
async saveBlockWithEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<BlockProgress> {
const blockRepo = queryRunner.manager.getRepository(BlockProgress);
const eventRepo = queryRunner.manager.getRepository(Event);
return this._baseDatabase.saveEvents(blockRepo, eventRepo, block, events);
return this._baseDatabase.saveBlockWithEvents(blockRepo, eventRepo, block, events);
}
async saveEvents (queryRunner: QueryRunner, events: Event[]): Promise<void> {
const eventRepo = queryRunner.manager.getRepository(Event);
return this._baseDatabase.saveEvents(eventRepo, events);
}
async saveBlockProgress (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>): Promise<BlockProgress> {
const repo = queryRunner.manager.getRepository(BlockProgress);
return this._baseDatabase.saveBlockProgress(repo, block);
}
async saveContract (queryRunner: QueryRunner, address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<Contract> {

View File

@ -16,8 +16,8 @@ import * as codec from '@ipld/dag-cbor';
import { EthClient } from '@cerc-io/ipld-eth-client';
import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper';
import {
IPLDIndexer as BaseIndexer,
IPLDIndexerInterface,
Indexer as BaseIndexer,
IndexerInterface,
ValueResult,
UNKNOWN_EVENT_NAME,
ServerConfig,
@ -31,7 +31,8 @@ import {
{{/if}}
IPFSClient,
StateKind,
IpldStatus as IpldStatusInterface
IpldStatus as IpldStatusInterface,
ResultIPLDBlock
} from '@cerc-io/util';
{{#if (subgraphPath)}}
import { GraphWatcher } from '@cerc-io/graph-node';
@ -88,21 +89,7 @@ export type ResultEvent = {
proof: string;
};
export type ResultIPLDBlock = {
block: {
cid: string;
hash: string;
number: number;
timestamp: number;
parentHash: string;
};
contractAddress: string;
cid: string;
kind: string;
data: string;
};
export class Indexer implements IPLDIndexerInterface {
export class Indexer implements IndexerInterface {
_db: Database
_ethClient: EthClient
_ethProvider: BaseProvider
@ -603,7 +590,7 @@ export class Indexer implements IPLDIndexerInterface {
return this._baseIndexer.getEventsByFilter(blockHash, contract, name);
}
async isWatchedContract (address : string): Promise<Contract | undefined> {
isWatchedContract (address : string): Contract | undefined {
return this._baseIndexer.isWatchedContract(address);
}
@ -655,8 +642,8 @@ export class Indexer implements IPLDIndexerInterface {
return this._baseIndexer.getBlocksAtHeight(height, isPruned);
}
async fetchBlockEvents (block: DeepPartial<BlockProgress>): Promise<BlockProgress> {
return this._baseIndexer.fetchBlockEvents(block, this._fetchAndSaveEvents.bind(this));
async fetchBlockWithEvents (block: DeepPartial<BlockProgress>): Promise<BlockProgress> {
return this._baseIndexer.fetchBlockWithEvents(block, this._fetchAndSaveEvents.bind(this));
}
async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<Event>> {
@ -866,7 +853,7 @@ export class Indexer implements IPLDIndexerInterface {
};
console.time('time:indexer#_fetchAndSaveEvents-save-block-events');
const blockProgress = await this._db.saveEvents(dbTx, block, dbEvents);
const blockProgress = await this._db.saveBlockWithEvents(dbTx, block, dbEvents);
await dbTx.commitTransaction();
console.timeEnd('time:indexer#_fetchAndSaveEvents-save-block-events');

View File

@ -6,7 +6,7 @@ import assert from 'assert';
import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions } from 'typeorm';
import path from 'path';
import { IPLDDatabase as BaseDatabase, IPLDDatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util';
import { Database as BaseDatabase, DatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util';
import { Contract } from './entity/Contract';
import { Event } from './entity/Event';
@ -15,7 +15,7 @@ import { IpldStatus } from './entity/IpldStatus';
import { BlockProgress } from './entity/BlockProgress';
import { IPLDBlock } from './entity/IPLDBlock';
export class Database implements IPLDDatabaseInterface {
export class Database implements DatabaseInterface {
_config: ConnectionOptions;
_conn!: Connection;
_baseDatabase: BaseDatabase;
@ -143,11 +143,23 @@ export class Database implements IPLDDatabaseInterface {
return this._baseDatabase.getBlockEvents(repo, blockHash, where, queryOptions);
}
async saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<BlockProgress> {
async saveBlockWithEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<BlockProgress> {
const blockRepo = queryRunner.manager.getRepository(BlockProgress);
const eventRepo = queryRunner.manager.getRepository(Event);
return this._baseDatabase.saveEvents(blockRepo, eventRepo, block, events);
return this._baseDatabase.saveBlockWithEvents(blockRepo, eventRepo, block, events);
}
async saveEvents (queryRunner: QueryRunner, events: Event[]): Promise<void> {
const eventRepo = queryRunner.manager.getRepository(Event);
return this._baseDatabase.saveEvents(eventRepo, events);
}
async saveBlockProgress (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>): Promise<BlockProgress> {
const repo = queryRunner.manager.getRepository(BlockProgress);
return this._baseDatabase.saveBlockProgress(repo, block);
}
async saveContract (queryRunner: QueryRunner, address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<Contract> {

View File

@ -16,7 +16,7 @@ import * as codec from '@ipld/dag-cbor';
import { EthClient } from '@cerc-io/ipld-eth-client';
import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper';
import {
IPLDIndexer as BaseIndexer,
Indexer as BaseIndexer,
UNKNOWN_EVENT_NAME,
ServerConfig,
JobQueue,
@ -25,9 +25,10 @@ import {
BlockHeight,
IPFSClient,
StateKind,
IPLDIndexerInterface,
IndexerInterface,
IpldStatus as IpldStatusInterface,
ValueResult
ValueResult,
ResultIPLDBlock
} from '@cerc-io/util';
import { GraphWatcher } from '@cerc-io/graph-node';
@ -90,21 +91,7 @@ export type ResultEvent = {
proof: string;
};
export type ResultIPLDBlock = {
block: {
cid: string;
hash: string;
number: number;
timestamp: number;
parentHash: string;
};
contractAddress: string;
cid: string;
kind: string;
data: string;
};
export class Indexer implements IPLDIndexerInterface {
export class Indexer implements IndexerInterface {
_db: Database
_ethClient: EthClient
_ethProvider: BaseProvider
@ -540,7 +527,7 @@ export class Indexer implements IPLDIndexerInterface {
return this._baseIndexer.getEventsByFilter(blockHash, contract, name);
}
async isWatchedContract (address : string): Promise<Contract | undefined> {
isWatchedContract (address : string): Contract | undefined {
return this._baseIndexer.isWatchedContract(address);
}
@ -588,8 +575,8 @@ export class Indexer implements IPLDIndexerInterface {
return this._baseIndexer.getBlocksAtHeight(height, isPruned);
}
async fetchBlockEvents (block: DeepPartial<BlockProgress>): Promise<BlockProgress> {
return this._baseIndexer.fetchBlockEvents(block, this._fetchAndSaveEvents.bind(this));
async fetchBlockWithEvents (block: DeepPartial<BlockProgress>): Promise<BlockProgress> {
return this._baseIndexer.fetchBlockWithEvents(block, this._fetchAndSaveEvents.bind(this));
}
async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<Event>> {
@ -1120,7 +1107,7 @@ export class Indexer implements IPLDIndexerInterface {
};
console.time('time:indexer#_fetchAndSaveEvents-save-block-events');
const blockProgress = await this._db.saveEvents(dbTx, block, dbEvents);
const blockProgress = await this._db.saveBlockWithEvents(dbTx, block, dbEvents);
await dbTx.commitTransaction();
console.timeEnd('time:indexer#_fetchAndSaveEvents-save-block-events');

View File

@ -6,7 +6,7 @@ import assert from 'assert';
import { Connection, ConnectionOptions, DeepPartial, FindConditions, FindManyOptions, QueryRunner } from 'typeorm';
import path from 'path';
import { Database as BaseDatabase, QueryOptions, Where } from '@cerc-io/util';
import { Database as BaseDatabase, DatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util';
import { Allowance } from './entity/Allowance';
import { Balance } from './entity/Balance';
@ -14,8 +14,10 @@ import { Contract } from './entity/Contract';
import { Event } from './entity/Event';
import { SyncStatus } from './entity/SyncStatus';
import { BlockProgress } from './entity/BlockProgress';
import { IPLDBlock } from './entity/IPLDBlock';
import { IpldStatus } from './entity/IpldStatus';
export class Database {
export class Database implements DatabaseInterface {
_config: ConnectionOptions
_conn!: Connection
_baseDatabase: BaseDatabase;
@ -39,6 +41,47 @@ export class Database {
return this._baseDatabase.close();
}
getNewIPLDBlock (): IPLDBlock {
return new IPLDBlock();
}
async getIPLDBlocks (where: FindConditions<IPLDBlock>): Promise<IPLDBlock[]> {
const repo = this._conn.getRepository(IPLDBlock);
return this._baseDatabase.getIPLDBlocks(repo, where);
}
async getLatestIPLDBlock (contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise<IPLDBlock | undefined> {
const repo = this._conn.getRepository(IPLDBlock);
return this._baseDatabase.getLatestIPLDBlock(repo, contractAddress, kind, blockNumber);
}
// Fetch all diff IPLDBlocks after the specified block number.
async getDiffIPLDBlocksInRange (contractAddress: string, startblock: number, endBlock: number): Promise<IPLDBlock[]> {
const repo = this._conn.getRepository(IPLDBlock);
return this._baseDatabase.getDiffIPLDBlocksInRange(repo, contractAddress, startblock, endBlock);
}
async saveOrUpdateIPLDBlock (dbTx: QueryRunner, ipldBlock: IPLDBlock): Promise<IPLDBlock> {
const repo = dbTx.manager.getRepository(IPLDBlock);
return this._baseDatabase.saveOrUpdateIPLDBlock(repo, ipldBlock);
}
async removeIPLDBlocks (dbTx: QueryRunner, blockNumber: number, kind: string): Promise<void> {
const repo = dbTx.manager.getRepository(IPLDBlock);
await this._baseDatabase.removeIPLDBlocks(repo, blockNumber, kind);
}
async getIPLDStatus (): Promise<IpldStatus | undefined> {
const repo = this._conn.getRepository(IpldStatus);
return this._baseDatabase.getIPLDStatus(repo);
}
async getBalance ({ blockHash, token, owner }: { blockHash: string, token: string, owner: string }): Promise<Balance | undefined> {
return this._conn.getRepository(Balance)
.createQueryBuilder('balance')
@ -107,11 +150,17 @@ export class Database {
return this._baseDatabase.getBlockEvents(repo, blockHash, where, queryOptions);
}
async saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<BlockProgress> {
async saveBlockWithEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<BlockProgress> {
const blockRepo = queryRunner.manager.getRepository(BlockProgress);
const eventRepo = queryRunner.manager.getRepository(Event);
return this._baseDatabase.saveEvents(blockRepo, eventRepo, block, events);
return this._baseDatabase.saveBlockWithEvents(blockRepo, eventRepo, block, events);
}
async saveEvents (queryRunner: QueryRunner, events: Event[]): Promise<void> {
const eventRepo = queryRunner.manager.getRepository(Event);
return this._baseDatabase.saveEvents(eventRepo, events);
}
async saveContract (queryRunner: QueryRunner, address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<Contract> {
@ -173,6 +222,12 @@ export class Database {
return this._baseDatabase.getBlockProgressEntities(repo, where, options);
}
async saveBlockProgress (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>): Promise<BlockProgress> {
const repo = queryRunner.manager.getRepository(BlockProgress);
return this._baseDatabase.saveBlockProgress(repo, block);
}
async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise<BlockProgress> {
const repo = queryRunner.manager.getRepository(BlockProgress);

View File

@ -0,0 +1,36 @@
//
// Copyright 2022 Vulcanize, Inc.
//
import { Entity, PrimaryGeneratedColumn, Column, Index, ManyToOne } from 'typeorm';
import { StateKind } from '@cerc-io/util';
import { BlockProgress } from './BlockProgress';
@Entity()
@Index(['cid'], { unique: true })
@Index(['block', 'contractAddress'])
@Index(['block', 'contractAddress', 'kind'], { unique: true })
export class IPLDBlock {
@PrimaryGeneratedColumn()
id!: number;
@ManyToOne(() => BlockProgress, { onDelete: 'CASCADE' })
block!: BlockProgress;
@Column('varchar', { length: 42 })
contractAddress!: string;
@Column('varchar')
cid!: string;
@Column({
type: 'enum',
enum: StateKind
})
kind!: StateKind;
@Column('bytea')
data!: Buffer;
}

View File

@ -0,0 +1,20 @@
//
// Copyright 2022 Vulcanize, Inc.
//
import { Entity, PrimaryGeneratedColumn, Column } from 'typeorm';
@Entity()
export class IpldStatus {
@PrimaryGeneratedColumn()
id!: number;
@Column('integer')
latestHooksBlockNumber!: number;
@Column('integer', { nullable: true })
latestCheckpointBlockNumber!: number;
@Column('integer', { nullable: true })
latestIPFSBlockNumber!: number;
}

View File

@ -12,7 +12,7 @@ import { BaseProvider } from '@ethersproject/providers';
import { EthClient } from '@cerc-io/ipld-eth-client';
import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper';
import { IndexerInterface, Indexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME, JobQueue, Where, QueryOptions, ServerConfig } from '@cerc-io/util';
import { IndexerInterface, Indexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME, JobQueue, Where, QueryOptions, ServerConfig, IPFSClient, IpldStatus as IpldStatusInterface } from '@cerc-io/util';
import { Database } from './database';
import { Event } from './entity/Event';
@ -21,6 +21,7 @@ import { SyncStatus } from './entity/SyncStatus';
import artifacts from './artifacts/ERC20.json';
import { BlockProgress } from './entity/BlockProgress';
import { Contract } from './entity/Contract';
import { IPLDBlock } from './entity/IPLDBlock';
const log = debug('vulcanize:indexer');
const JSONbigNative = JSONbig({ useNativeBigInt: true });
@ -63,7 +64,8 @@ export class Indexer implements IndexerInterface {
this._ethProvider = ethProvider;
this._serverConfig = serverConfig;
this._serverMode = serverConfig.mode;
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._ethProvider, jobQueue);
const ipfsClient = new IPFSClient(this._serverConfig.ipfsApiAddr);
this._baseIndexer = new BaseIndexer(serverConfig, this._db, this._ethClient, this._ethProvider, jobQueue, ipfsClient);
const { abi, storageLayout } = artifacts;
@ -248,6 +250,10 @@ export class Indexer implements IndexerInterface {
);
}
getIPLDData (ipldBlock: IPLDBlock): any {
return this._baseIndexer.getIPLDData(ipldBlock);
}
async triggerIndexingOnEvent (event: Event): Promise<void> {
const { eventName, eventInfo, contract: token, block: { blockHash } } = event;
const eventFields = JSON.parse(eventInfo);
@ -278,6 +284,11 @@ export class Indexer implements IndexerInterface {
await this.triggerIndexingOnEvent(event);
}
async processBlock (blockProgress: BlockProgress): Promise<void> {
// Call a function to create initial state for contracts.
await this._baseIndexer.createInit(this, blockProgress.blockHash, blockProgress.blockNumber);
}
parseEventNameAndArgs (kind: string, logObj: any): any {
const { topics, data } = logObj;
const logDescription = this._contract.parseLog({ data, topics });
@ -291,7 +302,7 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getEventsByFilter(blockHash, contract, name);
}
async isWatchedContract (address : string): Promise<Contract | undefined> {
isWatchedContract (address : string): Contract | undefined {
return this._baseIndexer.isWatchedContract(address);
}
@ -299,6 +310,10 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.watchContract(address, kind, checkpoint, startingBlock);
}
async updateIPLDStatusMap (address: string, ipldStatus: IpldStatusInterface): Promise<void> {
await this._baseIndexer.updateIPLDStatusMap(address, ipldStatus);
}
cacheContract (contract: Contract): void {
return this._baseIndexer.cacheContract(contract);
}
@ -351,8 +366,8 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.getBlocksAtHeight(height, isPruned);
}
async fetchBlockEvents (block: DeepPartial<BlockProgress>): Promise<BlockProgress> {
return this._baseIndexer.fetchBlockEvents(block, this._fetchAndSaveEvents.bind(this));
async fetchBlockWithEvents (block: DeepPartial<BlockProgress>): Promise<BlockProgress> {
return this._baseIndexer.fetchBlockWithEvents(block, this._fetchAndSaveEvents.bind(this));
}
async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<Event>> {
@ -451,7 +466,7 @@ export class Indexer implements IndexerInterface {
};
console.time('time:indexer#_fetchAndSaveEvents-save-block-events');
const blockProgress = await this._db.saveEvents(dbTx, block, dbEvents);
const blockProgress = await this._db.saveBlockWithEvents(dbTx, block, dbEvents);
await dbTx.commitTransaction();
console.timeEnd('time:indexer#_fetchAndSaveEvents-save-block-events');

View File

@ -6,7 +6,7 @@ import assert from 'assert';
import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions, FindOneOptions, LessThanOrEqual } from 'typeorm';
import path from 'path';
import { IPLDDatabase as BaseDatabase, IPLDDatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util';
import { Database as BaseDatabase, DatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util';
import { Contract } from './entity/Contract';
import { Event } from './entity/Event';
@ -30,7 +30,7 @@ import { _TokenApprovals } from './entity/_TokenApprovals';
import { _OperatorApprovals } from './entity/_OperatorApprovals';
import { TransferCount } from './entity/TransferCount';
export class Database implements IPLDDatabaseInterface {
export class Database implements DatabaseInterface {
_config: ConnectionOptions;
_conn!: Connection;
_baseDatabase: BaseDatabase;
@ -398,11 +398,23 @@ export class Database implements IPLDDatabaseInterface {
return this._baseDatabase.getBlockEvents(repo, blockHash, where, queryOptions);
}
async saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<BlockProgress> {
async saveBlockWithEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<BlockProgress> {
const blockRepo = queryRunner.manager.getRepository(BlockProgress);
const eventRepo = queryRunner.manager.getRepository(Event);
return this._baseDatabase.saveEvents(blockRepo, eventRepo, block, events);
return this._baseDatabase.saveBlockWithEvents(blockRepo, eventRepo, block, events);
}
async saveEvents (queryRunner: QueryRunner, events: Event[]): Promise<void> {
const eventRepo = queryRunner.manager.getRepository(Event);
return this._baseDatabase.saveEvents(eventRepo, events);
}
async saveBlockProgress (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>): Promise<BlockProgress> {
const repo = queryRunner.manager.getRepository(BlockProgress);
return this._baseDatabase.saveBlockProgress(repo, block);
}
async saveContract (queryRunner: QueryRunner, address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<Contract> {

View File

@ -38,7 +38,7 @@ export async function createInitialState (indexer: Indexer, contractAddress: str
/**
* Hook function to create state diff.
* @param indexer Indexer instance that contains methods to fetch the contract varaiable values.
* @param indexer Indexer instance that contains methods to fetch the contract variable values.
* @param blockHash Block hash of the concerned block.
*/
export async function createStateDiff (indexer: Indexer, blockHash: string): Promise<void> {

View File

@ -14,8 +14,8 @@ import * as codec from '@ipld/dag-cbor';
import { EthClient } from '@cerc-io/ipld-eth-client';
import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper';
import {
IPLDIndexer as BaseIndexer,
IPLDIndexerInterface,
Indexer as BaseIndexer,
IndexerInterface,
ValueResult,
UNKNOWN_EVENT_NAME,
ServerConfig,
@ -27,7 +27,8 @@ import {
BlockHeight,
IPFSClient,
StateKind,
IpldStatus as IpldStatusInterface
IpldStatus as IpldStatusInterface,
ResultIPLDBlock
} from '@cerc-io/util';
import ERC721Artifacts from './artifacts/ERC721.json';
@ -70,21 +71,7 @@ export type ResultEvent = {
proof: string;
};
export type ResultIPLDBlock = {
block: {
cid: string;
hash: string;
number: number;
timestamp: number;
parentHash: string;
};
contractAddress: string;
cid: string;
kind: string;
data: string;
};
export class Indexer implements IPLDIndexerInterface {
export class Indexer implements IndexerInterface {
_db: Database
_ethClient: EthClient
_ethProvider: BaseProvider
@ -907,7 +894,7 @@ export class Indexer implements IPLDIndexerInterface {
return this._baseIndexer.getEventsByFilter(blockHash, contract, name);
}
async isWatchedContract (address : string): Promise<Contract | undefined> {
isWatchedContract (address : string): Contract | undefined {
return this._baseIndexer.isWatchedContract(address);
}
@ -959,8 +946,8 @@ export class Indexer implements IPLDIndexerInterface {
return this._baseIndexer.getBlocksAtHeight(height, isPruned);
}
async fetchBlockEvents (block: DeepPartial<BlockProgress>): Promise<BlockProgress> {
return this._baseIndexer.fetchBlockEvents(block, this._fetchAndSaveEvents.bind(this));
async fetchBlockWithEvents (block: DeepPartial<BlockProgress>): Promise<BlockProgress> {
return this._baseIndexer.fetchBlockWithEvents(block, this._fetchAndSaveEvents.bind(this));
}
async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<Event>> {
@ -1098,7 +1085,7 @@ export class Indexer implements IPLDIndexerInterface {
};
console.time('time:indexer#_fetchAndSaveEvents-save-block-events');
const blockProgress = await this._db.saveEvents(dbTx, block, dbEvents);
const blockProgress = await this._db.saveBlockWithEvents(dbTx, block, dbEvents);
await dbTx.commitTransaction();
console.timeEnd('time:indexer#_fetchAndSaveEvents-save-block-events');

View File

@ -13,7 +13,7 @@
entitiesDir = "../../graph-test-watcher/dist/entity/*"
endpoint = "gqlEndpoint2"
verifyState = true
derivedFields = []
skipFields = []
[cache]
endpoint = "gqlEndpoint1"

View File

@ -3,6 +3,7 @@
"version": "0.1.0",
"main": "dist/index.js",
"license": "AGPL-3.0",
"private": true,
"devDependencies": {
"@graphprotocol/graph-ts": "^0.22.0",
"@nomiclabs/hardhat-ethers": "^2.0.2",

View File

@ -12,7 +12,7 @@ import _ from 'lodash';
import { getConfig as getWatcherConfig, wait } from '@cerc-io/util';
import { GraphQLClient } from '@cerc-io/ipld-eth-client';
import { checkEntityInIPLDState, compareQuery, Config, getIPLDsByBlock, checkIPLDMetaData, combineIPLDState, getClients, getConfig } from './utils';
import { checkGQLEntityInIPLDState, compareQuery, Config, getIPLDsByBlock, checkIPLDMetaData, combineIPLDState, getClients, getConfig, checkGQLEntitiesInIPLDState } from './utils';
import { Database } from '../../database';
import { getSubgraphConfig } from '../../utils';
@ -168,7 +168,7 @@ export const main = async (): Promise<void> => {
);
if (config.watcher.verifyState) {
const ipldDiff = await checkEntityInIPLDState(ipldStateByBlock, queryName, result, id, rawJson, config.watcher.derivedFields);
const ipldDiff = await checkGQLEntityInIPLDState(ipldStateByBlock, entityName, result[queryName], id, rawJson, config.watcher.skipFields);
if (ipldDiff) {
log('Results mismatch for IPLD state:', ipldDiff);
@ -182,13 +182,24 @@ export const main = async (): Promise<void> => {
}
} else {
if (updatedEntities.has(entityName)) {
({ diff: resultDiff } = await compareQuery(
let result;
({ diff: resultDiff, result1: result } = await compareQuery(
clients,
queryName,
{ block },
rawJson,
timeDiff
));
if (config.watcher.verifyState) {
const ipldDiff = await checkGQLEntitiesInIPLDState(ipldStateByBlock, entityName, result[queryName], rawJson, config.watcher.skipFields);
if (ipldDiff) {
log('Results mismatch for IPLD state:', ipldDiff);
diffFound = true;
}
}
}
}

View File

@ -50,7 +50,7 @@ interface QueryConfig {
queryLimits: { [queryName: string]: number }
}
interface EntityDerivedFields {
interface EntitySkipFields {
entity: string;
fields: string[];
}
@ -63,7 +63,7 @@ export interface Config {
entitiesDir: string;
verifyState: boolean;
endpoint: keyof EndpointConfig;
derivedFields: EntityDerivedFields[]
skipFields: EntitySkipFields[]
}
cache: {
endpoint: keyof EndpointConfig;
@ -312,34 +312,69 @@ export const combineIPLDState = (contractIPLDs: {[key: string]: any}[]): {[key:
return contractIPLDStates.reduce((acc, state) => _.merge(acc, state));
};
export const checkEntityInIPLDState = async (
export const checkGQLEntityInIPLDState = async (
ipldState: {[key: string]: any},
queryName: string,
entityName: string,
entityResult: {[key: string]: any},
id: string,
rawJson: boolean,
derivedFields: EntityDerivedFields[] = []
skipFields: EntitySkipFields[] = []
): Promise<string> => {
const entityName = _.upperFirst(queryName);
const ipldEntity = ipldState[entityName][id];
// Filter __typename key in GQL result.
const resultEntity = omitDeep(entityResult[queryName], '__typename');
entityResult = omitDeep(entityResult, '__typename');
// Filter derived fields in GQL result.
derivedFields.forEach(({ entity, fields }) => {
// Filter skipped fields in state comaparison.
skipFields.forEach(({ entity, fields }) => {
if (entityName === entity) {
fields.forEach(field => {
delete resultEntity[field];
});
omitDeep(entityResult, fields);
omitDeep(ipldEntity, fields);
}
});
const diff = compareObjects(resultEntity, ipldEntity, rawJson);
const diff = compareObjects(entityResult, ipldEntity, rawJson);
return diff;
};
export const checkGQLEntitiesInIPLDState = async (
ipldState: {[key: string]: any},
entityName: string,
entitiesResult: any[],
rawJson: boolean,
skipFields: EntitySkipFields[] = []
): Promise<string> => {
// Form entities from state to compare with GQL result
const stateEntities = ipldState[entityName];
for (const entityResult of entitiesResult) {
const stateEntity = stateEntities[entityResult.id];
// Verify state if entity from GQL result is present in state.
if (stateEntity) {
// Filter __typename key in GQL result.
entitiesResult = omitDeep(entityResult, '__typename');
// Filter skipped fields in state comaparison.
skipFields.forEach(({ entity, fields }) => {
if (entityName === entity) {
omitDeep(entityResult, fields);
omitDeep(stateEntity, fields);
}
});
const diff = compareObjects(entityResult, stateEntity, rawJson);
if (diff) {
return diff;
}
}
}
return '';
};
// obj1: expected
// obj2: actual
const compareObjects = (obj1: any, obj2: any, rawJson: boolean): string => {

View File

@ -100,6 +100,7 @@ export const instantiate = async (
database.cacheUpdatedEntity(entityName, dbEntity);
// Update the in-memory subgraph state if not disabled.
// TODO: enableSubgraphState
if (!indexer.serverConfig.disableSubgraphState) {
// Prepare diff data for the entity update
assert(indexer.getRelationsMap);

View File

@ -12,7 +12,7 @@ import { SelectionNode } from 'graphql';
import { ResultObject } from '@vulcanize/assemblyscript/lib/loader';
import { EthClient } from '@cerc-io/ipld-eth-client';
import { getFullBlock, BlockHeight, ServerConfig, getFullTransaction, QueryOptions, IPLDBlockInterface, IPLDIndexerInterface, BlockProgressInterface } from '@cerc-io/util';
import { getFullBlock, BlockHeight, ServerConfig, getFullTransaction, QueryOptions, IPLDBlockInterface, IndexerInterface, BlockProgressInterface } from '@cerc-io/util';
import { createBlock, createEvent, getSubgraphConfig, resolveEntityFieldConflicts, Transaction } from './utils';
import { Context, GraphData, instantiate } from './loader';
@ -28,7 +28,7 @@ interface DataSource {
export class GraphWatcher {
_database: Database;
_indexer?: IPLDIndexerInterface;
_indexer?: IndexerInterface;
_ethClient: EthClient;
_ethProvider: providers.BaseProvider;
_subgraphPath: string;
@ -254,7 +254,7 @@ export class GraphWatcher {
}
}
setIndexer (indexer: IPLDIndexerInterface): void {
setIndexer (indexer: IndexerInterface): void {
this._indexer = indexer;
}

View File

@ -7,7 +7,10 @@ import {
EventInterface,
SyncStatusInterface,
ServerConfig as ServerConfigInterface,
ValueResult
ValueResult,
ContractInterface,
IpldStatus as IpldStatusInterface,
IPLDBlockInterface
} from '@cerc-io/util';
import { EthClient } from '@cerc-io/ipld-eth-client';
import { GetStorageAt, getStorageValue, MappingKey, StorageLayout } from '@cerc-io/solidity-mapper';
@ -85,7 +88,7 @@ export class Indexer implements IndexerInterface {
return '';
}
async fetchBlockEvents (block: BlockProgressInterface): Promise<BlockProgressInterface> {
async fetchBlockWithEvents (block: BlockProgressInterface): Promise<BlockProgressInterface> {
return block;
}
@ -153,6 +156,22 @@ export class Indexer implements IndexerInterface {
async processEvent (event: EventInterface): Promise<void> {
assert(event);
}
isWatchedContract (address : string): ContractInterface | undefined {
return undefined;
}
async processBlock (blockProgress: BlockProgressInterface): Promise<void> {
return undefined;
}
getIPLDData (ipldBlock: IPLDBlockInterface): any {
return undefined;
}
async updateIPLDStatusMap (address: string, ipldStatus: IpldStatusInterface): Promise<void> {
return undefined;
}
}
class SyncStatus implements SyncStatusInterface {

View File

@ -6,7 +6,7 @@ import assert from 'assert';
import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions } from 'typeorm';
import path from 'path';
import { IPLDDatabase as BaseDatabase, IPLDDatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util';
import { Database as BaseDatabase, DatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util';
import { Contract } from './entity/Contract';
import { Event } from './entity/Event';
@ -18,7 +18,7 @@ import { IPLDBlock } from './entity/IPLDBlock';
import { GetMethod } from './entity/GetMethod';
import { _Test } from './entity/_Test';
export class Database implements IPLDDatabaseInterface {
export class Database implements DatabaseInterface {
_config: ConnectionOptions;
_conn!: Connection;
_baseDatabase: BaseDatabase;
@ -171,11 +171,23 @@ export class Database implements IPLDDatabaseInterface {
return this._baseDatabase.getBlockEvents(repo, blockHash, where, queryOptions);
}
async saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<BlockProgress> {
async saveBlockWithEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<BlockProgress> {
const blockRepo = queryRunner.manager.getRepository(BlockProgress);
const eventRepo = queryRunner.manager.getRepository(Event);
return this._baseDatabase.saveEvents(blockRepo, eventRepo, block, events);
return this._baseDatabase.saveBlockWithEvents(blockRepo, eventRepo, block, events);
}
async saveEvents (queryRunner: QueryRunner, events: Event[]): Promise<void> {
const eventRepo = queryRunner.manager.getRepository(Event);
return this._baseDatabase.saveEvents(eventRepo, events);
}
async saveBlockProgress (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>): Promise<BlockProgress> {
const repo = queryRunner.manager.getRepository(BlockProgress);
return this._baseDatabase.saveBlockProgress(repo, block);
}
async saveContract (queryRunner: QueryRunner, address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<Contract> {

View File

@ -16,7 +16,7 @@ import * as codec from '@ipld/dag-cbor';
import { EthClient } from '@cerc-io/ipld-eth-client';
import { StorageLayout, MappingKey } from '@cerc-io/solidity-mapper';
import {
IPLDIndexer as BaseIndexer,
Indexer as BaseIndexer,
ValueResult,
UNKNOWN_EVENT_NAME,
ServerConfig,
@ -27,8 +27,9 @@ import {
BlockHeight,
IPFSClient,
StateKind,
IPLDIndexerInterface,
IpldStatus as IpldStatusInterface
IndexerInterface,
IpldStatus as IpldStatusInterface,
ResultIPLDBlock
} from '@cerc-io/util';
import { GraphWatcher } from '@cerc-io/graph-node';
@ -74,21 +75,7 @@ export type ResultEvent = {
proof: string;
};
export type ResultIPLDBlock = {
block: {
cid: string;
hash: string;
number: number;
timestamp: number;
parentHash: string;
};
contractAddress: string;
cid: string;
kind: string;
data: string;
};
export class Indexer implements IPLDIndexerInterface {
export class Indexer implements IndexerInterface {
_db: Database
_ethClient: EthClient
_ethProvider: BaseProvider
@ -536,7 +523,7 @@ export class Indexer implements IPLDIndexerInterface {
return this._baseIndexer.getEventsByFilter(blockHash, contract, name);
}
async isWatchedContract (address : string): Promise<Contract | undefined> {
isWatchedContract (address : string): Contract | undefined {
return this._baseIndexer.isWatchedContract(address);
}
@ -588,8 +575,8 @@ export class Indexer implements IPLDIndexerInterface {
return this._baseIndexer.getBlocksAtHeight(height, isPruned);
}
async fetchBlockEvents (block: DeepPartial<BlockProgress>): Promise<BlockProgress> {
return this._baseIndexer.fetchBlockEvents(block, this._fetchAndSaveEvents.bind(this));
async fetchBlockWithEvents (block: DeepPartial<BlockProgress>): Promise<BlockProgress> {
return this._baseIndexer.fetchBlockWithEvents(block, this._fetchAndSaveEvents.bind(this));
}
async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<Event>> {
@ -820,7 +807,7 @@ export class Indexer implements IPLDIndexerInterface {
};
console.time('time:indexer#_fetchAndSaveEvents-save-block-events');
const blockProgress = await this._db.saveEvents(dbTx, block, dbEvents);
const blockProgress = await this._db.saveBlockWithEvents(dbTx, block, dbEvents);
await dbTx.commitTransaction();
console.timeEnd('time:indexer#_fetchAndSaveEvents-save-block-events');

View File

@ -0,0 +1,5 @@
/src/
index.ts
tsconfig.json
.eslintrc.json
.eslintignore

View File

@ -2,7 +2,6 @@
"name": "@cerc-io/ipld-eth-client",
"version": "0.1.0",
"description": "IPLD ETH Client",
"private": true,
"main": "dist/index.js",
"scripts": {
"lint": "eslint .",

View File

@ -6,7 +6,7 @@ import assert from 'assert';
import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions, LessThanOrEqual } from 'typeorm';
import path from 'path';
import { IPLDDatabase as BaseDatabase, IPLDDatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util';
import { Database as BaseDatabase, DatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util';
import { Contract } from './entity/Contract';
import { Event } from './entity/Event';
@ -20,7 +20,7 @@ import { IsRevoked } from './entity/IsRevoked';
import { IsPhisher } from './entity/IsPhisher';
import { IsMember } from './entity/IsMember';
export class Database implements IPLDDatabaseInterface {
export class Database implements DatabaseInterface {
_config: ConnectionOptions;
_conn!: Connection;
_baseDatabase: BaseDatabase;
@ -230,11 +230,23 @@ export class Database implements IPLDDatabaseInterface {
return this._baseDatabase.getBlockEvents(repo, blockHash, where, queryOptions);
}
async saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<BlockProgress> {
async saveBlockWithEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>, events: DeepPartial<Event>[]): Promise<BlockProgress> {
const blockRepo = queryRunner.manager.getRepository(BlockProgress);
const eventRepo = queryRunner.manager.getRepository(Event);
return this._baseDatabase.saveEvents(blockRepo, eventRepo, block, events);
return this._baseDatabase.saveBlockWithEvents(blockRepo, eventRepo, block, events);
}
async saveEvents (queryRunner: QueryRunner, events: Event[]): Promise<void> {
const eventRepo = queryRunner.manager.getRepository(Event);
return this._baseDatabase.saveEvents(eventRepo, events);
}
async saveBlockProgress (queryRunner: QueryRunner, block: DeepPartial<BlockProgress>): Promise<BlockProgress> {
const repo = queryRunner.manager.getRepository(BlockProgress);
return this._baseDatabase.saveBlockProgress(repo, block);
}
async saveContract (queryRunner: QueryRunner, address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<Contract> {

View File

@ -14,8 +14,8 @@ import * as codec from '@ipld/dag-cbor';
import { EthClient } from '@cerc-io/ipld-eth-client';
import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper';
import {
IPLDIndexer as BaseIndexer,
IPLDIndexerInterface,
Indexer as BaseIndexer,
IndexerInterface,
ValueResult,
UNKNOWN_EVENT_NAME,
ServerConfig,
@ -28,7 +28,8 @@ import {
IPFSClient,
StateKind,
IpldStatus as IpldStatusInterface,
getFullTransaction
getFullTransaction,
ResultIPLDBlock
} from '@cerc-io/util';
import PhisherRegistryArtifacts from './artifacts/PhisherRegistry.json';
@ -75,21 +76,7 @@ export type ResultEvent = {
proof: string;
};
export type ResultIPLDBlock = {
block: {
cid: string;
hash: string;
number: number;
timestamp: number;
parentHash: string;
};
contractAddress: string;
cid: string;
kind: string;
data: string;
};
export class Indexer implements IPLDIndexerInterface {
export class Indexer implements IndexerInterface {
_db: Database
_ethClient: EthClient
_ethProvider: JsonRpcProvider
@ -634,7 +621,7 @@ export class Indexer implements IPLDIndexerInterface {
return this._baseIndexer.getEventsByFilter(blockHash, contract, name);
}
async isWatchedContract (address : string): Promise<Contract | undefined> {
isWatchedContract (address : string): Contract | undefined {
return this._baseIndexer.isWatchedContract(address);
}
@ -686,8 +673,8 @@ export class Indexer implements IPLDIndexerInterface {
return this._baseIndexer.getBlocksAtHeight(height, isPruned);
}
async fetchBlockEvents (block: DeepPartial<BlockProgress>): Promise<BlockProgress> {
return this._baseIndexer.fetchBlockEvents(block, this._fetchAndSaveEvents.bind(this));
async fetchBlockWithEvents (block: DeepPartial<BlockProgress>): Promise<BlockProgress> {
return this._baseIndexer.fetchBlockWithEvents(block, this._fetchAndSaveEvents.bind(this));
}
async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<Event>> {
@ -842,7 +829,7 @@ export class Indexer implements IPLDIndexerInterface {
};
console.time('time:indexer#_fetchAndSaveEvents-save-block-events');
const blockProgress = await this._db.saveEvents(dbTx, block, dbEvents);
const blockProgress = await this._db.saveBlockWithEvents(dbTx, block, dbEvents);
await dbTx.commitTransaction();
console.timeEnd('time:indexer#_fetchAndSaveEvents-save-block-events');

View File

@ -0,0 +1,10 @@
/src/
hardhat.config.ts
tsconfig.json
.eslintrc.json
.eslintignore
.env
.env.example
cache
artifacts
test

View File

@ -3,6 +3,7 @@
"version": "0.1.0",
"main": "dist/index.js",
"license": "AGPL-3.0",
"private": true,
"scripts": {
"lint": "eslint .",
"build": "tsc",

6
packages/util/.npmignore Normal file
View File

@ -0,0 +1,6 @@
/src/
hardhat.config.ts
index.ts
tsconfig.json
.eslintrc.json
.eslintignore

View File

@ -14,8 +14,6 @@ export * from './src/indexer';
export * from './src/job-runner';
export * from './src/ipld-helper';
export * from './src/graph-decimal';
export * from './src/ipld-indexer';
export * from './src/ipld-database';
export * from './src/ipfs';
export * from './src/index-block';
export * from './src/metrics';

View File

@ -22,6 +22,8 @@ export interface JobQueueConfig {
maxCompletionLagInSecs: number;
jobDelayInMilliSecs?: number;
eventsInBatch: number;
lazyUpdateBlockProgress?: boolean;
subgraphEventsOrder: boolean;
}
export interface ServerConfig {

View File

@ -4,6 +4,7 @@
import assert from 'assert';
import {
Between,
Connection,
ConnectionOptions,
createConnection,
@ -18,8 +19,9 @@ import {
} from 'typeorm';
import { SnakeNamingStrategy } from 'typeorm-naming-strategies';
import _ from 'lodash';
import { Pool } from 'pg';
import { BlockProgressInterface, ContractInterface, EventInterface, SyncStatusInterface } from './types';
import { BlockProgressInterface, ContractInterface, EventInterface, IPLDBlockInterface, IpldStatusInterface, StateKind, SyncStatusInterface } from './types';
import { MAX_REORG_DEPTH, UNKNOWN_EVENT_NAME } from './constants';
import { blockProgressCount, eventCount } from './metrics';
@ -67,12 +69,22 @@ export type Relation = string | { property: string, alias: string }
export class Database {
_config: ConnectionOptions
_conn!: Connection
_pgPool: Pool
_blockCount = 0
_eventCount = 0
constructor (config: ConnectionOptions) {
assert(config);
this._config = config;
assert(config.type === 'postgres');
this._pgPool = new Pool({
user: config.username,
host: config.host,
database: config.database,
password: config.password,
port: config.port
});
}
async init (): Promise<Connection> {
@ -167,6 +179,13 @@ export class Database {
.getMany();
}
async saveBlockProgress (repo: Repository<BlockProgressInterface>, block: DeepPartial<BlockProgressInterface>): Promise<BlockProgressInterface> {
this._blockCount++;
blockProgressCount.set(this._blockCount);
return await repo.save(block);
}
async updateBlockProgress (repo: Repository<BlockProgressInterface>, block: BlockProgressInterface, lastProcessedEventIndex: number): Promise<BlockProgressInterface> {
if (!block.isComplete) {
if (lastProcessedEventIndex <= block.lastProcessedEventIndex) {
@ -222,7 +241,7 @@ export class Database {
return queryBuilder.getMany();
}
async saveEvents (blockRepo: Repository<BlockProgressInterface>, eventRepo: Repository<EventInterface>, block: DeepPartial<BlockProgressInterface>, events: DeepPartial<EventInterface>[]): Promise<BlockProgressInterface> {
async saveBlockWithEvents (blockRepo: Repository<BlockProgressInterface>, eventRepo: Repository<EventInterface>, block: DeepPartial<BlockProgressInterface>, events: DeepPartial<EventInterface>[]): Promise<BlockProgressInterface> {
const {
cid,
blockHash,
@ -258,17 +277,18 @@ export class Database {
this._blockCount++;
blockProgressCount.set(this._blockCount);
let blockEventCount = 0;
// Bulk insert events.
events.forEach(event => {
event.block = blockProgress;
if (event.eventName !== UNKNOWN_EVENT_NAME) {
blockEventCount++;
}
});
await this.saveEvents(eventRepo, events);
return blockProgress;
}
async saveEvents (eventRepo: Repository<EventInterface>, events: DeepPartial<EventInterface>[]): Promise<void> {
// Bulk insert events.
const eventBatches = _.chunk(events, INSERT_EVENTS_BATCH);
const insertPromises = eventBatches.map(async events => {
@ -280,10 +300,8 @@ export class Database {
});
await Promise.all(insertPromises);
this._eventCount += blockEventCount;
this._eventCount += events.filter(event => event.eventName !== UNKNOWN_EVENT_NAME).length;
eventCount.set(this._eventCount);
return blockProgress;
}
async getEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions<Entity>): Promise<Entity[]> {
@ -540,22 +558,246 @@ export class Database {
return repo.save(entity);
}
async _fetchBlockCount (): Promise<void> {
this._blockCount = await this._conn.getRepository('block_progress')
.count();
async getLatestIPLDBlock (repo: Repository<IPLDBlockInterface>, contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise<IPLDBlockInterface | undefined> {
let queryBuilder = repo.createQueryBuilder('ipld_block')
.leftJoinAndSelect('ipld_block.block', 'block')
.where('block.is_pruned = false')
.andWhere('ipld_block.contract_address = :contractAddress', { contractAddress })
.orderBy('block.block_number', 'DESC');
blockProgressCount.set(this._blockCount);
}
// Filter out blocks after the provided block number.
if (blockNumber) {
queryBuilder.andWhere('block.block_number <= :blockNumber', { blockNumber });
}
async _fetchEventCount (): Promise<void> {
this._eventCount = await this._conn.getRepository('event')
.count({
where: {
eventName: Not(UNKNOWN_EVENT_NAME)
// Filter using kind if specified else avoid diff_staged block.
queryBuilder = kind
? queryBuilder.andWhere('ipld_block.kind = :kind', { kind })
: queryBuilder.andWhere('ipld_block.kind != :kind', { kind: StateKind.DiffStaged });
// Get the first three entries.
queryBuilder.limit(3);
const results = await queryBuilder.getMany();
if (results.length) {
// Sort by (block number desc, id desc) to get the latest entry.
// At same height, IPLD blocks are expected in order ['init', 'diff', 'checkpoint'],
// and are given preference in order ['checkpoint', 'diff', 'init']
results.sort((result1, result2) => {
if (result1.block.blockNumber === result2.block.blockNumber) {
return (result1.id > result2.id) ? -1 : 1;
} else {
return (result1.block.blockNumber > result2.block.blockNumber) ? -1 : 1;
}
});
eventCount.set(this._eventCount);
return results[0];
}
}
async getPrevIPLDBlock (repo: Repository<IPLDBlockInterface>, blockHash: string, contractAddress: string, kind?: string): Promise<IPLDBlockInterface | undefined> {
const heirerchicalQuery = `
WITH RECURSIVE cte_query AS
(
SELECT
b.block_hash,
b.block_number,
b.parent_hash,
1 as depth,
i.id,
i.kind
FROM
block_progress b
LEFT JOIN
ipld_block i ON i.block_id = b.id
AND i.contract_address = $2
WHERE
b.block_hash = $1
UNION ALL
SELECT
b.block_hash,
b.block_number,
b.parent_hash,
c.depth + 1,
i.id,
i.kind
FROM
block_progress b
LEFT JOIN
ipld_block i
ON i.block_id = b.id
AND i.contract_address = $2
INNER JOIN
cte_query c ON c.parent_hash = b.block_hash
WHERE
c.depth < $3
)
SELECT
block_number, id, kind
FROM
cte_query
ORDER BY block_number DESC, id DESC
`;
// Fetching block and id for previous IPLDBlock in frothy region.
const queryResult = await repo.query(heirerchicalQuery, [blockHash, contractAddress, MAX_REORG_DEPTH]);
const latestRequiredResult = kind
? queryResult.find((obj: any) => obj.kind === kind)
: queryResult.find((obj: any) => obj.id);
let result: IPLDBlockInterface | undefined;
if (latestRequiredResult) {
result = await repo.findOne(latestRequiredResult.id, { relations: ['block'] });
} else {
// If IPLDBlock not found in frothy region get latest IPLDBlock in the pruned region.
// Filter out IPLDBlocks from pruned blocks.
const canonicalBlockNumber = queryResult.pop().block_number + 1;
let queryBuilder = repo.createQueryBuilder('ipld_block')
.leftJoinAndSelect('ipld_block.block', 'block')
.where('block.is_pruned = false')
.andWhere('ipld_block.contract_address = :contractAddress', { contractAddress })
.andWhere('block.block_number <= :canonicalBlockNumber', { canonicalBlockNumber })
.orderBy('block.block_number', 'DESC');
// Filter using kind if specified else order by id to give preference to checkpoint.
queryBuilder = kind
? queryBuilder.andWhere('ipld_block.kind = :kind', { kind })
: queryBuilder.addOrderBy('ipld_block.id', 'DESC');
// Get the first entry.
queryBuilder.limit(1);
result = await queryBuilder.getOne();
}
return result;
}
async getIPLDBlocks (repo: Repository<IPLDBlockInterface>, where: FindConditions<IPLDBlockInterface>): Promise<IPLDBlockInterface[]> {
return repo.find({ where, relations: ['block'] });
}
async getDiffIPLDBlocksInRange (repo: Repository<IPLDBlockInterface>, contractAddress: string, startblock: number, endBlock: number): Promise<IPLDBlockInterface[]> {
return repo.find({
relations: ['block'],
where: {
contractAddress,
kind: StateKind.Diff,
block: {
isPruned: false,
blockNumber: Between(startblock + 1, endBlock)
}
},
order: {
block: 'ASC'
}
});
}
async saveOrUpdateIPLDBlock (repo: Repository<IPLDBlockInterface>, ipldBlock: IPLDBlockInterface): Promise<IPLDBlockInterface> {
let updatedData: {[key: string]: any};
console.time('time:ipld-database#saveOrUpdateIPLDBlock-DB-query');
if (ipldBlock.id) {
// Using pg query as workaround for typeorm memory issue when saving checkpoint with large sized data.
const { rows } = await this._pgPool.query(`
UPDATE ipld_block
SET block_id = $1, contract_address = $2, cid = $3, kind = $4, data = $5
WHERE id = $6
RETURNING *
`, [ipldBlock.block.id, ipldBlock.contractAddress, ipldBlock.cid, ipldBlock.kind, ipldBlock.data, ipldBlock.id]);
updatedData = rows[0];
} else {
const { rows } = await this._pgPool.query(`
INSERT INTO ipld_block(block_id, contract_address, cid, kind, data)
VALUES($1, $2, $3, $4, $5)
RETURNING *
`, [ipldBlock.block.id, ipldBlock.contractAddress, ipldBlock.cid, ipldBlock.kind, ipldBlock.data]);
updatedData = rows[0];
}
console.timeEnd('time:ipld-database#saveOrUpdateIPLDBlock-DB-query');
assert(updatedData);
return {
block: ipldBlock.block,
contractAddress: updatedData.contract_address,
cid: updatedData.cid,
kind: updatedData.kind,
data: updatedData.data,
id: updatedData.id
};
}
async removeIPLDBlocks (repo: Repository<IPLDBlockInterface>, blockNumber: number, kind: string): Promise<void> {
const entities = await repo.find({ relations: ['block'], where: { block: { blockNumber }, kind } });
// Delete if entities found.
if (entities.length) {
await repo.delete(entities.map((entity) => entity.id));
}
}
async removeIPLDBlocksAfterBlock (repo: Repository<IPLDBlockInterface>, blockNumber: number): Promise<void> {
// Use raw SQL as TypeORM curently doesn't support delete via 'join' or 'using'
const deleteQuery = `
DELETE FROM
ipld_block
USING block_progress
WHERE
ipld_block.block_id = block_progress.id
AND block_progress.block_number > $1;
`;
await repo.query(deleteQuery, [blockNumber]);
}
async getIPLDStatus (repo: Repository<IpldStatusInterface>): Promise<IpldStatusInterface | undefined> {
return repo.findOne();
}
async updateIPLDStatusHooksBlock (repo: Repository<IpldStatusInterface>, blockNumber: number, force?: boolean): Promise<IpldStatusInterface> {
let entity = await repo.findOne();
if (!entity) {
entity = repo.create({
latestHooksBlockNumber: blockNumber,
latestCheckpointBlockNumber: -1,
latestIPFSBlockNumber: -1
});
}
if (force || blockNumber > entity.latestHooksBlockNumber) {
entity.latestHooksBlockNumber = blockNumber;
}
return repo.save(entity);
}
async updateIPLDStatusCheckpointBlock (repo: Repository<IpldStatusInterface>, blockNumber: number, force?: boolean): Promise<IpldStatusInterface> {
const entity = await repo.findOne();
assert(entity);
if (force || blockNumber > entity.latestCheckpointBlockNumber) {
entity.latestCheckpointBlockNumber = blockNumber;
}
return repo.save(entity);
}
async updateIPLDStatusIPFSBlock (repo: Repository<IpldStatusInterface>, blockNumber: number, force?: boolean): Promise<IpldStatusInterface> {
const entity = await repo.findOne();
assert(entity);
if (force || blockNumber > entity.latestIPFSBlockNumber) {
entity.latestIPFSBlockNumber = blockNumber;
}
return repo.save(entity);
}
buildQuery<Entity> (repo: Repository<Entity>, selectQueryBuilder: SelectQueryBuilder<Entity>, where: Where = {}): SelectQueryBuilder<Entity> {
@ -622,7 +864,8 @@ export class Database {
orderQuery<Entity> (
repo: Repository<Entity>,
selectQueryBuilder: SelectQueryBuilder<Entity>,
orderOptions: { orderBy?: string, orderDirection?: string }
orderOptions: { orderBy?: string, orderDirection?: string },
columnPrefix = ''
): SelectQueryBuilder<Entity> {
const { orderBy, orderDirection } = orderOptions;
assert(orderBy);
@ -631,8 +874,26 @@ export class Database {
assert(columnMetadata);
return selectQueryBuilder.addOrderBy(
`${selectQueryBuilder.alias}.${columnMetadata.propertyAliasName}`,
`"${selectQueryBuilder.alias}"."${columnPrefix}${columnMetadata.databaseName}"`,
orderDirection === 'desc' ? 'DESC' : 'ASC'
);
}
async _fetchBlockCount (): Promise<void> {
this._blockCount = await this._conn.getRepository('block_progress')
.count();
blockProgressCount.set(this._blockCount);
}
async _fetchEventCount (): Promise<void> {
this._eventCount = await this._conn.getRepository('event')
.count({
where: {
eventName: Not(UNKNOWN_EVENT_NAME)
}
});
eventCount.set(this._eventCount);
}
}

View File

@ -130,7 +130,7 @@ const prefetchBlocks = async (
const blockProgress = await indexer.getBlockProgress(blockHash);
if (!blockProgress) {
await indexer.fetchBlockEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp: timestamp });
await indexer.fetchBlockWithEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp: timestamp });
}
});

View File

@ -36,7 +36,7 @@ export const indexBlock = async (
// Check if blockProgress fetched from database.
if (!partialblockProgress.id) {
blockProgress = await indexer.fetchBlockEvents(partialblockProgress);
blockProgress = await indexer.fetchBlockWithEvents(partialblockProgress);
} else {
blockProgress = partialblockProgress as BlockProgressInterface;
}

View File

@ -6,14 +6,20 @@ import assert from 'assert';
import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm';
import debug from 'debug';
import { ethers } from 'ethers';
import _ from 'lodash';
import { sha256 } from 'multiformats/hashes/sha2';
import { CID } from 'multiformats/cid';
import * as codec from '@ipld/dag-cbor';
import { EthClient } from '@cerc-io/ipld-eth-client';
import { GetStorageAt, getStorageValue, StorageLayout } from '@cerc-io/solidity-mapper';
import { BlockProgressInterface, DatabaseInterface, EventInterface, SyncStatusInterface, ContractInterface } from './types';
import { BlockProgressInterface, DatabaseInterface, EventInterface, SyncStatusInterface, ContractInterface, IPLDBlockInterface, IndexerInterface, StateKind } from './types';
import { UNKNOWN_EVENT_NAME, JOB_KIND_CONTRACT, QUEUE_EVENT_PROCESSING } from './constants';
import { JobQueue } from './job-queue';
import { Where, QueryOptions } from './database';
import { ServerConfig } from './config';
import { IPFSClient } from './ipfs';
const DEFAULT_MAX_EVENTS_BLOCK_RANGE = 1000;
@ -26,20 +32,54 @@ export interface ValueResult {
}
}
export interface IpldStatus {
init?: number;
diff?: number;
checkpoint?: number;
// eslint-disable-next-line camelcase
diff_staged?: number;
}
export type ResultIPLDBlock = {
block: {
cid: string;
hash: string;
number: number;
timestamp: number;
parentHash: string;
};
contractAddress: string;
cid: string;
kind: string;
data: string;
};
export class Indexer {
_serverConfig: ServerConfig;
_db: DatabaseInterface;
_ethClient: EthClient;
_getStorageAt: GetStorageAt;
_ethProvider: ethers.providers.BaseProvider;
_jobQueue: JobQueue;
_ipfsClient: IPFSClient;
_watchedContracts: { [key: string]: ContractInterface } = {};
_ipldStatusMap: { [key: string]: IpldStatus } = {};
constructor (db: DatabaseInterface, ethClient: EthClient, ethProvider: ethers.providers.BaseProvider, jobQueue: JobQueue) {
constructor (
serverConfig: ServerConfig,
db: DatabaseInterface,
ethClient: EthClient,
ethProvider: ethers.providers.BaseProvider,
jobQueue: JobQueue,
ipfsClient: IPFSClient
) {
this._serverConfig = serverConfig;
this._db = db;
this._ethClient = ethClient;
this._ethProvider = ethProvider;
this._jobQueue = jobQueue;
this._ipfsClient = ipfsClient;
this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient);
}
@ -192,7 +232,7 @@ export class Indexer {
return this._db.getEvent(id);
}
async fetchBlockEvents (block: DeepPartial<BlockProgressInterface>, fetchAndSaveEvents: (block: DeepPartial<BlockProgressInterface>) => Promise<BlockProgressInterface>): Promise<BlockProgressInterface> {
async fetchBlockWithEvents (block: DeepPartial<BlockProgressInterface>, fetchAndSaveEvents: (block: DeepPartial<BlockProgressInterface>) => Promise<BlockProgressInterface>): Promise<BlockProgressInterface> {
assert(block.blockHash);
log(`getBlockEvents: fetching from upstream server ${block.blockHash}`);
@ -202,6 +242,35 @@ export class Indexer {
return blockProgress;
}
async fetchBlockEvents (block: DeepPartial<BlockProgressInterface>, fetchEvents: (block: DeepPartial<BlockProgressInterface>) => Promise<DeepPartial<EventInterface>[]>): Promise<DeepPartial<EventInterface>[]> {
assert(block.blockHash);
log(`getBlockEvents: fetching from upstream server ${block.blockHash}`);
console.time('time:indexer#fetchBlockEvents-fetchAndSaveEvents');
const events = await fetchEvents(block);
console.timeEnd('time:indexer#fetchBlockEvents-fetchAndSaveEvents');
log(`getBlockEvents: fetched for block: ${block.blockHash} num events: ${events.length}`);
return events;
}
async saveBlockProgress (block: DeepPartial<BlockProgressInterface>): Promise<BlockProgressInterface> {
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.saveBlockProgress(dbTx, block);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
}
async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<EventInterface>> {
return this._db.getBlockEvents(blockHash, where, queryOptions);
}
@ -288,6 +357,20 @@ export class Indexer {
return res;
}
async saveEvents (dbEvents: EventInterface[]): Promise<void> {
const dbTx = await this._db.createTransactionRunner();
try {
await this._db.saveEvents(dbTx, dbEvents);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
}
async getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }> {
return this._db.getProcessedBlockCountForRange(fromBlockNumber, toBlockNumber);
}
@ -304,7 +387,7 @@ export class Indexer {
return this._db.getEventsInRange(fromBlockNumber, toBlockNumber);
}
async isWatchedContract (address : string): Promise<ContractInterface | undefined> {
isWatchedContract (address : string): ContractInterface | undefined {
return this._watchedContracts[address];
}
@ -364,6 +447,453 @@ export class Indexer {
);
}
getIPLDData (ipldBlock: IPLDBlockInterface): any {
return codec.decode(Buffer.from(ipldBlock.data));
}
async pushToIPFS (data: any): Promise<void> {
await this._ipfsClient.push(data);
}
isIPFSConfigured (): boolean {
const ipfsAddr = this._serverConfig.ipfsApiAddr;
// Return false if ipfsAddr is undefined | null | empty string.
return (ipfsAddr !== undefined && ipfsAddr !== null && ipfsAddr !== '');
}
async getLatestHooksProcessedBlock (): Promise<BlockProgressInterface> {
// Get current hookStatus.
const ipldStatus = await this._db.getIPLDStatus();
assert(ipldStatus, 'IPLD status not found');
// Get all the blocks at height hookStatus.latestProcessedBlockNumber.
const blocksAtHeight = await this.getBlocksAtHeight(ipldStatus.latestHooksBlockNumber, false);
// There can exactly one block at hookStatus.latestProcessedBlockNumber height.
assert(blocksAtHeight.length === 1);
return blocksAtHeight[0];
}
async processCheckpoint (indexer: IndexerInterface, blockHash: string, checkpointInterval: number): Promise<void> {
// Get all the contracts.
const contracts = Object.values(this._watchedContracts);
// Getting the block for checkpoint.
const block = await this.getBlockProgress(blockHash);
assert(block);
// For each contract, merge the diff till now to create a checkpoint.
for (const contract of contracts) {
// Get IPLD status for the contract.
const ipldStatus = this._ipldStatusMap[contract.address];
assert(ipldStatus, `IPLD status for contract ${contract.address} not found`);
const initBlockNumber = ipldStatus.init;
// Check if contract has checkpointing on.
// Check if it's time for a checkpoint or the init is in current block.
if (
contract.checkpoint &&
(block.blockNumber % checkpointInterval === 0 || initBlockNumber === block.blockNumber)
) {
await this.createCheckpoint(indexer, contract.address, block);
}
}
}
async processCLICheckpoint (indexer: IndexerInterface, contractAddress: string, blockHash?: string): Promise<string | undefined> {
// Getting the block for checkpoint.
let block;
if (blockHash) {
block = await this.getBlockProgress(blockHash);
} else {
// In case of empty blockHash from checkpoint CLI, get the latest processed block from hookStatus for the checkpoint.
block = await this.getLatestHooksProcessedBlock();
}
assert(block);
const checkpointBlockHash = await this.createCheckpoint(indexer, contractAddress, block);
assert(checkpointBlockHash, 'Checkpoint not created');
// Push checkpoint to IPFS if configured.
if (this.isIPFSConfigured()) {
const checkpointIPLDBlocks = await this._db.getIPLDBlocks({ block, contractAddress, kind: StateKind.Checkpoint });
// There can be at most one IPLDBlock for a (block, contractAddress, kind) combination.
assert(checkpointIPLDBlocks.length <= 1);
const checkpointIPLDBlock = checkpointIPLDBlocks[0];
const checkpointData = this.getIPLDData(checkpointIPLDBlock);
await this.pushToIPFS(checkpointData);
}
return checkpointBlockHash;
}
async createStateCheckpoint (contractAddress: string, block: BlockProgressInterface, data: any): Promise<void> {
// Get the contract.
const contract = this._watchedContracts[contractAddress];
assert(contract, `Contract ${contractAddress} not watched`);
if (block.blockNumber < contract.startingBlock) {
return;
}
// Create a checkpoint from the hook data without being concerned about diffs.
const ipldBlock = await this.prepareIPLDBlock(block, contractAddress, data, StateKind.Checkpoint);
await this.saveOrUpdateIPLDBlock(ipldBlock);
}
async createInit (
indexer: IndexerInterface,
blockHash: string,
blockNumber: number
): Promise<void> {
// Get all the contracts.
const contracts = Object.values(this._watchedContracts);
// Create an initial state for each contract.
for (const contract of contracts) {
// Check if contract has checkpointing on.
if (contract.checkpoint) {
// Check if starting block not reached yet.
if (blockNumber < contract.startingBlock) {
continue;
}
// Get IPLD status for the contract.
const ipldStatus = this._ipldStatusMap[contract.address];
assert(ipldStatus, `IPLD status for contract ${contract.address} not found`);
// Check if a 'init' IPLDBlock already exists.
// Or if a 'diff' IPLDBlock already exists.
// Or if a 'checkpoint' IPLDBlock already exists.
// (A watcher with imported state won't have an init IPLDBlock, but it will have the imported checkpoint)
if (
ipldStatus.init ||
ipldStatus.diff ||
ipldStatus.checkpoint
) {
continue;
}
// Call initial state hook.
assert(indexer.processInitialState);
const stateData = await indexer.processInitialState(contract.address, blockHash);
const block = await this.getBlockProgress(blockHash);
assert(block);
const ipldBlock = await this.prepareIPLDBlock(block, contract.address, stateData, StateKind.Init);
await this.saveOrUpdateIPLDBlock(ipldBlock);
// Push initial state to IPFS if configured.
if (this.isIPFSConfigured()) {
const ipldData = this.getIPLDData(ipldBlock);
await this.pushToIPFS(ipldData);
}
}
}
}
async createDiffStaged (contractAddress: string, blockHash: string, data: any): Promise<void> {
const block = await this.getBlockProgress(blockHash);
assert(block);
// Get the contract.
const contract = this._watchedContracts[contractAddress];
assert(contract, `Contract ${contractAddress} not watched`);
if (block.blockNumber < contract.startingBlock) {
return;
}
// Create a staged diff block.
const ipldBlock = await this.prepareIPLDBlock(block, contractAddress, data, StateKind.DiffStaged);
await this.saveOrUpdateIPLDBlock(ipldBlock);
}
async finalizeDiffStaged (blockHash: string): Promise<void> {
const block = await this.getBlockProgress(blockHash);
assert(block);
// Get all the staged diff blocks for the given blockHash.
const stagedBlocks = await this._db.getIPLDBlocks({ block, kind: StateKind.DiffStaged });
// For each staged block, create a diff block.
for (const stagedBlock of stagedBlocks) {
const data = codec.decode(Buffer.from(stagedBlock.data));
await this.createDiff(stagedBlock.contractAddress, block, data);
}
// Remove all the staged diff blocks for current blockNumber.
// (Including staged diff blocks associated with pruned blocks)
await this.removeIPLDBlocks(block.blockNumber, StateKind.DiffStaged);
}
async createDiff (contractAddress: string, block: BlockProgressInterface, data: any): Promise<void> {
// Get the contract.
const contract = this._watchedContracts[contractAddress];
assert(contract, `Contract ${contractAddress} not watched`);
if (block.blockNumber < contract.startingBlock) {
return;
}
// Get IPLD status for the contract.
const ipldStatus = this._ipldStatusMap[contractAddress];
assert(ipldStatus, `IPLD status for contract ${contractAddress} not found`);
// Get the latest checkpoint block number.
const checkpointBlockNumber = ipldStatus.checkpoint;
if (!checkpointBlockNumber) {
// Get the initial state block number.
const initBlockNumber = ipldStatus.init;
// There should be an initial state at least.
assert(initBlockNumber, 'No initial state found');
} else if (checkpointBlockNumber === block.blockNumber) {
// Check if the latest checkpoint is in the same block if block number is same.
const checkpoint = await this._db.getLatestIPLDBlock(contractAddress, StateKind.Checkpoint);
assert(checkpoint);
assert(checkpoint.block.blockHash !== block.blockHash, 'Checkpoint already created for the block hash');
}
const ipldBlock = await this.prepareIPLDBlock(block, contractAddress, data, StateKind.Diff);
await this.saveOrUpdateIPLDBlock(ipldBlock);
}
async createCheckpoint (indexer: IndexerInterface, contractAddress: string, currentBlock: BlockProgressInterface): Promise<string | undefined> {
// Get the contract.
const contract = this._watchedContracts[contractAddress];
assert(contract, `Contract ${contractAddress} not watched`);
if (currentBlock.blockNumber < contract.startingBlock) {
return;
}
// Make sure the block is marked complete.
assert(currentBlock.isComplete, 'Block for a checkpoint should be marked as complete');
// Get current hookStatus.
const ipldStatus = await this._db.getIPLDStatus();
assert(ipldStatus);
// Make sure the hooks have been processed for the block.
assert(currentBlock.blockNumber <= ipldStatus.latestHooksBlockNumber, 'Block for a checkpoint should have hooks processed');
// Call state checkpoint hook and check if default checkpoint is disabled.
assert(indexer.processStateCheckpoint);
const disableDefaultCheckpoint = await indexer.processStateCheckpoint(contractAddress, currentBlock.blockHash);
if (disableDefaultCheckpoint) {
// Return if default checkpoint is disabled.
// Return block hash for checkpoint CLI.
return currentBlock.blockHash;
}
// Fetch the latest 'checkpoint' | 'init' for the contract to fetch diffs after it.
let prevNonDiffBlock: IPLDBlockInterface;
let diffStartBlockNumber: number;
const checkpointBlock = await this._db.getLatestIPLDBlock(contractAddress, StateKind.Checkpoint, currentBlock.blockNumber - 1);
if (checkpointBlock) {
const checkpointBlockNumber = checkpointBlock.block.blockNumber;
prevNonDiffBlock = checkpointBlock;
diffStartBlockNumber = checkpointBlockNumber;
// Update IPLD status map with the latest checkpoint info.
// Essential while importing state as checkpoint at the snapshot block is added by import-state CLI.
// (job-runner won't have the updated ipld status)
this.updateIPLDStatusMap(contractAddress, { checkpoint: checkpointBlockNumber });
} else {
// There should be an initial state at least.
const initBlock = await this._db.getLatestIPLDBlock(contractAddress, StateKind.Init);
assert(initBlock, 'No initial state found');
prevNonDiffBlock = initBlock;
// Take block number previous to initial state block as the checkpoint is to be created in the same block.
diffStartBlockNumber = initBlock.block.blockNumber - 1;
}
// Fetching all diff blocks after the latest 'checkpoint' | 'init'.
const diffBlocks = await this._db.getDiffIPLDBlocksInRange(contractAddress, diffStartBlockNumber, currentBlock.blockNumber);
const prevNonDiffBlockData = codec.decode(Buffer.from(prevNonDiffBlock.data)) as any;
const data = {
state: prevNonDiffBlockData.state
};
for (const diffBlock of diffBlocks) {
const diff = codec.decode(Buffer.from(diffBlock.data)) as any;
data.state = _.merge(data.state, diff.state);
}
const ipldBlock = await this.prepareIPLDBlock(currentBlock, contractAddress, data, StateKind.Checkpoint);
await this.saveOrUpdateIPLDBlock(ipldBlock);
return currentBlock.blockHash;
}
async prepareIPLDBlock (block: BlockProgressInterface, contractAddress: string, data: any, kind: StateKind):Promise<any> {
console.time('time:ipld-indexer#prepareIPLDBlock');
let ipldBlock: IPLDBlockInterface;
// Get IPLD status for the contract.
const ipldStatus = this._ipldStatusMap[contractAddress];
assert(ipldStatus, `IPLD status for contract ${contractAddress} not found`);
// Get an existing 'init' | 'diff' | 'diff_staged' | 'checkpoint' IPLDBlock for current block, contractAddress to update.
let currentIPLDBlock: IPLDBlockInterface | undefined;
const prevIPLDBlockNumber = ipldStatus[kind];
// Fetch from DB for previous IPLD block or for checkpoint kind.
if (kind === 'checkpoint' || (prevIPLDBlockNumber && prevIPLDBlockNumber === block.blockNumber)) {
const currentIPLDBlocks = await this._db.getIPLDBlocks({ block, contractAddress, kind });
// There can be at most one IPLDBlock for a (block, contractAddress, kind) combination.
assert(currentIPLDBlocks.length <= 1);
currentIPLDBlock = currentIPLDBlocks[0];
}
if (currentIPLDBlock) {
// Update current IPLDBlock of same kind if it exists.
ipldBlock = currentIPLDBlock;
// Update the data field.
const oldData = codec.decode(Buffer.from(ipldBlock.data));
data = _.merge(oldData, data);
} else {
// Create a new IPLDBlock instance.
ipldBlock = this._db.getNewIPLDBlock();
// Fetch the parent IPLDBlock.
const parentIPLDBlock = await this._db.getLatestIPLDBlock(contractAddress, null, block.blockNumber);
// Setting the meta-data for an IPLDBlock (done only once per IPLD block).
data.meta = {
id: contractAddress,
kind,
parent: {
'/': parentIPLDBlock ? parentIPLDBlock.cid : null
},
ethBlock: {
cid: {
'/': block.cid
},
num: block.blockNumber
}
};
}
// Encoding the data using dag-cbor codec.
const bytes = codec.encode(data);
// Calculating sha256 (multi)hash of the encoded data.
const hash = await sha256.digest(bytes);
// Calculating the CID: v1, code: dag-cbor, hash.
const cid = CID.create(1, codec.code, hash);
// Update ipldBlock with new data.
ipldBlock = Object.assign(ipldBlock, {
block,
contractAddress,
cid: cid.toString(),
kind: data.meta.kind,
data: Buffer.from(bytes)
});
console.timeEnd('time:ipld-indexer#prepareIPLDBlock');
return ipldBlock;
}
async getIPLDBlocksByHash (blockHash: string): Promise<IPLDBlockInterface[]> {
const block = await this.getBlockProgress(blockHash);
assert(block);
return this._db.getIPLDBlocks({ block });
}
async getIPLDBlockByCid (cid: string): Promise<IPLDBlockInterface | undefined> {
const ipldBlocks = await this._db.getIPLDBlocks({ cid });
// There can be only one IPLDBlock with a particular cid.
assert(ipldBlocks.length <= 1);
return ipldBlocks[0];
}
async saveOrUpdateIPLDBlock (ipldBlock: IPLDBlockInterface): Promise<IPLDBlockInterface> {
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._db.saveOrUpdateIPLDBlock(dbTx, ipldBlock);
await dbTx.commitTransaction();
// Get IPLD status for the contract.
const ipldStatus = this._ipldStatusMap[res.contractAddress];
assert(ipldStatus, `IPLD status for contract ${res.contractAddress} not found`);
// Update the IPLD status for the kind.
ipldStatus[res.kind] = res.block.blockNumber;
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
}
async removeIPLDBlocks (blockNumber: number, kind: StateKind): Promise<void> {
const dbTx = await this._db.createTransactionRunner();
try {
await this._db.removeIPLDBlocks(dbTx, blockNumber, kind);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
}
async fetchIPLDStatus (): Promise<void> {
const contracts = Object.values(this._watchedContracts);
for (const contract of contracts) {
const initIPLDBlock = await this._db.getLatestIPLDBlock(contract.address, StateKind.Init);
const diffIPLDBlock = await this._db.getLatestIPLDBlock(contract.address, StateKind.Diff);
const diffStagedIPLDBlock = await this._db.getLatestIPLDBlock(contract.address, StateKind.DiffStaged);
const checkpointIPLDBlock = await this._db.getLatestIPLDBlock(contract.address, StateKind.Checkpoint);
this._ipldStatusMap[contract.address] = {
init: initIPLDBlock?.block.blockNumber,
diff: diffIPLDBlock?.block.blockNumber,
diff_staged: diffStagedIPLDBlock?.block.blockNumber,
checkpoint: checkpointIPLDBlock?.block.blockNumber
};
}
}
async updateIPLDStatusMap (address: string, ipldStatus: IpldStatus): Promise<void> {
// Get and update IPLD status for the contract.
const ipldStatusOld = this._ipldStatusMap[address];
this._ipldStatusMap[address] = _.merge(ipldStatusOld, ipldStatus);
}
parseEvent (logDescription: ethers.utils.LogDescription): { eventName: string, eventInfo: any } {
const eventName = logDescription.name;

View File

@ -1,270 +0,0 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import { Between, ConnectionOptions, FindConditions, Repository } from 'typeorm';
import assert from 'assert';
import { Pool } from 'pg';
import { IPLDBlockInterface, IpldStatusInterface, StateKind } from './types';
import { Database } from './database';
import { MAX_REORG_DEPTH } from './constants';
export class IPLDDatabase extends Database {
_pgPool: Pool
constructor (config: ConnectionOptions) {
super(config);
assert(config.type === 'postgres');
this._pgPool = new Pool({
user: config.username,
host: config.host,
database: config.database,
password: config.password,
port: config.port
});
}
async getLatestIPLDBlock (repo: Repository<IPLDBlockInterface>, contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise<IPLDBlockInterface | undefined> {
let queryBuilder = repo.createQueryBuilder('ipld_block')
.leftJoinAndSelect('ipld_block.block', 'block')
.where('block.is_pruned = false')
.andWhere('ipld_block.contract_address = :contractAddress', { contractAddress })
.orderBy('block.block_number', 'DESC');
// Filter out blocks after the provided block number.
if (blockNumber) {
queryBuilder.andWhere('block.block_number <= :blockNumber', { blockNumber });
}
// Filter using kind if specified else avoid diff_staged block.
queryBuilder = kind
? queryBuilder.andWhere('ipld_block.kind = :kind', { kind })
: queryBuilder.andWhere('ipld_block.kind != :kind', { kind: StateKind.DiffStaged });
// Get the first three entries.
queryBuilder.limit(3);
const results = await queryBuilder.getMany();
if (results.length) {
// Sort by (block number desc, id desc) to get the latest entry.
// At same height, IPLD blocks are expected in order ['init', 'diff', 'checkpoint'],
// and are given preference in order ['checkpoint', 'diff', 'init']
results.sort((result1, result2) => {
if (result1.block.blockNumber === result2.block.blockNumber) {
return (result1.id > result2.id) ? -1 : 1;
} else {
return (result1.block.blockNumber > result2.block.blockNumber) ? -1 : 1;
}
});
return results[0];
}
}
async getPrevIPLDBlock (repo: Repository<IPLDBlockInterface>, blockHash: string, contractAddress: string, kind?: string): Promise<IPLDBlockInterface | undefined> {
const heirerchicalQuery = `
WITH RECURSIVE cte_query AS
(
SELECT
b.block_hash,
b.block_number,
b.parent_hash,
1 as depth,
i.id,
i.kind
FROM
block_progress b
LEFT JOIN
ipld_block i ON i.block_id = b.id
AND i.contract_address = $2
WHERE
b.block_hash = $1
UNION ALL
SELECT
b.block_hash,
b.block_number,
b.parent_hash,
c.depth + 1,
i.id,
i.kind
FROM
block_progress b
LEFT JOIN
ipld_block i
ON i.block_id = b.id
AND i.contract_address = $2
INNER JOIN
cte_query c ON c.parent_hash = b.block_hash
WHERE
c.depth < $3
)
SELECT
block_number, id, kind
FROM
cte_query
ORDER BY block_number DESC, id DESC
`;
// Fetching block and id for previous IPLDBlock in frothy region.
const queryResult = await repo.query(heirerchicalQuery, [blockHash, contractAddress, MAX_REORG_DEPTH]);
const latestRequiredResult = kind
? queryResult.find((obj: any) => obj.kind === kind)
: queryResult.find((obj: any) => obj.id);
let result: IPLDBlockInterface | undefined;
if (latestRequiredResult) {
result = await repo.findOne(latestRequiredResult.id, { relations: ['block'] });
} else {
// If IPLDBlock not found in frothy region get latest IPLDBlock in the pruned region.
// Filter out IPLDBlocks from pruned blocks.
const canonicalBlockNumber = queryResult.pop().block_number + 1;
let queryBuilder = repo.createQueryBuilder('ipld_block')
.leftJoinAndSelect('ipld_block.block', 'block')
.where('block.is_pruned = false')
.andWhere('ipld_block.contract_address = :contractAddress', { contractAddress })
.andWhere('block.block_number <= :canonicalBlockNumber', { canonicalBlockNumber })
.orderBy('block.block_number', 'DESC');
// Filter using kind if specified else order by id to give preference to checkpoint.
queryBuilder = kind
? queryBuilder.andWhere('ipld_block.kind = :kind', { kind })
: queryBuilder.addOrderBy('ipld_block.id', 'DESC');
// Get the first entry.
queryBuilder.limit(1);
result = await queryBuilder.getOne();
}
return result;
}
async getIPLDBlocks (repo: Repository<IPLDBlockInterface>, where: FindConditions<IPLDBlockInterface>): Promise<IPLDBlockInterface[]> {
return repo.find({ where, relations: ['block'] });
}
async getDiffIPLDBlocksInRange (repo: Repository<IPLDBlockInterface>, contractAddress: string, startblock: number, endBlock: number): Promise<IPLDBlockInterface[]> {
return repo.find({
relations: ['block'],
where: {
contractAddress,
kind: StateKind.Diff,
block: {
isPruned: false,
blockNumber: Between(startblock + 1, endBlock)
}
},
order: {
block: 'ASC'
}
});
}
async saveOrUpdateIPLDBlock (repo: Repository<IPLDBlockInterface>, ipldBlock: IPLDBlockInterface): Promise<IPLDBlockInterface> {
let updatedData: {[key: string]: any};
console.time('time:ipld-database#saveOrUpdateIPLDBlock-DB-query');
if (ipldBlock.id) {
// Using pg query as workaround for typeorm memory issue when saving checkpoint with large sized data.
const { rows } = await this._pgPool.query(`
UPDATE ipld_block
SET block_id = $1, contract_address = $2, cid = $3, kind = $4, data = $5
WHERE id = $6
RETURNING *
`, [ipldBlock.block.id, ipldBlock.contractAddress, ipldBlock.cid, ipldBlock.kind, ipldBlock.data, ipldBlock.id]);
updatedData = rows[0];
} else {
const { rows } = await this._pgPool.query(`
INSERT INTO ipld_block(block_id, contract_address, cid, kind, data)
VALUES($1, $2, $3, $4, $5)
RETURNING *
`, [ipldBlock.block.id, ipldBlock.contractAddress, ipldBlock.cid, ipldBlock.kind, ipldBlock.data]);
updatedData = rows[0];
}
console.timeEnd('time:ipld-database#saveOrUpdateIPLDBlock-DB-query');
assert(updatedData);
return {
block: ipldBlock.block,
contractAddress: updatedData.contract_address,
cid: updatedData.cid,
kind: updatedData.kind,
data: updatedData.data,
id: updatedData.id
};
}
async removeIPLDBlocks (repo: Repository<IPLDBlockInterface>, blockNumber: number, kind: string): Promise<void> {
const entities = await repo.find({ relations: ['block'], where: { block: { blockNumber }, kind } });
// Delete if entities found.
if (entities.length) {
await repo.delete(entities.map((entity) => entity.id));
}
}
async removeIPLDBlocksAfterBlock (repo: Repository<IPLDBlockInterface>, blockNumber: number): Promise<void> {
// Use raw SQL as TypeORM curently doesn't support delete via 'join' or 'using'
const deleteQuery = `
DELETE FROM
ipld_block
USING block_progress
WHERE
ipld_block.block_id = block_progress.id
AND block_progress.block_number > $1;
`;
await repo.query(deleteQuery, [blockNumber]);
}
async getIPLDStatus (repo: Repository<IpldStatusInterface>): Promise<IpldStatusInterface | undefined> {
return repo.findOne();
}
async updateIPLDStatusHooksBlock (repo: Repository<IpldStatusInterface>, blockNumber: number, force?: boolean): Promise<IpldStatusInterface> {
let entity = await repo.findOne();
if (!entity) {
entity = repo.create({
latestHooksBlockNumber: blockNumber,
latestCheckpointBlockNumber: -1,
latestIPFSBlockNumber: -1
});
}
if (force || blockNumber > entity.latestHooksBlockNumber) {
entity.latestHooksBlockNumber = blockNumber;
}
return repo.save(entity);
}
async updateIPLDStatusCheckpointBlock (repo: Repository<IpldStatusInterface>, blockNumber: number, force?: boolean): Promise<IpldStatusInterface> {
const entity = await repo.findOne();
assert(entity);
if (force || blockNumber > entity.latestCheckpointBlockNumber) {
entity.latestCheckpointBlockNumber = blockNumber;
}
return repo.save(entity);
}
async updateIPLDStatusIPFSBlock (repo: Repository<IpldStatusInterface>, blockNumber: number, force?: boolean): Promise<IpldStatusInterface> {
const entity = await repo.findOne();
assert(entity);
if (force || blockNumber > entity.latestIPFSBlockNumber) {
entity.latestIPFSBlockNumber = blockNumber;
}
return repo.save(entity);
}
}

View File

@ -1,501 +0,0 @@
//
// Copyright 2021 Vulcanize, Inc.
//
import assert from 'assert';
import { ethers } from 'ethers';
import { sha256 } from 'multiformats/hashes/sha2';
import { CID } from 'multiformats/cid';
import _ from 'lodash';
import { EthClient } from '@cerc-io/ipld-eth-client';
import * as codec from '@ipld/dag-cbor';
import {
IPLDDatabaseInterface,
IndexerInterface,
BlockProgressInterface,
IPLDBlockInterface,
StateKind
} from './types';
import { Indexer } from './indexer';
import { ServerConfig } from './config';
import { IPFSClient } from './ipfs';
import { JobQueue } from './job-queue';
export interface IpldStatus {
init?: number;
diff?: number;
checkpoint?: number;
// eslint-disable-next-line camelcase
diff_staged?: number;
}
export class IPLDIndexer extends Indexer {
_serverConfig: ServerConfig;
_ipldDb: IPLDDatabaseInterface;
_ipfsClient: IPFSClient;
_ipldStatusMap: { [key: string]: IpldStatus } = {};
constructor (
serverConfig: ServerConfig,
ipldDb: IPLDDatabaseInterface,
ethClient: EthClient,
ethProvider: ethers.providers.BaseProvider,
jobQueue: JobQueue,
ipfsClient: IPFSClient
) {
super(ipldDb, ethClient, ethProvider, jobQueue);
this._serverConfig = serverConfig;
this._ipldDb = ipldDb;
this._ipfsClient = ipfsClient;
}
getIPLDData (ipldBlock: IPLDBlockInterface): any {
return codec.decode(Buffer.from(ipldBlock.data));
}
async pushToIPFS (data: any): Promise<void> {
await this._ipfsClient.push(data);
}
isIPFSConfigured (): boolean {
const ipfsAddr = this._serverConfig.ipfsApiAddr;
// Return false if ipfsAddr is undefined | null | empty string.
return (ipfsAddr !== undefined && ipfsAddr !== null && ipfsAddr !== '');
}
async getLatestHooksProcessedBlock (): Promise<BlockProgressInterface> {
// Get current hookStatus.
const ipldStatus = await this._ipldDb.getIPLDStatus();
assert(ipldStatus, 'IPLD status not found');
// Get all the blocks at height hookStatus.latestProcessedBlockNumber.
const blocksAtHeight = await this.getBlocksAtHeight(ipldStatus.latestHooksBlockNumber, false);
// There can exactly one block at hookStatus.latestProcessedBlockNumber height.
assert(blocksAtHeight.length === 1);
return blocksAtHeight[0];
}
async processCheckpoint (indexer: IndexerInterface, blockHash: string, checkpointInterval: number): Promise<void> {
// Get all the contracts.
const contracts = Object.values(this._watchedContracts);
// Getting the block for checkpoint.
const block = await this.getBlockProgress(blockHash);
assert(block);
// For each contract, merge the diff till now to create a checkpoint.
for (const contract of contracts) {
// Get IPLD status for the contract.
const ipldStatus = this._ipldStatusMap[contract.address];
assert(ipldStatus, `IPLD status for contract ${contract.address} not found`);
const initBlockNumber = ipldStatus.init;
// Check if contract has checkpointing on.
// Check if it's time for a checkpoint or the init is in current block.
if (
contract.checkpoint &&
(block.blockNumber % checkpointInterval === 0 || initBlockNumber === block.blockNumber)
) {
await this.createCheckpoint(indexer, contract.address, block);
}
}
}
async processCLICheckpoint (indexer: IndexerInterface, contractAddress: string, blockHash?: string): Promise<string | undefined> {
// Getting the block for checkpoint.
let block;
if (blockHash) {
block = await this.getBlockProgress(blockHash);
} else {
// In case of empty blockHash from checkpoint CLI, get the latest processed block from hookStatus for the checkpoint.
block = await this.getLatestHooksProcessedBlock();
}
assert(block);
const checkpointBlockHash = await this.createCheckpoint(indexer, contractAddress, block);
assert(checkpointBlockHash, 'Checkpoint not created');
// Push checkpoint to IPFS if configured.
if (this.isIPFSConfigured()) {
const checkpointIPLDBlocks = await this._ipldDb.getIPLDBlocks({ block, contractAddress, kind: StateKind.Checkpoint });
// There can be at most one IPLDBlock for a (block, contractAddress, kind) combination.
assert(checkpointIPLDBlocks.length <= 1);
const checkpointIPLDBlock = checkpointIPLDBlocks[0];
const checkpointData = this.getIPLDData(checkpointIPLDBlock);
await this.pushToIPFS(checkpointData);
}
return checkpointBlockHash;
}
async createStateCheckpoint (contractAddress: string, block: BlockProgressInterface, data: any): Promise<void> {
// Get the contract.
const contract = this._watchedContracts[contractAddress];
assert(contract, `Contract ${contractAddress} not watched`);
if (block.blockNumber < contract.startingBlock) {
return;
}
// Create a checkpoint from the hook data without being concerned about diffs.
const ipldBlock = await this.prepareIPLDBlock(block, contractAddress, data, StateKind.Checkpoint);
await this.saveOrUpdateIPLDBlock(ipldBlock);
}
async createInit (
indexer: IndexerInterface,
blockHash: string,
blockNumber: number
): Promise<void> {
// Get all the contracts.
const contracts = Object.values(this._watchedContracts);
// Create an initial state for each contract.
for (const contract of contracts) {
// Check if contract has checkpointing on.
if (contract.checkpoint) {
// Check if starting block not reached yet.
if (blockNumber < contract.startingBlock) {
continue;
}
// Get IPLD status for the contract.
const ipldStatus = this._ipldStatusMap[contract.address];
assert(ipldStatus, `IPLD status for contract ${contract.address} not found`);
// Check if a 'init' IPLDBlock already exists.
// Or if a 'diff' IPLDBlock already exists.
// Or if a 'checkpoint' IPLDBlock already exists.
// (A watcher with imported state won't have an init IPLDBlock, but it will have the imported checkpoint)
if (
ipldStatus.init ||
ipldStatus.diff ||
ipldStatus.checkpoint
) {
continue;
}
// Call initial state hook.
assert(indexer.processInitialState);
const stateData = await indexer.processInitialState(contract.address, blockHash);
const block = await this.getBlockProgress(blockHash);
assert(block);
const ipldBlock = await this.prepareIPLDBlock(block, contract.address, stateData, StateKind.Init);
await this.saveOrUpdateIPLDBlock(ipldBlock);
// Push initial state to IPFS if configured.
if (this.isIPFSConfigured()) {
const ipldData = this.getIPLDData(ipldBlock);
await this.pushToIPFS(ipldData);
}
}
}
}
async createDiffStaged (contractAddress: string, blockHash: string, data: any): Promise<void> {
const block = await this.getBlockProgress(blockHash);
assert(block);
// Get the contract.
const contract = this._watchedContracts[contractAddress];
assert(contract, `Contract ${contractAddress} not watched`);
if (block.blockNumber < contract.startingBlock) {
return;
}
// Create a staged diff block.
const ipldBlock = await this.prepareIPLDBlock(block, contractAddress, data, StateKind.DiffStaged);
await this.saveOrUpdateIPLDBlock(ipldBlock);
}
async finalizeDiffStaged (blockHash: string): Promise<void> {
const block = await this.getBlockProgress(blockHash);
assert(block);
// Get all the staged diff blocks for the given blockHash.
const stagedBlocks = await this._ipldDb.getIPLDBlocks({ block, kind: StateKind.DiffStaged });
// For each staged block, create a diff block.
for (const stagedBlock of stagedBlocks) {
const data = codec.decode(Buffer.from(stagedBlock.data));
await this.createDiff(stagedBlock.contractAddress, block, data);
}
// Remove all the staged diff blocks for current blockNumber.
// (Including staged diff blocks associated with pruned blocks)
await this.removeIPLDBlocks(block.blockNumber, StateKind.DiffStaged);
}
async createDiff (contractAddress: string, block: BlockProgressInterface, data: any): Promise<void> {
// Get the contract.
const contract = this._watchedContracts[contractAddress];
assert(contract, `Contract ${contractAddress} not watched`);
if (block.blockNumber < contract.startingBlock) {
return;
}
// Get IPLD status for the contract.
const ipldStatus = this._ipldStatusMap[contractAddress];
assert(ipldStatus, `IPLD status for contract ${contractAddress} not found`);
// Get the latest checkpoint block number.
const checkpointBlockNumber = ipldStatus.checkpoint;
if (!checkpointBlockNumber) {
// Get the initial state block number.
const initBlockNumber = ipldStatus.init;
// There should be an initial state at least.
assert(initBlockNumber, 'No initial state found');
} else if (checkpointBlockNumber === block.blockNumber) {
// Check if the latest checkpoint is in the same block if block number is same.
const checkpoint = await this._ipldDb.getLatestIPLDBlock(contractAddress, StateKind.Checkpoint);
assert(checkpoint);
assert(checkpoint.block.blockHash !== block.blockHash, 'Checkpoint already created for the block hash');
}
const ipldBlock = await this.prepareIPLDBlock(block, contractAddress, data, StateKind.Diff);
await this.saveOrUpdateIPLDBlock(ipldBlock);
}
async createCheckpoint (indexer: IndexerInterface, contractAddress: string, currentBlock: BlockProgressInterface): Promise<string | undefined> {
// Get the contract.
const contract = this._watchedContracts[contractAddress];
assert(contract, `Contract ${contractAddress} not watched`);
if (currentBlock.blockNumber < contract.startingBlock) {
return;
}
// Make sure the block is marked complete.
assert(currentBlock.isComplete, 'Block for a checkpoint should be marked as complete');
// Get current hookStatus.
const ipldStatus = await this._ipldDb.getIPLDStatus();
assert(ipldStatus);
// Make sure the hooks have been processed for the block.
assert(currentBlock.blockNumber <= ipldStatus.latestHooksBlockNumber, 'Block for a checkpoint should have hooks processed');
// Call state checkpoint hook and check if default checkpoint is disabled.
assert(indexer.processStateCheckpoint);
const disableDefaultCheckpoint = await indexer.processStateCheckpoint(contractAddress, currentBlock.blockHash);
if (disableDefaultCheckpoint) {
// Return if default checkpoint is disabled.
// Return block hash for checkpoint CLI.
return currentBlock.blockHash;
}
// Fetch the latest 'checkpoint' | 'init' for the contract to fetch diffs after it.
let prevNonDiffBlock: IPLDBlockInterface;
let diffStartBlockNumber: number;
const checkpointBlock = await this._ipldDb.getLatestIPLDBlock(contractAddress, StateKind.Checkpoint, currentBlock.blockNumber - 1);
if (checkpointBlock) {
const checkpointBlockNumber = checkpointBlock.block.blockNumber;
prevNonDiffBlock = checkpointBlock;
diffStartBlockNumber = checkpointBlockNumber;
// Update IPLD status map with the latest checkpoint info.
// Essential while importing state as checkpoint at the snapshot block is added by import-state CLI.
// (job-runner won't have the updated ipld status)
this.updateIPLDStatusMap(contractAddress, { checkpoint: checkpointBlockNumber });
} else {
// There should be an initial state at least.
const initBlock = await this._ipldDb.getLatestIPLDBlock(contractAddress, StateKind.Init);
assert(initBlock, 'No initial state found');
prevNonDiffBlock = initBlock;
// Take block number previous to initial state block as the checkpoint is to be created in the same block.
diffStartBlockNumber = initBlock.block.blockNumber - 1;
}
// Fetching all diff blocks after the latest 'checkpoint' | 'init'.
const diffBlocks = await this._ipldDb.getDiffIPLDBlocksInRange(contractAddress, diffStartBlockNumber, currentBlock.blockNumber);
const prevNonDiffBlockData = codec.decode(Buffer.from(prevNonDiffBlock.data)) as any;
const data = {
state: prevNonDiffBlockData.state
};
for (const diffBlock of diffBlocks) {
const diff = codec.decode(Buffer.from(diffBlock.data)) as any;
data.state = _.merge(data.state, diff.state);
}
const ipldBlock = await this.prepareIPLDBlock(currentBlock, contractAddress, data, StateKind.Checkpoint);
await this.saveOrUpdateIPLDBlock(ipldBlock);
return currentBlock.blockHash;
}
async prepareIPLDBlock (block: BlockProgressInterface, contractAddress: string, data: any, kind: StateKind):Promise<any> {
console.time('time:ipld-indexer#prepareIPLDBlock');
let ipldBlock: IPLDBlockInterface;
// Get IPLD status for the contract.
const ipldStatus = this._ipldStatusMap[contractAddress];
assert(ipldStatus, `IPLD status for contract ${contractAddress} not found`);
// Get an existing 'init' | 'diff' | 'diff_staged' | 'checkpoint' IPLDBlock for current block, contractAddress to update.
let currentIPLDBlock: IPLDBlockInterface | undefined;
const prevIPLDBlockNumber = ipldStatus[kind];
// Fetch from DB for previous IPLD block or for checkpoint kind.
if (kind === 'checkpoint' || (prevIPLDBlockNumber && prevIPLDBlockNumber === block.blockNumber)) {
const currentIPLDBlocks = await this._ipldDb.getIPLDBlocks({ block, contractAddress, kind });
// There can be at most one IPLDBlock for a (block, contractAddress, kind) combination.
assert(currentIPLDBlocks.length <= 1);
currentIPLDBlock = currentIPLDBlocks[0];
}
if (currentIPLDBlock) {
// Update current IPLDBlock of same kind if it exists.
ipldBlock = currentIPLDBlock;
// Update the data field.
const oldData = codec.decode(Buffer.from(ipldBlock.data));
data = _.merge(oldData, data);
} else {
// Create a new IPLDBlock instance.
ipldBlock = this._ipldDb.getNewIPLDBlock();
// Fetch the parent IPLDBlock.
const parentIPLDBlock = await this._ipldDb.getLatestIPLDBlock(contractAddress, null, block.blockNumber);
// Setting the meta-data for an IPLDBlock (done only once per IPLD block).
data.meta = {
id: contractAddress,
kind,
parent: {
'/': parentIPLDBlock ? parentIPLDBlock.cid : null
},
ethBlock: {
cid: {
'/': block.cid
},
num: block.blockNumber
}
};
}
// Encoding the data using dag-cbor codec.
const bytes = codec.encode(data);
// Calculating sha256 (multi)hash of the encoded data.
const hash = await sha256.digest(bytes);
// Calculating the CID: v1, code: dag-cbor, hash.
const cid = CID.create(1, codec.code, hash);
// Update ipldBlock with new data.
ipldBlock = Object.assign(ipldBlock, {
block,
contractAddress,
cid: cid.toString(),
kind: data.meta.kind,
data: Buffer.from(bytes)
});
console.timeEnd('time:ipld-indexer#prepareIPLDBlock');
return ipldBlock;
}
async getIPLDBlocksByHash (blockHash: string): Promise<IPLDBlockInterface[]> {
const block = await this.getBlockProgress(blockHash);
assert(block);
return this._ipldDb.getIPLDBlocks({ block });
}
async getIPLDBlockByCid (cid: string): Promise<IPLDBlockInterface | undefined> {
const ipldBlocks = await this._ipldDb.getIPLDBlocks({ cid });
// There can be only one IPLDBlock with a particular cid.
assert(ipldBlocks.length <= 1);
return ipldBlocks[0];
}
async saveOrUpdateIPLDBlock (ipldBlock: IPLDBlockInterface): Promise<IPLDBlockInterface> {
const dbTx = await this._db.createTransactionRunner();
let res;
try {
res = await this._ipldDb.saveOrUpdateIPLDBlock(dbTx, ipldBlock);
await dbTx.commitTransaction();
// Get IPLD status for the contract.
const ipldStatus = this._ipldStatusMap[res.contractAddress];
assert(ipldStatus, `IPLD status for contract ${res.contractAddress} not found`);
// Update the IPLD status for the kind.
ipldStatus[res.kind] = res.block.blockNumber;
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
return res;
}
async removeIPLDBlocks (blockNumber: number, kind: StateKind): Promise<void> {
const dbTx = await this._db.createTransactionRunner();
try {
await this._ipldDb.removeIPLDBlocks(dbTx, blockNumber, kind);
await dbTx.commitTransaction();
} catch (error) {
await dbTx.rollbackTransaction();
throw error;
} finally {
await dbTx.release();
}
}
async fetchIPLDStatus (): Promise<void> {
const contracts = Object.values(this._watchedContracts);
for (const contract of contracts) {
const initIPLDBlock = await this._ipldDb.getLatestIPLDBlock(contract.address, StateKind.Init);
const diffIPLDBlock = await this._ipldDb.getLatestIPLDBlock(contract.address, StateKind.Diff);
const diffStagedIPLDBlock = await this._ipldDb.getLatestIPLDBlock(contract.address, StateKind.DiffStaged);
const checkpointIPLDBlock = await this._ipldDb.getLatestIPLDBlock(contract.address, StateKind.Checkpoint);
this._ipldStatusMap[contract.address] = {
init: initIPLDBlock?.block.blockNumber,
diff: diffIPLDBlock?.block.blockNumber,
diff_staged: diffStagedIPLDBlock?.block.blockNumber,
checkpoint: checkpointIPLDBlock?.block.blockNumber
};
}
}
async updateIPLDStatusMap (address: string, ipldStatus: IpldStatus): Promise<void> {
// Get and update IPLD status for the contract.
const ipldStatusOld = this._ipldStatusMap[address];
this._ipldStatusMap[address] = _.merge(ipldStatusOld, ipldStatus);
}
}

View File

@ -17,7 +17,7 @@ import {
QUEUE_EVENT_PROCESSING
} from './constants';
import { JobQueue } from './job-queue';
import { EventInterface, IndexerInterface, IPLDIndexerInterface, SyncStatusInterface } from './types';
import { EventInterface, IndexerInterface, SyncStatusInterface } from './types';
import { wait } from './misc';
import { createPruningJob, processBatchEvents } from './common';
import { lastBlockNumEvents, lastBlockProcessDuration, lastProcessedBlockNumber } from './metrics';
@ -25,7 +25,7 @@ import { lastBlockNumEvents, lastBlockProcessDuration, lastProcessedBlockNumber
const log = debug('vulcanize:job-runner');
export class JobRunner {
_indexer: IndexerInterface | IPLDIndexerInterface
_indexer: IndexerInterface
_jobQueue: JobQueue
_jobQueueConfig: JobQueueConfig
_blockProcessStartTime?: Date
@ -246,13 +246,11 @@ export class JobRunner {
// Delay required to process block.
await wait(jobDelayInMilliSecs);
console.time('time:job-runner#_indexBlock-fetch-block-events');
blockProgress = await this._indexer.fetchBlockEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp: timestamp });
blockProgress = await this._indexer.fetchBlockWithEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp: timestamp });
console.timeEnd('time:job-runner#_indexBlock-fetch-block-events');
}
if (this._indexer.processBlock) {
await this._indexer.processBlock(blockProgress);
}
await this._indexer.processBlock(blockProgress);
// Push job to event processing queue.
// Block with all events processed or no events will not be processed again due to check in _processEvents.
@ -299,7 +297,7 @@ export class JobRunner {
assert(this._indexer.cacheContract);
this._indexer.cacheContract(contract);
const ipldIndexer = this._indexer as IPLDIndexerInterface;
const ipldIndexer = this._indexer;
if (ipldIndexer.updateIPLDStatusMap) {
ipldIndexer.updateIPLDStatusMap(contract.address, {});
}

View File

@ -68,6 +68,11 @@ export const eventProcessingLoadEntityDBQueryDuration = new client.Histogram({
help: 'Duration of DB query made in event processing'
});
export const eventProcessingEthCallDuration = new client.Histogram({
name: 'event_processing_eth_call_duration_seconds',
help: 'Duration of eth_calls made in event processing'
});
// Export metrics on a server
const app: Application = express();
@ -131,7 +136,7 @@ const registerDBSizeMetrics = async ({ database, jobQueue }: Config): Promise<vo
// eslint-disable-next-line no-new
new client.Gauge({
name: 'database_size_bytes',
help: 'Total entries in event table',
help: 'Watcher database sizes in bytes',
labelNames: ['type'] as const,
async collect () {
const [

View File

@ -7,8 +7,7 @@ import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper';
import { ServerConfig } from './config';
import { Where, QueryOptions } from './database';
import { IpldStatus } from './ipld-indexer';
import { ValueResult } from './indexer';
import { ValueResult, IpldStatus } from './indexer';
export enum StateKind {
Diff = 'diff',
@ -91,7 +90,7 @@ export interface IndexerInterface {
getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgressInterface[]>;
getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<EventInterface>>
getAncestorAtDepth (blockHash: string, depth: number): Promise<string>
fetchBlockEvents (block: DeepPartial<BlockProgressInterface>): Promise<BlockProgressInterface>
fetchBlockWithEvents (block: DeepPartial<BlockProgressInterface>): Promise<BlockProgressInterface>
removeUnknownEvents (block: BlockProgressInterface): Promise<void>
updateBlockProgress (block: BlockProgressInterface, lastProcessedEventIndex: number): Promise<BlockProgressInterface>
updateSyncStatusChainHead (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>
@ -101,7 +100,7 @@ export interface IndexerInterface {
saveEventEntity (dbEvent: EventInterface): Promise<EventInterface>;
processEvent (event: EventInterface): Promise<void>;
parseEventNameAndArgs?: (kind: string, logObj: any) => any;
isWatchedContract?: (address: string) => Promise<ContractInterface | undefined>;
isWatchedContract: (address: string) => ContractInterface | undefined;
getContractsByKind?: (kind: string) => ContractInterface[];
cacheContract?: (contract: ContractInterface) => void;
watchContract?: (address: string, kind: string, checkpoint: boolean, startingBlock: number) => Promise<void>
@ -110,13 +109,10 @@ export interface IndexerInterface {
createDiffStaged?: (contractAddress: string, blockHash: string, data: any) => Promise<void>
processInitialState?: (contractAddress: string, blockHash: string) => Promise<any>
processStateCheckpoint?: (contractAddress: string, blockHash: string) => Promise<boolean>
processBlock?: (blockProgres: BlockProgressInterface) => Promise<void>
processBlock: (blockProgres: BlockProgressInterface) => Promise<void>
processBlockAfterEvents?: (blockHash: string) => Promise<void>
getStorageValue (storageLayout: StorageLayout, blockHash: string, contractAddress: string, variable: string, ...mappingKeys: MappingKey[]): Promise<ValueResult>
updateSubgraphState?: (contractAddress: string, data: any) => void
}
export interface IPLDIndexerInterface extends IndexerInterface {
updateIPLDStatusMap (address: string, ipldStatus: IpldStatus): Promise<void>
getIPLDData (ipldBlock: IPLDBlockInterface): any
}
@ -140,18 +136,17 @@ export interface DatabaseInterface {
getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }>;
getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise<Array<EventInterface>>;
markBlocksAsPruned (queryRunner: QueryRunner, blocks: BlockProgressInterface[]): Promise<void>;
saveBlockProgress (queryRunner: QueryRunner, block: DeepPartial<BlockProgressInterface>): Promise<BlockProgressInterface>;
updateBlockProgress (queryRunner: QueryRunner, block: BlockProgressInterface, lastProcessedEventIndex: number): Promise<BlockProgressInterface>
updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>;
updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>;
updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>;
saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgressInterface>, events: DeepPartial<EventInterface>[]): Promise<BlockProgressInterface>;
saveEvents (queryRunner: QueryRunner, events: DeepPartial<EventInterface>[]): Promise<void>;
saveBlockWithEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgressInterface>, events: DeepPartial<EventInterface>[]): Promise<BlockProgressInterface>;
saveEventEntity (queryRunner: QueryRunner, entity: EventInterface): Promise<EventInterface>;
removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindManyOptions<Entity> | FindConditions<Entity>): Promise<void>;
getContracts?: () => Promise<ContractInterface[]>
saveContract?: (queryRunner: QueryRunner, contractAddress: string, kind: string, checkpoint: boolean, startingBlock: number) => Promise<ContractInterface>
}
export interface IPLDDatabaseInterface extends DatabaseInterface {
getLatestIPLDBlock (contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise<IPLDBlockInterface | undefined>
getIPLDBlocks (where: FindConditions<IPLDBlockInterface>): Promise<IPLDBlockInterface[]>
getDiffIPLDBlocksInRange (contractAddress: string, startBlock: number, endBlock: number): Promise<IPLDBlockInterface[]>