mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-01-07 20:08:06 +00:00
Store initial state and refactor IPLDBlocks related code (#76)
* Process hooks for the first block in the watchers * Store initial state from a hook in an IPLDBlock for eden-watcher * Extract watcher method to prepare IPLDBlocks in util * Extract common IPLDBlocks related code in util * Move IPFSClient to util * Use constants for state kind
This commit is contained in:
parent
4c422e3ea2
commit
cb2fe2aa45
@ -9,7 +9,7 @@ import debug from 'debug';
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
|
||||
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util';
|
||||
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, STATE_KIND_CHECKPOINT } from '@vulcanize/util';
|
||||
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
|
||||
import * as codec from '@ipld/dag-cbor';
|
||||
|
||||
@ -93,7 +93,7 @@ const main = async (): Promise<void> => {
|
||||
if (contract.checkpoint) {
|
||||
await indexer.createCheckpoint(contract.address, block.blockHash);
|
||||
|
||||
const ipldBlock = await indexer.getLatestIPLDBlock(contract.address, 'checkpoint', block.blockNumber);
|
||||
const ipldBlock = await indexer.getLatestIPLDBlock(contract.address, STATE_KIND_CHECKPOINT, block.blockNumber);
|
||||
assert(ipldBlock);
|
||||
|
||||
const data = indexer.getIPLDData(ipldBlock);
|
||||
|
@ -11,7 +11,7 @@ import { PubSub } from 'apollo-server-express';
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
|
||||
import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, Config, initClients } from '@vulcanize/util';
|
||||
import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, Config, initClients, STATE_KIND_INIT, STATE_KIND_DIFF_STAGED } from '@vulcanize/util';
|
||||
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
|
||||
import * as codec from '@ipld/dag-cbor';
|
||||
|
||||
@ -108,11 +108,12 @@ export const main = async (): Promise<any> => {
|
||||
|
||||
ipldBlock.data = Buffer.from(codec.encode(ipldBlock.data));
|
||||
|
||||
await db.saveOrUpdateIPLDBlock(ipldBlock);
|
||||
await indexer.saveOrUpdateIPLDBlock(ipldBlock);
|
||||
}
|
||||
|
||||
// The staged IPLD blocks are unnecessary as checkpoints have been already created for the snapshot block.
|
||||
await indexer.removeStagedIPLDBlocks(block.blockNumber);
|
||||
// The 'diff_staged' and 'init' IPLD blocks are unnecessary as checkpoints have been already created for the snapshot block.
|
||||
await indexer.removeIPLDBlocks(block.blockNumber, STATE_KIND_INIT);
|
||||
await indexer.removeIPLDBlocks(block.blockNumber, STATE_KIND_DIFF_STAGED);
|
||||
};
|
||||
|
||||
main().catch(err => {
|
||||
|
@ -3,10 +3,10 @@
|
||||
//
|
||||
|
||||
import assert from 'assert';
|
||||
import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions, MoreThan } from 'typeorm';
|
||||
import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions } from 'typeorm';
|
||||
import path from 'path';
|
||||
|
||||
import { Database as BaseDatabase, MAX_REORG_DEPTH, DatabaseInterface } from '@vulcanize/util';
|
||||
import { IPLDDatabase as BaseDatabase, IPLDDatabaseInterface } from '@vulcanize/util';
|
||||
|
||||
import { Contract } from './entity/Contract';
|
||||
import { Event } from './entity/Event';
|
||||
@ -15,7 +15,7 @@ import { HookStatus } from './entity/HookStatus';
|
||||
import { BlockProgress } from './entity/BlockProgress';
|
||||
import { IPLDBlock } from './entity/IPLDBlock';
|
||||
|
||||
export class Database implements DatabaseInterface {
|
||||
export class Database implements IPLDDatabaseInterface {
|
||||
_config: ConnectionOptions;
|
||||
_conn!: Connection;
|
||||
_baseDatabase: BaseDatabase;
|
||||
@ -39,156 +39,55 @@ export class Database implements DatabaseInterface {
|
||||
return this._baseDatabase.close();
|
||||
}
|
||||
|
||||
getNewIPLDBlock (): IPLDBlock {
|
||||
return new IPLDBlock();
|
||||
}
|
||||
|
||||
async getIPLDBlocks (where: FindConditions<IPLDBlock>): Promise<IPLDBlock[]> {
|
||||
const repo = this._conn.getRepository(IPLDBlock);
|
||||
return repo.find({ where, relations: ['block'] });
|
||||
|
||||
return this._baseDatabase.getIPLDBlocks(repo, where);
|
||||
}
|
||||
|
||||
async getLatestIPLDBlock (contractAddress: string, kind: string | null, blockNumber?: number): Promise<IPLDBlock | undefined> {
|
||||
const repo = this._conn.getRepository(IPLDBlock);
|
||||
|
||||
let queryBuilder = repo.createQueryBuilder('ipld_block')
|
||||
.leftJoinAndSelect('ipld_block.block', 'block')
|
||||
.where('block.is_pruned = false')
|
||||
.andWhere('ipld_block.contract_address = :contractAddress', { contractAddress })
|
||||
.orderBy('block.block_number', 'DESC');
|
||||
|
||||
// Filter out blocks after the provided block number.
|
||||
if (blockNumber) {
|
||||
queryBuilder.andWhere('block.block_number <= :blockNumber', { blockNumber });
|
||||
}
|
||||
|
||||
// Filter using kind if specified else order by id to give preference to checkpoint.
|
||||
queryBuilder = kind
|
||||
? queryBuilder.andWhere('ipld_block.kind = :kind', { kind })
|
||||
: queryBuilder.andWhere('ipld_block.kind != :kind', { kind: 'diff_staged' })
|
||||
.addOrderBy('ipld_block.id', 'DESC');
|
||||
|
||||
return queryBuilder.getOne();
|
||||
return this._baseDatabase.getLatestIPLDBlock(repo, contractAddress, kind, blockNumber);
|
||||
}
|
||||
|
||||
async getPrevIPLDBlock (queryRunner: QueryRunner, blockHash: string, contractAddress: string, kind?: string): Promise<IPLDBlock | undefined> {
|
||||
const heirerchicalQuery = `
|
||||
WITH RECURSIVE cte_query AS
|
||||
(
|
||||
SELECT
|
||||
b.block_hash,
|
||||
b.block_number,
|
||||
b.parent_hash,
|
||||
1 as depth,
|
||||
i.id,
|
||||
i.kind
|
||||
FROM
|
||||
block_progress b
|
||||
LEFT JOIN
|
||||
ipld_block i ON i.block_id = b.id
|
||||
AND i.contract_address = $2
|
||||
WHERE
|
||||
b.block_hash = $1
|
||||
UNION ALL
|
||||
SELECT
|
||||
b.block_hash,
|
||||
b.block_number,
|
||||
b.parent_hash,
|
||||
c.depth + 1,
|
||||
i.id,
|
||||
i.kind
|
||||
FROM
|
||||
block_progress b
|
||||
LEFT JOIN
|
||||
ipld_block i
|
||||
ON i.block_id = b.id
|
||||
AND i.contract_address = $2
|
||||
INNER JOIN
|
||||
cte_query c ON c.parent_hash = b.block_hash
|
||||
WHERE
|
||||
c.depth < $3
|
||||
)
|
||||
SELECT
|
||||
block_number, id, kind
|
||||
FROM
|
||||
cte_query
|
||||
ORDER BY block_number DESC, id DESC
|
||||
`;
|
||||
|
||||
// Fetching block and id for previous IPLDBlock in frothy region.
|
||||
const queryResult = await queryRunner.query(heirerchicalQuery, [blockHash, contractAddress, MAX_REORG_DEPTH]);
|
||||
const latestRequiredResult = kind
|
||||
? queryResult.find((obj: any) => obj.kind === kind)
|
||||
: queryResult.find((obj: any) => obj.id);
|
||||
|
||||
let result: IPLDBlock | undefined;
|
||||
if (latestRequiredResult) {
|
||||
result = await queryRunner.manager.findOne(IPLDBlock, { id: latestRequiredResult.id }, { relations: ['block'] });
|
||||
} else {
|
||||
// If IPLDBlock not found in frothy region get latest IPLDBlock in the pruned region.
|
||||
// Filter out IPLDBlocks from pruned blocks.
|
||||
const canonicalBlockNumber = queryResult.pop().block_number + 1;
|
||||
|
||||
let queryBuilder = queryRunner.manager.createQueryBuilder(IPLDBlock, 'ipld_block')
|
||||
.leftJoinAndSelect('ipld_block.block', 'block')
|
||||
.where('block.is_pruned = false')
|
||||
.andWhere('ipld_block.contract_address = :contractAddress', { contractAddress })
|
||||
.andWhere('block.block_number <= :canonicalBlockNumber', { canonicalBlockNumber })
|
||||
.orderBy('block.block_number', 'DESC');
|
||||
|
||||
// Filter using kind if specified else order by id to give preference to checkpoint.
|
||||
queryBuilder = kind
|
||||
? queryBuilder.andWhere('ipld_block.kind = :kind', { kind })
|
||||
: queryBuilder.addOrderBy('ipld_block.id', 'DESC');
|
||||
|
||||
result = await queryBuilder.getOne();
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// Fetch all diff IPLDBlocks after the specified checkpoint.
|
||||
async getDiffIPLDBlocksByCheckpoint (contractAddress: string, checkpointBlockNumber: number): Promise<IPLDBlock[]> {
|
||||
async getPrevIPLDBlock (blockHash: string, contractAddress: string, kind?: string): Promise<IPLDBlock | undefined> {
|
||||
const repo = this._conn.getRepository(IPLDBlock);
|
||||
|
||||
return repo.find({
|
||||
relations: ['block'],
|
||||
where: {
|
||||
contractAddress,
|
||||
kind: 'diff',
|
||||
block: {
|
||||
isPruned: false,
|
||||
blockNumber: MoreThan(checkpointBlockNumber)
|
||||
}
|
||||
},
|
||||
order: {
|
||||
block: 'ASC'
|
||||
}
|
||||
});
|
||||
return this._baseDatabase.getPrevIPLDBlock(repo, blockHash, contractAddress, kind);
|
||||
}
|
||||
|
||||
async saveOrUpdateIPLDBlock (ipldBlock: IPLDBlock): Promise<IPLDBlock> {
|
||||
// Fetch all diff IPLDBlocks after the specified block number.
|
||||
async getDiffIPLDBlocksByBlocknumber (contractAddress: string, blockNumber: number): Promise<IPLDBlock[]> {
|
||||
const repo = this._conn.getRepository(IPLDBlock);
|
||||
return repo.save(ipldBlock);
|
||||
|
||||
return this._baseDatabase.getDiffIPLDBlocksByBlocknumber(repo, contractAddress, blockNumber);
|
||||
}
|
||||
|
||||
async getHookStatus (queryRunner: QueryRunner): Promise<HookStatus | undefined> {
|
||||
const repo = queryRunner.manager.getRepository(HookStatus);
|
||||
async saveOrUpdateIPLDBlock (dbTx: QueryRunner, ipldBlock: IPLDBlock): Promise<IPLDBlock> {
|
||||
const repo = dbTx.manager.getRepository(IPLDBlock);
|
||||
|
||||
return repo.findOne();
|
||||
return this._baseDatabase.saveOrUpdateIPLDBlock(repo, ipldBlock);
|
||||
}
|
||||
|
||||
async removeIPLDBlocks (dbTx: QueryRunner, blockNumber: number, kind: string): Promise<void> {
|
||||
await this._baseDatabase.removeEntities(dbTx, IPLDBlock, { relations: ['block'], where: { block: { blockNumber }, kind } });
|
||||
}
|
||||
|
||||
async getHookStatus (): Promise<HookStatus | undefined> {
|
||||
const repo = this._conn.getRepository(HookStatus);
|
||||
|
||||
return this._baseDatabase.getHookStatus(repo);
|
||||
}
|
||||
|
||||
async updateHookStatusProcessedBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise<HookStatus> {
|
||||
const repo = queryRunner.manager.getRepository(HookStatus);
|
||||
let entity = await repo.findOne();
|
||||
|
||||
if (!entity) {
|
||||
entity = repo.create({
|
||||
latestProcessedBlockNumber: blockNumber
|
||||
});
|
||||
}
|
||||
|
||||
if (force || blockNumber > entity.latestProcessedBlockNumber) {
|
||||
entity.latestProcessedBlockNumber = blockNumber;
|
||||
}
|
||||
|
||||
return repo.save(entity);
|
||||
return this._baseDatabase.updateHookStatusProcessedBlock(repo, blockNumber, force);
|
||||
}
|
||||
|
||||
async getContracts (): Promise<Contract[]> {
|
||||
|
@ -80,7 +80,10 @@ export class EventWatcher implements EventWatcherInterface {
|
||||
|
||||
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
|
||||
|
||||
await this.createHooksJob(kind);
|
||||
// If it's a pruning job: Create a hooks job.
|
||||
if (kind === JOB_KIND_PRUNE) {
|
||||
await this.createHooksJob();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@ -148,19 +151,18 @@ export class EventWatcher implements EventWatcherInterface {
|
||||
}
|
||||
}
|
||||
|
||||
async createHooksJob (kind: string): Promise<void> {
|
||||
// If it's a pruning job: Create a hook job for the latest canonical block.
|
||||
if (kind === JOB_KIND_PRUNE) {
|
||||
const latestCanonicalBlock = await this._indexer.getLatestCanonicalBlock();
|
||||
async createHooksJob (): Promise<void> {
|
||||
// Get the latest canonical block
|
||||
const latestCanonicalBlock = await this._indexer.getLatestCanonicalBlock();
|
||||
|
||||
await this._jobQueue.pushJob(
|
||||
QUEUE_HOOKS,
|
||||
{
|
||||
blockHash: latestCanonicalBlock.blockHash,
|
||||
blockNumber: latestCanonicalBlock.blockNumber
|
||||
}
|
||||
);
|
||||
}
|
||||
// Create a hooks job for parent block of latestCanonicalBlock because pruning for first block is skipped as it is assumed to be a canonical block.
|
||||
await this._jobQueue.pushJob(
|
||||
QUEUE_HOOKS,
|
||||
{
|
||||
blockHash: latestCanonicalBlock.parentHash,
|
||||
blockNumber: latestCanonicalBlock.blockNumber - 1
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
async createCheckpointJob (blockHash: string, blockNumber: number): Promise<void> {
|
||||
|
@ -6,7 +6,14 @@ import assert from 'assert';
|
||||
|
||||
import { Indexer, ResultEvent } from './indexer';
|
||||
|
||||
export async function createInitialCheckpoint (indexer: Indexer, contractAddress: string, blockHash: string): Promise<void> {
|
||||
/**
|
||||
* Hook function to store an initial state.
|
||||
* @param indexer Indexer instance.
|
||||
* @param blockHash Hash of the concerned block.
|
||||
* @param contractAddress Address of the concerned contract.
|
||||
* @returns Data block to be stored.
|
||||
*/
|
||||
export async function createInitialState (indexer: Indexer, contractAddress: string, blockHash: string): Promise<any> {
|
||||
assert(indexer);
|
||||
assert(blockHash);
|
||||
assert(contractAddress);
|
||||
@ -16,14 +23,26 @@ export async function createInitialCheckpoint (indexer: Indexer, contractAddress
|
||||
state: {}
|
||||
};
|
||||
|
||||
await indexer.createCheckpoint(contractAddress, blockHash, ipldBlockData);
|
||||
return ipldBlockData;
|
||||
}
|
||||
|
||||
/**
|
||||
* Hook function to create state diff.
|
||||
* @param indexer Indexer instance that contains methods to fetch the contract varaiable values.
|
||||
* @param blockHash Block hash of the concerned block.
|
||||
*/
|
||||
export async function createStateDiff (indexer: Indexer, blockHash: string): Promise<void> {
|
||||
assert(indexer);
|
||||
assert(blockHash);
|
||||
}
|
||||
|
||||
/**
|
||||
* Hook function to create state checkpoint
|
||||
* @param indexer Indexer instance.
|
||||
* @param contractAddress Address of the concerned contract.
|
||||
* @param blockHash Block hash of the concerned block.
|
||||
* @returns Whether to disable default checkpoint. If false, the state from this hook is updated with that from default checkpoint.
|
||||
*/
|
||||
export async function createStateCheckpoint (indexer: Indexer, contractAddress: string, blockHash: string): Promise<boolean> {
|
||||
assert(indexer);
|
||||
assert(blockHash);
|
||||
@ -32,6 +51,11 @@ export async function createStateCheckpoint (indexer: Indexer, contractAddress:
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Event hook function.
|
||||
* @param indexer Indexer instance that contains methods to fetch and update the contract values in the database.
|
||||
* @param eventData ResultEvent object containing event information.
|
||||
*/
|
||||
export async function handleEvent (indexer: Indexer, eventData: ResultEvent): Promise<void> {
|
||||
assert(indexer);
|
||||
assert(eventData);
|
||||
|
@ -7,16 +7,22 @@ import debug from 'debug';
|
||||
import { DeepPartial } from 'typeorm';
|
||||
import JSONbig from 'json-bigint';
|
||||
import { ethers } from 'ethers';
|
||||
import { sha256 } from 'multiformats/hashes/sha2';
|
||||
import { CID } from 'multiformats/cid';
|
||||
import _ from 'lodash';
|
||||
|
||||
import { JsonFragment } from '@ethersproject/abi';
|
||||
import { BaseProvider } from '@ethersproject/providers';
|
||||
import * as codec from '@ipld/dag-cbor';
|
||||
import { EthClient } from '@vulcanize/ipld-eth-client';
|
||||
import { StorageLayout } from '@vulcanize/solidity-mapper';
|
||||
import { EventInterface, Indexer as BaseIndexer, IndexerInterface, UNKNOWN_EVENT_NAME, ServerConfig, JobQueue, BlockHeight } from '@vulcanize/util';
|
||||
import {
|
||||
EventInterface,
|
||||
IPLDIndexer as BaseIndexer,
|
||||
IndexerInterface,
|
||||
UNKNOWN_EVENT_NAME,
|
||||
ServerConfig,
|
||||
JobQueue,
|
||||
BlockHeight,
|
||||
IPFSClient
|
||||
} from '@vulcanize/util';
|
||||
import { GraphWatcher } from '@vulcanize/graph-node';
|
||||
|
||||
import { Database } from './database';
|
||||
@ -29,8 +35,7 @@ import { IPLDBlock } from './entity/IPLDBlock';
|
||||
import EdenNetworkArtifacts from './artifacts/EdenNetwork.json';
|
||||
import MerkleDistributorArtifacts from './artifacts/MerkleDistributor.json';
|
||||
import DistributorGovernanceArtifacts from './artifacts/DistributorGovernance.json';
|
||||
import { createInitialCheckpoint, handleEvent, createStateDiff, createStateCheckpoint } from './hooks';
|
||||
import { IPFSClient } from './ipfs';
|
||||
import { createInitialState, handleEvent, createStateDiff, createStateCheckpoint } from './hooks';
|
||||
import { ProducerSet } from './entity/ProducerSet';
|
||||
import { Producer } from './entity/Producer';
|
||||
import { RewardSchedule } from './entity/RewardSchedule';
|
||||
@ -146,7 +151,8 @@ export class Indexer implements IndexerInterface {
|
||||
this._postgraphileClient = postgraphileClient;
|
||||
this._ethProvider = ethProvider;
|
||||
this._serverConfig = serverConfig;
|
||||
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, this._ethProvider, jobQueue);
|
||||
this._ipfsClient = new IPFSClient(this._serverConfig.ipfsApiAddr);
|
||||
this._baseIndexer = new BaseIndexer(this._serverConfig, this._db, this._ethClient, this._postgraphileClient, this._ethProvider, jobQueue, this._ipfsClient);
|
||||
this._graphWatcher = graphWatcher;
|
||||
|
||||
this._abiMap = new Map();
|
||||
@ -178,8 +184,6 @@ export class Indexer implements IndexerInterface {
|
||||
this._storageLayoutMap.set(KIND_DISTRIBUTORGOVERNANCE, DistributorGovernanceStorageLayout);
|
||||
this._contractMap.set(KIND_DISTRIBUTORGOVERNANCE, new ethers.utils.Interface(DistributorGovernanceABI));
|
||||
|
||||
this._ipfsClient = new IPFSClient(this._serverConfig.ipfsApiAddr);
|
||||
|
||||
this._entityTypesMap = new Map();
|
||||
this._populateEntityTypesMap();
|
||||
|
||||
@ -242,315 +246,83 @@ export class Indexer implements IndexerInterface {
|
||||
};
|
||||
}
|
||||
|
||||
async pushToIPFS (data: any): Promise<void> {
|
||||
await this._baseIndexer.pushToIPFS(data);
|
||||
}
|
||||
|
||||
async processCanonicalBlock (job: any): Promise<void> {
|
||||
const { data: { blockHash } } = job;
|
||||
|
||||
// Finalize staged diff blocks if any.
|
||||
await this.finalizeDiffStaged(blockHash);
|
||||
await this._baseIndexer.finalizeDiffStaged(blockHash);
|
||||
|
||||
// Call custom stateDiff hook.
|
||||
await createStateDiff(this, blockHash);
|
||||
}
|
||||
|
||||
async createDiffStaged (contractAddress: string, blockHash: string, data: any): Promise<void> {
|
||||
const block = await this.getBlockProgress(blockHash);
|
||||
assert(block);
|
||||
|
||||
// Create a staged diff block.
|
||||
const ipldBlock = await this.prepareIPLDBlock(block, contractAddress, data, 'diff_staged');
|
||||
await this.saveOrUpdateIPLDBlock(ipldBlock);
|
||||
}
|
||||
|
||||
async finalizeDiffStaged (blockHash: string): Promise<void> {
|
||||
const block = await this.getBlockProgress(blockHash);
|
||||
assert(block);
|
||||
|
||||
// Get all the staged diff blocks for the given blockHash.
|
||||
const stagedBlocks = await this._db.getIPLDBlocks({ block, kind: 'diff_staged' });
|
||||
|
||||
// For each staged block, create a diff block.
|
||||
for (const stagedBlock of stagedBlocks) {
|
||||
const data = codec.decode(Buffer.from(stagedBlock.data));
|
||||
await this.createDiff(stagedBlock.contractAddress, stagedBlock.block.blockHash, data);
|
||||
}
|
||||
|
||||
// Remove all the staged diff blocks for current blockNumber.
|
||||
await this.removeStagedIPLDBlocks(block.blockNumber);
|
||||
}
|
||||
|
||||
async createDiff (contractAddress: string, blockHash: string, data: any): Promise<void> {
|
||||
const block = await this.getBlockProgress(blockHash);
|
||||
assert(block);
|
||||
|
||||
// Fetch the latest checkpoint for the contract.
|
||||
const checkpoint = await this.getLatestIPLDBlock(contractAddress, 'checkpoint');
|
||||
|
||||
// There should be an initial checkpoint at least.
|
||||
// Return if initial checkpoint doesn't exist.
|
||||
if (!checkpoint) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if the latest checkpoint is in the same block.
|
||||
assert(checkpoint.block.blockHash !== block.blockHash, 'Checkpoint already created for the block hash.');
|
||||
|
||||
const ipldBlock = await this.prepareIPLDBlock(block, contractAddress, data, 'diff');
|
||||
await this.saveOrUpdateIPLDBlock(ipldBlock);
|
||||
}
|
||||
|
||||
async processCheckpoint (job: any): Promise<void> {
|
||||
// Return if checkpointInterval is <= 0.
|
||||
const checkpointInterval = this._serverConfig.checkpointInterval;
|
||||
if (checkpointInterval <= 0) return;
|
||||
|
||||
const { data: { blockHash, blockNumber } } = job;
|
||||
|
||||
// Get all the contracts.
|
||||
const contracts = await this._db.getContracts();
|
||||
|
||||
// For each contract, merge the diff till now to create a checkpoint.
|
||||
for (const contract of contracts) {
|
||||
// Check if contract has checkpointing on.
|
||||
if (contract.checkpoint) {
|
||||
// If a checkpoint doesn't already exist and blockNumber is equal to startingBlock, create an initial checkpoint.
|
||||
const checkpointBlock = await this.getLatestIPLDBlock(contract.address, 'checkpoint');
|
||||
|
||||
if (!checkpointBlock) {
|
||||
if (blockNumber >= contract.startingBlock) {
|
||||
// Call initial checkpoint hook.
|
||||
await createInitialCheckpoint(this, contract.address, blockHash);
|
||||
}
|
||||
} else {
|
||||
await this.createCheckpoint(contract.address, blockHash, null, checkpointInterval);
|
||||
}
|
||||
}
|
||||
}
|
||||
const { data: { blockHash } } = job;
|
||||
await this._baseIndexer.processCheckpoint(this, blockHash, checkpointInterval);
|
||||
}
|
||||
|
||||
async processCLICheckpoint (contractAddress: string, blockHash?: string): Promise<string | undefined> {
|
||||
const checkpointBlockHash = await this.createCheckpoint(contractAddress, blockHash);
|
||||
assert(checkpointBlockHash);
|
||||
|
||||
// Push checkpoint to IPFS if configured.
|
||||
if (this.isIPFSConfigured()) {
|
||||
const block = await this.getBlockProgress(checkpointBlockHash);
|
||||
const checkpointIPLDBlocks = await this._db.getIPLDBlocks({ block, contractAddress, kind: 'checkpoint' });
|
||||
|
||||
// There can be at most one IPLDBlock for a (block, contractAddress, kind) combination.
|
||||
assert(checkpointIPLDBlocks.length <= 1);
|
||||
const checkpointIPLDBlock = checkpointIPLDBlocks[0];
|
||||
|
||||
const checkpointData = this.getIPLDData(checkpointIPLDBlock);
|
||||
await this.pushToIPFS(checkpointData);
|
||||
}
|
||||
|
||||
return checkpointBlockHash;
|
||||
return this._baseIndexer.processCLICheckpoint(this, contractAddress, blockHash);
|
||||
}
|
||||
|
||||
async createCheckpoint (contractAddress: string, blockHash?: string, data?: any, checkpointInterval?: number): Promise<string | undefined> {
|
||||
// Getting the current block.
|
||||
let currentBlock;
|
||||
|
||||
if (blockHash) {
|
||||
currentBlock = await this.getBlockProgress(blockHash);
|
||||
} else {
|
||||
// In case of empty blockHash from checkpoint CLI, get the latest processed block from hookStatus for the checkpoint.
|
||||
currentBlock = await this.getLatestHooksProcessedBlock();
|
||||
}
|
||||
|
||||
assert(currentBlock);
|
||||
|
||||
// Data is passed in case of initial checkpoint and checkpoint hook.
|
||||
// Assumption: There should be no events for the contract at the starting block.
|
||||
if (data) {
|
||||
const ipldBlock = await this.prepareIPLDBlock(currentBlock, contractAddress, data, 'checkpoint');
|
||||
await this.saveOrUpdateIPLDBlock(ipldBlock);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// If data is not passed, create from previous checkpoint and diffs after that.
|
||||
|
||||
// Make sure the block is marked complete.
|
||||
assert(currentBlock.isComplete, 'Block for a checkpoint should be marked as complete');
|
||||
|
||||
const hookStatus = await this.getHookStatus();
|
||||
assert(hookStatus);
|
||||
|
||||
// Make sure the hooks have been processed for the block.
|
||||
assert(currentBlock.blockNumber <= hookStatus.latestProcessedBlockNumber, 'Block for a checkpoint should have hooks processed');
|
||||
|
||||
// Fetch the latest checkpoint for the contract.
|
||||
const checkpointBlock = await this.getLatestIPLDBlock(contractAddress, 'checkpoint', currentBlock.blockNumber);
|
||||
assert(checkpointBlock);
|
||||
|
||||
// Check (only if checkpointInterval is passed) if it is time for a new checkpoint.
|
||||
if (checkpointInterval && checkpointBlock.block.blockNumber > (currentBlock.blockNumber - checkpointInterval)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Call state checkpoint hook and check if default checkpoint is disabled.
|
||||
const disableDefaultCheckpoint = await createStateCheckpoint(this, contractAddress, currentBlock.blockHash);
|
||||
|
||||
if (disableDefaultCheckpoint) {
|
||||
// Return if default checkpoint is disabled.
|
||||
// Return block hash for checkpoint CLI.
|
||||
return currentBlock.blockHash;
|
||||
}
|
||||
|
||||
const { block: { blockNumber: checkpointBlockNumber } } = checkpointBlock;
|
||||
|
||||
// Fetching all diff blocks after checkpoint.
|
||||
const diffBlocks = await this.getDiffIPLDBlocksByCheckpoint(contractAddress, checkpointBlockNumber);
|
||||
|
||||
const checkpointBlockData = codec.decode(Buffer.from(checkpointBlock.data)) as any;
|
||||
data = {
|
||||
state: checkpointBlockData.state
|
||||
};
|
||||
|
||||
for (const diffBlock of diffBlocks) {
|
||||
const diff = codec.decode(Buffer.from(diffBlock.data)) as any;
|
||||
data.state = _.merge(data.state, diff.state);
|
||||
}
|
||||
|
||||
const ipldBlock = await this.prepareIPLDBlock(currentBlock, contractAddress, data, 'checkpoint');
|
||||
await this.saveOrUpdateIPLDBlock(ipldBlock);
|
||||
|
||||
return currentBlock.blockHash;
|
||||
}
|
||||
|
||||
getIPLDData (ipldBlock: IPLDBlock): any {
|
||||
return codec.decode(Buffer.from(ipldBlock.data));
|
||||
}
|
||||
|
||||
async getIPLDBlocksByHash (blockHash: string): Promise<IPLDBlock[]> {
|
||||
const block = await this.getBlockProgress(blockHash);
|
||||
assert(block);
|
||||
|
||||
return this._db.getIPLDBlocks({ block });
|
||||
}
|
||||
|
||||
async getIPLDBlockByCid (cid: string): Promise<IPLDBlock | undefined> {
|
||||
const ipldBlocks = await this._db.getIPLDBlocks({ cid });
|
||||
|
||||
// There can be only one IPLDBlock with a particular cid.
|
||||
assert(ipldBlocks.length <= 1);
|
||||
|
||||
return ipldBlocks[0];
|
||||
async getPrevIPLDBlock (blockHash: string, contractAddress: string, kind?: string): Promise<IPLDBlock | undefined> {
|
||||
return this._db.getPrevIPLDBlock(blockHash, contractAddress, kind);
|
||||
}
|
||||
|
||||
async getLatestIPLDBlock (contractAddress: string, kind: string | null, blockNumber?: number): Promise<IPLDBlock | undefined> {
|
||||
return this._db.getLatestIPLDBlock(contractAddress, kind, blockNumber);
|
||||
}
|
||||
|
||||
async getPrevIPLDBlock (blockHash: string, contractAddress: string, kind?: string): Promise<IPLDBlock | undefined> {
|
||||
const dbTx = await this._db.createTransactionRunner();
|
||||
let res;
|
||||
|
||||
try {
|
||||
res = await this._db.getPrevIPLDBlock(dbTx, blockHash, contractAddress, kind);
|
||||
await dbTx.commitTransaction();
|
||||
} catch (error) {
|
||||
await dbTx.rollbackTransaction();
|
||||
throw error;
|
||||
} finally {
|
||||
await dbTx.release();
|
||||
}
|
||||
return res;
|
||||
async getIPLDBlocksByHash (blockHash: string): Promise<IPLDBlock[]> {
|
||||
return this._baseIndexer.getIPLDBlocksByHash(blockHash);
|
||||
}
|
||||
|
||||
async getDiffIPLDBlocksByCheckpoint (contractAddress: string, checkpointBlockNumber: number): Promise<IPLDBlock[]> {
|
||||
return this._db.getDiffIPLDBlocksByCheckpoint(contractAddress, checkpointBlockNumber);
|
||||
async getIPLDBlockByCid (cid: string): Promise<IPLDBlock | undefined> {
|
||||
return this._baseIndexer.getIPLDBlockByCid(cid);
|
||||
}
|
||||
|
||||
async prepareIPLDBlock (block: BlockProgress, contractAddress: string, data: any, kind: string):Promise<any> {
|
||||
assert(_.includes(['diff', 'checkpoint', 'diff_staged'], kind));
|
||||
|
||||
// Get an existing 'diff' | 'diff_staged' | 'checkpoint' IPLDBlock for current block, contractAddress.
|
||||
const currentIPLDBlocks = await this._db.getIPLDBlocks({ block, contractAddress, kind });
|
||||
|
||||
// There can be at most one IPLDBlock for a (block, contractAddress, kind) combination.
|
||||
assert(currentIPLDBlocks.length <= 1);
|
||||
const currentIPLDBlock = currentIPLDBlocks[0];
|
||||
|
||||
// Update currentIPLDBlock if it exists and is of same kind.
|
||||
let ipldBlock;
|
||||
if (currentIPLDBlock) {
|
||||
ipldBlock = currentIPLDBlock;
|
||||
|
||||
// Update the data field.
|
||||
const oldData = codec.decode(Buffer.from(currentIPLDBlock.data));
|
||||
data = _.merge(oldData, data);
|
||||
} else {
|
||||
ipldBlock = new IPLDBlock();
|
||||
|
||||
// Fetch the parent IPLDBlock.
|
||||
const parentIPLDBlock = await this.getLatestIPLDBlock(contractAddress, null, block.blockNumber);
|
||||
|
||||
// Setting the meta-data for an IPLDBlock (done only once per block).
|
||||
data.meta = {
|
||||
id: contractAddress,
|
||||
kind,
|
||||
parent: {
|
||||
'/': parentIPLDBlock ? parentIPLDBlock.cid : null
|
||||
},
|
||||
ethBlock: {
|
||||
cid: {
|
||||
'/': block.cid
|
||||
},
|
||||
num: block.blockNumber
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// Encoding the data using dag-cbor codec.
|
||||
const bytes = codec.encode(data);
|
||||
|
||||
// Calculating sha256 (multi)hash of the encoded data.
|
||||
const hash = await sha256.digest(bytes);
|
||||
|
||||
// Calculating the CID: v1, code: dag-cbor, hash.
|
||||
const cid = CID.create(1, codec.code, hash);
|
||||
|
||||
// Update ipldBlock with new data.
|
||||
ipldBlock = Object.assign(ipldBlock, {
|
||||
block,
|
||||
contractAddress,
|
||||
cid: cid.toString(),
|
||||
kind: data.meta.kind,
|
||||
data: Buffer.from(bytes)
|
||||
});
|
||||
|
||||
return ipldBlock;
|
||||
}
|
||||
|
||||
async saveOrUpdateIPLDBlock (ipldBlock: IPLDBlock): Promise<IPLDBlock> {
|
||||
return this._db.saveOrUpdateIPLDBlock(ipldBlock);
|
||||
}
|
||||
|
||||
async removeStagedIPLDBlocks (blockNumber: number): Promise<void> {
|
||||
const dbTx = await this._db.createTransactionRunner();
|
||||
|
||||
try {
|
||||
await this._db.removeEntities(dbTx, IPLDBlock, { relations: ['block'], where: { block: { blockNumber }, kind: 'diff_staged' } });
|
||||
await dbTx.commitTransaction();
|
||||
} catch (error) {
|
||||
await dbTx.rollbackTransaction();
|
||||
throw error;
|
||||
} finally {
|
||||
await dbTx.release();
|
||||
}
|
||||
}
|
||||
|
||||
async pushToIPFS (data: any): Promise<void> {
|
||||
await this._ipfsClient.push(data);
|
||||
getIPLDData (ipldBlock: IPLDBlock): any {
|
||||
return this._baseIndexer.getIPLDData(ipldBlock);
|
||||
}
|
||||
|
||||
isIPFSConfigured (): boolean {
|
||||
const ipfsAddr = this._serverConfig.ipfsApiAddr;
|
||||
return this._baseIndexer.isIPFSConfigured();
|
||||
}
|
||||
|
||||
// Return false if ipfsAddr is undefined | null | empty string.
|
||||
return (ipfsAddr !== undefined && ipfsAddr !== null && ipfsAddr !== '');
|
||||
async createInitialState (contractAddress: string, blockHash: string): Promise<any> {
|
||||
return createInitialState(this, contractAddress, blockHash);
|
||||
}
|
||||
|
||||
async createDiffStaged (contractAddress: string, blockHash: string, data: any): Promise<void> {
|
||||
await this._baseIndexer.createDiffStaged(contractAddress, blockHash, data);
|
||||
}
|
||||
|
||||
async createDiff (contractAddress: string, blockHash: string, data: any): Promise<void> {
|
||||
await this._baseIndexer.createDiff(contractAddress, blockHash, data);
|
||||
}
|
||||
|
||||
async createStateCheckpoint (contractAddress: string, blockHash: string): Promise<boolean> {
|
||||
return createStateCheckpoint(this, contractAddress, blockHash);
|
||||
}
|
||||
|
||||
async createCheckpoint (contractAddress: string, blockHash?: string, data?: any, checkpointInterval?: number): Promise<string | undefined> {
|
||||
return this._baseIndexer.createCheckpoint(this, contractAddress, blockHash, data, checkpointInterval);
|
||||
}
|
||||
|
||||
async saveOrUpdateIPLDBlock (ipldBlock: IPLDBlock): Promise<IPLDBlock> {
|
||||
return this._baseIndexer.saveOrUpdateIPLDBlock(ipldBlock);
|
||||
}
|
||||
|
||||
async removeIPLDBlocks (blockNumber: number, kind: string): Promise<void> {
|
||||
await this._baseIndexer.removeIPLDBlocks(blockNumber, kind);
|
||||
}
|
||||
|
||||
async getSubgraphEntity<Entity> (entity: new () => Entity, id: string, block?: BlockHeight): Promise<any> {
|
||||
@ -576,7 +348,10 @@ export class Indexer implements IndexerInterface {
|
||||
await this.triggerIndexingOnEvent(event);
|
||||
}
|
||||
|
||||
async processBlock (blockHash: string): Promise<void> {
|
||||
async processBlock (blockHash: string, blockNumber: number): Promise<void> {
|
||||
// Call a function to create initial state for contracts.
|
||||
await this._baseIndexer.createInit(this, blockHash, blockNumber);
|
||||
|
||||
// Call subgraph handler for block.
|
||||
await this._graphWatcher.handleBlock(blockHash);
|
||||
}
|
||||
@ -1008,20 +783,7 @@ export class Indexer implements IndexerInterface {
|
||||
}
|
||||
|
||||
async getHookStatus (): Promise<HookStatus | undefined> {
|
||||
const dbTx = await this._db.createTransactionRunner();
|
||||
let res;
|
||||
|
||||
try {
|
||||
res = await this._db.getHookStatus(dbTx);
|
||||
await dbTx.commitTransaction();
|
||||
} catch (error) {
|
||||
await dbTx.rollbackTransaction();
|
||||
throw error;
|
||||
} finally {
|
||||
await dbTx.release();
|
||||
}
|
||||
|
||||
return res;
|
||||
return this._db.getHookStatus();
|
||||
}
|
||||
|
||||
async updateHookStatusProcessedBlock (blockNumber: number, force?: boolean): Promise<HookStatus> {
|
||||
@ -1055,12 +817,7 @@ export class Indexer implements IndexerInterface {
|
||||
const hookStatus = await this.getHookStatus();
|
||||
assert(hookStatus);
|
||||
|
||||
const blocksAtHeight = await this.getBlocksAtHeight(hookStatus.latestProcessedBlockNumber, false);
|
||||
|
||||
// There can exactly one block at hookStatus.latestProcessedBlockNumber height.
|
||||
assert(blocksAtHeight.length === 1);
|
||||
|
||||
return blocksAtHeight[0];
|
||||
return this._baseIndexer.getLatestHooksProcessedBlock(hookStatus);
|
||||
}
|
||||
|
||||
async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<void> {
|
||||
|
@ -58,10 +58,10 @@ export class JobRunner {
|
||||
|
||||
await this._baseJobRunner.processBlock(job);
|
||||
|
||||
const { data: { kind, blockHash } } = job;
|
||||
const { data: { kind, blockHash, blockNumber } } = job;
|
||||
|
||||
if (kind === JOB_KIND_INDEX) {
|
||||
await this._indexer.processBlock(blockHash);
|
||||
await this._indexer.processBlock(blockHash, blockNumber);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -8,7 +8,7 @@ import debug from 'debug';
|
||||
import Decimal from 'decimal.js';
|
||||
import { GraphQLScalarType } from 'graphql';
|
||||
|
||||
import { BlockHeight } from '@vulcanize/util';
|
||||
import { BlockHeight, STATE_KIND_DIFF } from '@vulcanize/util';
|
||||
|
||||
import { Indexer } from './indexer';
|
||||
import { EventWatcher } from './events';
|
||||
@ -217,7 +217,7 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch
|
||||
return ipldBlock && ipldBlock.block.isComplete ? indexer.getResultIPLDBlock(ipldBlock) : undefined;
|
||||
},
|
||||
|
||||
getState: async (_: any, { blockHash, contractAddress, kind = 'diff' }: { blockHash: string, contractAddress: string, kind: string }) => {
|
||||
getState: async (_: any, { blockHash, contractAddress, kind = STATE_KIND_DIFF }: { blockHash: string, contractAddress: string, kind: string }) => {
|
||||
log('getState', blockHash, contractAddress, kind);
|
||||
|
||||
const ipldBlock = await indexer.getPrevIPLDBlock(blockHash, contractAddress, kind);
|
||||
|
@ -9,7 +9,7 @@ import debug from 'debug';
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
|
||||
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue } from '@vulcanize/util';
|
||||
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, STATE_KIND_CHECKPOINT } from '@vulcanize/util';
|
||||
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
|
||||
import * as codec from '@ipld/dag-cbor';
|
||||
|
||||
@ -93,7 +93,7 @@ const main = async (): Promise<void> => {
|
||||
if (contract.checkpoint) {
|
||||
await indexer.createCheckpoint(contract.address, block.blockHash);
|
||||
|
||||
const ipldBlock = await indexer.getLatestIPLDBlock(contract.address, 'checkpoint', block.blockNumber);
|
||||
const ipldBlock = await indexer.getLatestIPLDBlock(contract.address, STATE_KIND_CHECKPOINT, block.blockNumber);
|
||||
assert(ipldBlock);
|
||||
|
||||
const data = indexer.getIPLDData(ipldBlock);
|
||||
|
@ -11,7 +11,7 @@ import { PubSub } from 'apollo-server-express';
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
|
||||
import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, Config, initClients } from '@vulcanize/util';
|
||||
import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, Config, initClients, STATE_KIND_INIT, STATE_KIND_DIFF_STAGED } from '@vulcanize/util';
|
||||
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
|
||||
import * as codec from '@ipld/dag-cbor';
|
||||
|
||||
@ -108,11 +108,12 @@ export const main = async (): Promise<any> => {
|
||||
|
||||
ipldBlock.data = Buffer.from(codec.encode(ipldBlock.data));
|
||||
|
||||
await db.saveOrUpdateIPLDBlock(ipldBlock);
|
||||
await indexer.saveOrUpdateIPLDBlock(ipldBlock);
|
||||
}
|
||||
|
||||
// The staged IPLD blocks are unnecessary as checkpoints have been already created for the snapshot block.
|
||||
await indexer.removeStagedIPLDBlocks(block.blockNumber);
|
||||
// The 'diff_staged' and 'init' IPLD blocks are unnecessary as checkpoints have been already created for the snapshot block.
|
||||
await indexer.removeIPLDBlocks(block.blockNumber, STATE_KIND_INIT);
|
||||
await indexer.removeIPLDBlocks(block.blockNumber, STATE_KIND_DIFF_STAGED);
|
||||
};
|
||||
|
||||
main().catch(err => {
|
||||
|
@ -3,10 +3,10 @@
|
||||
//
|
||||
|
||||
import assert from 'assert';
|
||||
import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions, MoreThan } from 'typeorm';
|
||||
import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions } from 'typeorm';
|
||||
import path from 'path';
|
||||
|
||||
import { Database as BaseDatabase, MAX_REORG_DEPTH, DatabaseInterface } from '@vulcanize/util';
|
||||
import { IPLDDatabase as BaseDatabase, IPLDDatabaseInterface } from '@vulcanize/util';
|
||||
|
||||
import { Contract } from './entity/Contract';
|
||||
import { Event } from './entity/Event';
|
||||
@ -18,7 +18,7 @@ import { IPLDBlock } from './entity/IPLDBlock';
|
||||
import { GetMethod } from './entity/GetMethod';
|
||||
import { _Test } from './entity/_Test';
|
||||
|
||||
export class Database implements DatabaseInterface {
|
||||
export class Database implements IPLDDatabaseInterface {
|
||||
_config: ConnectionOptions;
|
||||
_conn!: Connection;
|
||||
_baseDatabase: BaseDatabase;
|
||||
@ -73,156 +73,55 @@ export class Database implements DatabaseInterface {
|
||||
return repo.save(entity);
|
||||
}
|
||||
|
||||
getNewIPLDBlock (): IPLDBlock {
|
||||
return new IPLDBlock();
|
||||
}
|
||||
|
||||
async getIPLDBlocks (where: FindConditions<IPLDBlock>): Promise<IPLDBlock[]> {
|
||||
const repo = this._conn.getRepository(IPLDBlock);
|
||||
return repo.find({ where, relations: ['block'] });
|
||||
|
||||
return this._baseDatabase.getIPLDBlocks(repo, where);
|
||||
}
|
||||
|
||||
async getLatestIPLDBlock (contractAddress: string, kind: string | null, blockNumber?: number): Promise<IPLDBlock | undefined> {
|
||||
const repo = this._conn.getRepository(IPLDBlock);
|
||||
|
||||
let queryBuilder = repo.createQueryBuilder('ipld_block')
|
||||
.leftJoinAndSelect('ipld_block.block', 'block')
|
||||
.where('block.is_pruned = false')
|
||||
.andWhere('ipld_block.contract_address = :contractAddress', { contractAddress })
|
||||
.orderBy('block.block_number', 'DESC');
|
||||
|
||||
// Filter out blocks after the provided block number.
|
||||
if (blockNumber) {
|
||||
queryBuilder.andWhere('block.block_number <= :blockNumber', { blockNumber });
|
||||
}
|
||||
|
||||
// Filter using kind if specified else order by id to give preference to checkpoint.
|
||||
queryBuilder = kind
|
||||
? queryBuilder.andWhere('ipld_block.kind = :kind', { kind })
|
||||
: queryBuilder.andWhere('ipld_block.kind != :kind', { kind: 'diff_staged' })
|
||||
.addOrderBy('ipld_block.id', 'DESC');
|
||||
|
||||
return queryBuilder.getOne();
|
||||
return this._baseDatabase.getLatestIPLDBlock(repo, contractAddress, kind, blockNumber);
|
||||
}
|
||||
|
||||
async getPrevIPLDBlock (queryRunner: QueryRunner, blockHash: string, contractAddress: string, kind?: string): Promise<IPLDBlock | undefined> {
|
||||
const heirerchicalQuery = `
|
||||
WITH RECURSIVE cte_query AS
|
||||
(
|
||||
SELECT
|
||||
b.block_hash,
|
||||
b.block_number,
|
||||
b.parent_hash,
|
||||
1 as depth,
|
||||
i.id,
|
||||
i.kind
|
||||
FROM
|
||||
block_progress b
|
||||
LEFT JOIN
|
||||
ipld_block i ON i.block_id = b.id
|
||||
AND i.contract_address = $2
|
||||
WHERE
|
||||
b.block_hash = $1
|
||||
UNION ALL
|
||||
SELECT
|
||||
b.block_hash,
|
||||
b.block_number,
|
||||
b.parent_hash,
|
||||
c.depth + 1,
|
||||
i.id,
|
||||
i.kind
|
||||
FROM
|
||||
block_progress b
|
||||
LEFT JOIN
|
||||
ipld_block i
|
||||
ON i.block_id = b.id
|
||||
AND i.contract_address = $2
|
||||
INNER JOIN
|
||||
cte_query c ON c.parent_hash = b.block_hash
|
||||
WHERE
|
||||
c.depth < $3
|
||||
)
|
||||
SELECT
|
||||
block_number, id, kind
|
||||
FROM
|
||||
cte_query
|
||||
ORDER BY block_number DESC, id DESC
|
||||
`;
|
||||
|
||||
// Fetching block and id for previous IPLDBlock in frothy region.
|
||||
const queryResult = await queryRunner.query(heirerchicalQuery, [blockHash, contractAddress, MAX_REORG_DEPTH]);
|
||||
const latestRequiredResult = kind
|
||||
? queryResult.find((obj: any) => obj.kind === kind)
|
||||
: queryResult.find((obj: any) => obj.id);
|
||||
|
||||
let result: IPLDBlock | undefined;
|
||||
if (latestRequiredResult) {
|
||||
result = await queryRunner.manager.findOne(IPLDBlock, { id: latestRequiredResult.id }, { relations: ['block'] });
|
||||
} else {
|
||||
// If IPLDBlock not found in frothy region get latest IPLDBlock in the pruned region.
|
||||
// Filter out IPLDBlocks from pruned blocks.
|
||||
const canonicalBlockNumber = queryResult.pop().block_number + 1;
|
||||
|
||||
let queryBuilder = queryRunner.manager.createQueryBuilder(IPLDBlock, 'ipld_block')
|
||||
.leftJoinAndSelect('ipld_block.block', 'block')
|
||||
.where('block.is_pruned = false')
|
||||
.andWhere('ipld_block.contract_address = :contractAddress', { contractAddress })
|
||||
.andWhere('block.block_number <= :canonicalBlockNumber', { canonicalBlockNumber })
|
||||
.orderBy('block.block_number', 'DESC');
|
||||
|
||||
// Filter using kind if specified else order by id to give preference to checkpoint.
|
||||
queryBuilder = kind
|
||||
? queryBuilder.andWhere('ipld_block.kind = :kind', { kind })
|
||||
: queryBuilder.addOrderBy('ipld_block.id', 'DESC');
|
||||
|
||||
result = await queryBuilder.getOne();
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// Fetch all diff IPLDBlocks after the specified checkpoint.
|
||||
async getDiffIPLDBlocksByCheckpoint (contractAddress: string, checkpointBlockNumber: number): Promise<IPLDBlock[]> {
|
||||
async getPrevIPLDBlock (blockHash: string, contractAddress: string, kind?: string): Promise<IPLDBlock | undefined> {
|
||||
const repo = this._conn.getRepository(IPLDBlock);
|
||||
|
||||
return repo.find({
|
||||
relations: ['block'],
|
||||
where: {
|
||||
contractAddress,
|
||||
kind: 'diff',
|
||||
block: {
|
||||
isPruned: false,
|
||||
blockNumber: MoreThan(checkpointBlockNumber)
|
||||
}
|
||||
},
|
||||
order: {
|
||||
block: 'ASC'
|
||||
}
|
||||
});
|
||||
return this._baseDatabase.getPrevIPLDBlock(repo, blockHash, contractAddress, kind);
|
||||
}
|
||||
|
||||
async saveOrUpdateIPLDBlock (ipldBlock: IPLDBlock): Promise<IPLDBlock> {
|
||||
// Fetch all diff IPLDBlocks after the specified block number.
|
||||
async getDiffIPLDBlocksByBlocknumber (contractAddress: string, blockNumber: number): Promise<IPLDBlock[]> {
|
||||
const repo = this._conn.getRepository(IPLDBlock);
|
||||
return repo.save(ipldBlock);
|
||||
|
||||
return this._baseDatabase.getDiffIPLDBlocksByBlocknumber(repo, contractAddress, blockNumber);
|
||||
}
|
||||
|
||||
async getHookStatus (queryRunner: QueryRunner): Promise<HookStatus | undefined> {
|
||||
const repo = queryRunner.manager.getRepository(HookStatus);
|
||||
async saveOrUpdateIPLDBlock (dbTx: QueryRunner, ipldBlock: IPLDBlock): Promise<IPLDBlock> {
|
||||
const repo = dbTx.manager.getRepository(IPLDBlock);
|
||||
|
||||
return repo.findOne();
|
||||
return this._baseDatabase.saveOrUpdateIPLDBlock(repo, ipldBlock);
|
||||
}
|
||||
|
||||
async removeIPLDBlocks (dbTx: QueryRunner, blockNumber: number, kind: string): Promise<void> {
|
||||
await this._baseDatabase.removeEntities(dbTx, IPLDBlock, { relations: ['block'], where: { block: { blockNumber }, kind } });
|
||||
}
|
||||
|
||||
async getHookStatus (): Promise<HookStatus | undefined> {
|
||||
const repo = this._conn.getRepository(HookStatus);
|
||||
|
||||
return this._baseDatabase.getHookStatus(repo);
|
||||
}
|
||||
|
||||
async updateHookStatusProcessedBlock (queryRunner: QueryRunner, blockNumber: number, force?: boolean): Promise<HookStatus> {
|
||||
const repo = queryRunner.manager.getRepository(HookStatus);
|
||||
let entity = await repo.findOne();
|
||||
|
||||
if (!entity) {
|
||||
entity = repo.create({
|
||||
latestProcessedBlockNumber: blockNumber
|
||||
});
|
||||
}
|
||||
|
||||
if (force || blockNumber > entity.latestProcessedBlockNumber) {
|
||||
entity.latestProcessedBlockNumber = blockNumber;
|
||||
}
|
||||
|
||||
return repo.save(entity);
|
||||
return this._baseDatabase.updateHookStatusProcessedBlock(repo, blockNumber, force);
|
||||
}
|
||||
|
||||
async getContracts (): Promise<Contract[]> {
|
||||
|
@ -80,7 +80,10 @@ export class EventWatcher implements EventWatcherInterface {
|
||||
|
||||
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
|
||||
|
||||
await this.createHooksJob(kind);
|
||||
// If it's a pruning job: Create a hooks job.
|
||||
if (kind === JOB_KIND_PRUNE) {
|
||||
await this.createHooksJob();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@ -148,19 +151,18 @@ export class EventWatcher implements EventWatcherInterface {
|
||||
}
|
||||
}
|
||||
|
||||
async createHooksJob (kind: string): Promise<void> {
|
||||
// If it's a pruning job: Create a hook job for the latest canonical block.
|
||||
if (kind === JOB_KIND_PRUNE) {
|
||||
const latestCanonicalBlock = await this._indexer.getLatestCanonicalBlock();
|
||||
async createHooksJob (): Promise<void> {
|
||||
// Get the latest canonical block
|
||||
const latestCanonicalBlock = await this._indexer.getLatestCanonicalBlock();
|
||||
|
||||
await this._jobQueue.pushJob(
|
||||
QUEUE_HOOKS,
|
||||
{
|
||||
blockHash: latestCanonicalBlock.blockHash,
|
||||
blockNumber: latestCanonicalBlock.blockNumber
|
||||
}
|
||||
);
|
||||
}
|
||||
// Create a hooks job for parent block of latestCanonicalBlock because pruning for first block is skipped as it is assumed to be a canonical block.
|
||||
await this._jobQueue.pushJob(
|
||||
QUEUE_HOOKS,
|
||||
{
|
||||
blockHash: latestCanonicalBlock.parentHash,
|
||||
blockNumber: latestCanonicalBlock.blockNumber - 1
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
async createCheckpointJob (blockHash: string, blockNumber: number): Promise<void> {
|
||||
|
@ -6,7 +6,14 @@ import assert from 'assert';
|
||||
|
||||
import { Indexer, ResultEvent } from './indexer';
|
||||
|
||||
export async function createInitialCheckpoint (indexer: Indexer, contractAddress: string, blockHash: string): Promise<void> {
|
||||
/**
|
||||
* Hook function to store an initial state.
|
||||
* @param indexer Indexer instance.
|
||||
* @param blockHash Hash of the concerned block.
|
||||
* @param contractAddress Address of the concerned contract.
|
||||
* @returns Data block to be stored.
|
||||
*/
|
||||
export async function createInitialState (indexer: Indexer, contractAddress: string, blockHash: string): Promise<any> {
|
||||
assert(indexer);
|
||||
assert(blockHash);
|
||||
assert(contractAddress);
|
||||
@ -16,14 +23,26 @@ export async function createInitialCheckpoint (indexer: Indexer, contractAddress
|
||||
state: {}
|
||||
};
|
||||
|
||||
await indexer.createCheckpoint(contractAddress, blockHash, ipldBlockData);
|
||||
return ipldBlockData;
|
||||
}
|
||||
|
||||
/**
|
||||
* Hook function to create state diff.
|
||||
* @param indexer Indexer instance that contains methods to fetch the contract varaiable values.
|
||||
* @param blockHash Block hash of the concerned block.
|
||||
*/
|
||||
export async function createStateDiff (indexer: Indexer, blockHash: string): Promise<void> {
|
||||
assert(indexer);
|
||||
assert(blockHash);
|
||||
}
|
||||
|
||||
/**
|
||||
* Hook function to create state checkpoint
|
||||
* @param indexer Indexer instance.
|
||||
* @param contractAddress Address of the concerned contract.
|
||||
* @param blockHash Block hash of the concerned block.
|
||||
* @returns Whether to disable default checkpoint. If false, the state from this hook is updated with that from default checkpoint.
|
||||
*/
|
||||
export async function createStateCheckpoint (indexer: Indexer, contractAddress: string, blockHash: string): Promise<boolean> {
|
||||
assert(indexer);
|
||||
assert(blockHash);
|
||||
@ -32,6 +51,11 @@ export async function createStateCheckpoint (indexer: Indexer, contractAddress:
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Event hook function.
|
||||
* @param indexer Indexer instance that contains methods to fetch and update the contract values in the database.
|
||||
* @param eventData ResultEvent object containing event information.
|
||||
*/
|
||||
export async function handleEvent (indexer: Indexer, eventData: ResultEvent): Promise<void> {
|
||||
assert(indexer);
|
||||
assert(eventData);
|
||||
|
@ -7,16 +7,24 @@ import debug from 'debug';
|
||||
import { DeepPartial } from 'typeorm';
|
||||
import JSONbig from 'json-bigint';
|
||||
import { ethers } from 'ethers';
|
||||
import { sha256 } from 'multiformats/hashes/sha2';
|
||||
import { CID } from 'multiformats/cid';
|
||||
import _ from 'lodash';
|
||||
|
||||
import { JsonFragment } from '@ethersproject/abi';
|
||||
import { BaseProvider } from '@ethersproject/providers';
|
||||
import * as codec from '@ipld/dag-cbor';
|
||||
import { EthClient } from '@vulcanize/ipld-eth-client';
|
||||
import { StorageLayout } from '@vulcanize/solidity-mapper';
|
||||
import { EventInterface, Indexer as BaseIndexer, IndexerInterface, ValueResult, UNKNOWN_EVENT_NAME, ServerConfig, updateStateForElementaryType, JobQueue, BlockHeight } from '@vulcanize/util';
|
||||
import {
|
||||
EventInterface,
|
||||
IPLDIndexer as BaseIndexer,
|
||||
IndexerInterface,
|
||||
ValueResult,
|
||||
UNKNOWN_EVENT_NAME,
|
||||
ServerConfig,
|
||||
updateStateForElementaryType,
|
||||
JobQueue,
|
||||
BlockHeight,
|
||||
IPFSClient
|
||||
} from '@vulcanize/util';
|
||||
import { GraphWatcher } from '@vulcanize/graph-node';
|
||||
|
||||
import { Database } from './database';
|
||||
@ -27,8 +35,7 @@ import { HookStatus } from './entity/HookStatus';
|
||||
import { BlockProgress } from './entity/BlockProgress';
|
||||
import { IPLDBlock } from './entity/IPLDBlock';
|
||||
import artifacts from './artifacts/Example.json';
|
||||
import { createInitialCheckpoint, handleEvent, createStateDiff, createStateCheckpoint } from './hooks';
|
||||
import { IPFSClient } from './ipfs';
|
||||
import { createInitialState, handleEvent, createStateDiff, createStateCheckpoint } from './hooks';
|
||||
import { Author } from './entity/Author';
|
||||
import { Blog } from './entity/Blog';
|
||||
import { Category } from './entity/Category';
|
||||
@ -103,7 +110,8 @@ export class Indexer implements IndexerInterface {
|
||||
this._postgraphileClient = postgraphileClient;
|
||||
this._ethProvider = ethProvider;
|
||||
this._serverConfig = serverConfig;
|
||||
this._baseIndexer = new BaseIndexer(this._db, this._ethClient, this._postgraphileClient, this._ethProvider, jobQueue);
|
||||
this._ipfsClient = new IPFSClient(this._serverConfig.ipfsApiAddr);
|
||||
this._baseIndexer = new BaseIndexer(this._serverConfig, this._db, this._ethClient, this._postgraphileClient, this._ethProvider, jobQueue, this._ipfsClient);
|
||||
this._graphWatcher = graphWatcher;
|
||||
|
||||
const { abi, storageLayout } = artifacts;
|
||||
@ -116,8 +124,6 @@ export class Indexer implements IndexerInterface {
|
||||
|
||||
this._contract = new ethers.utils.Interface(this._abi);
|
||||
|
||||
this._ipfsClient = new IPFSClient(this._serverConfig.ipfsApiAddr);
|
||||
|
||||
this._entityTypesMap = new Map();
|
||||
this._populateEntityTypesMap();
|
||||
|
||||
@ -239,315 +245,83 @@ export class Indexer implements IndexerInterface {
|
||||
return result;
|
||||
}
|
||||
|
||||
async pushToIPFS (data: any): Promise<void> {
|
||||
await this._baseIndexer.pushToIPFS(data);
|
||||
}
|
||||
|
||||
async processCanonicalBlock (job: any): Promise<void> {
|
||||
const { data: { blockHash } } = job;
|
||||
|
||||
// Finalize staged diff blocks if any.
|
||||
await this.finalizeDiffStaged(blockHash);
|
||||
await this._baseIndexer.finalizeDiffStaged(blockHash);
|
||||
|
||||
// Call custom stateDiff hook.
|
||||
await createStateDiff(this, blockHash);
|
||||
}
|
||||
|
||||
async createDiffStaged (contractAddress: string, blockHash: string, data: any): Promise<void> {
|
||||
const block = await this.getBlockProgress(blockHash);
|
||||
assert(block);
|
||||
|
||||
// Create a staged diff block.
|
||||
const ipldBlock = await this.prepareIPLDBlock(block, contractAddress, data, 'diff_staged');
|
||||
await this.saveOrUpdateIPLDBlock(ipldBlock);
|
||||
}
|
||||
|
||||
async finalizeDiffStaged (blockHash: string): Promise<void> {
|
||||
const block = await this.getBlockProgress(blockHash);
|
||||
assert(block);
|
||||
|
||||
// Get all the staged diff blocks for the given blockHash.
|
||||
const stagedBlocks = await this._db.getIPLDBlocks({ block, kind: 'diff_staged' });
|
||||
|
||||
// For each staged block, create a diff block.
|
||||
for (const stagedBlock of stagedBlocks) {
|
||||
const data = codec.decode(Buffer.from(stagedBlock.data));
|
||||
await this.createDiff(stagedBlock.contractAddress, stagedBlock.block.blockHash, data);
|
||||
}
|
||||
|
||||
// Remove all the staged diff blocks for current blockNumber.
|
||||
await this.removeStagedIPLDBlocks(block.blockNumber);
|
||||
}
|
||||
|
||||
async createDiff (contractAddress: string, blockHash: string, data: any): Promise<void> {
|
||||
const block = await this.getBlockProgress(blockHash);
|
||||
assert(block);
|
||||
|
||||
// Fetch the latest checkpoint for the contract.
|
||||
const checkpoint = await this.getLatestIPLDBlock(contractAddress, 'checkpoint');
|
||||
|
||||
// There should be an initial checkpoint at least.
|
||||
// Return if initial checkpoint doesn't exist.
|
||||
if (!checkpoint) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if the latest checkpoint is in the same block.
|
||||
assert(checkpoint.block.blockHash !== block.blockHash, 'Checkpoint already created for the block hash.');
|
||||
|
||||
const ipldBlock = await this.prepareIPLDBlock(block, contractAddress, data, 'diff');
|
||||
await this.saveOrUpdateIPLDBlock(ipldBlock);
|
||||
}
|
||||
|
||||
async processCheckpoint (job: any): Promise<void> {
|
||||
// Return if checkpointInterval is <= 0.
|
||||
const checkpointInterval = this._serverConfig.checkpointInterval;
|
||||
if (checkpointInterval <= 0) return;
|
||||
|
||||
const { data: { blockHash, blockNumber } } = job;
|
||||
|
||||
// Get all the contracts.
|
||||
const contracts = await this._db.getContracts();
|
||||
|
||||
// For each contract, merge the diff till now to create a checkpoint.
|
||||
for (const contract of contracts) {
|
||||
// Check if contract has checkpointing on.
|
||||
if (contract.checkpoint) {
|
||||
// If a checkpoint doesn't already exist and blockNumber is equal to startingBlock, create an initial checkpoint.
|
||||
const checkpointBlock = await this.getLatestIPLDBlock(contract.address, 'checkpoint');
|
||||
|
||||
if (!checkpointBlock) {
|
||||
if (blockNumber >= contract.startingBlock) {
|
||||
// Call initial checkpoint hook.
|
||||
await createInitialCheckpoint(this, contract.address, blockHash);
|
||||
}
|
||||
} else {
|
||||
await this.createCheckpoint(contract.address, blockHash, null, checkpointInterval);
|
||||
}
|
||||
}
|
||||
}
|
||||
const { data: { blockHash } } = job;
|
||||
await this._baseIndexer.processCheckpoint(this, blockHash, checkpointInterval);
|
||||
}
|
||||
|
||||
async processCLICheckpoint (contractAddress: string, blockHash?: string): Promise<string | undefined> {
|
||||
const checkpointBlockHash = await this.createCheckpoint(contractAddress, blockHash);
|
||||
assert(checkpointBlockHash);
|
||||
|
||||
// Push checkpoint to IPFS if configured.
|
||||
if (this.isIPFSConfigured()) {
|
||||
const block = await this.getBlockProgress(checkpointBlockHash);
|
||||
const checkpointIPLDBlocks = await this._db.getIPLDBlocks({ block, contractAddress, kind: 'checkpoint' });
|
||||
|
||||
// There can be at most one IPLDBlock for a (block, contractAddress, kind) combination.
|
||||
assert(checkpointIPLDBlocks.length <= 1);
|
||||
const checkpointIPLDBlock = checkpointIPLDBlocks[0];
|
||||
|
||||
const checkpointData = this.getIPLDData(checkpointIPLDBlock);
|
||||
await this.pushToIPFS(checkpointData);
|
||||
}
|
||||
|
||||
return checkpointBlockHash;
|
||||
return this._baseIndexer.processCLICheckpoint(this, contractAddress, blockHash);
|
||||
}
|
||||
|
||||
async createCheckpoint (contractAddress: string, blockHash?: string, data?: any, checkpointInterval?: number): Promise<string | undefined> {
|
||||
// Getting the current block.
|
||||
let currentBlock;
|
||||
|
||||
if (blockHash) {
|
||||
currentBlock = await this.getBlockProgress(blockHash);
|
||||
} else {
|
||||
// In case of empty blockHash from checkpoint CLI, get the latest processed block from hookStatus for the checkpoint.
|
||||
currentBlock = await this.getLatestHooksProcessedBlock();
|
||||
}
|
||||
|
||||
assert(currentBlock);
|
||||
|
||||
// Data is passed in case of initial checkpoint and checkpoint hook.
|
||||
// Assumption: There should be no events for the contract at the starting block.
|
||||
if (data) {
|
||||
const ipldBlock = await this.prepareIPLDBlock(currentBlock, contractAddress, data, 'checkpoint');
|
||||
await this.saveOrUpdateIPLDBlock(ipldBlock);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// If data is not passed, create from previous checkpoint and diffs after that.
|
||||
|
||||
// Make sure the block is marked complete.
|
||||
assert(currentBlock.isComplete, 'Block for a checkpoint should be marked as complete');
|
||||
|
||||
const hookStatus = await this.getHookStatus();
|
||||
assert(hookStatus);
|
||||
|
||||
// Make sure the hooks have been processed for the block.
|
||||
assert(currentBlock.blockNumber <= hookStatus.latestProcessedBlockNumber, 'Block for a checkpoint should have hooks processed');
|
||||
|
||||
// Fetch the latest checkpoint for the contract.
|
||||
const checkpointBlock = await this.getLatestIPLDBlock(contractAddress, 'checkpoint', currentBlock.blockNumber);
|
||||
assert(checkpointBlock);
|
||||
|
||||
// Check (only if checkpointInterval is passed) if it is time for a new checkpoint.
|
||||
if (checkpointInterval && checkpointBlock.block.blockNumber > (currentBlock.blockNumber - checkpointInterval)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Call state checkpoint hook and check if default checkpoint is disabled.
|
||||
const disableDefaultCheckpoint = await createStateCheckpoint(this, contractAddress, currentBlock.blockHash);
|
||||
|
||||
if (disableDefaultCheckpoint) {
|
||||
// Return if default checkpoint is disabled.
|
||||
// Return block hash for checkpoint CLI.
|
||||
return currentBlock.blockHash;
|
||||
}
|
||||
|
||||
const { block: { blockNumber: checkpointBlockNumber } } = checkpointBlock;
|
||||
|
||||
// Fetching all diff blocks after checkpoint.
|
||||
const diffBlocks = await this.getDiffIPLDBlocksByCheckpoint(contractAddress, checkpointBlockNumber);
|
||||
|
||||
const checkpointBlockData = codec.decode(Buffer.from(checkpointBlock.data)) as any;
|
||||
data = {
|
||||
state: checkpointBlockData.state
|
||||
};
|
||||
|
||||
for (const diffBlock of diffBlocks) {
|
||||
const diff = codec.decode(Buffer.from(diffBlock.data)) as any;
|
||||
data.state = _.merge(data.state, diff.state);
|
||||
}
|
||||
|
||||
const ipldBlock = await this.prepareIPLDBlock(currentBlock, contractAddress, data, 'checkpoint');
|
||||
await this.saveOrUpdateIPLDBlock(ipldBlock);
|
||||
|
||||
return currentBlock.blockHash;
|
||||
}
|
||||
|
||||
getIPLDData (ipldBlock: IPLDBlock): any {
|
||||
return codec.decode(Buffer.from(ipldBlock.data));
|
||||
}
|
||||
|
||||
async getIPLDBlocksByHash (blockHash: string): Promise<IPLDBlock[]> {
|
||||
const block = await this.getBlockProgress(blockHash);
|
||||
assert(block);
|
||||
|
||||
return this._db.getIPLDBlocks({ block });
|
||||
}
|
||||
|
||||
async getIPLDBlockByCid (cid: string): Promise<IPLDBlock | undefined> {
|
||||
const ipldBlocks = await this._db.getIPLDBlocks({ cid });
|
||||
|
||||
// There can be only one IPLDBlock with a particular cid.
|
||||
assert(ipldBlocks.length <= 1);
|
||||
|
||||
return ipldBlocks[0];
|
||||
async getPrevIPLDBlock (blockHash: string, contractAddress: string, kind?: string): Promise<IPLDBlock | undefined> {
|
||||
return this._db.getPrevIPLDBlock(blockHash, contractAddress, kind);
|
||||
}
|
||||
|
||||
async getLatestIPLDBlock (contractAddress: string, kind: string | null, blockNumber?: number): Promise<IPLDBlock | undefined> {
|
||||
return this._db.getLatestIPLDBlock(contractAddress, kind, blockNumber);
|
||||
}
|
||||
|
||||
async getPrevIPLDBlock (blockHash: string, contractAddress: string, kind?: string): Promise<IPLDBlock | undefined> {
|
||||
const dbTx = await this._db.createTransactionRunner();
|
||||
let res;
|
||||
|
||||
try {
|
||||
res = await this._db.getPrevIPLDBlock(dbTx, blockHash, contractAddress, kind);
|
||||
await dbTx.commitTransaction();
|
||||
} catch (error) {
|
||||
await dbTx.rollbackTransaction();
|
||||
throw error;
|
||||
} finally {
|
||||
await dbTx.release();
|
||||
}
|
||||
return res;
|
||||
async getIPLDBlocksByHash (blockHash: string): Promise<IPLDBlock[]> {
|
||||
return this._baseIndexer.getIPLDBlocksByHash(blockHash);
|
||||
}
|
||||
|
||||
async getDiffIPLDBlocksByCheckpoint (contractAddress: string, checkpointBlockNumber: number): Promise<IPLDBlock[]> {
|
||||
return this._db.getDiffIPLDBlocksByCheckpoint(contractAddress, checkpointBlockNumber);
|
||||
async getIPLDBlockByCid (cid: string): Promise<IPLDBlock | undefined> {
|
||||
return this._baseIndexer.getIPLDBlockByCid(cid);
|
||||
}
|
||||
|
||||
async prepareIPLDBlock (block: BlockProgress, contractAddress: string, data: any, kind: string):Promise<any> {
|
||||
assert(_.includes(['diff', 'checkpoint', 'diff_staged'], kind));
|
||||
|
||||
// Get an existing 'diff' | 'diff_staged' | 'checkpoint' IPLDBlock for current block, contractAddress.
|
||||
const currentIPLDBlocks = await this._db.getIPLDBlocks({ block, contractAddress, kind });
|
||||
|
||||
// There can be at most one IPLDBlock for a (block, contractAddress, kind) combination.
|
||||
assert(currentIPLDBlocks.length <= 1);
|
||||
const currentIPLDBlock = currentIPLDBlocks[0];
|
||||
|
||||
// Update currentIPLDBlock if it exists and is of same kind.
|
||||
let ipldBlock;
|
||||
if (currentIPLDBlock) {
|
||||
ipldBlock = currentIPLDBlock;
|
||||
|
||||
// Update the data field.
|
||||
const oldData = codec.decode(Buffer.from(currentIPLDBlock.data));
|
||||
data = _.merge(oldData, data);
|
||||
} else {
|
||||
ipldBlock = new IPLDBlock();
|
||||
|
||||
// Fetch the parent IPLDBlock.
|
||||
const parentIPLDBlock = await this.getLatestIPLDBlock(contractAddress, null, block.blockNumber);
|
||||
|
||||
// Setting the meta-data for an IPLDBlock (done only once per block).
|
||||
data.meta = {
|
||||
id: contractAddress,
|
||||
kind,
|
||||
parent: {
|
||||
'/': parentIPLDBlock ? parentIPLDBlock.cid : null
|
||||
},
|
||||
ethBlock: {
|
||||
cid: {
|
||||
'/': block.cid
|
||||
},
|
||||
num: block.blockNumber
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// Encoding the data using dag-cbor codec.
|
||||
const bytes = codec.encode(data);
|
||||
|
||||
// Calculating sha256 (multi)hash of the encoded data.
|
||||
const hash = await sha256.digest(bytes);
|
||||
|
||||
// Calculating the CID: v1, code: dag-cbor, hash.
|
||||
const cid = CID.create(1, codec.code, hash);
|
||||
|
||||
// Update ipldBlock with new data.
|
||||
ipldBlock = Object.assign(ipldBlock, {
|
||||
block,
|
||||
contractAddress,
|
||||
cid: cid.toString(),
|
||||
kind: data.meta.kind,
|
||||
data: Buffer.from(bytes)
|
||||
});
|
||||
|
||||
return ipldBlock;
|
||||
}
|
||||
|
||||
async saveOrUpdateIPLDBlock (ipldBlock: IPLDBlock): Promise<IPLDBlock> {
|
||||
return this._db.saveOrUpdateIPLDBlock(ipldBlock);
|
||||
}
|
||||
|
||||
async removeStagedIPLDBlocks (blockNumber: number): Promise<void> {
|
||||
const dbTx = await this._db.createTransactionRunner();
|
||||
|
||||
try {
|
||||
await this._db.removeEntities(dbTx, IPLDBlock, { relations: ['block'], where: { block: { blockNumber }, kind: 'diff_staged' } });
|
||||
await dbTx.commitTransaction();
|
||||
} catch (error) {
|
||||
await dbTx.rollbackTransaction();
|
||||
throw error;
|
||||
} finally {
|
||||
await dbTx.release();
|
||||
}
|
||||
}
|
||||
|
||||
async pushToIPFS (data: any): Promise<void> {
|
||||
await this._ipfsClient.push(data);
|
||||
getIPLDData (ipldBlock: IPLDBlock): any {
|
||||
return this._baseIndexer.getIPLDData(ipldBlock);
|
||||
}
|
||||
|
||||
isIPFSConfigured (): boolean {
|
||||
const ipfsAddr = this._serverConfig.ipfsApiAddr;
|
||||
return this._baseIndexer.isIPFSConfigured();
|
||||
}
|
||||
|
||||
// Return false if ipfsAddr is undefined | null | empty string.
|
||||
return (ipfsAddr !== undefined && ipfsAddr !== null && ipfsAddr !== '');
|
||||
async createInitialState (contractAddress: string, blockHash: string): Promise<any> {
|
||||
return createInitialState(this, contractAddress, blockHash);
|
||||
}
|
||||
|
||||
async createDiffStaged (contractAddress: string, blockHash: string, data: any): Promise<void> {
|
||||
await this._baseIndexer.createDiffStaged(contractAddress, blockHash, data);
|
||||
}
|
||||
|
||||
async createDiff (contractAddress: string, blockHash: string, data: any): Promise<void> {
|
||||
await this._baseIndexer.createDiff(contractAddress, blockHash, data);
|
||||
}
|
||||
|
||||
async createStateCheckpoint (contractAddress: string, blockHash: string): Promise<boolean> {
|
||||
return createStateCheckpoint(this, contractAddress, blockHash);
|
||||
}
|
||||
|
||||
async createCheckpoint (contractAddress: string, blockHash?: string, data?: any, checkpointInterval?: number): Promise<string | undefined> {
|
||||
return this._baseIndexer.createCheckpoint(this, contractAddress, blockHash, data, checkpointInterval);
|
||||
}
|
||||
|
||||
async saveOrUpdateIPLDBlock (ipldBlock: IPLDBlock): Promise<IPLDBlock> {
|
||||
return this._baseIndexer.saveOrUpdateIPLDBlock(ipldBlock);
|
||||
}
|
||||
|
||||
async removeIPLDBlocks (blockNumber: number, kind: string): Promise<void> {
|
||||
await this._baseIndexer.removeIPLDBlocks(blockNumber, kind);
|
||||
}
|
||||
|
||||
async getSubgraphEntity<Entity> (entity: new () => Entity, id: string, block: BlockHeight): Promise<Entity | undefined> {
|
||||
@ -573,7 +347,10 @@ export class Indexer implements IndexerInterface {
|
||||
await this.triggerIndexingOnEvent(event);
|
||||
}
|
||||
|
||||
async processBlock (blockHash: string): Promise<void> {
|
||||
async processBlock (blockHash: string, blockNumber: number): Promise<void> {
|
||||
// Call a function to create initial state for contracts.
|
||||
await this._baseIndexer.createInit(this, blockHash, blockNumber);
|
||||
|
||||
// Call subgraph handler for block.
|
||||
await this._graphWatcher.handleBlock(blockHash);
|
||||
}
|
||||
@ -607,20 +384,7 @@ export class Indexer implements IndexerInterface {
|
||||
}
|
||||
|
||||
async getHookStatus (): Promise<HookStatus | undefined> {
|
||||
const dbTx = await this._db.createTransactionRunner();
|
||||
let res;
|
||||
|
||||
try {
|
||||
res = await this._db.getHookStatus(dbTx);
|
||||
await dbTx.commitTransaction();
|
||||
} catch (error) {
|
||||
await dbTx.rollbackTransaction();
|
||||
throw error;
|
||||
} finally {
|
||||
await dbTx.release();
|
||||
}
|
||||
|
||||
return res;
|
||||
return this._db.getHookStatus();
|
||||
}
|
||||
|
||||
async updateHookStatusProcessedBlock (blockNumber: number, force?: boolean): Promise<HookStatus> {
|
||||
@ -654,12 +418,7 @@ export class Indexer implements IndexerInterface {
|
||||
const hookStatus = await this.getHookStatus();
|
||||
assert(hookStatus);
|
||||
|
||||
const blocksAtHeight = await this.getBlocksAtHeight(hookStatus.latestProcessedBlockNumber, false);
|
||||
|
||||
// There can exactly one block at hookStatus.latestProcessedBlockNumber height.
|
||||
assert(blocksAtHeight.length === 1);
|
||||
|
||||
return blocksAtHeight[0];
|
||||
return this._baseIndexer.getLatestHooksProcessedBlock(hookStatus);
|
||||
}
|
||||
|
||||
async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<void> {
|
||||
|
@ -1,17 +0,0 @@
|
||||
//
|
||||
// Copyright 2021 Vulcanize, Inc.
|
||||
//
|
||||
|
||||
import { create, IPFSHTTPClient } from 'ipfs-http-client';
|
||||
|
||||
export class IPFSClient {
|
||||
_client: IPFSHTTPClient
|
||||
|
||||
constructor (url: string) {
|
||||
this._client = create({ url });
|
||||
}
|
||||
|
||||
async push (data: any): Promise<void> {
|
||||
await this._client.dag.put(data, { format: 'dag-cbor', hashAlg: 'sha2-256' });
|
||||
}
|
||||
}
|
@ -58,10 +58,10 @@ export class JobRunner {
|
||||
|
||||
await this._baseJobRunner.processBlock(job);
|
||||
|
||||
const { data: { kind, blockHash } } = job;
|
||||
const { data: { kind, blockHash, blockNumber } } = job;
|
||||
|
||||
if (kind === JOB_KIND_INDEX) {
|
||||
await this._indexer.processBlock(blockHash);
|
||||
await this._indexer.processBlock(blockHash, blockNumber);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -8,7 +8,7 @@ import debug from 'debug';
|
||||
import Decimal from 'decimal.js';
|
||||
import { GraphQLScalarType } from 'graphql';
|
||||
|
||||
import { ValueResult, BlockHeight } from '@vulcanize/util';
|
||||
import { ValueResult, BlockHeight, STATE_KIND_DIFF } from '@vulcanize/util';
|
||||
|
||||
import { Indexer } from './indexer';
|
||||
import { EventWatcher } from './events';
|
||||
@ -122,7 +122,7 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch
|
||||
return ipldBlock && ipldBlock.block.isComplete ? indexer.getResultIPLDBlock(ipldBlock) : undefined;
|
||||
},
|
||||
|
||||
getState: async (_: any, { blockHash, contractAddress, kind = 'diff' }: { blockHash: string, contractAddress: string, kind: string }) => {
|
||||
getState: async (_: any, { blockHash, contractAddress, kind = STATE_KIND_DIFF }: { blockHash: string, contractAddress: string, kind: string }) => {
|
||||
log('getState', blockHash, contractAddress, kind);
|
||||
|
||||
const ipldBlock = await indexer.getPrevIPLDBlock(blockHash, contractAddress, kind);
|
||||
|
@ -14,3 +14,6 @@ export * from './src/indexer';
|
||||
export * from './src/job-runner';
|
||||
export * from './src/ipld-helper';
|
||||
export * from './src/graph-decimal';
|
||||
export * from './src/ipld-indexer';
|
||||
export * from './src/ipld-database';
|
||||
export * from './src/ipfs';
|
||||
|
@ -23,3 +23,8 @@ export const UNKNOWN_EVENT_NAME = '__unknown__';
|
||||
|
||||
export const KIND_ACTIVE = 'active';
|
||||
export const KIND_LAZY = 'lazy';
|
||||
|
||||
export const STATE_KIND_INIT = 'init';
|
||||
export const STATE_KIND_DIFF_STAGED = 'diff_staged';
|
||||
export const STATE_KIND_DIFF = 'diff';
|
||||
export const STATE_KIND_CHECKPOINT = 'checkpoint';
|
||||
|
154
packages/util/src/ipld-database.ts
Normal file
154
packages/util/src/ipld-database.ts
Normal file
@ -0,0 +1,154 @@
|
||||
//
|
||||
// Copyright 2021 Vulcanize, Inc.
|
||||
//
|
||||
|
||||
import { FindConditions, MoreThan, Repository } from 'typeorm';
|
||||
|
||||
import { IPLDBlockInterface, HookStatusInterface } from './types';
|
||||
import { Database } from './database';
|
||||
import { MAX_REORG_DEPTH, STATE_KIND_DIFF_STAGED, STATE_KIND_DIFF } from './constants';
|
||||
|
||||
export class IPLDDatabase extends Database {
|
||||
async getLatestIPLDBlock (repo: Repository<IPLDBlockInterface>, contractAddress: string, kind: string | null, blockNumber?: number): Promise<IPLDBlockInterface | undefined> {
|
||||
let queryBuilder = repo.createQueryBuilder('ipld_block')
|
||||
.leftJoinAndSelect('ipld_block.block', 'block')
|
||||
.where('block.is_pruned = false')
|
||||
.andWhere('ipld_block.contract_address = :contractAddress', { contractAddress })
|
||||
.orderBy('block.block_number', 'DESC');
|
||||
|
||||
// Filter out blocks after the provided block number.
|
||||
if (blockNumber) {
|
||||
queryBuilder.andWhere('block.block_number <= :blockNumber', { blockNumber });
|
||||
}
|
||||
|
||||
// Filter using kind if specified else order by id to give preference to checkpoint.
|
||||
queryBuilder = kind
|
||||
? queryBuilder.andWhere('ipld_block.kind = :kind', { kind })
|
||||
: queryBuilder.andWhere('ipld_block.kind != :kind', { kind: STATE_KIND_DIFF_STAGED })
|
||||
.addOrderBy('ipld_block.id', 'DESC');
|
||||
|
||||
return queryBuilder.getOne();
|
||||
}
|
||||
|
||||
async getPrevIPLDBlock (repo: Repository<IPLDBlockInterface>, blockHash: string, contractAddress: string, kind?: string): Promise<IPLDBlockInterface | undefined> {
|
||||
const heirerchicalQuery = `
|
||||
WITH RECURSIVE cte_query AS
|
||||
(
|
||||
SELECT
|
||||
b.block_hash,
|
||||
b.block_number,
|
||||
b.parent_hash,
|
||||
1 as depth,
|
||||
i.id,
|
||||
i.kind
|
||||
FROM
|
||||
block_progress b
|
||||
LEFT JOIN
|
||||
ipld_block i ON i.block_id = b.id
|
||||
AND i.contract_address = $2
|
||||
WHERE
|
||||
b.block_hash = $1
|
||||
UNION ALL
|
||||
SELECT
|
||||
b.block_hash,
|
||||
b.block_number,
|
||||
b.parent_hash,
|
||||
c.depth + 1,
|
||||
i.id,
|
||||
i.kind
|
||||
FROM
|
||||
block_progress b
|
||||
LEFT JOIN
|
||||
ipld_block i
|
||||
ON i.block_id = b.id
|
||||
AND i.contract_address = $2
|
||||
INNER JOIN
|
||||
cte_query c ON c.parent_hash = b.block_hash
|
||||
WHERE
|
||||
c.depth < $3
|
||||
)
|
||||
SELECT
|
||||
block_number, id, kind
|
||||
FROM
|
||||
cte_query
|
||||
ORDER BY block_number DESC, id DESC
|
||||
`;
|
||||
|
||||
// Fetching block and id for previous IPLDBlock in frothy region.
|
||||
const queryResult = await repo.query(heirerchicalQuery, [blockHash, contractAddress, MAX_REORG_DEPTH]);
|
||||
const latestRequiredResult = kind
|
||||
? queryResult.find((obj: any) => obj.kind === kind)
|
||||
: queryResult.find((obj: any) => obj.id);
|
||||
|
||||
let result: IPLDBlockInterface | undefined;
|
||||
|
||||
if (latestRequiredResult) {
|
||||
result = await repo.findOne(latestRequiredResult.id, { relations: ['block'] });
|
||||
} else {
|
||||
// If IPLDBlock not found in frothy region get latest IPLDBlock in the pruned region.
|
||||
// Filter out IPLDBlocks from pruned blocks.
|
||||
const canonicalBlockNumber = queryResult.pop().block_number + 1;
|
||||
|
||||
let queryBuilder = repo.createQueryBuilder('ipld_block')
|
||||
.leftJoinAndSelect('ipld_block.block', 'block')
|
||||
.where('block.is_pruned = false')
|
||||
.andWhere('ipld_block.contract_address = :contractAddress', { contractAddress })
|
||||
.andWhere('block.block_number <= :canonicalBlockNumber', { canonicalBlockNumber })
|
||||
.orderBy('block.block_number', 'DESC');
|
||||
|
||||
// Filter using kind if specified else order by id to give preference to checkpoint.
|
||||
queryBuilder = kind
|
||||
? queryBuilder.andWhere('ipld_block.kind = :kind', { kind })
|
||||
: queryBuilder.addOrderBy('ipld_block.id', 'DESC');
|
||||
|
||||
result = await queryBuilder.getOne();
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
async getIPLDBlocks (repo: Repository<IPLDBlockInterface>, where: FindConditions<IPLDBlockInterface>): Promise<IPLDBlockInterface[]> {
|
||||
return repo.find({ where, relations: ['block'] });
|
||||
}
|
||||
|
||||
async getDiffIPLDBlocksByBlocknumber (repo: Repository<IPLDBlockInterface>, contractAddress: string, blockNumber: number): Promise<IPLDBlockInterface[]> {
|
||||
return repo.find({
|
||||
relations: ['block'],
|
||||
where: {
|
||||
contractAddress,
|
||||
kind: STATE_KIND_DIFF,
|
||||
block: {
|
||||
isPruned: false,
|
||||
blockNumber: MoreThan(blockNumber)
|
||||
}
|
||||
},
|
||||
order: {
|
||||
block: 'ASC'
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async saveOrUpdateIPLDBlock (repo: Repository<IPLDBlockInterface>, ipldBlock: IPLDBlockInterface): Promise<IPLDBlockInterface> {
|
||||
return repo.save(ipldBlock);
|
||||
}
|
||||
|
||||
async getHookStatus (repo: Repository<HookStatusInterface>): Promise<HookStatusInterface | undefined> {
|
||||
return repo.findOne();
|
||||
}
|
||||
|
||||
async updateHookStatusProcessedBlock (repo: Repository<HookStatusInterface>, blockNumber: number, force?: boolean): Promise<HookStatusInterface> {
|
||||
let entity = await repo.findOne();
|
||||
|
||||
if (!entity) {
|
||||
entity = repo.create({
|
||||
latestProcessedBlockNumber: blockNumber
|
||||
});
|
||||
}
|
||||
|
||||
if (force || blockNumber > entity.latestProcessedBlockNumber) {
|
||||
entity.latestProcessedBlockNumber = blockNumber;
|
||||
}
|
||||
|
||||
return repo.save(entity);
|
||||
}
|
||||
}
|
396
packages/util/src/ipld-indexer.ts
Normal file
396
packages/util/src/ipld-indexer.ts
Normal file
@ -0,0 +1,396 @@
|
||||
//
|
||||
// Copyright 2021 Vulcanize, Inc.
|
||||
//
|
||||
|
||||
import assert from 'assert';
|
||||
import { ethers } from 'ethers';
|
||||
import { sha256 } from 'multiformats/hashes/sha2';
|
||||
import { CID } from 'multiformats/cid';
|
||||
import _ from 'lodash';
|
||||
|
||||
import { EthClient } from '@vulcanize/ipld-eth-client';
|
||||
import * as codec from '@ipld/dag-cbor';
|
||||
|
||||
import {
|
||||
IPLDDatabaseInterface,
|
||||
IndexerInterface,
|
||||
BlockProgressInterface,
|
||||
IPLDBlockInterface,
|
||||
HookStatusInterface
|
||||
} from './types';
|
||||
import { Indexer } from './indexer';
|
||||
import { ServerConfig } from './config';
|
||||
import { IPFSClient } from './ipfs';
|
||||
import {
|
||||
STATE_KIND_INIT,
|
||||
STATE_KIND_DIFF_STAGED,
|
||||
STATE_KIND_DIFF,
|
||||
STATE_KIND_CHECKPOINT
|
||||
} from './constants';
|
||||
import { JobQueue } from './job-queue';
|
||||
|
||||
export class IPLDIndexer extends Indexer {
|
||||
_serverConfig: ServerConfig;
|
||||
_ipldDb: IPLDDatabaseInterface;
|
||||
_ipfsClient: IPFSClient;
|
||||
|
||||
constructor (
|
||||
serverConfig: ServerConfig,
|
||||
ipldDb: IPLDDatabaseInterface,
|
||||
ethClient: EthClient,
|
||||
postgraphileClient: EthClient,
|
||||
ethProvider: ethers.providers.BaseProvider,
|
||||
jobQueue: JobQueue,
|
||||
ipfsClient: IPFSClient
|
||||
) {
|
||||
super(ipldDb, ethClient, postgraphileClient, ethProvider, jobQueue);
|
||||
|
||||
this._serverConfig = serverConfig;
|
||||
this._ipldDb = ipldDb;
|
||||
this._ipfsClient = ipfsClient;
|
||||
}
|
||||
|
||||
getIPLDData (ipldBlock: IPLDBlockInterface): any {
|
||||
return codec.decode(Buffer.from(ipldBlock.data));
|
||||
}
|
||||
|
||||
async pushToIPFS (data: any): Promise<void> {
|
||||
await this._ipfsClient.push(data);
|
||||
}
|
||||
|
||||
isIPFSConfigured (): boolean {
|
||||
const ipfsAddr = this._serverConfig.ipfsApiAddr;
|
||||
|
||||
// Return false if ipfsAddr is undefined | null | empty string.
|
||||
return (ipfsAddr !== undefined && ipfsAddr !== null && ipfsAddr !== '');
|
||||
}
|
||||
|
||||
async getLatestHooksProcessedBlock (hookStatus: HookStatusInterface): Promise<BlockProgressInterface> {
|
||||
const blocksAtHeight = await this.getBlocksAtHeight(hookStatus.latestProcessedBlockNumber, false);
|
||||
|
||||
// There can exactly one block at hookStatus.latestProcessedBlockNumber height.
|
||||
assert(blocksAtHeight.length === 1);
|
||||
|
||||
return blocksAtHeight[0];
|
||||
}
|
||||
|
||||
async processCheckpoint (indexer: IndexerInterface, blockHash: string, checkpointInterval: number): Promise<void> {
|
||||
// Get all the contracts.
|
||||
assert(this._ipldDb.getContracts);
|
||||
const contracts = await this._ipldDb.getContracts();
|
||||
|
||||
// For each contract, merge the diff till now to create a checkpoint.
|
||||
for (const contract of contracts) {
|
||||
// Check if contract has checkpointing on.
|
||||
if (contract.checkpoint) {
|
||||
await this.createCheckpoint(indexer, contract.address, blockHash, null, checkpointInterval);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async processCLICheckpoint (indexer: IndexerInterface, contractAddress: string, blockHash?: string): Promise<string | undefined> {
|
||||
const checkpointBlockHash = await this.createCheckpoint(indexer, contractAddress, blockHash);
|
||||
assert(checkpointBlockHash);
|
||||
|
||||
// Push checkpoint to IPFS if configured.
|
||||
if (this.isIPFSConfigured()) {
|
||||
const block = await this.getBlockProgress(checkpointBlockHash);
|
||||
const checkpointIPLDBlocks = await this._ipldDb.getIPLDBlocks({ block, contractAddress, kind: STATE_KIND_CHECKPOINT });
|
||||
|
||||
// There can be at most one IPLDBlock for a (block, contractAddress, kind) combination.
|
||||
assert(checkpointIPLDBlocks.length <= 1);
|
||||
const checkpointIPLDBlock = checkpointIPLDBlocks[0];
|
||||
|
||||
const checkpointData = this.getIPLDData(checkpointIPLDBlock);
|
||||
await this.pushToIPFS(checkpointData);
|
||||
}
|
||||
|
||||
return checkpointBlockHash;
|
||||
}
|
||||
|
||||
async createInit (
|
||||
indexer: IndexerInterface,
|
||||
blockHash: string,
|
||||
blockNumber: number
|
||||
): Promise<void> {
|
||||
// Get all the contracts.
|
||||
assert(this._ipldDb.getContracts);
|
||||
const contracts = await this._ipldDb.getContracts();
|
||||
|
||||
// Create an initial state for each contract.
|
||||
for (const contract of contracts) {
|
||||
// Check if contract has checkpointing on.
|
||||
if (contract.checkpoint) {
|
||||
// Check if a 'diff' | 'checkpoint' ipldBlock already exists or blockNumber is < to startingBlock.
|
||||
const existingIpldBlock = await this._ipldDb.getLatestIPLDBlock(contract.address, null);
|
||||
|
||||
if (existingIpldBlock || blockNumber < contract.startingBlock) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Call initial state hook.
|
||||
assert(indexer.createInitialState);
|
||||
const stateData = await indexer.createInitialState(contract.address, blockHash);
|
||||
|
||||
const block = await this.getBlockProgress(blockHash);
|
||||
assert(block);
|
||||
|
||||
const ipldBlock = await this.prepareIPLDBlock(block, contract.address, stateData, STATE_KIND_INIT);
|
||||
await this.saveOrUpdateIPLDBlock(ipldBlock);
|
||||
|
||||
// Push initial state to IPFS if configured.
|
||||
if (this.isIPFSConfigured()) {
|
||||
const ipldData = this.getIPLDData(ipldBlock);
|
||||
await this.pushToIPFS(ipldData);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async createDiffStaged (contractAddress: string, blockHash: string, data: any): Promise<void> {
|
||||
const block = await this.getBlockProgress(blockHash);
|
||||
assert(block);
|
||||
|
||||
// Create a staged diff block.
|
||||
const ipldBlock = await this.prepareIPLDBlock(block, contractAddress, data, STATE_KIND_DIFF_STAGED);
|
||||
await this.saveOrUpdateIPLDBlock(ipldBlock);
|
||||
}
|
||||
|
||||
async finalizeDiffStaged (blockHash: string): Promise<void> {
|
||||
const block = await this.getBlockProgress(blockHash);
|
||||
assert(block);
|
||||
|
||||
// Get all the staged diff blocks for the given blockHash.
|
||||
const stagedBlocks = await this._ipldDb.getIPLDBlocks({ block, kind: STATE_KIND_DIFF_STAGED });
|
||||
|
||||
// For each staged block, create a diff block.
|
||||
for (const stagedBlock of stagedBlocks) {
|
||||
const data = codec.decode(Buffer.from(stagedBlock.data));
|
||||
await this.createDiff(stagedBlock.contractAddress, stagedBlock.block.blockHash, data);
|
||||
}
|
||||
|
||||
// Remove all the staged diff blocks for current blockNumber.
|
||||
await this.removeIPLDBlocks(block.blockNumber, STATE_KIND_DIFF_STAGED);
|
||||
}
|
||||
|
||||
async createDiff (contractAddress: string, blockHash: string, data: any): Promise<void> {
|
||||
const block = await this.getBlockProgress(blockHash);
|
||||
assert(block);
|
||||
|
||||
// Fetch the latest checkpoint for the contract.
|
||||
const checkpoint = await this._ipldDb.getLatestIPLDBlock(contractAddress, STATE_KIND_CHECKPOINT);
|
||||
|
||||
// There should be an initial state at least.
|
||||
if (!checkpoint) {
|
||||
// Fetch the initial state for the contract.
|
||||
const initState = await this._ipldDb.getLatestIPLDBlock(contractAddress, STATE_KIND_INIT);
|
||||
assert(initState, 'No initial state found');
|
||||
} else {
|
||||
// Check if the latest checkpoint is in the same block.
|
||||
assert(checkpoint.block.blockHash !== block.blockHash, 'Checkpoint already created for the block hash');
|
||||
}
|
||||
|
||||
const ipldBlock = await this.prepareIPLDBlock(block, contractAddress, data, STATE_KIND_DIFF);
|
||||
await this.saveOrUpdateIPLDBlock(ipldBlock);
|
||||
}
|
||||
|
||||
async createCheckpoint (indexer: IndexerInterface, contractAddress: string, blockHash?: string, data?: any, checkpointInterval?: number): Promise<string | undefined> {
|
||||
// Get current hookStatus.
|
||||
const hookStatus = await this._ipldDb.getHookStatus();
|
||||
assert(hookStatus);
|
||||
|
||||
// Getting the current block.
|
||||
let currentBlock;
|
||||
|
||||
if (blockHash) {
|
||||
currentBlock = await this.getBlockProgress(blockHash);
|
||||
} else {
|
||||
// In case of empty blockHash from checkpoint CLI, get the latest processed block from hookStatus for the checkpoint.
|
||||
currentBlock = await this.getLatestHooksProcessedBlock(hookStatus);
|
||||
}
|
||||
|
||||
assert(currentBlock);
|
||||
|
||||
// Data is passed in case of checkpoint hook.
|
||||
if (data) {
|
||||
// Create a checkpoint from the hook data without being concerned about diffs.
|
||||
const ipldBlock = await this.prepareIPLDBlock(currentBlock, contractAddress, data, STATE_KIND_CHECKPOINT);
|
||||
await this.saveOrUpdateIPLDBlock(ipldBlock);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// If data is not passed, create from previous 'checkpoint' | 'init' and diffs after that.
|
||||
|
||||
// Make sure the block is marked complete.
|
||||
assert(currentBlock.isComplete, 'Block for a checkpoint should be marked as complete');
|
||||
|
||||
// Make sure the hooks have been processed for the block.
|
||||
assert(currentBlock.blockNumber <= hookStatus.latestProcessedBlockNumber, 'Block for a checkpoint should have hooks processed');
|
||||
|
||||
// Call state checkpoint hook and check if default checkpoint is disabled.
|
||||
assert(indexer.createStateCheckpoint);
|
||||
const disableDefaultCheckpoint = await indexer.createStateCheckpoint(contractAddress, currentBlock.blockHash);
|
||||
|
||||
if (disableDefaultCheckpoint) {
|
||||
// Return if default checkpoint is disabled.
|
||||
// Return block hash for checkpoint CLI.
|
||||
return currentBlock.blockHash;
|
||||
}
|
||||
|
||||
// Fetch the latest 'checkpoint' | 'init' for the contract.
|
||||
let prevNonDiffBlock: IPLDBlockInterface;
|
||||
let getDiffBlockNumber: number;
|
||||
const checkpointBlock = await this._ipldDb.getLatestIPLDBlock(contractAddress, STATE_KIND_CHECKPOINT, currentBlock.blockNumber);
|
||||
|
||||
if (checkpointBlock) {
|
||||
prevNonDiffBlock = checkpointBlock;
|
||||
getDiffBlockNumber = checkpointBlock.block.blockNumber;
|
||||
|
||||
// Check (only if checkpointInterval is passed) if it is time for a new checkpoint.
|
||||
if (checkpointInterval && checkpointBlock.block.blockNumber > (currentBlock.blockNumber - checkpointInterval)) {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
// There should be an initial state at least.
|
||||
const initBlock = await this._ipldDb.getLatestIPLDBlock(contractAddress, STATE_KIND_INIT);
|
||||
assert(initBlock, 'No initial state found');
|
||||
|
||||
prevNonDiffBlock = initBlock;
|
||||
// Take block number previous to initial state block to get diffs after that.
|
||||
getDiffBlockNumber = initBlock.block.blockNumber - 1;
|
||||
}
|
||||
|
||||
// Fetching all diff blocks after the latest 'checkpoint' | 'init'.
|
||||
const diffBlocks = await this._ipldDb.getDiffIPLDBlocksByBlocknumber(contractAddress, getDiffBlockNumber);
|
||||
|
||||
const prevNonDiffBlockData = codec.decode(Buffer.from(prevNonDiffBlock.data)) as any;
|
||||
data = {
|
||||
state: prevNonDiffBlockData.state
|
||||
};
|
||||
|
||||
for (const diffBlock of diffBlocks) {
|
||||
const diff = codec.decode(Buffer.from(diffBlock.data)) as any;
|
||||
data.state = _.merge(data.state, diff.state);
|
||||
}
|
||||
|
||||
const ipldBlock = await this.prepareIPLDBlock(currentBlock, contractAddress, data, STATE_KIND_CHECKPOINT);
|
||||
await this.saveOrUpdateIPLDBlock(ipldBlock);
|
||||
|
||||
return currentBlock.blockHash;
|
||||
}
|
||||
|
||||
async prepareIPLDBlock (block: BlockProgressInterface, contractAddress: string, data: any, kind: string):Promise<any> {
|
||||
assert(_.includes([
|
||||
STATE_KIND_INIT,
|
||||
STATE_KIND_DIFF_STAGED,
|
||||
STATE_KIND_DIFF,
|
||||
STATE_KIND_CHECKPOINT
|
||||
], kind));
|
||||
|
||||
// Get an existing 'init' | 'diff' | 'diff_staged' | 'checkpoint' IPLDBlock for current block, contractAddress.
|
||||
const currentIPLDBlocks = await this._ipldDb.getIPLDBlocks({ block, contractAddress, kind });
|
||||
|
||||
// There can be at most one IPLDBlock for a (block, contractAddress, kind) combination.
|
||||
assert(currentIPLDBlocks.length <= 1);
|
||||
const currentIPLDBlock = currentIPLDBlocks[0];
|
||||
|
||||
// Update currentIPLDBlock of same kind if it exists.
|
||||
let ipldBlock;
|
||||
|
||||
if (currentIPLDBlock) {
|
||||
ipldBlock = currentIPLDBlock;
|
||||
|
||||
// Update the data field.
|
||||
const oldData = codec.decode(Buffer.from(currentIPLDBlock.data));
|
||||
data = _.merge(oldData, data);
|
||||
} else {
|
||||
ipldBlock = this._ipldDb.getNewIPLDBlock();
|
||||
|
||||
// Fetch the parent IPLDBlock.
|
||||
const parentIPLDBlock = await this._ipldDb.getLatestIPLDBlock(contractAddress, null, block.blockNumber);
|
||||
|
||||
// Setting the meta-data for an IPLDBlock (done only once per block).
|
||||
data.meta = {
|
||||
id: contractAddress,
|
||||
kind,
|
||||
parent: {
|
||||
'/': parentIPLDBlock ? parentIPLDBlock.cid : null
|
||||
},
|
||||
ethBlock: {
|
||||
cid: {
|
||||
'/': block.cid
|
||||
},
|
||||
num: block.blockNumber
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// Encoding the data using dag-cbor codec.
|
||||
const bytes = codec.encode(data);
|
||||
|
||||
// Calculating sha256 (multi)hash of the encoded data.
|
||||
const hash = await sha256.digest(bytes);
|
||||
|
||||
// Calculating the CID: v1, code: dag-cbor, hash.
|
||||
const cid = CID.create(1, codec.code, hash);
|
||||
|
||||
// Update ipldBlock with new data.
|
||||
ipldBlock = Object.assign(ipldBlock, {
|
||||
block,
|
||||
contractAddress,
|
||||
cid: cid.toString(),
|
||||
kind: data.meta.kind,
|
||||
data: Buffer.from(bytes)
|
||||
});
|
||||
|
||||
return ipldBlock;
|
||||
}
|
||||
|
||||
async getIPLDBlocksByHash (blockHash: string): Promise<IPLDBlockInterface[]> {
|
||||
const block = await this.getBlockProgress(blockHash);
|
||||
assert(block);
|
||||
|
||||
return this._ipldDb.getIPLDBlocks({ block });
|
||||
}
|
||||
|
||||
async getIPLDBlockByCid (cid: string): Promise<IPLDBlockInterface | undefined> {
|
||||
const ipldBlocks = await this._ipldDb.getIPLDBlocks({ cid });
|
||||
|
||||
// There can be only one IPLDBlock with a particular cid.
|
||||
assert(ipldBlocks.length <= 1);
|
||||
|
||||
return ipldBlocks[0];
|
||||
}
|
||||
|
||||
async saveOrUpdateIPLDBlock (ipldBlock: IPLDBlockInterface): Promise<IPLDBlockInterface> {
|
||||
const dbTx = await this._db.createTransactionRunner();
|
||||
let res;
|
||||
|
||||
try {
|
||||
res = await this._ipldDb.saveOrUpdateIPLDBlock(dbTx, ipldBlock);
|
||||
await dbTx.commitTransaction();
|
||||
} catch (error) {
|
||||
await dbTx.rollbackTransaction();
|
||||
throw error;
|
||||
} finally {
|
||||
await dbTx.release();
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
async removeIPLDBlocks (blockNumber: number, kind: string): Promise<void> {
|
||||
const dbTx = await this._db.createTransactionRunner();
|
||||
|
||||
try {
|
||||
await this._ipldDb.removeIPLDBlocks(dbTx, blockNumber, kind);
|
||||
await dbTx.commitTransaction();
|
||||
} catch (error) {
|
||||
await dbTx.rollbackTransaction();
|
||||
throw error;
|
||||
} finally {
|
||||
await dbTx.release();
|
||||
}
|
||||
}
|
||||
}
|
@ -31,6 +31,11 @@ export interface SyncStatusInterface {
|
||||
latestCanonicalBlockNumber: number;
|
||||
}
|
||||
|
||||
export interface HookStatusInterface {
|
||||
id: number;
|
||||
latestProcessedBlockNumber: number
|
||||
}
|
||||
|
||||
export interface EventInterface {
|
||||
id: number;
|
||||
block: BlockProgressInterface;
|
||||
@ -51,6 +56,15 @@ export interface ContractInterface {
|
||||
checkpoint: boolean;
|
||||
}
|
||||
|
||||
export interface IPLDBlockInterface {
|
||||
id: number;
|
||||
block: BlockProgressInterface;
|
||||
contractAddress: string;
|
||||
cid: string;
|
||||
kind: string;
|
||||
data: Buffer;
|
||||
}
|
||||
|
||||
export interface IndexerInterface {
|
||||
getBlockProgress (blockHash: string): Promise<BlockProgressInterface | undefined>
|
||||
getBlockProgressEntities (where: FindConditions<BlockProgressInterface>, options: FindManyOptions<BlockProgressInterface>): Promise<BlockProgressInterface[]>
|
||||
@ -72,9 +86,11 @@ export interface IndexerInterface {
|
||||
parseEventNameAndArgs?: (kind: string, logObj: any) => any;
|
||||
isWatchedContract?: (address: string) => Promise<ContractInterface | undefined>;
|
||||
cacheContract?: (contract: ContractInterface) => void;
|
||||
createDiffStaged?: (contractAddress: string, blockHash: string, data: any) => Promise<void>
|
||||
watchContract?: (address: string, kind: string, checkpoint: boolean, startingBlock: number) => Promise<void>
|
||||
getEntityTypesMap?: () => Map<string, { [key: string]: string }>
|
||||
createDiffStaged?: (contractAddress: string, blockHash: string, data: any) => Promise<void>
|
||||
createInitialState?: (contractAddress: string, blockHash: string) => Promise<any>
|
||||
createStateCheckpoint?: (contractAddress: string, blockHash: string) => Promise<boolean>
|
||||
}
|
||||
|
||||
export interface EventWatcherInterface {
|
||||
@ -108,3 +124,13 @@ export interface DatabaseInterface {
|
||||
getContracts?: () => Promise<ContractInterface[]>
|
||||
saveContract?: (queryRunner: QueryRunner, contractAddress: string, kind: string, checkpoint: boolean, startingBlock: number) => Promise<ContractInterface>
|
||||
}
|
||||
|
||||
export interface IPLDDatabaseInterface extends DatabaseInterface {
|
||||
getLatestIPLDBlock (contractAddress: string, kind: string | null, blockNumber?: number): Promise<IPLDBlockInterface | undefined>
|
||||
getIPLDBlocks (where: FindConditions<IPLDBlockInterface>): Promise<IPLDBlockInterface[]>
|
||||
getDiffIPLDBlocksByBlocknumber (contractAddress: string, blockNumber: number): Promise<IPLDBlockInterface[]>
|
||||
getNewIPLDBlock (): IPLDBlockInterface
|
||||
removeIPLDBlocks(dbTx: QueryRunner, blockNumber: number, kind: string): Promise<void>
|
||||
saveOrUpdateIPLDBlock (dbTx: QueryRunner, ipldBlock: IPLDBlockInterface): Promise<IPLDBlockInterface>
|
||||
getHookStatus (): Promise<HookStatusInterface | undefined>
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user