// // Copyright 2021 Vulcanize, Inc. // import assert from 'assert'; import debug from 'debug'; import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm'; import JSONbig from 'json-bigint'; import { ethers } from 'ethers'; import { JsonFragment } from '@ethersproject/abi'; import { BaseProvider } from '@ethersproject/providers'; import * as codec from '@ipld/dag-cbor'; import { EthClient } from '@vulcanize/ipld-eth-client'; import { StorageLayout, MappingKey } from '@vulcanize/solidity-mapper'; import { IPLDIndexer as BaseIndexer, ValueResult, UNKNOWN_EVENT_NAME, ServerConfig, updateStateForElementaryType, JobQueue, Where, QueryOptions, BlockHeight, IPFSClient, StateKind, IPLDIndexerInterface, IpldStatus as IpldStatusInterface } from '@vulcanize/util'; import { GraphWatcher } from '@vulcanize/graph-node'; import { Database } from './database'; import { Contract } from './entity/Contract'; import { Event } from './entity/Event'; import { SyncStatus } from './entity/SyncStatus'; import { IpldStatus } from './entity/IpldStatus'; import { BlockProgress } from './entity/BlockProgress'; import { IPLDBlock } from './entity/IPLDBlock'; import Example1Artifacts from './artifacts/Example.json'; import { createInitialState, handleEvent, createStateDiff, createStateCheckpoint } from './hooks'; import { Author } from './entity/Author'; import { Blog } from './entity/Blog'; import { Category } from './entity/Category'; const log = debug('vulcanize:indexer'); const JSONbigNative = JSONbig({ useNativeBigInt: true }); const KIND_EXAMPLE1 = 'Example1'; export type ResultEvent = { block: { cid: string; hash: string; number: number; timestamp: number; parentHash: string; }; tx: { hash: string; from: string; to: string; index: number; }; contract: string; eventIndex: number; eventSignature: string; event: any; proof: string; }; export type ResultIPLDBlock = { block: { cid: string; hash: string; number: number; timestamp: number; parentHash: string; }; contractAddress: string; cid: string; kind: string; data: string; }; export class Indexer implements IPLDIndexerInterface { _db: Database _ethClient: EthClient _ethProvider: BaseProvider _baseIndexer: BaseIndexer _serverConfig: ServerConfig _graphWatcher: GraphWatcher; _abiMap: Map _storageLayoutMap: Map _contractMap: Map _ipfsClient: IPFSClient _entityTypesMap: Map _relationsMap: Map constructor (serverConfig: ServerConfig, db: Database, ethClient: EthClient, ethProvider: BaseProvider, jobQueue: JobQueue, graphWatcher: GraphWatcher) { assert(db); assert(ethClient); this._db = db; this._ethClient = ethClient; this._ethProvider = ethProvider; this._serverConfig = serverConfig; this._ipfsClient = new IPFSClient(this._serverConfig.ipfsApiAddr); this._baseIndexer = new BaseIndexer(this._serverConfig, this._db, this._ethClient, this._ethProvider, jobQueue, this._ipfsClient); this._graphWatcher = graphWatcher; this._abiMap = new Map(); this._storageLayoutMap = new Map(); this._contractMap = new Map(); const { abi: Example1ABI, storageLayout: Example1StorageLayout } = Example1Artifacts; assert(Example1ABI); assert(Example1StorageLayout); this._abiMap.set(KIND_EXAMPLE1, Example1ABI); this._storageLayoutMap.set(KIND_EXAMPLE1, Example1StorageLayout); this._contractMap.set(KIND_EXAMPLE1, new ethers.utils.Interface(Example1ABI)); this._entityTypesMap = new Map(); this._populateEntityTypesMap(); this._relationsMap = new Map(); this._populateRelationsMap(); } get serverConfig (): ServerConfig { return this._serverConfig; } get storageLayoutMap (): Map { return this._storageLayoutMap; } async init (): Promise { await this._baseIndexer.fetchContracts(); await this._baseIndexer.fetchIPLDStatus(); } getResultEvent (event: Event): ResultEvent { const block = event.block; const eventFields = JSONbigNative.parse(event.eventInfo); const { tx, eventSignature } = JSONbigNative.parse(event.extraInfo); return { block: { cid: block.cid, hash: block.blockHash, number: block.blockNumber, timestamp: block.blockTimestamp, parentHash: block.parentHash }, tx: { hash: event.txHash, from: tx.src, to: tx.dst, index: tx.index }, contract: event.contract, eventIndex: event.index, eventSignature, event: { __typename: `${event.eventName}Event`, ...eventFields }, // TODO: Return proof only if requested. proof: JSON.parse(event.proof) }; } getResultIPLDBlock (ipldBlock: IPLDBlock): ResultIPLDBlock { const block = ipldBlock.block; const data = codec.decode(Buffer.from(ipldBlock.data)) as any; return { block: { cid: block.cid, hash: block.blockHash, number: block.blockNumber, timestamp: block.blockTimestamp, parentHash: block.parentHash }, contractAddress: ipldBlock.contractAddress, cid: ipldBlock.cid, kind: ipldBlock.kind, data: JSON.stringify(data) }; } async getMethod (blockHash: string, contractAddress: string): Promise { const entity = await this._db.getGetMethod({ blockHash, contractAddress }); if (entity) { log('getMethod: db hit.'); return { value: entity.value, proof: JSON.parse(entity.proof) }; } log('getMethod: db miss, fetching from upstream server'); const { block: { number } } = await this._ethClient.getBlockByHash(blockHash); const blockNumber = ethers.BigNumber.from(number).toNumber(); const abi = this._abiMap.get(KIND_EXAMPLE1); assert(abi); const contract = new ethers.Contract(contractAddress, abi, this._ethProvider); const value = await contract.getMethod({ blockTag: blockHash }); const result: ValueResult = { value }; await this._db.saveGetMethod({ blockHash, blockNumber, contractAddress, value: result.value, proof: JSONbigNative.stringify(result.proof) }); return result; } async _test (blockHash: string, contractAddress: string, diff = false): Promise { const entity = await this._db._getTest({ blockHash, contractAddress }); if (entity) { log('_test: db hit.'); return { value: entity.value, proof: JSON.parse(entity.proof) }; } log('_test: db miss, fetching from upstream server'); const { block: { number } } = await this._ethClient.getBlockByHash(blockHash); const blockNumber = ethers.BigNumber.from(number).toNumber(); const storageLayout = this._storageLayoutMap.get(KIND_EXAMPLE1); assert(storageLayout); const result = await this._baseIndexer.getStorageValue( storageLayout, blockHash, contractAddress, '_test' ); await this._db._saveTest({ blockHash, blockNumber, contractAddress, value: result.value, proof: JSONbigNative.stringify(result.proof) }); if (diff) { const stateUpdate = updateStateForElementaryType({}, '_test', result.value.toString()); await this.createDiffStaged(contractAddress, blockHash, stateUpdate); } return result; } async getStorageValue (storageLayout: StorageLayout, blockHash: string, contractAddress: string, variable: string, ...mappingKeys: MappingKey[]): Promise { return this._baseIndexer.getStorageValue( storageLayout, blockHash, contractAddress, variable, ...mappingKeys ); } async pushToIPFS (data: any): Promise { await this._baseIndexer.pushToIPFS(data); } async processInitialState (contractAddress: string, blockHash: string): Promise { // Call initial state hook. return createInitialState(this, contractAddress, blockHash); } async processStateCheckpoint (contractAddress: string, blockHash: string): Promise { // Call checkpoint hook. return createStateCheckpoint(this, contractAddress, blockHash); } async processCanonicalBlock (blockHash: string): Promise { // Finalize staged diff blocks if any. await this._baseIndexer.finalizeDiffStaged(blockHash); // Call custom stateDiff hook. await createStateDiff(this, blockHash); } async processCheckpoint (blockHash: string): Promise { // Return if checkpointInterval is <= 0. const checkpointInterval = this._serverConfig.checkpointInterval; if (checkpointInterval <= 0) return; await this._baseIndexer.processCheckpoint(this, blockHash, checkpointInterval); } async processCLICheckpoint (contractAddress: string, blockHash?: string): Promise { return this._baseIndexer.processCLICheckpoint(this, contractAddress, blockHash); } async getPrevIPLDBlock (blockHash: string, contractAddress: string, kind?: string): Promise { return this._db.getPrevIPLDBlock(blockHash, contractAddress, kind); } async getLatestIPLDBlock (contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise { return this._db.getLatestIPLDBlock(contractAddress, kind, blockNumber); } async getIPLDBlocksByHash (blockHash: string): Promise { return this._baseIndexer.getIPLDBlocksByHash(blockHash); } async getIPLDBlockByCid (cid: string): Promise { return this._baseIndexer.getIPLDBlockByCid(cid); } getIPLDData (ipldBlock: IPLDBlock): any { return this._baseIndexer.getIPLDData(ipldBlock); } isIPFSConfigured (): boolean { return this._baseIndexer.isIPFSConfigured(); } // Method used to create auto diffs (diff_staged). async createDiffStaged (contractAddress: string, blockHash: string, data: any): Promise { await this._baseIndexer.createDiffStaged(contractAddress, blockHash, data); } // Method to be used by createStateDiff hook. async createDiff (contractAddress: string, blockHash: string, data: any): Promise { const block = await this.getBlockProgress(blockHash); assert(block); await this._baseIndexer.createDiff(contractAddress, block, data); } // Method to be used by createStateCheckpoint hook. async createStateCheckpoint (contractAddress: string, blockHash: string, data: any): Promise { const block = await this.getBlockProgress(blockHash); assert(block); return this._baseIndexer.createStateCheckpoint(contractAddress, block, data); } // Method to be used by checkpoint CLI. async createCheckpoint (contractAddress: string, blockHash: string): Promise { const block = await this.getBlockProgress(blockHash); assert(block); return this._baseIndexer.createCheckpoint(this, contractAddress, block); } async saveOrUpdateIPLDBlock (ipldBlock: IPLDBlock): Promise { return this._baseIndexer.saveOrUpdateIPLDBlock(ipldBlock); } async removeIPLDBlocks (blockNumber: number, kind: StateKind): Promise { await this._baseIndexer.removeIPLDBlocks(blockNumber, kind); } async getSubgraphEntity (entity: new () => Entity, id: string, block: BlockHeight): Promise { const data = await this._graphWatcher.getEntity(entity, id, this._relationsMap, block); return data; } async triggerIndexingOnEvent (event: Event): Promise { const resultEvent = this.getResultEvent(event); // Call subgraph handler for event. await this._graphWatcher.handleEvent(resultEvent); // Call custom hook function for indexing on event. await handleEvent(this, resultEvent); } async processEvent (event: Event): Promise { // Trigger indexing of data based on the event. await this.triggerIndexingOnEvent(event); } async processBlock (blockHash: string, blockNumber: number): Promise { // Call a function to create initial state for contracts. await this._baseIndexer.createInit(this, blockHash, blockNumber); } async processBlockAfterEvents (blockHash: string): Promise { // Call subgraph handler for block. await this._graphWatcher.handleBlock(blockHash); } parseEventNameAndArgs (kind: string, logObj: any): any { const { topics, data } = logObj; const contract = this._contractMap.get(kind); assert(contract); const logDescription = contract.parseLog({ data, topics }); const { eventName, eventInfo } = this._baseIndexer.parseEvent(logDescription); return { eventName, eventInfo, eventSignature: logDescription.signature }; } async getIPLDStatus (): Promise { return this._db.getIPLDStatus(); } async updateIPLDStatusHooksBlock (blockNumber: number, force?: boolean): Promise { const dbTx = await this._db.createTransactionRunner(); let res; try { res = await this._db.updateIPLDStatusHooksBlock(dbTx, blockNumber, force); await dbTx.commitTransaction(); } catch (error) { await dbTx.rollbackTransaction(); throw error; } finally { await dbTx.release(); } return res; } async updateIPLDStatusCheckpointBlock (blockNumber: number, force?: boolean): Promise { const dbTx = await this._db.createTransactionRunner(); let res; try { res = await this._db.updateIPLDStatusCheckpointBlock(dbTx, blockNumber, force); await dbTx.commitTransaction(); } catch (error) { await dbTx.rollbackTransaction(); throw error; } finally { await dbTx.release(); } return res; } async updateIPLDStatusIPFSBlock (blockNumber: number, force?: boolean): Promise { const dbTx = await this._db.createTransactionRunner(); let res; try { res = await this._db.updateIPLDStatusIPFSBlock(dbTx, blockNumber, force); await dbTx.commitTransaction(); } catch (error) { await dbTx.rollbackTransaction(); throw error; } finally { await dbTx.release(); } return res; } async getLatestCanonicalBlock (): Promise { const syncStatus = await this.getSyncStatus(); assert(syncStatus); const latestCanonicalBlock = await this.getBlockProgress(syncStatus.latestCanonicalBlockHash); assert(latestCanonicalBlock); return latestCanonicalBlock; } async getLatestHooksProcessedBlock (): Promise { return this._baseIndexer.getLatestHooksProcessedBlock(); } async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise { await this.updateIPLDStatusMap(address, {}); return this._baseIndexer.watchContract(address, kind, checkpoint, startingBlock); } async updateIPLDStatusMap (address: string, ipldStatus: IpldStatusInterface): Promise { await this._baseIndexer.updateIPLDStatusMap(address, ipldStatus); } cacheContract (contract: Contract): void { return this._baseIndexer.cacheContract(contract); } async saveEventEntity (dbEvent: Event): Promise { return this._baseIndexer.saveEventEntity(dbEvent); } async getEventsByFilter (blockHash: string, contract?: string, name?: string): Promise> { return this._baseIndexer.getEventsByFilter(blockHash, contract, name); } async isWatchedContract (address : string): Promise { return this._baseIndexer.isWatchedContract(address); } getContractsByKind (kind: string): Contract[] { return this._baseIndexer.getContractsByKind(kind); } async getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }> { return this._baseIndexer.getProcessedBlockCountForRange(fromBlockNumber, toBlockNumber); } async getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise> { return this._baseIndexer.getEventsInRange(fromBlockNumber, toBlockNumber); } async getSyncStatus (): Promise { return this._baseIndexer.getSyncStatus(); } async getBlocks (blockFilter: { blockHash?: string, blockNumber?: number }): Promise { return this._baseIndexer.getBlocks(blockFilter); } async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force = false): Promise { return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber, force); } async updateSyncStatusChainHead (blockHash: string, blockNumber: number, force = false): Promise { return this._baseIndexer.updateSyncStatusChainHead(blockHash, blockNumber, force); } async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number, force = false): Promise { return this._baseIndexer.updateSyncStatusCanonicalBlock(blockHash, blockNumber, force); } async getEvent (id: string): Promise { return this._baseIndexer.getEvent(id); } async getBlockProgress (blockHash: string): Promise { return this._baseIndexer.getBlockProgress(blockHash); } async getBlockProgressEntities (where: FindConditions, options: FindManyOptions): Promise { return this._baseIndexer.getBlockProgressEntities(where, options); } async getBlocksAtHeight (height: number, isPruned: boolean): Promise { return this._baseIndexer.getBlocksAtHeight(height, isPruned); } async fetchBlockEvents (block: DeepPartial): Promise { return this._baseIndexer.fetchBlockEvents(block, this._fetchAndSaveEvents.bind(this)); } async getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise> { return this._baseIndexer.getBlockEvents(blockHash, where, queryOptions); } async removeUnknownEvents (block: BlockProgress): Promise { return this._baseIndexer.removeUnknownEvents(Event, block); } async markBlocksAsPruned (blocks: BlockProgress[]): Promise { return this._baseIndexer.markBlocksAsPruned(blocks); } async updateBlockProgress (block: BlockProgress, lastProcessedEventIndex: number): Promise { return this._baseIndexer.updateBlockProgress(block, lastProcessedEventIndex); } async getAncestorAtDepth (blockHash: string, depth: number): Promise { return this._baseIndexer.getAncestorAtDepth(blockHash, depth); } getEntityTypesMap (): Map { return this._entityTypesMap; } _populateEntityTypesMap (): void { this._entityTypesMap.set( 'Author', { id: 'ID', blogCount: 'BigInt', name: 'String', rating: 'BigDecimal', paramInt: 'Int', paramBigInt: 'BigInt', paramBytes: 'Bytes' } ); this._entityTypesMap.set( 'Blog', { id: 'ID', kind: 'BlogKind', isActive: 'Boolean', reviews: 'BigInt', author: 'Author', categories: 'Category' } ); this._entityTypesMap.set( 'Category', { id: 'ID', name: 'String', count: 'BigInt' } ); } _populateRelationsMap (): void { // Needs to be generated by codegen. this._relationsMap.set(Author, { blogs: { entity: Blog, isDerived: true, isArray: true, field: 'author' } }); this._relationsMap.set(Blog, { author: { entity: Author, isDerived: false, isArray: false }, categories: { entity: Category, isDerived: false, isArray: true } }); } async _fetchAndSaveEvents ({ cid: blockCid, blockHash }: DeepPartial): Promise { assert(blockHash); const transactionsPromise = this._ethClient.getBlockWithTransactions({ blockHash }); const blockPromise = this._ethClient.getBlockByHash(blockHash); let logs: any[]; console.time('time:indexer#_fetchAndSaveEvents-fetch-logs'); if (this._serverConfig.filterLogs) { const watchedContracts = this._baseIndexer.getWatchedContracts(); // TODO: Query logs by multiple contracts. const contractlogsPromises = watchedContracts.map((watchedContract): Promise => this._ethClient.getLogs({ blockHash, contract: watchedContract.address })); const contractlogs = await Promise.all(contractlogsPromises); // Flatten logs by contract and sort by index. logs = contractlogs.map(data => { return data.logs; }).flat() .sort((a, b) => { return a.index - b.index; }); } else { ({ logs } = await this._ethClient.getLogs({ blockHash })); } console.timeEnd('time:indexer#_fetchAndSaveEvents-fetch-logs'); let [ { block }, { allEthHeaderCids: { nodes: [ { ethTransactionCidsByHeaderId: { nodes: transactions } } ] } } ] = await Promise.all([blockPromise, transactionsPromise]); const transactionMap = transactions.reduce((acc: {[key: string]: any}, transaction: {[key: string]: any}) => { acc[transaction.txHash] = transaction; return acc; }, {}); const dbEvents: Array> = []; for (let li = 0; li < logs.length; li++) { const logObj = logs[li]; const { topics, data, index: logIndex, cid, ipldBlock, account: { address }, transaction: { hash: txHash }, receiptCID, status } = logObj; if (status) { let eventName = UNKNOWN_EVENT_NAME; let eventInfo = {}; const tx = transactionMap[txHash]; const extraInfo: { [key: string]: any } = { topics, data, tx }; const contract = ethers.utils.getAddress(address); const watchedContract = await this.isWatchedContract(contract); if (watchedContract) { const eventDetails = this.parseEventNameAndArgs(watchedContract.kind, logObj); eventName = eventDetails.eventName; eventInfo = eventDetails.eventInfo; extraInfo.eventSignature = eventDetails.eventSignature; } dbEvents.push({ index: logIndex, txHash, contract, eventName, eventInfo: JSONbigNative.stringify(eventInfo), extraInfo: JSONbigNative.stringify(extraInfo), proof: JSONbigNative.stringify({ data: JSONbigNative.stringify({ blockHash, receiptCID, log: { cid, ipldBlock } }) }) }); } else { log(`Skipping event for receipt ${receiptCID} due to failed transaction.`); } } const dbTx = await this._db.createTransactionRunner(); try { block = { cid: blockCid, blockHash, blockNumber: block.number, blockTimestamp: block.timestamp, parentHash: block.parent.hash }; console.time('time:indexer#_fetchAndSaveEvents-save-block-events'); const blockProgress = await this._db.saveEvents(dbTx, block, dbEvents); await dbTx.commitTransaction(); console.timeEnd('time:indexer#_fetchAndSaveEvents-save-block-events'); return blockProgress; } catch (error) { await dbTx.rollbackTransaction(); throw error; } finally { await dbTx.release(); } } }