diff --git a/lerna.json b/lerna.json index af906086..e2c04be2 100644 --- a/lerna.json +++ b/lerna.json @@ -4,5 +4,10 @@ ], "version": "0.1.0", "npmClient": "yarn", - "useWorkspaces": true + "useWorkspaces": true, + "command": { + "publish": { + "registry": "https://git.vdb.to/api/packages/cerc-io/npm/" + } + } } diff --git a/package.json b/package.json index 51124d9e..dba7fb35 100644 --- a/package.json +++ b/package.json @@ -18,6 +18,7 @@ "build": "lerna run build --stream", "build:watch": "lerna run build --stream --parallel -- -w", "db:reset": "sudo ./scripts/reset-dbs.sh", - "prepare": "husky install" + "prepare": "husky install", + "publish:workspace": "yarn build && lerna publish" } } diff --git a/packages/cache/.npmignore b/packages/cache/.npmignore new file mode 100644 index 00000000..2739a4b8 --- /dev/null +++ b/packages/cache/.npmignore @@ -0,0 +1,5 @@ +/src/ +index.ts +tsconfig.json +.eslintrc.json +.eslintignore diff --git a/packages/cache/package.json b/packages/cache/package.json index 233e0aed..fbcbaeb0 100644 --- a/packages/cache/package.json +++ b/packages/cache/package.json @@ -2,7 +2,6 @@ "name": "@cerc-io/cache", "version": "0.1.0", "description": "Generic object cache", - "private": true, "main": "dist/index.js", "scripts": { "lint": "eslint .", diff --git a/packages/codegen/src/templates/database-template.handlebars b/packages/codegen/src/templates/database-template.handlebars index 9ee4dbf9..82169fe1 100644 --- a/packages/codegen/src/templates/database-template.handlebars +++ b/packages/codegen/src/templates/database-template.handlebars @@ -6,7 +6,7 @@ import assert from 'assert'; import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions } from 'typeorm'; import path from 'path'; -import { IPLDDatabase as BaseDatabase, IPLDDatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util'; +import { Database as BaseDatabase, DatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util'; import { Contract } from './entity/Contract'; import { Event } from './entity/Event'; @@ -18,7 +18,7 @@ import { IPLDBlock } from './entity/IPLDBlock'; import { {{query.entityName}} } from './entity/{{query.entityName}}'; {{/each}} -export class Database implements IPLDDatabaseInterface { +export class Database implements DatabaseInterface { _config: ConnectionOptions; _conn!: Connection; _baseDatabase: BaseDatabase; @@ -183,11 +183,23 @@ export class Database implements IPLDDatabaseInterface { return this._baseDatabase.getBlockEvents(repo, blockHash, where, queryOptions); } - async saveEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise { + async saveBlockWithEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise { const blockRepo = queryRunner.manager.getRepository(BlockProgress); const eventRepo = queryRunner.manager.getRepository(Event); - return this._baseDatabase.saveEvents(blockRepo, eventRepo, block, events); + return this._baseDatabase.saveBlockWithEvents(blockRepo, eventRepo, block, events); + } + + async saveEvents (queryRunner: QueryRunner, events: Event[]): Promise { + const eventRepo = queryRunner.manager.getRepository(Event); + + return this._baseDatabase.saveEvents(eventRepo, events); + } + + async saveBlockProgress (queryRunner: QueryRunner, block: DeepPartial): Promise { + const repo = queryRunner.manager.getRepository(BlockProgress); + + return this._baseDatabase.saveBlockProgress(repo, block); } async saveContract (queryRunner: QueryRunner, address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise { diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index a4eb1c72..d67b6596 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -16,8 +16,8 @@ import * as codec from '@ipld/dag-cbor'; import { EthClient } from '@cerc-io/ipld-eth-client'; import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper'; import { - IPLDIndexer as BaseIndexer, - IPLDIndexerInterface, + Indexer as BaseIndexer, + IndexerInterface, ValueResult, UNKNOWN_EVENT_NAME, ServerConfig, @@ -31,7 +31,8 @@ import { {{/if}} IPFSClient, StateKind, - IpldStatus as IpldStatusInterface + IpldStatus as IpldStatusInterface, + ResultIPLDBlock } from '@cerc-io/util'; {{#if (subgraphPath)}} import { GraphWatcher } from '@cerc-io/graph-node'; @@ -88,21 +89,7 @@ export type ResultEvent = { proof: string; }; -export type ResultIPLDBlock = { - block: { - cid: string; - hash: string; - number: number; - timestamp: number; - parentHash: string; - }; - contractAddress: string; - cid: string; - kind: string; - data: string; -}; - -export class Indexer implements IPLDIndexerInterface { +export class Indexer implements IndexerInterface { _db: Database _ethClient: EthClient _ethProvider: BaseProvider @@ -603,7 +590,7 @@ export class Indexer implements IPLDIndexerInterface { return this._baseIndexer.getEventsByFilter(blockHash, contract, name); } - async isWatchedContract (address : string): Promise { + isWatchedContract (address : string): Contract | undefined { return this._baseIndexer.isWatchedContract(address); } @@ -655,8 +642,8 @@ export class Indexer implements IPLDIndexerInterface { return this._baseIndexer.getBlocksAtHeight(height, isPruned); } - async fetchBlockEvents (block: DeepPartial): Promise { - return this._baseIndexer.fetchBlockEvents(block, this._fetchAndSaveEvents.bind(this)); + async fetchBlockWithEvents (block: DeepPartial): Promise { + return this._baseIndexer.fetchBlockWithEvents(block, this._fetchAndSaveEvents.bind(this)); } async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise> { @@ -866,7 +853,7 @@ export class Indexer implements IPLDIndexerInterface { }; console.time('time:indexer#_fetchAndSaveEvents-save-block-events'); - const blockProgress = await this._db.saveEvents(dbTx, block, dbEvents); + const blockProgress = await this._db.saveBlockWithEvents(dbTx, block, dbEvents); await dbTx.commitTransaction(); console.timeEnd('time:indexer#_fetchAndSaveEvents-save-block-events'); diff --git a/packages/eden-watcher/src/database.ts b/packages/eden-watcher/src/database.ts index 282fe16a..2a93a88c 100644 --- a/packages/eden-watcher/src/database.ts +++ b/packages/eden-watcher/src/database.ts @@ -6,7 +6,7 @@ import assert from 'assert'; import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions } from 'typeorm'; import path from 'path'; -import { IPLDDatabase as BaseDatabase, IPLDDatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util'; +import { Database as BaseDatabase, DatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util'; import { Contract } from './entity/Contract'; import { Event } from './entity/Event'; @@ -15,7 +15,7 @@ import { IpldStatus } from './entity/IpldStatus'; import { BlockProgress } from './entity/BlockProgress'; import { IPLDBlock } from './entity/IPLDBlock'; -export class Database implements IPLDDatabaseInterface { +export class Database implements DatabaseInterface { _config: ConnectionOptions; _conn!: Connection; _baseDatabase: BaseDatabase; @@ -143,11 +143,23 @@ export class Database implements IPLDDatabaseInterface { return this._baseDatabase.getBlockEvents(repo, blockHash, where, queryOptions); } - async saveEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise { + async saveBlockWithEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise { const blockRepo = queryRunner.manager.getRepository(BlockProgress); const eventRepo = queryRunner.manager.getRepository(Event); - return this._baseDatabase.saveEvents(blockRepo, eventRepo, block, events); + return this._baseDatabase.saveBlockWithEvents(blockRepo, eventRepo, block, events); + } + + async saveEvents (queryRunner: QueryRunner, events: Event[]): Promise { + const eventRepo = queryRunner.manager.getRepository(Event); + + return this._baseDatabase.saveEvents(eventRepo, events); + } + + async saveBlockProgress (queryRunner: QueryRunner, block: DeepPartial): Promise { + const repo = queryRunner.manager.getRepository(BlockProgress); + + return this._baseDatabase.saveBlockProgress(repo, block); } async saveContract (queryRunner: QueryRunner, address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise { diff --git a/packages/eden-watcher/src/indexer.ts b/packages/eden-watcher/src/indexer.ts index 1383abb9..e0fe8cd6 100644 --- a/packages/eden-watcher/src/indexer.ts +++ b/packages/eden-watcher/src/indexer.ts @@ -16,7 +16,7 @@ import * as codec from '@ipld/dag-cbor'; import { EthClient } from '@cerc-io/ipld-eth-client'; import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper'; import { - IPLDIndexer as BaseIndexer, + Indexer as BaseIndexer, UNKNOWN_EVENT_NAME, ServerConfig, JobQueue, @@ -25,9 +25,10 @@ import { BlockHeight, IPFSClient, StateKind, - IPLDIndexerInterface, + IndexerInterface, IpldStatus as IpldStatusInterface, - ValueResult + ValueResult, + ResultIPLDBlock } from '@cerc-io/util'; import { GraphWatcher } from '@cerc-io/graph-node'; @@ -90,21 +91,7 @@ export type ResultEvent = { proof: string; }; -export type ResultIPLDBlock = { - block: { - cid: string; - hash: string; - number: number; - timestamp: number; - parentHash: string; - }; - contractAddress: string; - cid: string; - kind: string; - data: string; -}; - -export class Indexer implements IPLDIndexerInterface { +export class Indexer implements IndexerInterface { _db: Database _ethClient: EthClient _ethProvider: BaseProvider @@ -540,7 +527,7 @@ export class Indexer implements IPLDIndexerInterface { return this._baseIndexer.getEventsByFilter(blockHash, contract, name); } - async isWatchedContract (address : string): Promise { + isWatchedContract (address : string): Contract | undefined { return this._baseIndexer.isWatchedContract(address); } @@ -588,8 +575,8 @@ export class Indexer implements IPLDIndexerInterface { return this._baseIndexer.getBlocksAtHeight(height, isPruned); } - async fetchBlockEvents (block: DeepPartial): Promise { - return this._baseIndexer.fetchBlockEvents(block, this._fetchAndSaveEvents.bind(this)); + async fetchBlockWithEvents (block: DeepPartial): Promise { + return this._baseIndexer.fetchBlockWithEvents(block, this._fetchAndSaveEvents.bind(this)); } async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise> { @@ -1120,7 +1107,7 @@ export class Indexer implements IPLDIndexerInterface { }; console.time('time:indexer#_fetchAndSaveEvents-save-block-events'); - const blockProgress = await this._db.saveEvents(dbTx, block, dbEvents); + const blockProgress = await this._db.saveBlockWithEvents(dbTx, block, dbEvents); await dbTx.commitTransaction(); console.timeEnd('time:indexer#_fetchAndSaveEvents-save-block-events'); diff --git a/packages/erc20-watcher/src/database.ts b/packages/erc20-watcher/src/database.ts index f3f95321..f278d888 100644 --- a/packages/erc20-watcher/src/database.ts +++ b/packages/erc20-watcher/src/database.ts @@ -6,7 +6,7 @@ import assert from 'assert'; import { Connection, ConnectionOptions, DeepPartial, FindConditions, FindManyOptions, QueryRunner } from 'typeorm'; import path from 'path'; -import { Database as BaseDatabase, QueryOptions, Where } from '@cerc-io/util'; +import { Database as BaseDatabase, DatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util'; import { Allowance } from './entity/Allowance'; import { Balance } from './entity/Balance'; @@ -14,8 +14,10 @@ import { Contract } from './entity/Contract'; import { Event } from './entity/Event'; import { SyncStatus } from './entity/SyncStatus'; import { BlockProgress } from './entity/BlockProgress'; +import { IPLDBlock } from './entity/IPLDBlock'; +import { IpldStatus } from './entity/IpldStatus'; -export class Database { +export class Database implements DatabaseInterface { _config: ConnectionOptions _conn!: Connection _baseDatabase: BaseDatabase; @@ -39,6 +41,47 @@ export class Database { return this._baseDatabase.close(); } + getNewIPLDBlock (): IPLDBlock { + return new IPLDBlock(); + } + + async getIPLDBlocks (where: FindConditions): Promise { + const repo = this._conn.getRepository(IPLDBlock); + + return this._baseDatabase.getIPLDBlocks(repo, where); + } + + async getLatestIPLDBlock (contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise { + const repo = this._conn.getRepository(IPLDBlock); + + return this._baseDatabase.getLatestIPLDBlock(repo, contractAddress, kind, blockNumber); + } + + // Fetch all diff IPLDBlocks after the specified block number. + async getDiffIPLDBlocksInRange (contractAddress: string, startblock: number, endBlock: number): Promise { + const repo = this._conn.getRepository(IPLDBlock); + + return this._baseDatabase.getDiffIPLDBlocksInRange(repo, contractAddress, startblock, endBlock); + } + + async saveOrUpdateIPLDBlock (dbTx: QueryRunner, ipldBlock: IPLDBlock): Promise { + const repo = dbTx.manager.getRepository(IPLDBlock); + + return this._baseDatabase.saveOrUpdateIPLDBlock(repo, ipldBlock); + } + + async removeIPLDBlocks (dbTx: QueryRunner, blockNumber: number, kind: string): Promise { + const repo = dbTx.manager.getRepository(IPLDBlock); + + await this._baseDatabase.removeIPLDBlocks(repo, blockNumber, kind); + } + + async getIPLDStatus (): Promise { + const repo = this._conn.getRepository(IpldStatus); + + return this._baseDatabase.getIPLDStatus(repo); + } + async getBalance ({ blockHash, token, owner }: { blockHash: string, token: string, owner: string }): Promise { return this._conn.getRepository(Balance) .createQueryBuilder('balance') @@ -107,11 +150,17 @@ export class Database { return this._baseDatabase.getBlockEvents(repo, blockHash, where, queryOptions); } - async saveEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise { + async saveBlockWithEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise { const blockRepo = queryRunner.manager.getRepository(BlockProgress); const eventRepo = queryRunner.manager.getRepository(Event); - return this._baseDatabase.saveEvents(blockRepo, eventRepo, block, events); + return this._baseDatabase.saveBlockWithEvents(blockRepo, eventRepo, block, events); + } + + async saveEvents (queryRunner: QueryRunner, events: Event[]): Promise { + const eventRepo = queryRunner.manager.getRepository(Event); + + return this._baseDatabase.saveEvents(eventRepo, events); } async saveContract (queryRunner: QueryRunner, address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise { @@ -173,6 +222,12 @@ export class Database { return this._baseDatabase.getBlockProgressEntities(repo, where, options); } + async saveBlockProgress (queryRunner: QueryRunner, block: DeepPartial): Promise { + const repo = queryRunner.manager.getRepository(BlockProgress); + + return this._baseDatabase.saveBlockProgress(repo, block); + } + async updateBlockProgress (queryRunner: QueryRunner, block: BlockProgress, lastProcessedEventIndex: number): Promise { const repo = queryRunner.manager.getRepository(BlockProgress); diff --git a/packages/erc20-watcher/src/entity/IPLDBlock.ts b/packages/erc20-watcher/src/entity/IPLDBlock.ts new file mode 100644 index 00000000..bff1118c --- /dev/null +++ b/packages/erc20-watcher/src/entity/IPLDBlock.ts @@ -0,0 +1,36 @@ +// +// Copyright 2022 Vulcanize, Inc. +// + +import { Entity, PrimaryGeneratedColumn, Column, Index, ManyToOne } from 'typeorm'; + +import { StateKind } from '@cerc-io/util'; + +import { BlockProgress } from './BlockProgress'; + +@Entity() +@Index(['cid'], { unique: true }) +@Index(['block', 'contractAddress']) +@Index(['block', 'contractAddress', 'kind'], { unique: true }) +export class IPLDBlock { + @PrimaryGeneratedColumn() + id!: number; + + @ManyToOne(() => BlockProgress, { onDelete: 'CASCADE' }) + block!: BlockProgress; + + @Column('varchar', { length: 42 }) + contractAddress!: string; + + @Column('varchar') + cid!: string; + + @Column({ + type: 'enum', + enum: StateKind + }) + kind!: StateKind; + + @Column('bytea') + data!: Buffer; +} diff --git a/packages/erc20-watcher/src/entity/IpldStatus.ts b/packages/erc20-watcher/src/entity/IpldStatus.ts new file mode 100644 index 00000000..fb81069e --- /dev/null +++ b/packages/erc20-watcher/src/entity/IpldStatus.ts @@ -0,0 +1,20 @@ +// +// Copyright 2022 Vulcanize, Inc. +// + +import { Entity, PrimaryGeneratedColumn, Column } from 'typeorm'; + +@Entity() +export class IpldStatus { + @PrimaryGeneratedColumn() + id!: number; + + @Column('integer') + latestHooksBlockNumber!: number; + + @Column('integer', { nullable: true }) + latestCheckpointBlockNumber!: number; + + @Column('integer', { nullable: true }) + latestIPFSBlockNumber!: number; +} diff --git a/packages/erc20-watcher/src/indexer.ts b/packages/erc20-watcher/src/indexer.ts index 027ac664..7a6877f6 100644 --- a/packages/erc20-watcher/src/indexer.ts +++ b/packages/erc20-watcher/src/indexer.ts @@ -12,7 +12,7 @@ import { BaseProvider } from '@ethersproject/providers'; import { EthClient } from '@cerc-io/ipld-eth-client'; import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper'; -import { IndexerInterface, Indexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME, JobQueue, Where, QueryOptions, ServerConfig } from '@cerc-io/util'; +import { IndexerInterface, Indexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME, JobQueue, Where, QueryOptions, ServerConfig, IPFSClient, IpldStatus as IpldStatusInterface } from '@cerc-io/util'; import { Database } from './database'; import { Event } from './entity/Event'; @@ -21,6 +21,7 @@ import { SyncStatus } from './entity/SyncStatus'; import artifacts from './artifacts/ERC20.json'; import { BlockProgress } from './entity/BlockProgress'; import { Contract } from './entity/Contract'; +import { IPLDBlock } from './entity/IPLDBlock'; const log = debug('vulcanize:indexer'); const JSONbigNative = JSONbig({ useNativeBigInt: true }); @@ -63,7 +64,8 @@ export class Indexer implements IndexerInterface { this._ethProvider = ethProvider; this._serverConfig = serverConfig; this._serverMode = serverConfig.mode; - this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._ethProvider, jobQueue); + const ipfsClient = new IPFSClient(this._serverConfig.ipfsApiAddr); + this._baseIndexer = new BaseIndexer(serverConfig, this._db, this._ethClient, this._ethProvider, jobQueue, ipfsClient); const { abi, storageLayout } = artifacts; @@ -248,6 +250,10 @@ export class Indexer implements IndexerInterface { ); } + getIPLDData (ipldBlock: IPLDBlock): any { + return this._baseIndexer.getIPLDData(ipldBlock); + } + async triggerIndexingOnEvent (event: Event): Promise { const { eventName, eventInfo, contract: token, block: { blockHash } } = event; const eventFields = JSON.parse(eventInfo); @@ -278,6 +284,11 @@ export class Indexer implements IndexerInterface { await this.triggerIndexingOnEvent(event); } + async processBlock (blockProgress: BlockProgress): Promise { + // Call a function to create initial state for contracts. + await this._baseIndexer.createInit(this, blockProgress.blockHash, blockProgress.blockNumber); + } + parseEventNameAndArgs (kind: string, logObj: any): any { const { topics, data } = logObj; const logDescription = this._contract.parseLog({ data, topics }); @@ -291,7 +302,7 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getEventsByFilter(blockHash, contract, name); } - async isWatchedContract (address : string): Promise { + isWatchedContract (address : string): Contract | undefined { return this._baseIndexer.isWatchedContract(address); } @@ -299,6 +310,10 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.watchContract(address, kind, checkpoint, startingBlock); } + async updateIPLDStatusMap (address: string, ipldStatus: IpldStatusInterface): Promise { + await this._baseIndexer.updateIPLDStatusMap(address, ipldStatus); + } + cacheContract (contract: Contract): void { return this._baseIndexer.cacheContract(contract); } @@ -351,8 +366,8 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.getBlocksAtHeight(height, isPruned); } - async fetchBlockEvents (block: DeepPartial): Promise { - return this._baseIndexer.fetchBlockEvents(block, this._fetchAndSaveEvents.bind(this)); + async fetchBlockWithEvents (block: DeepPartial): Promise { + return this._baseIndexer.fetchBlockWithEvents(block, this._fetchAndSaveEvents.bind(this)); } async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise> { @@ -451,7 +466,7 @@ export class Indexer implements IndexerInterface { }; console.time('time:indexer#_fetchAndSaveEvents-save-block-events'); - const blockProgress = await this._db.saveEvents(dbTx, block, dbEvents); + const blockProgress = await this._db.saveBlockWithEvents(dbTx, block, dbEvents); await dbTx.commitTransaction(); console.timeEnd('time:indexer#_fetchAndSaveEvents-save-block-events'); diff --git a/packages/erc721-watcher/src/database.ts b/packages/erc721-watcher/src/database.ts index 2bb0e5f1..a8c57866 100644 --- a/packages/erc721-watcher/src/database.ts +++ b/packages/erc721-watcher/src/database.ts @@ -6,7 +6,7 @@ import assert from 'assert'; import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions, FindOneOptions, LessThanOrEqual } from 'typeorm'; import path from 'path'; -import { IPLDDatabase as BaseDatabase, IPLDDatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util'; +import { Database as BaseDatabase, DatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util'; import { Contract } from './entity/Contract'; import { Event } from './entity/Event'; @@ -30,7 +30,7 @@ import { _TokenApprovals } from './entity/_TokenApprovals'; import { _OperatorApprovals } from './entity/_OperatorApprovals'; import { TransferCount } from './entity/TransferCount'; -export class Database implements IPLDDatabaseInterface { +export class Database implements DatabaseInterface { _config: ConnectionOptions; _conn!: Connection; _baseDatabase: BaseDatabase; @@ -398,11 +398,23 @@ export class Database implements IPLDDatabaseInterface { return this._baseDatabase.getBlockEvents(repo, blockHash, where, queryOptions); } - async saveEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise { + async saveBlockWithEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise { const blockRepo = queryRunner.manager.getRepository(BlockProgress); const eventRepo = queryRunner.manager.getRepository(Event); - return this._baseDatabase.saveEvents(blockRepo, eventRepo, block, events); + return this._baseDatabase.saveBlockWithEvents(blockRepo, eventRepo, block, events); + } + + async saveEvents (queryRunner: QueryRunner, events: Event[]): Promise { + const eventRepo = queryRunner.manager.getRepository(Event); + + return this._baseDatabase.saveEvents(eventRepo, events); + } + + async saveBlockProgress (queryRunner: QueryRunner, block: DeepPartial): Promise { + const repo = queryRunner.manager.getRepository(BlockProgress); + + return this._baseDatabase.saveBlockProgress(repo, block); } async saveContract (queryRunner: QueryRunner, address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise { diff --git a/packages/erc721-watcher/src/hooks.ts b/packages/erc721-watcher/src/hooks.ts index 9865888e..a4091cb1 100644 --- a/packages/erc721-watcher/src/hooks.ts +++ b/packages/erc721-watcher/src/hooks.ts @@ -38,7 +38,7 @@ export async function createInitialState (indexer: Indexer, contractAddress: str /** * Hook function to create state diff. - * @param indexer Indexer instance that contains methods to fetch the contract varaiable values. + * @param indexer Indexer instance that contains methods to fetch the contract variable values. * @param blockHash Block hash of the concerned block. */ export async function createStateDiff (indexer: Indexer, blockHash: string): Promise { diff --git a/packages/erc721-watcher/src/indexer.ts b/packages/erc721-watcher/src/indexer.ts index eaa5daa1..fd8b7f04 100644 --- a/packages/erc721-watcher/src/indexer.ts +++ b/packages/erc721-watcher/src/indexer.ts @@ -14,8 +14,8 @@ import * as codec from '@ipld/dag-cbor'; import { EthClient } from '@cerc-io/ipld-eth-client'; import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper'; import { - IPLDIndexer as BaseIndexer, - IPLDIndexerInterface, + Indexer as BaseIndexer, + IndexerInterface, ValueResult, UNKNOWN_EVENT_NAME, ServerConfig, @@ -27,7 +27,8 @@ import { BlockHeight, IPFSClient, StateKind, - IpldStatus as IpldStatusInterface + IpldStatus as IpldStatusInterface, + ResultIPLDBlock } from '@cerc-io/util'; import ERC721Artifacts from './artifacts/ERC721.json'; @@ -70,21 +71,7 @@ export type ResultEvent = { proof: string; }; -export type ResultIPLDBlock = { - block: { - cid: string; - hash: string; - number: number; - timestamp: number; - parentHash: string; - }; - contractAddress: string; - cid: string; - kind: string; - data: string; -}; - -export class Indexer implements IPLDIndexerInterface { +export class Indexer implements IndexerInterface { _db: Database _ethClient: EthClient _ethProvider: BaseProvider @@ -907,7 +894,7 @@ export class Indexer implements IPLDIndexerInterface { return this._baseIndexer.getEventsByFilter(blockHash, contract, name); } - async isWatchedContract (address : string): Promise { + isWatchedContract (address : string): Contract | undefined { return this._baseIndexer.isWatchedContract(address); } @@ -959,8 +946,8 @@ export class Indexer implements IPLDIndexerInterface { return this._baseIndexer.getBlocksAtHeight(height, isPruned); } - async fetchBlockEvents (block: DeepPartial): Promise { - return this._baseIndexer.fetchBlockEvents(block, this._fetchAndSaveEvents.bind(this)); + async fetchBlockWithEvents (block: DeepPartial): Promise { + return this._baseIndexer.fetchBlockWithEvents(block, this._fetchAndSaveEvents.bind(this)); } async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise> { @@ -1098,7 +1085,7 @@ export class Indexer implements IPLDIndexerInterface { }; console.time('time:indexer#_fetchAndSaveEvents-save-block-events'); - const blockProgress = await this._db.saveEvents(dbTx, block, dbEvents); + const blockProgress = await this._db.saveBlockWithEvents(dbTx, block, dbEvents); await dbTx.commitTransaction(); console.timeEnd('time:indexer#_fetchAndSaveEvents-save-block-events'); diff --git a/packages/graph-node/environments/compare-cli-config.toml b/packages/graph-node/environments/compare-cli-config.toml index 0d450bc4..99e24826 100644 --- a/packages/graph-node/environments/compare-cli-config.toml +++ b/packages/graph-node/environments/compare-cli-config.toml @@ -13,7 +13,7 @@ entitiesDir = "../../graph-test-watcher/dist/entity/*" endpoint = "gqlEndpoint2" verifyState = true - derivedFields = [] + skipFields = [] [cache] endpoint = "gqlEndpoint1" diff --git a/packages/graph-node/package.json b/packages/graph-node/package.json index c6013ea8..73bf306f 100644 --- a/packages/graph-node/package.json +++ b/packages/graph-node/package.json @@ -3,6 +3,7 @@ "version": "0.1.0", "main": "dist/index.js", "license": "AGPL-3.0", + "private": true, "devDependencies": { "@graphprotocol/graph-ts": "^0.22.0", "@nomiclabs/hardhat-ethers": "^2.0.2", diff --git a/packages/graph-node/src/cli/compare/compare-blocks.ts b/packages/graph-node/src/cli/compare/compare-blocks.ts index 2d806bf6..5d016578 100644 --- a/packages/graph-node/src/cli/compare/compare-blocks.ts +++ b/packages/graph-node/src/cli/compare/compare-blocks.ts @@ -12,7 +12,7 @@ import _ from 'lodash'; import { getConfig as getWatcherConfig, wait } from '@cerc-io/util'; import { GraphQLClient } from '@cerc-io/ipld-eth-client'; -import { checkEntityInIPLDState, compareQuery, Config, getIPLDsByBlock, checkIPLDMetaData, combineIPLDState, getClients, getConfig } from './utils'; +import { checkGQLEntityInIPLDState, compareQuery, Config, getIPLDsByBlock, checkIPLDMetaData, combineIPLDState, getClients, getConfig, checkGQLEntitiesInIPLDState } from './utils'; import { Database } from '../../database'; import { getSubgraphConfig } from '../../utils'; @@ -168,7 +168,7 @@ export const main = async (): Promise => { ); if (config.watcher.verifyState) { - const ipldDiff = await checkEntityInIPLDState(ipldStateByBlock, queryName, result, id, rawJson, config.watcher.derivedFields); + const ipldDiff = await checkGQLEntityInIPLDState(ipldStateByBlock, entityName, result[queryName], id, rawJson, config.watcher.skipFields); if (ipldDiff) { log('Results mismatch for IPLD state:', ipldDiff); @@ -182,13 +182,24 @@ export const main = async (): Promise => { } } else { if (updatedEntities.has(entityName)) { - ({ diff: resultDiff } = await compareQuery( + let result; + + ({ diff: resultDiff, result1: result } = await compareQuery( clients, queryName, { block }, rawJson, timeDiff )); + + if (config.watcher.verifyState) { + const ipldDiff = await checkGQLEntitiesInIPLDState(ipldStateByBlock, entityName, result[queryName], rawJson, config.watcher.skipFields); + + if (ipldDiff) { + log('Results mismatch for IPLD state:', ipldDiff); + diffFound = true; + } + } } } diff --git a/packages/graph-node/src/cli/compare/utils.ts b/packages/graph-node/src/cli/compare/utils.ts index 95ca3c4d..d6aecd1a 100644 --- a/packages/graph-node/src/cli/compare/utils.ts +++ b/packages/graph-node/src/cli/compare/utils.ts @@ -50,7 +50,7 @@ interface QueryConfig { queryLimits: { [queryName: string]: number } } -interface EntityDerivedFields { +interface EntitySkipFields { entity: string; fields: string[]; } @@ -63,7 +63,7 @@ export interface Config { entitiesDir: string; verifyState: boolean; endpoint: keyof EndpointConfig; - derivedFields: EntityDerivedFields[] + skipFields: EntitySkipFields[] } cache: { endpoint: keyof EndpointConfig; @@ -312,34 +312,69 @@ export const combineIPLDState = (contractIPLDs: {[key: string]: any}[]): {[key: return contractIPLDStates.reduce((acc, state) => _.merge(acc, state)); }; -export const checkEntityInIPLDState = async ( +export const checkGQLEntityInIPLDState = async ( ipldState: {[key: string]: any}, - queryName: string, + entityName: string, entityResult: {[key: string]: any}, id: string, rawJson: boolean, - derivedFields: EntityDerivedFields[] = [] + skipFields: EntitySkipFields[] = [] ): Promise => { - const entityName = _.upperFirst(queryName); const ipldEntity = ipldState[entityName][id]; // Filter __typename key in GQL result. - const resultEntity = omitDeep(entityResult[queryName], '__typename'); + entityResult = omitDeep(entityResult, '__typename'); - // Filter derived fields in GQL result. - derivedFields.forEach(({ entity, fields }) => { + // Filter skipped fields in state comaparison. + skipFields.forEach(({ entity, fields }) => { if (entityName === entity) { - fields.forEach(field => { - delete resultEntity[field]; - }); + omitDeep(entityResult, fields); + omitDeep(ipldEntity, fields); } }); - const diff = compareObjects(resultEntity, ipldEntity, rawJson); + const diff = compareObjects(entityResult, ipldEntity, rawJson); return diff; }; +export const checkGQLEntitiesInIPLDState = async ( + ipldState: {[key: string]: any}, + entityName: string, + entitiesResult: any[], + rawJson: boolean, + skipFields: EntitySkipFields[] = [] +): Promise => { + // Form entities from state to compare with GQL result + const stateEntities = ipldState[entityName]; + + for (const entityResult of entitiesResult) { + const stateEntity = stateEntities[entityResult.id]; + + // Verify state if entity from GQL result is present in state. + if (stateEntity) { + // Filter __typename key in GQL result. + entitiesResult = omitDeep(entityResult, '__typename'); + + // Filter skipped fields in state comaparison. + skipFields.forEach(({ entity, fields }) => { + if (entityName === entity) { + omitDeep(entityResult, fields); + omitDeep(stateEntity, fields); + } + }); + + const diff = compareObjects(entityResult, stateEntity, rawJson); + + if (diff) { + return diff; + } + } + } + + return ''; +}; + // obj1: expected // obj2: actual const compareObjects = (obj1: any, obj2: any, rawJson: boolean): string => { diff --git a/packages/graph-node/src/loader.ts b/packages/graph-node/src/loader.ts index 6352ebc6..93d1dde1 100644 --- a/packages/graph-node/src/loader.ts +++ b/packages/graph-node/src/loader.ts @@ -100,6 +100,7 @@ export const instantiate = async ( database.cacheUpdatedEntity(entityName, dbEntity); // Update the in-memory subgraph state if not disabled. + // TODO: enableSubgraphState if (!indexer.serverConfig.disableSubgraphState) { // Prepare diff data for the entity update assert(indexer.getRelationsMap); diff --git a/packages/graph-node/src/watcher.ts b/packages/graph-node/src/watcher.ts index 787aebf9..1e8e35fb 100644 --- a/packages/graph-node/src/watcher.ts +++ b/packages/graph-node/src/watcher.ts @@ -12,7 +12,7 @@ import { SelectionNode } from 'graphql'; import { ResultObject } from '@vulcanize/assemblyscript/lib/loader'; import { EthClient } from '@cerc-io/ipld-eth-client'; -import { getFullBlock, BlockHeight, ServerConfig, getFullTransaction, QueryOptions, IPLDBlockInterface, IPLDIndexerInterface, BlockProgressInterface } from '@cerc-io/util'; +import { getFullBlock, BlockHeight, ServerConfig, getFullTransaction, QueryOptions, IPLDBlockInterface, IndexerInterface, BlockProgressInterface } from '@cerc-io/util'; import { createBlock, createEvent, getSubgraphConfig, resolveEntityFieldConflicts, Transaction } from './utils'; import { Context, GraphData, instantiate } from './loader'; @@ -28,7 +28,7 @@ interface DataSource { export class GraphWatcher { _database: Database; - _indexer?: IPLDIndexerInterface; + _indexer?: IndexerInterface; _ethClient: EthClient; _ethProvider: providers.BaseProvider; _subgraphPath: string; @@ -254,7 +254,7 @@ export class GraphWatcher { } } - setIndexer (indexer: IPLDIndexerInterface): void { + setIndexer (indexer: IndexerInterface): void { this._indexer = indexer; } diff --git a/packages/graph-node/test/utils/indexer.ts b/packages/graph-node/test/utils/indexer.ts index 9fd6f4c1..5ec91aeb 100644 --- a/packages/graph-node/test/utils/indexer.ts +++ b/packages/graph-node/test/utils/indexer.ts @@ -7,7 +7,10 @@ import { EventInterface, SyncStatusInterface, ServerConfig as ServerConfigInterface, - ValueResult + ValueResult, + ContractInterface, + IpldStatus as IpldStatusInterface, + IPLDBlockInterface } from '@cerc-io/util'; import { EthClient } from '@cerc-io/ipld-eth-client'; import { GetStorageAt, getStorageValue, MappingKey, StorageLayout } from '@cerc-io/solidity-mapper'; @@ -85,7 +88,7 @@ export class Indexer implements IndexerInterface { return ''; } - async fetchBlockEvents (block: BlockProgressInterface): Promise { + async fetchBlockWithEvents (block: BlockProgressInterface): Promise { return block; } @@ -153,6 +156,22 @@ export class Indexer implements IndexerInterface { async processEvent (event: EventInterface): Promise { assert(event); } + + isWatchedContract (address : string): ContractInterface | undefined { + return undefined; + } + + async processBlock (blockProgress: BlockProgressInterface): Promise { + return undefined; + } + + getIPLDData (ipldBlock: IPLDBlockInterface): any { + return undefined; + } + + async updateIPLDStatusMap (address: string, ipldStatus: IpldStatusInterface): Promise { + return undefined; + } } class SyncStatus implements SyncStatusInterface { diff --git a/packages/graph-test-watcher/src/database.ts b/packages/graph-test-watcher/src/database.ts index 4f8409a4..d1fd69ff 100644 --- a/packages/graph-test-watcher/src/database.ts +++ b/packages/graph-test-watcher/src/database.ts @@ -6,7 +6,7 @@ import assert from 'assert'; import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions } from 'typeorm'; import path from 'path'; -import { IPLDDatabase as BaseDatabase, IPLDDatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util'; +import { Database as BaseDatabase, DatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util'; import { Contract } from './entity/Contract'; import { Event } from './entity/Event'; @@ -18,7 +18,7 @@ import { IPLDBlock } from './entity/IPLDBlock'; import { GetMethod } from './entity/GetMethod'; import { _Test } from './entity/_Test'; -export class Database implements IPLDDatabaseInterface { +export class Database implements DatabaseInterface { _config: ConnectionOptions; _conn!: Connection; _baseDatabase: BaseDatabase; @@ -171,11 +171,23 @@ export class Database implements IPLDDatabaseInterface { return this._baseDatabase.getBlockEvents(repo, blockHash, where, queryOptions); } - async saveEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise { + async saveBlockWithEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise { const blockRepo = queryRunner.manager.getRepository(BlockProgress); const eventRepo = queryRunner.manager.getRepository(Event); - return this._baseDatabase.saveEvents(blockRepo, eventRepo, block, events); + return this._baseDatabase.saveBlockWithEvents(blockRepo, eventRepo, block, events); + } + + async saveEvents (queryRunner: QueryRunner, events: Event[]): Promise { + const eventRepo = queryRunner.manager.getRepository(Event); + + return this._baseDatabase.saveEvents(eventRepo, events); + } + + async saveBlockProgress (queryRunner: QueryRunner, block: DeepPartial): Promise { + const repo = queryRunner.manager.getRepository(BlockProgress); + + return this._baseDatabase.saveBlockProgress(repo, block); } async saveContract (queryRunner: QueryRunner, address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise { diff --git a/packages/graph-test-watcher/src/indexer.ts b/packages/graph-test-watcher/src/indexer.ts index 6782d298..94e0b6b4 100644 --- a/packages/graph-test-watcher/src/indexer.ts +++ b/packages/graph-test-watcher/src/indexer.ts @@ -16,7 +16,7 @@ import * as codec from '@ipld/dag-cbor'; import { EthClient } from '@cerc-io/ipld-eth-client'; import { StorageLayout, MappingKey } from '@cerc-io/solidity-mapper'; import { - IPLDIndexer as BaseIndexer, + Indexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME, ServerConfig, @@ -27,8 +27,9 @@ import { BlockHeight, IPFSClient, StateKind, - IPLDIndexerInterface, - IpldStatus as IpldStatusInterface + IndexerInterface, + IpldStatus as IpldStatusInterface, + ResultIPLDBlock } from '@cerc-io/util'; import { GraphWatcher } from '@cerc-io/graph-node'; @@ -74,21 +75,7 @@ export type ResultEvent = { proof: string; }; -export type ResultIPLDBlock = { - block: { - cid: string; - hash: string; - number: number; - timestamp: number; - parentHash: string; - }; - contractAddress: string; - cid: string; - kind: string; - data: string; -}; - -export class Indexer implements IPLDIndexerInterface { +export class Indexer implements IndexerInterface { _db: Database _ethClient: EthClient _ethProvider: BaseProvider @@ -536,7 +523,7 @@ export class Indexer implements IPLDIndexerInterface { return this._baseIndexer.getEventsByFilter(blockHash, contract, name); } - async isWatchedContract (address : string): Promise { + isWatchedContract (address : string): Contract | undefined { return this._baseIndexer.isWatchedContract(address); } @@ -588,8 +575,8 @@ export class Indexer implements IPLDIndexerInterface { return this._baseIndexer.getBlocksAtHeight(height, isPruned); } - async fetchBlockEvents (block: DeepPartial): Promise { - return this._baseIndexer.fetchBlockEvents(block, this._fetchAndSaveEvents.bind(this)); + async fetchBlockWithEvents (block: DeepPartial): Promise { + return this._baseIndexer.fetchBlockWithEvents(block, this._fetchAndSaveEvents.bind(this)); } async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise> { @@ -820,7 +807,7 @@ export class Indexer implements IPLDIndexerInterface { }; console.time('time:indexer#_fetchAndSaveEvents-save-block-events'); - const blockProgress = await this._db.saveEvents(dbTx, block, dbEvents); + const blockProgress = await this._db.saveBlockWithEvents(dbTx, block, dbEvents); await dbTx.commitTransaction(); console.timeEnd('time:indexer#_fetchAndSaveEvents-save-block-events'); diff --git a/packages/ipld-eth-client/.npmignore b/packages/ipld-eth-client/.npmignore new file mode 100644 index 00000000..2739a4b8 --- /dev/null +++ b/packages/ipld-eth-client/.npmignore @@ -0,0 +1,5 @@ +/src/ +index.ts +tsconfig.json +.eslintrc.json +.eslintignore diff --git a/packages/ipld-eth-client/package.json b/packages/ipld-eth-client/package.json index 58a9a570..56970d6b 100644 --- a/packages/ipld-eth-client/package.json +++ b/packages/ipld-eth-client/package.json @@ -2,7 +2,6 @@ "name": "@cerc-io/ipld-eth-client", "version": "0.1.0", "description": "IPLD ETH Client", - "private": true, "main": "dist/index.js", "scripts": { "lint": "eslint .", diff --git a/packages/mobymask-watcher/src/database.ts b/packages/mobymask-watcher/src/database.ts index 73c3af12..b000e6f5 100644 --- a/packages/mobymask-watcher/src/database.ts +++ b/packages/mobymask-watcher/src/database.ts @@ -6,7 +6,7 @@ import assert from 'assert'; import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions, LessThanOrEqual } from 'typeorm'; import path from 'path'; -import { IPLDDatabase as BaseDatabase, IPLDDatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util'; +import { Database as BaseDatabase, DatabaseInterface, QueryOptions, StateKind, Where } from '@cerc-io/util'; import { Contract } from './entity/Contract'; import { Event } from './entity/Event'; @@ -20,7 +20,7 @@ import { IsRevoked } from './entity/IsRevoked'; import { IsPhisher } from './entity/IsPhisher'; import { IsMember } from './entity/IsMember'; -export class Database implements IPLDDatabaseInterface { +export class Database implements DatabaseInterface { _config: ConnectionOptions; _conn!: Connection; _baseDatabase: BaseDatabase; @@ -230,11 +230,23 @@ export class Database implements IPLDDatabaseInterface { return this._baseDatabase.getBlockEvents(repo, blockHash, where, queryOptions); } - async saveEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise { + async saveBlockWithEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise { const blockRepo = queryRunner.manager.getRepository(BlockProgress); const eventRepo = queryRunner.manager.getRepository(Event); - return this._baseDatabase.saveEvents(blockRepo, eventRepo, block, events); + return this._baseDatabase.saveBlockWithEvents(blockRepo, eventRepo, block, events); + } + + async saveEvents (queryRunner: QueryRunner, events: Event[]): Promise { + const eventRepo = queryRunner.manager.getRepository(Event); + + return this._baseDatabase.saveEvents(eventRepo, events); + } + + async saveBlockProgress (queryRunner: QueryRunner, block: DeepPartial): Promise { + const repo = queryRunner.manager.getRepository(BlockProgress); + + return this._baseDatabase.saveBlockProgress(repo, block); } async saveContract (queryRunner: QueryRunner, address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise { diff --git a/packages/mobymask-watcher/src/indexer.ts b/packages/mobymask-watcher/src/indexer.ts index 6ea575e1..98dae3b2 100644 --- a/packages/mobymask-watcher/src/indexer.ts +++ b/packages/mobymask-watcher/src/indexer.ts @@ -14,8 +14,8 @@ import * as codec from '@ipld/dag-cbor'; import { EthClient } from '@cerc-io/ipld-eth-client'; import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper'; import { - IPLDIndexer as BaseIndexer, - IPLDIndexerInterface, + Indexer as BaseIndexer, + IndexerInterface, ValueResult, UNKNOWN_EVENT_NAME, ServerConfig, @@ -28,7 +28,8 @@ import { IPFSClient, StateKind, IpldStatus as IpldStatusInterface, - getFullTransaction + getFullTransaction, + ResultIPLDBlock } from '@cerc-io/util'; import PhisherRegistryArtifacts from './artifacts/PhisherRegistry.json'; @@ -75,21 +76,7 @@ export type ResultEvent = { proof: string; }; -export type ResultIPLDBlock = { - block: { - cid: string; - hash: string; - number: number; - timestamp: number; - parentHash: string; - }; - contractAddress: string; - cid: string; - kind: string; - data: string; -}; - -export class Indexer implements IPLDIndexerInterface { +export class Indexer implements IndexerInterface { _db: Database _ethClient: EthClient _ethProvider: JsonRpcProvider @@ -634,7 +621,7 @@ export class Indexer implements IPLDIndexerInterface { return this._baseIndexer.getEventsByFilter(blockHash, contract, name); } - async isWatchedContract (address : string): Promise { + isWatchedContract (address : string): Contract | undefined { return this._baseIndexer.isWatchedContract(address); } @@ -686,8 +673,8 @@ export class Indexer implements IPLDIndexerInterface { return this._baseIndexer.getBlocksAtHeight(height, isPruned); } - async fetchBlockEvents (block: DeepPartial): Promise { - return this._baseIndexer.fetchBlockEvents(block, this._fetchAndSaveEvents.bind(this)); + async fetchBlockWithEvents (block: DeepPartial): Promise { + return this._baseIndexer.fetchBlockWithEvents(block, this._fetchAndSaveEvents.bind(this)); } async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise> { @@ -842,7 +829,7 @@ export class Indexer implements IPLDIndexerInterface { }; console.time('time:indexer#_fetchAndSaveEvents-save-block-events'); - const blockProgress = await this._db.saveEvents(dbTx, block, dbEvents); + const blockProgress = await this._db.saveBlockWithEvents(dbTx, block, dbEvents); await dbTx.commitTransaction(); console.timeEnd('time:indexer#_fetchAndSaveEvents-save-block-events'); diff --git a/packages/solidity-mapper/.npmignore b/packages/solidity-mapper/.npmignore new file mode 100644 index 00000000..681e19cb --- /dev/null +++ b/packages/solidity-mapper/.npmignore @@ -0,0 +1,10 @@ +/src/ +hardhat.config.ts +tsconfig.json +.eslintrc.json +.eslintignore +.env +.env.example +cache +artifacts +test diff --git a/packages/test/package.json b/packages/test/package.json index d6d900c7..d9ecf50e 100644 --- a/packages/test/package.json +++ b/packages/test/package.json @@ -3,6 +3,7 @@ "version": "0.1.0", "main": "dist/index.js", "license": "AGPL-3.0", + "private": true, "scripts": { "lint": "eslint .", "build": "tsc", diff --git a/packages/util/.npmignore b/packages/util/.npmignore new file mode 100644 index 00000000..9cc8d99d --- /dev/null +++ b/packages/util/.npmignore @@ -0,0 +1,6 @@ +/src/ +hardhat.config.ts +index.ts +tsconfig.json +.eslintrc.json +.eslintignore diff --git a/packages/util/index.ts b/packages/util/index.ts index 37d3b544..ac18ea9f 100644 --- a/packages/util/index.ts +++ b/packages/util/index.ts @@ -14,8 +14,6 @@ export * from './src/indexer'; export * from './src/job-runner'; export * from './src/ipld-helper'; export * from './src/graph-decimal'; -export * from './src/ipld-indexer'; -export * from './src/ipld-database'; export * from './src/ipfs'; export * from './src/index-block'; export * from './src/metrics'; diff --git a/packages/util/src/config.ts b/packages/util/src/config.ts index 7e37d59a..d8a42112 100644 --- a/packages/util/src/config.ts +++ b/packages/util/src/config.ts @@ -22,6 +22,8 @@ export interface JobQueueConfig { maxCompletionLagInSecs: number; jobDelayInMilliSecs?: number; eventsInBatch: number; + lazyUpdateBlockProgress?: boolean; + subgraphEventsOrder: boolean; } export interface ServerConfig { diff --git a/packages/util/src/database.ts b/packages/util/src/database.ts index 99b6ae5e..5fb671f1 100644 --- a/packages/util/src/database.ts +++ b/packages/util/src/database.ts @@ -4,6 +4,7 @@ import assert from 'assert'; import { + Between, Connection, ConnectionOptions, createConnection, @@ -18,8 +19,9 @@ import { } from 'typeorm'; import { SnakeNamingStrategy } from 'typeorm-naming-strategies'; import _ from 'lodash'; +import { Pool } from 'pg'; -import { BlockProgressInterface, ContractInterface, EventInterface, SyncStatusInterface } from './types'; +import { BlockProgressInterface, ContractInterface, EventInterface, IPLDBlockInterface, IpldStatusInterface, StateKind, SyncStatusInterface } from './types'; import { MAX_REORG_DEPTH, UNKNOWN_EVENT_NAME } from './constants'; import { blockProgressCount, eventCount } from './metrics'; @@ -67,12 +69,22 @@ export type Relation = string | { property: string, alias: string } export class Database { _config: ConnectionOptions _conn!: Connection + _pgPool: Pool _blockCount = 0 _eventCount = 0 constructor (config: ConnectionOptions) { assert(config); this._config = config; + assert(config.type === 'postgres'); + + this._pgPool = new Pool({ + user: config.username, + host: config.host, + database: config.database, + password: config.password, + port: config.port + }); } async init (): Promise { @@ -167,6 +179,13 @@ export class Database { .getMany(); } + async saveBlockProgress (repo: Repository, block: DeepPartial): Promise { + this._blockCount++; + blockProgressCount.set(this._blockCount); + + return await repo.save(block); + } + async updateBlockProgress (repo: Repository, block: BlockProgressInterface, lastProcessedEventIndex: number): Promise { if (!block.isComplete) { if (lastProcessedEventIndex <= block.lastProcessedEventIndex) { @@ -222,7 +241,7 @@ export class Database { return queryBuilder.getMany(); } - async saveEvents (blockRepo: Repository, eventRepo: Repository, block: DeepPartial, events: DeepPartial[]): Promise { + async saveBlockWithEvents (blockRepo: Repository, eventRepo: Repository, block: DeepPartial, events: DeepPartial[]): Promise { const { cid, blockHash, @@ -258,17 +277,18 @@ export class Database { this._blockCount++; blockProgressCount.set(this._blockCount); - let blockEventCount = 0; - // Bulk insert events. events.forEach(event => { event.block = blockProgress; - - if (event.eventName !== UNKNOWN_EVENT_NAME) { - blockEventCount++; - } }); + await this.saveEvents(eventRepo, events); + + return blockProgress; + } + + async saveEvents (eventRepo: Repository, events: DeepPartial[]): Promise { + // Bulk insert events. const eventBatches = _.chunk(events, INSERT_EVENTS_BATCH); const insertPromises = eventBatches.map(async events => { @@ -280,10 +300,8 @@ export class Database { }); await Promise.all(insertPromises); - this._eventCount += blockEventCount; + this._eventCount += events.filter(event => event.eventName !== UNKNOWN_EVENT_NAME).length; eventCount.set(this._eventCount); - - return blockProgress; } async getEntities (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions): Promise { @@ -540,22 +558,246 @@ export class Database { return repo.save(entity); } - async _fetchBlockCount (): Promise { - this._blockCount = await this._conn.getRepository('block_progress') - .count(); + async getLatestIPLDBlock (repo: Repository, contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise { + let queryBuilder = repo.createQueryBuilder('ipld_block') + .leftJoinAndSelect('ipld_block.block', 'block') + .where('block.is_pruned = false') + .andWhere('ipld_block.contract_address = :contractAddress', { contractAddress }) + .orderBy('block.block_number', 'DESC'); - blockProgressCount.set(this._blockCount); - } + // Filter out blocks after the provided block number. + if (blockNumber) { + queryBuilder.andWhere('block.block_number <= :blockNumber', { blockNumber }); + } - async _fetchEventCount (): Promise { - this._eventCount = await this._conn.getRepository('event') - .count({ - where: { - eventName: Not(UNKNOWN_EVENT_NAME) + // Filter using kind if specified else avoid diff_staged block. + queryBuilder = kind + ? queryBuilder.andWhere('ipld_block.kind = :kind', { kind }) + : queryBuilder.andWhere('ipld_block.kind != :kind', { kind: StateKind.DiffStaged }); + + // Get the first three entries. + queryBuilder.limit(3); + + const results = await queryBuilder.getMany(); + + if (results.length) { + // Sort by (block number desc, id desc) to get the latest entry. + // At same height, IPLD blocks are expected in order ['init', 'diff', 'checkpoint'], + // and are given preference in order ['checkpoint', 'diff', 'init'] + results.sort((result1, result2) => { + if (result1.block.blockNumber === result2.block.blockNumber) { + return (result1.id > result2.id) ? -1 : 1; + } else { + return (result1.block.blockNumber > result2.block.blockNumber) ? -1 : 1; } }); - eventCount.set(this._eventCount); + return results[0]; + } + } + + async getPrevIPLDBlock (repo: Repository, blockHash: string, contractAddress: string, kind?: string): Promise { + const heirerchicalQuery = ` + WITH RECURSIVE cte_query AS + ( + SELECT + b.block_hash, + b.block_number, + b.parent_hash, + 1 as depth, + i.id, + i.kind + FROM + block_progress b + LEFT JOIN + ipld_block i ON i.block_id = b.id + AND i.contract_address = $2 + WHERE + b.block_hash = $1 + UNION ALL + SELECT + b.block_hash, + b.block_number, + b.parent_hash, + c.depth + 1, + i.id, + i.kind + FROM + block_progress b + LEFT JOIN + ipld_block i + ON i.block_id = b.id + AND i.contract_address = $2 + INNER JOIN + cte_query c ON c.parent_hash = b.block_hash + WHERE + c.depth < $3 + ) + SELECT + block_number, id, kind + FROM + cte_query + ORDER BY block_number DESC, id DESC + `; + + // Fetching block and id for previous IPLDBlock in frothy region. + const queryResult = await repo.query(heirerchicalQuery, [blockHash, contractAddress, MAX_REORG_DEPTH]); + const latestRequiredResult = kind + ? queryResult.find((obj: any) => obj.kind === kind) + : queryResult.find((obj: any) => obj.id); + + let result: IPLDBlockInterface | undefined; + + if (latestRequiredResult) { + result = await repo.findOne(latestRequiredResult.id, { relations: ['block'] }); + } else { + // If IPLDBlock not found in frothy region get latest IPLDBlock in the pruned region. + // Filter out IPLDBlocks from pruned blocks. + const canonicalBlockNumber = queryResult.pop().block_number + 1; + + let queryBuilder = repo.createQueryBuilder('ipld_block') + .leftJoinAndSelect('ipld_block.block', 'block') + .where('block.is_pruned = false') + .andWhere('ipld_block.contract_address = :contractAddress', { contractAddress }) + .andWhere('block.block_number <= :canonicalBlockNumber', { canonicalBlockNumber }) + .orderBy('block.block_number', 'DESC'); + + // Filter using kind if specified else order by id to give preference to checkpoint. + queryBuilder = kind + ? queryBuilder.andWhere('ipld_block.kind = :kind', { kind }) + : queryBuilder.addOrderBy('ipld_block.id', 'DESC'); + + // Get the first entry. + queryBuilder.limit(1); + + result = await queryBuilder.getOne(); + } + + return result; + } + + async getIPLDBlocks (repo: Repository, where: FindConditions): Promise { + return repo.find({ where, relations: ['block'] }); + } + + async getDiffIPLDBlocksInRange (repo: Repository, contractAddress: string, startblock: number, endBlock: number): Promise { + return repo.find({ + relations: ['block'], + where: { + contractAddress, + kind: StateKind.Diff, + block: { + isPruned: false, + blockNumber: Between(startblock + 1, endBlock) + } + }, + order: { + block: 'ASC' + } + }); + } + + async saveOrUpdateIPLDBlock (repo: Repository, ipldBlock: IPLDBlockInterface): Promise { + let updatedData: {[key: string]: any}; + + console.time('time:ipld-database#saveOrUpdateIPLDBlock-DB-query'); + if (ipldBlock.id) { + // Using pg query as workaround for typeorm memory issue when saving checkpoint with large sized data. + const { rows } = await this._pgPool.query(` + UPDATE ipld_block + SET block_id = $1, contract_address = $2, cid = $3, kind = $4, data = $5 + WHERE id = $6 + RETURNING * + `, [ipldBlock.block.id, ipldBlock.contractAddress, ipldBlock.cid, ipldBlock.kind, ipldBlock.data, ipldBlock.id]); + + updatedData = rows[0]; + } else { + const { rows } = await this._pgPool.query(` + INSERT INTO ipld_block(block_id, contract_address, cid, kind, data) + VALUES($1, $2, $3, $4, $5) + RETURNING * + `, [ipldBlock.block.id, ipldBlock.contractAddress, ipldBlock.cid, ipldBlock.kind, ipldBlock.data]); + + updatedData = rows[0]; + } + console.timeEnd('time:ipld-database#saveOrUpdateIPLDBlock-DB-query'); + + assert(updatedData); + return { + block: ipldBlock.block, + contractAddress: updatedData.contract_address, + cid: updatedData.cid, + kind: updatedData.kind, + data: updatedData.data, + id: updatedData.id + }; + } + + async removeIPLDBlocks (repo: Repository, blockNumber: number, kind: string): Promise { + const entities = await repo.find({ relations: ['block'], where: { block: { blockNumber }, kind } }); + + // Delete if entities found. + if (entities.length) { + await repo.delete(entities.map((entity) => entity.id)); + } + } + + async removeIPLDBlocksAfterBlock (repo: Repository, blockNumber: number): Promise { + // Use raw SQL as TypeORM curently doesn't support delete via 'join' or 'using' + const deleteQuery = ` + DELETE FROM + ipld_block + USING block_progress + WHERE + ipld_block.block_id = block_progress.id + AND block_progress.block_number > $1; + `; + + await repo.query(deleteQuery, [blockNumber]); + } + + async getIPLDStatus (repo: Repository): Promise { + return repo.findOne(); + } + + async updateIPLDStatusHooksBlock (repo: Repository, blockNumber: number, force?: boolean): Promise { + let entity = await repo.findOne(); + + if (!entity) { + entity = repo.create({ + latestHooksBlockNumber: blockNumber, + latestCheckpointBlockNumber: -1, + latestIPFSBlockNumber: -1 + }); + } + + if (force || blockNumber > entity.latestHooksBlockNumber) { + entity.latestHooksBlockNumber = blockNumber; + } + + return repo.save(entity); + } + + async updateIPLDStatusCheckpointBlock (repo: Repository, blockNumber: number, force?: boolean): Promise { + const entity = await repo.findOne(); + assert(entity); + + if (force || blockNumber > entity.latestCheckpointBlockNumber) { + entity.latestCheckpointBlockNumber = blockNumber; + } + + return repo.save(entity); + } + + async updateIPLDStatusIPFSBlock (repo: Repository, blockNumber: number, force?: boolean): Promise { + const entity = await repo.findOne(); + assert(entity); + + if (force || blockNumber > entity.latestIPFSBlockNumber) { + entity.latestIPFSBlockNumber = blockNumber; + } + + return repo.save(entity); } buildQuery (repo: Repository, selectQueryBuilder: SelectQueryBuilder, where: Where = {}): SelectQueryBuilder { @@ -622,7 +864,8 @@ export class Database { orderQuery ( repo: Repository, selectQueryBuilder: SelectQueryBuilder, - orderOptions: { orderBy?: string, orderDirection?: string } + orderOptions: { orderBy?: string, orderDirection?: string }, + columnPrefix = '' ): SelectQueryBuilder { const { orderBy, orderDirection } = orderOptions; assert(orderBy); @@ -631,8 +874,26 @@ export class Database { assert(columnMetadata); return selectQueryBuilder.addOrderBy( - `${selectQueryBuilder.alias}.${columnMetadata.propertyAliasName}`, + `"${selectQueryBuilder.alias}"."${columnPrefix}${columnMetadata.databaseName}"`, orderDirection === 'desc' ? 'DESC' : 'ASC' ); } + + async _fetchBlockCount (): Promise { + this._blockCount = await this._conn.getRepository('block_progress') + .count(); + + blockProgressCount.set(this._blockCount); + } + + async _fetchEventCount (): Promise { + this._eventCount = await this._conn.getRepository('event') + .count({ + where: { + eventName: Not(UNKNOWN_EVENT_NAME) + } + }); + + eventCount.set(this._eventCount); + } } diff --git a/packages/util/src/fill.ts b/packages/util/src/fill.ts index 63b5522a..e85faba0 100644 --- a/packages/util/src/fill.ts +++ b/packages/util/src/fill.ts @@ -130,7 +130,7 @@ const prefetchBlocks = async ( const blockProgress = await indexer.getBlockProgress(blockHash); if (!blockProgress) { - await indexer.fetchBlockEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp: timestamp }); + await indexer.fetchBlockWithEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp: timestamp }); } }); diff --git a/packages/util/src/index-block.ts b/packages/util/src/index-block.ts index 17ee7f3a..31375a5c 100644 --- a/packages/util/src/index-block.ts +++ b/packages/util/src/index-block.ts @@ -36,7 +36,7 @@ export const indexBlock = async ( // Check if blockProgress fetched from database. if (!partialblockProgress.id) { - blockProgress = await indexer.fetchBlockEvents(partialblockProgress); + blockProgress = await indexer.fetchBlockWithEvents(partialblockProgress); } else { blockProgress = partialblockProgress as BlockProgressInterface; } diff --git a/packages/util/src/indexer.ts b/packages/util/src/indexer.ts index c6d05ee5..4579a2d6 100644 --- a/packages/util/src/indexer.ts +++ b/packages/util/src/indexer.ts @@ -6,14 +6,20 @@ import assert from 'assert'; import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm'; import debug from 'debug'; import { ethers } from 'ethers'; +import _ from 'lodash'; +import { sha256 } from 'multiformats/hashes/sha2'; +import { CID } from 'multiformats/cid'; +import * as codec from '@ipld/dag-cbor'; import { EthClient } from '@cerc-io/ipld-eth-client'; import { GetStorageAt, getStorageValue, StorageLayout } from '@cerc-io/solidity-mapper'; -import { BlockProgressInterface, DatabaseInterface, EventInterface, SyncStatusInterface, ContractInterface } from './types'; +import { BlockProgressInterface, DatabaseInterface, EventInterface, SyncStatusInterface, ContractInterface, IPLDBlockInterface, IndexerInterface, StateKind } from './types'; import { UNKNOWN_EVENT_NAME, JOB_KIND_CONTRACT, QUEUE_EVENT_PROCESSING } from './constants'; import { JobQueue } from './job-queue'; import { Where, QueryOptions } from './database'; +import { ServerConfig } from './config'; +import { IPFSClient } from './ipfs'; const DEFAULT_MAX_EVENTS_BLOCK_RANGE = 1000; @@ -26,20 +32,54 @@ export interface ValueResult { } } +export interface IpldStatus { + init?: number; + diff?: number; + checkpoint?: number; + // eslint-disable-next-line camelcase + diff_staged?: number; +} + +export type ResultIPLDBlock = { + block: { + cid: string; + hash: string; + number: number; + timestamp: number; + parentHash: string; + }; + contractAddress: string; + cid: string; + kind: string; + data: string; +}; + export class Indexer { + _serverConfig: ServerConfig; _db: DatabaseInterface; _ethClient: EthClient; _getStorageAt: GetStorageAt; _ethProvider: ethers.providers.BaseProvider; _jobQueue: JobQueue; + _ipfsClient: IPFSClient; _watchedContracts: { [key: string]: ContractInterface } = {}; + _ipldStatusMap: { [key: string]: IpldStatus } = {}; - constructor (db: DatabaseInterface, ethClient: EthClient, ethProvider: ethers.providers.BaseProvider, jobQueue: JobQueue) { + constructor ( + serverConfig: ServerConfig, + db: DatabaseInterface, + ethClient: EthClient, + ethProvider: ethers.providers.BaseProvider, + jobQueue: JobQueue, + ipfsClient: IPFSClient + ) { + this._serverConfig = serverConfig; this._db = db; this._ethClient = ethClient; this._ethProvider = ethProvider; this._jobQueue = jobQueue; + this._ipfsClient = ipfsClient; this._getStorageAt = this._ethClient.getStorageAt.bind(this._ethClient); } @@ -192,7 +232,7 @@ export class Indexer { return this._db.getEvent(id); } - async fetchBlockEvents (block: DeepPartial, fetchAndSaveEvents: (block: DeepPartial) => Promise): Promise { + async fetchBlockWithEvents (block: DeepPartial, fetchAndSaveEvents: (block: DeepPartial) => Promise): Promise { assert(block.blockHash); log(`getBlockEvents: fetching from upstream server ${block.blockHash}`); @@ -202,6 +242,35 @@ export class Indexer { return blockProgress; } + async fetchBlockEvents (block: DeepPartial, fetchEvents: (block: DeepPartial) => Promise[]>): Promise[]> { + assert(block.blockHash); + + log(`getBlockEvents: fetching from upstream server ${block.blockHash}`); + console.time('time:indexer#fetchBlockEvents-fetchAndSaveEvents'); + const events = await fetchEvents(block); + console.timeEnd('time:indexer#fetchBlockEvents-fetchAndSaveEvents'); + log(`getBlockEvents: fetched for block: ${block.blockHash} num events: ${events.length}`); + + return events; + } + + async saveBlockProgress (block: DeepPartial): Promise { + const dbTx = await this._db.createTransactionRunner(); + let res; + + try { + res = await this._db.saveBlockProgress(dbTx, block); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + + return res; + } + async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise> { return this._db.getBlockEvents(blockHash, where, queryOptions); } @@ -288,6 +357,20 @@ export class Indexer { return res; } + async saveEvents (dbEvents: EventInterface[]): Promise { + const dbTx = await this._db.createTransactionRunner(); + + try { + await this._db.saveEvents(dbTx, dbEvents); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + } + async getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }> { return this._db.getProcessedBlockCountForRange(fromBlockNumber, toBlockNumber); } @@ -304,7 +387,7 @@ export class Indexer { return this._db.getEventsInRange(fromBlockNumber, toBlockNumber); } - async isWatchedContract (address : string): Promise { + isWatchedContract (address : string): ContractInterface | undefined { return this._watchedContracts[address]; } @@ -364,6 +447,453 @@ export class Indexer { ); } + getIPLDData (ipldBlock: IPLDBlockInterface): any { + return codec.decode(Buffer.from(ipldBlock.data)); + } + + async pushToIPFS (data: any): Promise { + await this._ipfsClient.push(data); + } + + isIPFSConfigured (): boolean { + const ipfsAddr = this._serverConfig.ipfsApiAddr; + + // Return false if ipfsAddr is undefined | null | empty string. + return (ipfsAddr !== undefined && ipfsAddr !== null && ipfsAddr !== ''); + } + + async getLatestHooksProcessedBlock (): Promise { + // Get current hookStatus. + const ipldStatus = await this._db.getIPLDStatus(); + assert(ipldStatus, 'IPLD status not found'); + + // Get all the blocks at height hookStatus.latestProcessedBlockNumber. + const blocksAtHeight = await this.getBlocksAtHeight(ipldStatus.latestHooksBlockNumber, false); + + // There can exactly one block at hookStatus.latestProcessedBlockNumber height. + assert(blocksAtHeight.length === 1); + + return blocksAtHeight[0]; + } + + async processCheckpoint (indexer: IndexerInterface, blockHash: string, checkpointInterval: number): Promise { + // Get all the contracts. + const contracts = Object.values(this._watchedContracts); + + // Getting the block for checkpoint. + const block = await this.getBlockProgress(blockHash); + assert(block); + + // For each contract, merge the diff till now to create a checkpoint. + for (const contract of contracts) { + // Get IPLD status for the contract. + const ipldStatus = this._ipldStatusMap[contract.address]; + assert(ipldStatus, `IPLD status for contract ${contract.address} not found`); + + const initBlockNumber = ipldStatus.init; + + // Check if contract has checkpointing on. + // Check if it's time for a checkpoint or the init is in current block. + if ( + contract.checkpoint && + (block.blockNumber % checkpointInterval === 0 || initBlockNumber === block.blockNumber) + ) { + await this.createCheckpoint(indexer, contract.address, block); + } + } + } + + async processCLICheckpoint (indexer: IndexerInterface, contractAddress: string, blockHash?: string): Promise { + // Getting the block for checkpoint. + let block; + + if (blockHash) { + block = await this.getBlockProgress(blockHash); + } else { + // In case of empty blockHash from checkpoint CLI, get the latest processed block from hookStatus for the checkpoint. + block = await this.getLatestHooksProcessedBlock(); + } + + assert(block); + + const checkpointBlockHash = await this.createCheckpoint(indexer, contractAddress, block); + assert(checkpointBlockHash, 'Checkpoint not created'); + + // Push checkpoint to IPFS if configured. + if (this.isIPFSConfigured()) { + const checkpointIPLDBlocks = await this._db.getIPLDBlocks({ block, contractAddress, kind: StateKind.Checkpoint }); + + // There can be at most one IPLDBlock for a (block, contractAddress, kind) combination. + assert(checkpointIPLDBlocks.length <= 1); + const checkpointIPLDBlock = checkpointIPLDBlocks[0]; + + const checkpointData = this.getIPLDData(checkpointIPLDBlock); + await this.pushToIPFS(checkpointData); + } + + return checkpointBlockHash; + } + + async createStateCheckpoint (contractAddress: string, block: BlockProgressInterface, data: any): Promise { + // Get the contract. + const contract = this._watchedContracts[contractAddress]; + assert(contract, `Contract ${contractAddress} not watched`); + + if (block.blockNumber < contract.startingBlock) { + return; + } + + // Create a checkpoint from the hook data without being concerned about diffs. + const ipldBlock = await this.prepareIPLDBlock(block, contractAddress, data, StateKind.Checkpoint); + await this.saveOrUpdateIPLDBlock(ipldBlock); + } + + async createInit ( + indexer: IndexerInterface, + blockHash: string, + blockNumber: number + ): Promise { + // Get all the contracts. + const contracts = Object.values(this._watchedContracts); + + // Create an initial state for each contract. + for (const contract of contracts) { + // Check if contract has checkpointing on. + if (contract.checkpoint) { + // Check if starting block not reached yet. + if (blockNumber < contract.startingBlock) { + continue; + } + + // Get IPLD status for the contract. + const ipldStatus = this._ipldStatusMap[contract.address]; + assert(ipldStatus, `IPLD status for contract ${contract.address} not found`); + + // Check if a 'init' IPLDBlock already exists. + // Or if a 'diff' IPLDBlock already exists. + // Or if a 'checkpoint' IPLDBlock already exists. + // (A watcher with imported state won't have an init IPLDBlock, but it will have the imported checkpoint) + if ( + ipldStatus.init || + ipldStatus.diff || + ipldStatus.checkpoint + ) { + continue; + } + + // Call initial state hook. + assert(indexer.processInitialState); + const stateData = await indexer.processInitialState(contract.address, blockHash); + + const block = await this.getBlockProgress(blockHash); + assert(block); + + const ipldBlock = await this.prepareIPLDBlock(block, contract.address, stateData, StateKind.Init); + await this.saveOrUpdateIPLDBlock(ipldBlock); + + // Push initial state to IPFS if configured. + if (this.isIPFSConfigured()) { + const ipldData = this.getIPLDData(ipldBlock); + await this.pushToIPFS(ipldData); + } + } + } + } + + async createDiffStaged (contractAddress: string, blockHash: string, data: any): Promise { + const block = await this.getBlockProgress(blockHash); + assert(block); + + // Get the contract. + const contract = this._watchedContracts[contractAddress]; + assert(contract, `Contract ${contractAddress} not watched`); + + if (block.blockNumber < contract.startingBlock) { + return; + } + + // Create a staged diff block. + const ipldBlock = await this.prepareIPLDBlock(block, contractAddress, data, StateKind.DiffStaged); + await this.saveOrUpdateIPLDBlock(ipldBlock); + } + + async finalizeDiffStaged (blockHash: string): Promise { + const block = await this.getBlockProgress(blockHash); + assert(block); + + // Get all the staged diff blocks for the given blockHash. + const stagedBlocks = await this._db.getIPLDBlocks({ block, kind: StateKind.DiffStaged }); + + // For each staged block, create a diff block. + for (const stagedBlock of stagedBlocks) { + const data = codec.decode(Buffer.from(stagedBlock.data)); + await this.createDiff(stagedBlock.contractAddress, block, data); + } + + // Remove all the staged diff blocks for current blockNumber. + // (Including staged diff blocks associated with pruned blocks) + await this.removeIPLDBlocks(block.blockNumber, StateKind.DiffStaged); + } + + async createDiff (contractAddress: string, block: BlockProgressInterface, data: any): Promise { + // Get the contract. + const contract = this._watchedContracts[contractAddress]; + assert(contract, `Contract ${contractAddress} not watched`); + + if (block.blockNumber < contract.startingBlock) { + return; + } + + // Get IPLD status for the contract. + const ipldStatus = this._ipldStatusMap[contractAddress]; + assert(ipldStatus, `IPLD status for contract ${contractAddress} not found`); + + // Get the latest checkpoint block number. + const checkpointBlockNumber = ipldStatus.checkpoint; + + if (!checkpointBlockNumber) { + // Get the initial state block number. + const initBlockNumber = ipldStatus.init; + + // There should be an initial state at least. + assert(initBlockNumber, 'No initial state found'); + } else if (checkpointBlockNumber === block.blockNumber) { + // Check if the latest checkpoint is in the same block if block number is same. + const checkpoint = await this._db.getLatestIPLDBlock(contractAddress, StateKind.Checkpoint); + assert(checkpoint); + + assert(checkpoint.block.blockHash !== block.blockHash, 'Checkpoint already created for the block hash'); + } + + const ipldBlock = await this.prepareIPLDBlock(block, contractAddress, data, StateKind.Diff); + await this.saveOrUpdateIPLDBlock(ipldBlock); + } + + async createCheckpoint (indexer: IndexerInterface, contractAddress: string, currentBlock: BlockProgressInterface): Promise { + // Get the contract. + const contract = this._watchedContracts[contractAddress]; + assert(contract, `Contract ${contractAddress} not watched`); + + if (currentBlock.blockNumber < contract.startingBlock) { + return; + } + + // Make sure the block is marked complete. + assert(currentBlock.isComplete, 'Block for a checkpoint should be marked as complete'); + + // Get current hookStatus. + const ipldStatus = await this._db.getIPLDStatus(); + assert(ipldStatus); + + // Make sure the hooks have been processed for the block. + assert(currentBlock.blockNumber <= ipldStatus.latestHooksBlockNumber, 'Block for a checkpoint should have hooks processed'); + + // Call state checkpoint hook and check if default checkpoint is disabled. + assert(indexer.processStateCheckpoint); + const disableDefaultCheckpoint = await indexer.processStateCheckpoint(contractAddress, currentBlock.blockHash); + + if (disableDefaultCheckpoint) { + // Return if default checkpoint is disabled. + // Return block hash for checkpoint CLI. + return currentBlock.blockHash; + } + + // Fetch the latest 'checkpoint' | 'init' for the contract to fetch diffs after it. + let prevNonDiffBlock: IPLDBlockInterface; + let diffStartBlockNumber: number; + const checkpointBlock = await this._db.getLatestIPLDBlock(contractAddress, StateKind.Checkpoint, currentBlock.blockNumber - 1); + + if (checkpointBlock) { + const checkpointBlockNumber = checkpointBlock.block.blockNumber; + + prevNonDiffBlock = checkpointBlock; + diffStartBlockNumber = checkpointBlockNumber; + + // Update IPLD status map with the latest checkpoint info. + // Essential while importing state as checkpoint at the snapshot block is added by import-state CLI. + // (job-runner won't have the updated ipld status) + this.updateIPLDStatusMap(contractAddress, { checkpoint: checkpointBlockNumber }); + } else { + // There should be an initial state at least. + const initBlock = await this._db.getLatestIPLDBlock(contractAddress, StateKind.Init); + assert(initBlock, 'No initial state found'); + + prevNonDiffBlock = initBlock; + // Take block number previous to initial state block as the checkpoint is to be created in the same block. + diffStartBlockNumber = initBlock.block.blockNumber - 1; + } + + // Fetching all diff blocks after the latest 'checkpoint' | 'init'. + const diffBlocks = await this._db.getDiffIPLDBlocksInRange(contractAddress, diffStartBlockNumber, currentBlock.blockNumber); + + const prevNonDiffBlockData = codec.decode(Buffer.from(prevNonDiffBlock.data)) as any; + const data = { + state: prevNonDiffBlockData.state + }; + + for (const diffBlock of diffBlocks) { + const diff = codec.decode(Buffer.from(diffBlock.data)) as any; + data.state = _.merge(data.state, diff.state); + } + + const ipldBlock = await this.prepareIPLDBlock(currentBlock, contractAddress, data, StateKind.Checkpoint); + await this.saveOrUpdateIPLDBlock(ipldBlock); + + return currentBlock.blockHash; + } + + async prepareIPLDBlock (block: BlockProgressInterface, contractAddress: string, data: any, kind: StateKind):Promise { + console.time('time:ipld-indexer#prepareIPLDBlock'); + let ipldBlock: IPLDBlockInterface; + + // Get IPLD status for the contract. + const ipldStatus = this._ipldStatusMap[contractAddress]; + assert(ipldStatus, `IPLD status for contract ${contractAddress} not found`); + + // Get an existing 'init' | 'diff' | 'diff_staged' | 'checkpoint' IPLDBlock for current block, contractAddress to update. + let currentIPLDBlock: IPLDBlockInterface | undefined; + const prevIPLDBlockNumber = ipldStatus[kind]; + + // Fetch from DB for previous IPLD block or for checkpoint kind. + if (kind === 'checkpoint' || (prevIPLDBlockNumber && prevIPLDBlockNumber === block.blockNumber)) { + const currentIPLDBlocks = await this._db.getIPLDBlocks({ block, contractAddress, kind }); + + // There can be at most one IPLDBlock for a (block, contractAddress, kind) combination. + assert(currentIPLDBlocks.length <= 1); + currentIPLDBlock = currentIPLDBlocks[0]; + } + + if (currentIPLDBlock) { + // Update current IPLDBlock of same kind if it exists. + ipldBlock = currentIPLDBlock; + + // Update the data field. + const oldData = codec.decode(Buffer.from(ipldBlock.data)); + data = _.merge(oldData, data); + } else { + // Create a new IPLDBlock instance. + ipldBlock = this._db.getNewIPLDBlock(); + + // Fetch the parent IPLDBlock. + const parentIPLDBlock = await this._db.getLatestIPLDBlock(contractAddress, null, block.blockNumber); + + // Setting the meta-data for an IPLDBlock (done only once per IPLD block). + data.meta = { + id: contractAddress, + kind, + parent: { + '/': parentIPLDBlock ? parentIPLDBlock.cid : null + }, + ethBlock: { + cid: { + '/': block.cid + }, + num: block.blockNumber + } + }; + } + + // Encoding the data using dag-cbor codec. + const bytes = codec.encode(data); + + // Calculating sha256 (multi)hash of the encoded data. + const hash = await sha256.digest(bytes); + + // Calculating the CID: v1, code: dag-cbor, hash. + const cid = CID.create(1, codec.code, hash); + + // Update ipldBlock with new data. + ipldBlock = Object.assign(ipldBlock, { + block, + contractAddress, + cid: cid.toString(), + kind: data.meta.kind, + data: Buffer.from(bytes) + }); + + console.timeEnd('time:ipld-indexer#prepareIPLDBlock'); + return ipldBlock; + } + + async getIPLDBlocksByHash (blockHash: string): Promise { + const block = await this.getBlockProgress(blockHash); + assert(block); + + return this._db.getIPLDBlocks({ block }); + } + + async getIPLDBlockByCid (cid: string): Promise { + const ipldBlocks = await this._db.getIPLDBlocks({ cid }); + + // There can be only one IPLDBlock with a particular cid. + assert(ipldBlocks.length <= 1); + + return ipldBlocks[0]; + } + + async saveOrUpdateIPLDBlock (ipldBlock: IPLDBlockInterface): Promise { + const dbTx = await this._db.createTransactionRunner(); + let res; + + try { + res = await this._db.saveOrUpdateIPLDBlock(dbTx, ipldBlock); + + await dbTx.commitTransaction(); + + // Get IPLD status for the contract. + const ipldStatus = this._ipldStatusMap[res.contractAddress]; + assert(ipldStatus, `IPLD status for contract ${res.contractAddress} not found`); + + // Update the IPLD status for the kind. + ipldStatus[res.kind] = res.block.blockNumber; + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + + return res; + } + + async removeIPLDBlocks (blockNumber: number, kind: StateKind): Promise { + const dbTx = await this._db.createTransactionRunner(); + + try { + await this._db.removeIPLDBlocks(dbTx, blockNumber, kind); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + } + + async fetchIPLDStatus (): Promise { + const contracts = Object.values(this._watchedContracts); + + for (const contract of contracts) { + const initIPLDBlock = await this._db.getLatestIPLDBlock(contract.address, StateKind.Init); + const diffIPLDBlock = await this._db.getLatestIPLDBlock(contract.address, StateKind.Diff); + const diffStagedIPLDBlock = await this._db.getLatestIPLDBlock(contract.address, StateKind.DiffStaged); + const checkpointIPLDBlock = await this._db.getLatestIPLDBlock(contract.address, StateKind.Checkpoint); + + this._ipldStatusMap[contract.address] = { + init: initIPLDBlock?.block.blockNumber, + diff: diffIPLDBlock?.block.blockNumber, + diff_staged: diffStagedIPLDBlock?.block.blockNumber, + checkpoint: checkpointIPLDBlock?.block.blockNumber + }; + } + } + + async updateIPLDStatusMap (address: string, ipldStatus: IpldStatus): Promise { + // Get and update IPLD status for the contract. + const ipldStatusOld = this._ipldStatusMap[address]; + this._ipldStatusMap[address] = _.merge(ipldStatusOld, ipldStatus); + } + parseEvent (logDescription: ethers.utils.LogDescription): { eventName: string, eventInfo: any } { const eventName = logDescription.name; diff --git a/packages/util/src/ipld-database.ts b/packages/util/src/ipld-database.ts deleted file mode 100644 index e992d34e..00000000 --- a/packages/util/src/ipld-database.ts +++ /dev/null @@ -1,270 +0,0 @@ -// -// Copyright 2021 Vulcanize, Inc. -// - -import { Between, ConnectionOptions, FindConditions, Repository } from 'typeorm'; -import assert from 'assert'; -import { Pool } from 'pg'; - -import { IPLDBlockInterface, IpldStatusInterface, StateKind } from './types'; -import { Database } from './database'; -import { MAX_REORG_DEPTH } from './constants'; - -export class IPLDDatabase extends Database { - _pgPool: Pool - - constructor (config: ConnectionOptions) { - super(config); - - assert(config.type === 'postgres'); - this._pgPool = new Pool({ - user: config.username, - host: config.host, - database: config.database, - password: config.password, - port: config.port - }); - } - - async getLatestIPLDBlock (repo: Repository, contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise { - let queryBuilder = repo.createQueryBuilder('ipld_block') - .leftJoinAndSelect('ipld_block.block', 'block') - .where('block.is_pruned = false') - .andWhere('ipld_block.contract_address = :contractAddress', { contractAddress }) - .orderBy('block.block_number', 'DESC'); - - // Filter out blocks after the provided block number. - if (blockNumber) { - queryBuilder.andWhere('block.block_number <= :blockNumber', { blockNumber }); - } - - // Filter using kind if specified else avoid diff_staged block. - queryBuilder = kind - ? queryBuilder.andWhere('ipld_block.kind = :kind', { kind }) - : queryBuilder.andWhere('ipld_block.kind != :kind', { kind: StateKind.DiffStaged }); - - // Get the first three entries. - queryBuilder.limit(3); - - const results = await queryBuilder.getMany(); - - if (results.length) { - // Sort by (block number desc, id desc) to get the latest entry. - // At same height, IPLD blocks are expected in order ['init', 'diff', 'checkpoint'], - // and are given preference in order ['checkpoint', 'diff', 'init'] - results.sort((result1, result2) => { - if (result1.block.blockNumber === result2.block.blockNumber) { - return (result1.id > result2.id) ? -1 : 1; - } else { - return (result1.block.blockNumber > result2.block.blockNumber) ? -1 : 1; - } - }); - - return results[0]; - } - } - - async getPrevIPLDBlock (repo: Repository, blockHash: string, contractAddress: string, kind?: string): Promise { - const heirerchicalQuery = ` - WITH RECURSIVE cte_query AS - ( - SELECT - b.block_hash, - b.block_number, - b.parent_hash, - 1 as depth, - i.id, - i.kind - FROM - block_progress b - LEFT JOIN - ipld_block i ON i.block_id = b.id - AND i.contract_address = $2 - WHERE - b.block_hash = $1 - UNION ALL - SELECT - b.block_hash, - b.block_number, - b.parent_hash, - c.depth + 1, - i.id, - i.kind - FROM - block_progress b - LEFT JOIN - ipld_block i - ON i.block_id = b.id - AND i.contract_address = $2 - INNER JOIN - cte_query c ON c.parent_hash = b.block_hash - WHERE - c.depth < $3 - ) - SELECT - block_number, id, kind - FROM - cte_query - ORDER BY block_number DESC, id DESC - `; - - // Fetching block and id for previous IPLDBlock in frothy region. - const queryResult = await repo.query(heirerchicalQuery, [blockHash, contractAddress, MAX_REORG_DEPTH]); - const latestRequiredResult = kind - ? queryResult.find((obj: any) => obj.kind === kind) - : queryResult.find((obj: any) => obj.id); - - let result: IPLDBlockInterface | undefined; - - if (latestRequiredResult) { - result = await repo.findOne(latestRequiredResult.id, { relations: ['block'] }); - } else { - // If IPLDBlock not found in frothy region get latest IPLDBlock in the pruned region. - // Filter out IPLDBlocks from pruned blocks. - const canonicalBlockNumber = queryResult.pop().block_number + 1; - - let queryBuilder = repo.createQueryBuilder('ipld_block') - .leftJoinAndSelect('ipld_block.block', 'block') - .where('block.is_pruned = false') - .andWhere('ipld_block.contract_address = :contractAddress', { contractAddress }) - .andWhere('block.block_number <= :canonicalBlockNumber', { canonicalBlockNumber }) - .orderBy('block.block_number', 'DESC'); - - // Filter using kind if specified else order by id to give preference to checkpoint. - queryBuilder = kind - ? queryBuilder.andWhere('ipld_block.kind = :kind', { kind }) - : queryBuilder.addOrderBy('ipld_block.id', 'DESC'); - - // Get the first entry. - queryBuilder.limit(1); - - result = await queryBuilder.getOne(); - } - - return result; - } - - async getIPLDBlocks (repo: Repository, where: FindConditions): Promise { - return repo.find({ where, relations: ['block'] }); - } - - async getDiffIPLDBlocksInRange (repo: Repository, contractAddress: string, startblock: number, endBlock: number): Promise { - return repo.find({ - relations: ['block'], - where: { - contractAddress, - kind: StateKind.Diff, - block: { - isPruned: false, - blockNumber: Between(startblock + 1, endBlock) - } - }, - order: { - block: 'ASC' - } - }); - } - - async saveOrUpdateIPLDBlock (repo: Repository, ipldBlock: IPLDBlockInterface): Promise { - let updatedData: {[key: string]: any}; - - console.time('time:ipld-database#saveOrUpdateIPLDBlock-DB-query'); - if (ipldBlock.id) { - // Using pg query as workaround for typeorm memory issue when saving checkpoint with large sized data. - const { rows } = await this._pgPool.query(` - UPDATE ipld_block - SET block_id = $1, contract_address = $2, cid = $3, kind = $4, data = $5 - WHERE id = $6 - RETURNING * - `, [ipldBlock.block.id, ipldBlock.contractAddress, ipldBlock.cid, ipldBlock.kind, ipldBlock.data, ipldBlock.id]); - - updatedData = rows[0]; - } else { - const { rows } = await this._pgPool.query(` - INSERT INTO ipld_block(block_id, contract_address, cid, kind, data) - VALUES($1, $2, $3, $4, $5) - RETURNING * - `, [ipldBlock.block.id, ipldBlock.contractAddress, ipldBlock.cid, ipldBlock.kind, ipldBlock.data]); - - updatedData = rows[0]; - } - console.timeEnd('time:ipld-database#saveOrUpdateIPLDBlock-DB-query'); - - assert(updatedData); - return { - block: ipldBlock.block, - contractAddress: updatedData.contract_address, - cid: updatedData.cid, - kind: updatedData.kind, - data: updatedData.data, - id: updatedData.id - }; - } - - async removeIPLDBlocks (repo: Repository, blockNumber: number, kind: string): Promise { - const entities = await repo.find({ relations: ['block'], where: { block: { blockNumber }, kind } }); - - // Delete if entities found. - if (entities.length) { - await repo.delete(entities.map((entity) => entity.id)); - } - } - - async removeIPLDBlocksAfterBlock (repo: Repository, blockNumber: number): Promise { - // Use raw SQL as TypeORM curently doesn't support delete via 'join' or 'using' - const deleteQuery = ` - DELETE FROM - ipld_block - USING block_progress - WHERE - ipld_block.block_id = block_progress.id - AND block_progress.block_number > $1; - `; - - await repo.query(deleteQuery, [blockNumber]); - } - - async getIPLDStatus (repo: Repository): Promise { - return repo.findOne(); - } - - async updateIPLDStatusHooksBlock (repo: Repository, blockNumber: number, force?: boolean): Promise { - let entity = await repo.findOne(); - - if (!entity) { - entity = repo.create({ - latestHooksBlockNumber: blockNumber, - latestCheckpointBlockNumber: -1, - latestIPFSBlockNumber: -1 - }); - } - - if (force || blockNumber > entity.latestHooksBlockNumber) { - entity.latestHooksBlockNumber = blockNumber; - } - - return repo.save(entity); - } - - async updateIPLDStatusCheckpointBlock (repo: Repository, blockNumber: number, force?: boolean): Promise { - const entity = await repo.findOne(); - assert(entity); - - if (force || blockNumber > entity.latestCheckpointBlockNumber) { - entity.latestCheckpointBlockNumber = blockNumber; - } - - return repo.save(entity); - } - - async updateIPLDStatusIPFSBlock (repo: Repository, blockNumber: number, force?: boolean): Promise { - const entity = await repo.findOne(); - assert(entity); - - if (force || blockNumber > entity.latestIPFSBlockNumber) { - entity.latestIPFSBlockNumber = blockNumber; - } - - return repo.save(entity); - } -} diff --git a/packages/util/src/ipld-indexer.ts b/packages/util/src/ipld-indexer.ts deleted file mode 100644 index 0423e0e3..00000000 --- a/packages/util/src/ipld-indexer.ts +++ /dev/null @@ -1,501 +0,0 @@ -// -// Copyright 2021 Vulcanize, Inc. -// - -import assert from 'assert'; -import { ethers } from 'ethers'; -import { sha256 } from 'multiformats/hashes/sha2'; -import { CID } from 'multiformats/cid'; -import _ from 'lodash'; - -import { EthClient } from '@cerc-io/ipld-eth-client'; -import * as codec from '@ipld/dag-cbor'; - -import { - IPLDDatabaseInterface, - IndexerInterface, - BlockProgressInterface, - IPLDBlockInterface, - StateKind -} from './types'; -import { Indexer } from './indexer'; -import { ServerConfig } from './config'; -import { IPFSClient } from './ipfs'; -import { JobQueue } from './job-queue'; - -export interface IpldStatus { - init?: number; - diff?: number; - checkpoint?: number; - // eslint-disable-next-line camelcase - diff_staged?: number; -} - -export class IPLDIndexer extends Indexer { - _serverConfig: ServerConfig; - _ipldDb: IPLDDatabaseInterface; - _ipfsClient: IPFSClient; - _ipldStatusMap: { [key: string]: IpldStatus } = {}; - - constructor ( - serverConfig: ServerConfig, - ipldDb: IPLDDatabaseInterface, - ethClient: EthClient, - ethProvider: ethers.providers.BaseProvider, - jobQueue: JobQueue, - ipfsClient: IPFSClient - ) { - super(ipldDb, ethClient, ethProvider, jobQueue); - - this._serverConfig = serverConfig; - this._ipldDb = ipldDb; - this._ipfsClient = ipfsClient; - } - - getIPLDData (ipldBlock: IPLDBlockInterface): any { - return codec.decode(Buffer.from(ipldBlock.data)); - } - - async pushToIPFS (data: any): Promise { - await this._ipfsClient.push(data); - } - - isIPFSConfigured (): boolean { - const ipfsAddr = this._serverConfig.ipfsApiAddr; - - // Return false if ipfsAddr is undefined | null | empty string. - return (ipfsAddr !== undefined && ipfsAddr !== null && ipfsAddr !== ''); - } - - async getLatestHooksProcessedBlock (): Promise { - // Get current hookStatus. - const ipldStatus = await this._ipldDb.getIPLDStatus(); - assert(ipldStatus, 'IPLD status not found'); - - // Get all the blocks at height hookStatus.latestProcessedBlockNumber. - const blocksAtHeight = await this.getBlocksAtHeight(ipldStatus.latestHooksBlockNumber, false); - - // There can exactly one block at hookStatus.latestProcessedBlockNumber height. - assert(blocksAtHeight.length === 1); - - return blocksAtHeight[0]; - } - - async processCheckpoint (indexer: IndexerInterface, blockHash: string, checkpointInterval: number): Promise { - // Get all the contracts. - const contracts = Object.values(this._watchedContracts); - - // Getting the block for checkpoint. - const block = await this.getBlockProgress(blockHash); - assert(block); - - // For each contract, merge the diff till now to create a checkpoint. - for (const contract of contracts) { - // Get IPLD status for the contract. - const ipldStatus = this._ipldStatusMap[contract.address]; - assert(ipldStatus, `IPLD status for contract ${contract.address} not found`); - - const initBlockNumber = ipldStatus.init; - - // Check if contract has checkpointing on. - // Check if it's time for a checkpoint or the init is in current block. - if ( - contract.checkpoint && - (block.blockNumber % checkpointInterval === 0 || initBlockNumber === block.blockNumber) - ) { - await this.createCheckpoint(indexer, contract.address, block); - } - } - } - - async processCLICheckpoint (indexer: IndexerInterface, contractAddress: string, blockHash?: string): Promise { - // Getting the block for checkpoint. - let block; - - if (blockHash) { - block = await this.getBlockProgress(blockHash); - } else { - // In case of empty blockHash from checkpoint CLI, get the latest processed block from hookStatus for the checkpoint. - block = await this.getLatestHooksProcessedBlock(); - } - - assert(block); - - const checkpointBlockHash = await this.createCheckpoint(indexer, contractAddress, block); - assert(checkpointBlockHash, 'Checkpoint not created'); - - // Push checkpoint to IPFS if configured. - if (this.isIPFSConfigured()) { - const checkpointIPLDBlocks = await this._ipldDb.getIPLDBlocks({ block, contractAddress, kind: StateKind.Checkpoint }); - - // There can be at most one IPLDBlock for a (block, contractAddress, kind) combination. - assert(checkpointIPLDBlocks.length <= 1); - const checkpointIPLDBlock = checkpointIPLDBlocks[0]; - - const checkpointData = this.getIPLDData(checkpointIPLDBlock); - await this.pushToIPFS(checkpointData); - } - - return checkpointBlockHash; - } - - async createStateCheckpoint (contractAddress: string, block: BlockProgressInterface, data: any): Promise { - // Get the contract. - const contract = this._watchedContracts[contractAddress]; - assert(contract, `Contract ${contractAddress} not watched`); - - if (block.blockNumber < contract.startingBlock) { - return; - } - - // Create a checkpoint from the hook data without being concerned about diffs. - const ipldBlock = await this.prepareIPLDBlock(block, contractAddress, data, StateKind.Checkpoint); - await this.saveOrUpdateIPLDBlock(ipldBlock); - } - - async createInit ( - indexer: IndexerInterface, - blockHash: string, - blockNumber: number - ): Promise { - // Get all the contracts. - const contracts = Object.values(this._watchedContracts); - - // Create an initial state for each contract. - for (const contract of contracts) { - // Check if contract has checkpointing on. - if (contract.checkpoint) { - // Check if starting block not reached yet. - if (blockNumber < contract.startingBlock) { - continue; - } - - // Get IPLD status for the contract. - const ipldStatus = this._ipldStatusMap[contract.address]; - assert(ipldStatus, `IPLD status for contract ${contract.address} not found`); - - // Check if a 'init' IPLDBlock already exists. - // Or if a 'diff' IPLDBlock already exists. - // Or if a 'checkpoint' IPLDBlock already exists. - // (A watcher with imported state won't have an init IPLDBlock, but it will have the imported checkpoint) - if ( - ipldStatus.init || - ipldStatus.diff || - ipldStatus.checkpoint - ) { - continue; - } - - // Call initial state hook. - assert(indexer.processInitialState); - const stateData = await indexer.processInitialState(contract.address, blockHash); - - const block = await this.getBlockProgress(blockHash); - assert(block); - - const ipldBlock = await this.prepareIPLDBlock(block, contract.address, stateData, StateKind.Init); - await this.saveOrUpdateIPLDBlock(ipldBlock); - - // Push initial state to IPFS if configured. - if (this.isIPFSConfigured()) { - const ipldData = this.getIPLDData(ipldBlock); - await this.pushToIPFS(ipldData); - } - } - } - } - - async createDiffStaged (contractAddress: string, blockHash: string, data: any): Promise { - const block = await this.getBlockProgress(blockHash); - assert(block); - - // Get the contract. - const contract = this._watchedContracts[contractAddress]; - assert(contract, `Contract ${contractAddress} not watched`); - - if (block.blockNumber < contract.startingBlock) { - return; - } - - // Create a staged diff block. - const ipldBlock = await this.prepareIPLDBlock(block, contractAddress, data, StateKind.DiffStaged); - await this.saveOrUpdateIPLDBlock(ipldBlock); - } - - async finalizeDiffStaged (blockHash: string): Promise { - const block = await this.getBlockProgress(blockHash); - assert(block); - - // Get all the staged diff blocks for the given blockHash. - const stagedBlocks = await this._ipldDb.getIPLDBlocks({ block, kind: StateKind.DiffStaged }); - - // For each staged block, create a diff block. - for (const stagedBlock of stagedBlocks) { - const data = codec.decode(Buffer.from(stagedBlock.data)); - await this.createDiff(stagedBlock.contractAddress, block, data); - } - - // Remove all the staged diff blocks for current blockNumber. - // (Including staged diff blocks associated with pruned blocks) - await this.removeIPLDBlocks(block.blockNumber, StateKind.DiffStaged); - } - - async createDiff (contractAddress: string, block: BlockProgressInterface, data: any): Promise { - // Get the contract. - const contract = this._watchedContracts[contractAddress]; - assert(contract, `Contract ${contractAddress} not watched`); - - if (block.blockNumber < contract.startingBlock) { - return; - } - - // Get IPLD status for the contract. - const ipldStatus = this._ipldStatusMap[contractAddress]; - assert(ipldStatus, `IPLD status for contract ${contractAddress} not found`); - - // Get the latest checkpoint block number. - const checkpointBlockNumber = ipldStatus.checkpoint; - - if (!checkpointBlockNumber) { - // Get the initial state block number. - const initBlockNumber = ipldStatus.init; - - // There should be an initial state at least. - assert(initBlockNumber, 'No initial state found'); - } else if (checkpointBlockNumber === block.blockNumber) { - // Check if the latest checkpoint is in the same block if block number is same. - const checkpoint = await this._ipldDb.getLatestIPLDBlock(contractAddress, StateKind.Checkpoint); - assert(checkpoint); - - assert(checkpoint.block.blockHash !== block.blockHash, 'Checkpoint already created for the block hash'); - } - - const ipldBlock = await this.prepareIPLDBlock(block, contractAddress, data, StateKind.Diff); - await this.saveOrUpdateIPLDBlock(ipldBlock); - } - - async createCheckpoint (indexer: IndexerInterface, contractAddress: string, currentBlock: BlockProgressInterface): Promise { - // Get the contract. - const contract = this._watchedContracts[contractAddress]; - assert(contract, `Contract ${contractAddress} not watched`); - - if (currentBlock.blockNumber < contract.startingBlock) { - return; - } - - // Make sure the block is marked complete. - assert(currentBlock.isComplete, 'Block for a checkpoint should be marked as complete'); - - // Get current hookStatus. - const ipldStatus = await this._ipldDb.getIPLDStatus(); - assert(ipldStatus); - - // Make sure the hooks have been processed for the block. - assert(currentBlock.blockNumber <= ipldStatus.latestHooksBlockNumber, 'Block for a checkpoint should have hooks processed'); - - // Call state checkpoint hook and check if default checkpoint is disabled. - assert(indexer.processStateCheckpoint); - const disableDefaultCheckpoint = await indexer.processStateCheckpoint(contractAddress, currentBlock.blockHash); - - if (disableDefaultCheckpoint) { - // Return if default checkpoint is disabled. - // Return block hash for checkpoint CLI. - return currentBlock.blockHash; - } - - // Fetch the latest 'checkpoint' | 'init' for the contract to fetch diffs after it. - let prevNonDiffBlock: IPLDBlockInterface; - let diffStartBlockNumber: number; - const checkpointBlock = await this._ipldDb.getLatestIPLDBlock(contractAddress, StateKind.Checkpoint, currentBlock.blockNumber - 1); - - if (checkpointBlock) { - const checkpointBlockNumber = checkpointBlock.block.blockNumber; - - prevNonDiffBlock = checkpointBlock; - diffStartBlockNumber = checkpointBlockNumber; - - // Update IPLD status map with the latest checkpoint info. - // Essential while importing state as checkpoint at the snapshot block is added by import-state CLI. - // (job-runner won't have the updated ipld status) - this.updateIPLDStatusMap(contractAddress, { checkpoint: checkpointBlockNumber }); - } else { - // There should be an initial state at least. - const initBlock = await this._ipldDb.getLatestIPLDBlock(contractAddress, StateKind.Init); - assert(initBlock, 'No initial state found'); - - prevNonDiffBlock = initBlock; - // Take block number previous to initial state block as the checkpoint is to be created in the same block. - diffStartBlockNumber = initBlock.block.blockNumber - 1; - } - - // Fetching all diff blocks after the latest 'checkpoint' | 'init'. - const diffBlocks = await this._ipldDb.getDiffIPLDBlocksInRange(contractAddress, diffStartBlockNumber, currentBlock.blockNumber); - - const prevNonDiffBlockData = codec.decode(Buffer.from(prevNonDiffBlock.data)) as any; - const data = { - state: prevNonDiffBlockData.state - }; - - for (const diffBlock of diffBlocks) { - const diff = codec.decode(Buffer.from(diffBlock.data)) as any; - data.state = _.merge(data.state, diff.state); - } - - const ipldBlock = await this.prepareIPLDBlock(currentBlock, contractAddress, data, StateKind.Checkpoint); - await this.saveOrUpdateIPLDBlock(ipldBlock); - - return currentBlock.blockHash; - } - - async prepareIPLDBlock (block: BlockProgressInterface, contractAddress: string, data: any, kind: StateKind):Promise { - console.time('time:ipld-indexer#prepareIPLDBlock'); - let ipldBlock: IPLDBlockInterface; - - // Get IPLD status for the contract. - const ipldStatus = this._ipldStatusMap[contractAddress]; - assert(ipldStatus, `IPLD status for contract ${contractAddress} not found`); - - // Get an existing 'init' | 'diff' | 'diff_staged' | 'checkpoint' IPLDBlock for current block, contractAddress to update. - let currentIPLDBlock: IPLDBlockInterface | undefined; - const prevIPLDBlockNumber = ipldStatus[kind]; - - // Fetch from DB for previous IPLD block or for checkpoint kind. - if (kind === 'checkpoint' || (prevIPLDBlockNumber && prevIPLDBlockNumber === block.blockNumber)) { - const currentIPLDBlocks = await this._ipldDb.getIPLDBlocks({ block, contractAddress, kind }); - - // There can be at most one IPLDBlock for a (block, contractAddress, kind) combination. - assert(currentIPLDBlocks.length <= 1); - currentIPLDBlock = currentIPLDBlocks[0]; - } - - if (currentIPLDBlock) { - // Update current IPLDBlock of same kind if it exists. - ipldBlock = currentIPLDBlock; - - // Update the data field. - const oldData = codec.decode(Buffer.from(ipldBlock.data)); - data = _.merge(oldData, data); - } else { - // Create a new IPLDBlock instance. - ipldBlock = this._ipldDb.getNewIPLDBlock(); - - // Fetch the parent IPLDBlock. - const parentIPLDBlock = await this._ipldDb.getLatestIPLDBlock(contractAddress, null, block.blockNumber); - - // Setting the meta-data for an IPLDBlock (done only once per IPLD block). - data.meta = { - id: contractAddress, - kind, - parent: { - '/': parentIPLDBlock ? parentIPLDBlock.cid : null - }, - ethBlock: { - cid: { - '/': block.cid - }, - num: block.blockNumber - } - }; - } - - // Encoding the data using dag-cbor codec. - const bytes = codec.encode(data); - - // Calculating sha256 (multi)hash of the encoded data. - const hash = await sha256.digest(bytes); - - // Calculating the CID: v1, code: dag-cbor, hash. - const cid = CID.create(1, codec.code, hash); - - // Update ipldBlock with new data. - ipldBlock = Object.assign(ipldBlock, { - block, - contractAddress, - cid: cid.toString(), - kind: data.meta.kind, - data: Buffer.from(bytes) - }); - - console.timeEnd('time:ipld-indexer#prepareIPLDBlock'); - return ipldBlock; - } - - async getIPLDBlocksByHash (blockHash: string): Promise { - const block = await this.getBlockProgress(blockHash); - assert(block); - - return this._ipldDb.getIPLDBlocks({ block }); - } - - async getIPLDBlockByCid (cid: string): Promise { - const ipldBlocks = await this._ipldDb.getIPLDBlocks({ cid }); - - // There can be only one IPLDBlock with a particular cid. - assert(ipldBlocks.length <= 1); - - return ipldBlocks[0]; - } - - async saveOrUpdateIPLDBlock (ipldBlock: IPLDBlockInterface): Promise { - const dbTx = await this._db.createTransactionRunner(); - let res; - - try { - res = await this._ipldDb.saveOrUpdateIPLDBlock(dbTx, ipldBlock); - - await dbTx.commitTransaction(); - - // Get IPLD status for the contract. - const ipldStatus = this._ipldStatusMap[res.contractAddress]; - assert(ipldStatus, `IPLD status for contract ${res.contractAddress} not found`); - - // Update the IPLD status for the kind. - ipldStatus[res.kind] = res.block.blockNumber; - } catch (error) { - await dbTx.rollbackTransaction(); - throw error; - } finally { - await dbTx.release(); - } - - return res; - } - - async removeIPLDBlocks (blockNumber: number, kind: StateKind): Promise { - const dbTx = await this._db.createTransactionRunner(); - - try { - await this._ipldDb.removeIPLDBlocks(dbTx, blockNumber, kind); - await dbTx.commitTransaction(); - } catch (error) { - await dbTx.rollbackTransaction(); - throw error; - } finally { - await dbTx.release(); - } - } - - async fetchIPLDStatus (): Promise { - const contracts = Object.values(this._watchedContracts); - - for (const contract of contracts) { - const initIPLDBlock = await this._ipldDb.getLatestIPLDBlock(contract.address, StateKind.Init); - const diffIPLDBlock = await this._ipldDb.getLatestIPLDBlock(contract.address, StateKind.Diff); - const diffStagedIPLDBlock = await this._ipldDb.getLatestIPLDBlock(contract.address, StateKind.DiffStaged); - const checkpointIPLDBlock = await this._ipldDb.getLatestIPLDBlock(contract.address, StateKind.Checkpoint); - - this._ipldStatusMap[contract.address] = { - init: initIPLDBlock?.block.blockNumber, - diff: diffIPLDBlock?.block.blockNumber, - diff_staged: diffStagedIPLDBlock?.block.blockNumber, - checkpoint: checkpointIPLDBlock?.block.blockNumber - }; - } - } - - async updateIPLDStatusMap (address: string, ipldStatus: IpldStatus): Promise { - // Get and update IPLD status for the contract. - const ipldStatusOld = this._ipldStatusMap[address]; - this._ipldStatusMap[address] = _.merge(ipldStatusOld, ipldStatus); - } -} diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 68ecf4da..464c4492 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -17,7 +17,7 @@ import { QUEUE_EVENT_PROCESSING } from './constants'; import { JobQueue } from './job-queue'; -import { EventInterface, IndexerInterface, IPLDIndexerInterface, SyncStatusInterface } from './types'; +import { EventInterface, IndexerInterface, SyncStatusInterface } from './types'; import { wait } from './misc'; import { createPruningJob, processBatchEvents } from './common'; import { lastBlockNumEvents, lastBlockProcessDuration, lastProcessedBlockNumber } from './metrics'; @@ -25,7 +25,7 @@ import { lastBlockNumEvents, lastBlockProcessDuration, lastProcessedBlockNumber const log = debug('vulcanize:job-runner'); export class JobRunner { - _indexer: IndexerInterface | IPLDIndexerInterface + _indexer: IndexerInterface _jobQueue: JobQueue _jobQueueConfig: JobQueueConfig _blockProcessStartTime?: Date @@ -246,13 +246,11 @@ export class JobRunner { // Delay required to process block. await wait(jobDelayInMilliSecs); console.time('time:job-runner#_indexBlock-fetch-block-events'); - blockProgress = await this._indexer.fetchBlockEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp: timestamp }); + blockProgress = await this._indexer.fetchBlockWithEvents({ cid, blockHash, blockNumber, parentHash, blockTimestamp: timestamp }); console.timeEnd('time:job-runner#_indexBlock-fetch-block-events'); } - if (this._indexer.processBlock) { - await this._indexer.processBlock(blockProgress); - } + await this._indexer.processBlock(blockProgress); // Push job to event processing queue. // Block with all events processed or no events will not be processed again due to check in _processEvents. @@ -299,7 +297,7 @@ export class JobRunner { assert(this._indexer.cacheContract); this._indexer.cacheContract(contract); - const ipldIndexer = this._indexer as IPLDIndexerInterface; + const ipldIndexer = this._indexer; if (ipldIndexer.updateIPLDStatusMap) { ipldIndexer.updateIPLDStatusMap(contract.address, {}); } diff --git a/packages/util/src/metrics.ts b/packages/util/src/metrics.ts index c504d2e3..9a0130b6 100644 --- a/packages/util/src/metrics.ts +++ b/packages/util/src/metrics.ts @@ -68,6 +68,11 @@ export const eventProcessingLoadEntityDBQueryDuration = new client.Histogram({ help: 'Duration of DB query made in event processing' }); +export const eventProcessingEthCallDuration = new client.Histogram({ + name: 'event_processing_eth_call_duration_seconds', + help: 'Duration of eth_calls made in event processing' +}); + // Export metrics on a server const app: Application = express(); @@ -131,7 +136,7 @@ const registerDBSizeMetrics = async ({ database, jobQueue }: Config): Promise; getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise> getAncestorAtDepth (blockHash: string, depth: number): Promise - fetchBlockEvents (block: DeepPartial): Promise + fetchBlockWithEvents (block: DeepPartial): Promise removeUnknownEvents (block: BlockProgressInterface): Promise updateBlockProgress (block: BlockProgressInterface, lastProcessedEventIndex: number): Promise updateSyncStatusChainHead (blockHash: string, blockNumber: number, force?: boolean): Promise @@ -101,7 +100,7 @@ export interface IndexerInterface { saveEventEntity (dbEvent: EventInterface): Promise; processEvent (event: EventInterface): Promise; parseEventNameAndArgs?: (kind: string, logObj: any) => any; - isWatchedContract?: (address: string) => Promise; + isWatchedContract: (address: string) => ContractInterface | undefined; getContractsByKind?: (kind: string) => ContractInterface[]; cacheContract?: (contract: ContractInterface) => void; watchContract?: (address: string, kind: string, checkpoint: boolean, startingBlock: number) => Promise @@ -110,13 +109,10 @@ export interface IndexerInterface { createDiffStaged?: (contractAddress: string, blockHash: string, data: any) => Promise processInitialState?: (contractAddress: string, blockHash: string) => Promise processStateCheckpoint?: (contractAddress: string, blockHash: string) => Promise - processBlock?: (blockProgres: BlockProgressInterface) => Promise + processBlock: (blockProgres: BlockProgressInterface) => Promise processBlockAfterEvents?: (blockHash: string) => Promise getStorageValue (storageLayout: StorageLayout, blockHash: string, contractAddress: string, variable: string, ...mappingKeys: MappingKey[]): Promise updateSubgraphState?: (contractAddress: string, data: any) => void -} - -export interface IPLDIndexerInterface extends IndexerInterface { updateIPLDStatusMap (address: string, ipldStatus: IpldStatus): Promise getIPLDData (ipldBlock: IPLDBlockInterface): any } @@ -140,18 +136,17 @@ export interface DatabaseInterface { getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }>; getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise>; markBlocksAsPruned (queryRunner: QueryRunner, blocks: BlockProgressInterface[]): Promise; + saveBlockProgress (queryRunner: QueryRunner, block: DeepPartial): Promise; updateBlockProgress (queryRunner: QueryRunner, block: BlockProgressInterface, lastProcessedEventIndex: number): Promise updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise; updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise; updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise; - saveEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise; + saveEvents (queryRunner: QueryRunner, events: DeepPartial[]): Promise; + saveBlockWithEvents (queryRunner: QueryRunner, block: DeepPartial, events: DeepPartial[]): Promise; saveEventEntity (queryRunner: QueryRunner, entity: EventInterface): Promise; removeEntities (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindManyOptions | FindConditions): Promise; getContracts?: () => Promise saveContract?: (queryRunner: QueryRunner, contractAddress: string, kind: string, checkpoint: boolean, startingBlock: number) => Promise -} - -export interface IPLDDatabaseInterface extends DatabaseInterface { getLatestIPLDBlock (contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise getIPLDBlocks (where: FindConditions): Promise getDiffIPLDBlocksInRange (contractAddress: string, startBlock: number, endBlock: number): Promise