mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-01-24 03:59:06 +00:00
Performance improvements for IPLDBlocks in eden watcher (#80)
* Avoid fetching contracts while creating a checkpoint * Use enum for state kind and cache ipld status * Avoid fetching block twice while finalizing a staged diff IPLDBlock * Create checkpoints at fixed block numbers and refactor checkpointing code * Avoid calling block handler until start block is reached * Use delete while finalizing staged diff IPLDBlocks * Add a check to ensure hooks job is created only once * Avoid check for initial state while creating a checkpoint
This commit is contained in:
parent
952f68ee58
commit
560df57ac7
@ -9,7 +9,7 @@ import debug from 'debug';
|
|||||||
import fs from 'fs';
|
import fs from 'fs';
|
||||||
import path from 'path';
|
import path from 'path';
|
||||||
|
|
||||||
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, STATE_KIND_CHECKPOINT } from '@vulcanize/util';
|
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, StateKind } from '@vulcanize/util';
|
||||||
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
|
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
|
||||||
import * as codec from '@ipld/dag-cbor';
|
import * as codec from '@ipld/dag-cbor';
|
||||||
|
|
||||||
@ -94,7 +94,7 @@ const main = async (): Promise<void> => {
|
|||||||
if (contract.checkpoint) {
|
if (contract.checkpoint) {
|
||||||
await indexer.createCheckpoint(contract.address, block.blockHash);
|
await indexer.createCheckpoint(contract.address, block.blockHash);
|
||||||
|
|
||||||
const ipldBlock = await indexer.getLatestIPLDBlock(contract.address, STATE_KIND_CHECKPOINT, block.blockNumber);
|
const ipldBlock = await indexer.getLatestIPLDBlock(contract.address, StateKind.Checkpoint, block.blockNumber);
|
||||||
assert(ipldBlock);
|
assert(ipldBlock);
|
||||||
|
|
||||||
const data = indexer.getIPLDData(ipldBlock);
|
const data = indexer.getIPLDData(ipldBlock);
|
||||||
|
@ -11,7 +11,7 @@ import { PubSub } from 'apollo-server-express';
|
|||||||
import fs from 'fs';
|
import fs from 'fs';
|
||||||
import path from 'path';
|
import path from 'path';
|
||||||
|
|
||||||
import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, Config, initClients, STATE_KIND_INIT, STATE_KIND_DIFF_STAGED } from '@vulcanize/util';
|
import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, Config, initClients, StateKind } from '@vulcanize/util';
|
||||||
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
|
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
|
||||||
import * as codec from '@ipld/dag-cbor';
|
import * as codec from '@ipld/dag-cbor';
|
||||||
|
|
||||||
@ -112,8 +112,8 @@ export const main = async (): Promise<any> => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// The 'diff_staged' and 'init' IPLD blocks are unnecessary as checkpoints have been already created for the snapshot block.
|
// The 'diff_staged' and 'init' IPLD blocks are unnecessary as checkpoints have been already created for the snapshot block.
|
||||||
await indexer.removeIPLDBlocks(block.blockNumber, STATE_KIND_INIT);
|
await indexer.removeIPLDBlocks(block.blockNumber, StateKind.Init);
|
||||||
await indexer.removeIPLDBlocks(block.blockNumber, STATE_KIND_DIFF_STAGED);
|
await indexer.removeIPLDBlocks(block.blockNumber, StateKind.DiffStaged);
|
||||||
};
|
};
|
||||||
|
|
||||||
main().catch(err => {
|
main().catch(err => {
|
||||||
|
@ -6,7 +6,7 @@ import assert from 'assert';
|
|||||||
import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions } from 'typeorm';
|
import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions } from 'typeorm';
|
||||||
import path from 'path';
|
import path from 'path';
|
||||||
|
|
||||||
import { IPLDDatabase as BaseDatabase, IPLDDatabaseInterface, QueryOptions, Where } from '@vulcanize/util';
|
import { IPLDDatabase as BaseDatabase, IPLDDatabaseInterface, QueryOptions, StateKind, Where } from '@vulcanize/util';
|
||||||
|
|
||||||
import { Contract } from './entity/Contract';
|
import { Contract } from './entity/Contract';
|
||||||
import { Event } from './entity/Event';
|
import { Event } from './entity/Event';
|
||||||
@ -49,7 +49,7 @@ export class Database implements IPLDDatabaseInterface {
|
|||||||
return this._baseDatabase.getIPLDBlocks(repo, where);
|
return this._baseDatabase.getIPLDBlocks(repo, where);
|
||||||
}
|
}
|
||||||
|
|
||||||
async getLatestIPLDBlock (contractAddress: string, kind: string | null, blockNumber?: number): Promise<IPLDBlock | undefined> {
|
async getLatestIPLDBlock (contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise<IPLDBlock | undefined> {
|
||||||
const repo = this._conn.getRepository(IPLDBlock);
|
const repo = this._conn.getRepository(IPLDBlock);
|
||||||
|
|
||||||
return this._baseDatabase.getLatestIPLDBlock(repo, contractAddress, kind, blockNumber);
|
return this._baseDatabase.getLatestIPLDBlock(repo, contractAddress, kind, blockNumber);
|
||||||
@ -75,7 +75,9 @@ export class Database implements IPLDDatabaseInterface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async removeIPLDBlocks (dbTx: QueryRunner, blockNumber: number, kind: string): Promise<void> {
|
async removeIPLDBlocks (dbTx: QueryRunner, blockNumber: number, kind: string): Promise<void> {
|
||||||
await this._baseDatabase.removeEntities(dbTx, IPLDBlock, { relations: ['block'], where: { block: { blockNumber }, kind } });
|
const repo = dbTx.manager.getRepository(IPLDBlock);
|
||||||
|
|
||||||
|
await this._baseDatabase.removeIPLDBlocks(repo, blockNumber, kind);
|
||||||
}
|
}
|
||||||
|
|
||||||
async getHookStatus (): Promise<HookStatus | undefined> {
|
async getHookStatus (): Promise<HookStatus | undefined> {
|
||||||
|
@ -3,6 +3,9 @@
|
|||||||
//
|
//
|
||||||
|
|
||||||
import { Entity, PrimaryGeneratedColumn, Column, Index, ManyToOne } from 'typeorm';
|
import { Entity, PrimaryGeneratedColumn, Column, Index, ManyToOne } from 'typeorm';
|
||||||
|
|
||||||
|
import { StateKind } from '@vulcanize/util';
|
||||||
|
|
||||||
import { BlockProgress } from './BlockProgress';
|
import { BlockProgress } from './BlockProgress';
|
||||||
|
|
||||||
@Entity()
|
@Entity()
|
||||||
@ -22,8 +25,11 @@ export class IPLDBlock {
|
|||||||
@Column('varchar')
|
@Column('varchar')
|
||||||
cid!: string;
|
cid!: string;
|
||||||
|
|
||||||
@Column('varchar')
|
@Column({
|
||||||
kind!: string;
|
type: 'enum',
|
||||||
|
enum: StateKind
|
||||||
|
})
|
||||||
|
kind!: StateKind;
|
||||||
|
|
||||||
@Column('bytea')
|
@Column('bytea')
|
||||||
data!: Buffer;
|
data!: Buffer;
|
||||||
|
@ -23,6 +23,7 @@ export async function createInitialState (indexer: Indexer, contractAddress: str
|
|||||||
state: {}
|
state: {}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Return initial state data to be saved.
|
||||||
return ipldBlockData;
|
return ipldBlockData;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -34,6 +35,8 @@ export async function createInitialState (indexer: Indexer, contractAddress: str
|
|||||||
export async function createStateDiff (indexer: Indexer, blockHash: string): Promise<void> {
|
export async function createStateDiff (indexer: Indexer, blockHash: string): Promise<void> {
|
||||||
assert(indexer);
|
assert(indexer);
|
||||||
assert(blockHash);
|
assert(blockHash);
|
||||||
|
|
||||||
|
// Use indexer.createStateDiff() method to create a custom diff.
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -48,6 +51,8 @@ export async function createStateCheckpoint (indexer: Indexer, contractAddress:
|
|||||||
assert(blockHash);
|
assert(blockHash);
|
||||||
assert(contractAddress);
|
assert(contractAddress);
|
||||||
|
|
||||||
|
// Use indexer.createStateCheckpoint() method to create a custom checkpoint.
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -22,7 +22,8 @@ import {
|
|||||||
Where,
|
Where,
|
||||||
QueryOptions,
|
QueryOptions,
|
||||||
BlockHeight,
|
BlockHeight,
|
||||||
IPFSClient
|
IPFSClient,
|
||||||
|
StateKind
|
||||||
} from '@vulcanize/util';
|
} from '@vulcanize/util';
|
||||||
import { GraphWatcher } from '@vulcanize/graph-node';
|
import { GraphWatcher } from '@vulcanize/graph-node';
|
||||||
|
|
||||||
@ -194,6 +195,7 @@ export class Indexer implements IndexerInterface {
|
|||||||
|
|
||||||
async init (): Promise<void> {
|
async init (): Promise<void> {
|
||||||
await this._baseIndexer.fetchContracts();
|
await this._baseIndexer.fetchContracts();
|
||||||
|
await this._baseIndexer.fetchIPLDStatus();
|
||||||
}
|
}
|
||||||
|
|
||||||
getResultEvent (event: Event): ResultEvent {
|
getResultEvent (event: Event): ResultEvent {
|
||||||
@ -255,6 +257,16 @@ export class Indexer implements IndexerInterface {
|
|||||||
await this._baseIndexer.pushToIPFS(data);
|
await this._baseIndexer.pushToIPFS(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async processInitialState (contractAddress: string, blockHash: string): Promise<any> {
|
||||||
|
// Call initial state hook.
|
||||||
|
return createInitialState(this, contractAddress, blockHash);
|
||||||
|
}
|
||||||
|
|
||||||
|
async processStateCheckpoint (contractAddress: string, blockHash: string): Promise<boolean> {
|
||||||
|
// Call checkpoint hook.
|
||||||
|
return createStateCheckpoint(this, contractAddress, blockHash);
|
||||||
|
}
|
||||||
|
|
||||||
async processCanonicalBlock (job: any): Promise<void> {
|
async processCanonicalBlock (job: any): Promise<void> {
|
||||||
const { data: { blockHash } } = job;
|
const { data: { blockHash } } = job;
|
||||||
|
|
||||||
@ -291,7 +303,7 @@ export class Indexer implements IndexerInterface {
|
|||||||
return this._db.getPrevIPLDBlock(blockHash, contractAddress, kind);
|
return this._db.getPrevIPLDBlock(blockHash, contractAddress, kind);
|
||||||
}
|
}
|
||||||
|
|
||||||
async getLatestIPLDBlock (contractAddress: string, kind: string | null, blockNumber?: number): Promise<IPLDBlock | undefined> {
|
async getLatestIPLDBlock (contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise<IPLDBlock | undefined> {
|
||||||
return this._db.getLatestIPLDBlock(contractAddress, kind, blockNumber);
|
return this._db.getLatestIPLDBlock(contractAddress, kind, blockNumber);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -311,10 +323,7 @@ export class Indexer implements IndexerInterface {
|
|||||||
return this._baseIndexer.isIPFSConfigured();
|
return this._baseIndexer.isIPFSConfigured();
|
||||||
}
|
}
|
||||||
|
|
||||||
async createInitialState (contractAddress: string, blockHash: string): Promise<any> {
|
// Method used to create auto diffs (diff_staged).
|
||||||
return createInitialState(this, contractAddress, blockHash);
|
|
||||||
}
|
|
||||||
|
|
||||||
async createDiffStaged (contractAddress: string, blockHash: string, data: any): Promise<void> {
|
async createDiffStaged (contractAddress: string, blockHash: string, data: any): Promise<void> {
|
||||||
console.time('time:indexer#createDiffStaged-auto_diff');
|
console.time('time:indexer#createDiffStaged-auto_diff');
|
||||||
|
|
||||||
@ -323,23 +332,35 @@ export class Indexer implements IndexerInterface {
|
|||||||
console.timeEnd('time:indexer#createDiffStaged-auto_diff');
|
console.timeEnd('time:indexer#createDiffStaged-auto_diff');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Method to be used by createStateDiff hook.
|
||||||
async createDiff (contractAddress: string, blockHash: string, data: any): Promise<void> {
|
async createDiff (contractAddress: string, blockHash: string, data: any): Promise<void> {
|
||||||
await this._baseIndexer.createDiff(contractAddress, blockHash, data);
|
const block = await this.getBlockProgress(blockHash);
|
||||||
|
assert(block);
|
||||||
|
|
||||||
|
await this._baseIndexer.createDiff(contractAddress, block, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
async createStateCheckpoint (contractAddress: string, blockHash: string): Promise<boolean> {
|
// Method to be used by createStateCheckpoint hook.
|
||||||
return createStateCheckpoint(this, contractAddress, blockHash);
|
async createStateCheckpoint (contractAddress: string, blockHash: string, data: any): Promise<void> {
|
||||||
|
const block = await this.getBlockProgress(blockHash);
|
||||||
|
assert(block);
|
||||||
|
|
||||||
|
return this._baseIndexer.createStateCheckpoint(contractAddress, block, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
async createCheckpoint (contractAddress: string, blockHash?: string, data?: any, checkpointInterval?: number): Promise<string | undefined> {
|
// Method to be used by checkpoint CLI.
|
||||||
return this._baseIndexer.createCheckpoint(this, contractAddress, blockHash, data, checkpointInterval);
|
async createCheckpoint (contractAddress: string, blockHash: string): Promise<string | undefined> {
|
||||||
|
const block = await this.getBlockProgress(blockHash);
|
||||||
|
assert(block);
|
||||||
|
|
||||||
|
return this._baseIndexer.createCheckpoint(this, contractAddress, block);
|
||||||
}
|
}
|
||||||
|
|
||||||
async saveOrUpdateIPLDBlock (ipldBlock: IPLDBlock): Promise<IPLDBlock> {
|
async saveOrUpdateIPLDBlock (ipldBlock: IPLDBlock): Promise<IPLDBlock> {
|
||||||
return this._baseIndexer.saveOrUpdateIPLDBlock(ipldBlock);
|
return this._baseIndexer.saveOrUpdateIPLDBlock(ipldBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
async removeIPLDBlocks (blockNumber: number, kind: string): Promise<void> {
|
async removeIPLDBlocks (blockNumber: number, kind: StateKind): Promise<void> {
|
||||||
await this._baseIndexer.removeIPLDBlocks(blockNumber, kind);
|
await this._baseIndexer.removeIPLDBlocks(blockNumber, kind);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -844,13 +865,12 @@ export class Indexer implements IndexerInterface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async getLatestHooksProcessedBlock (): Promise<BlockProgress> {
|
async getLatestHooksProcessedBlock (): Promise<BlockProgress> {
|
||||||
const hookStatus = await this.getHookStatus();
|
return this._baseIndexer.getLatestHooksProcessedBlock();
|
||||||
assert(hookStatus);
|
|
||||||
|
|
||||||
return this._baseIndexer.getLatestHooksProcessedBlock(hookStatus);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<void> {
|
async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<void> {
|
||||||
|
this._baseIndexer.updateIPLDStatusMap(address, {});
|
||||||
|
|
||||||
return this._baseIndexer.watchContract(address, kind, checkpoint, startingBlock);
|
return this._baseIndexer.watchContract(address, kind, checkpoint, startingBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -78,11 +78,19 @@ export class JobRunner {
|
|||||||
|
|
||||||
const hookStatus = await this._indexer.getHookStatus();
|
const hookStatus = await this._indexer.getHookStatus();
|
||||||
|
|
||||||
if (hookStatus && hookStatus.latestProcessedBlockNumber < (blockNumber - 1)) {
|
if (hookStatus) {
|
||||||
const message = `Hooks for blockNumber ${blockNumber - 1} not processed yet, aborting`;
|
if (hookStatus.latestProcessedBlockNumber < (blockNumber - 1)) {
|
||||||
log(message);
|
const message = `Hooks for blockNumber ${blockNumber - 1} not processed yet, aborting`;
|
||||||
|
log(message);
|
||||||
|
|
||||||
throw new Error(message);
|
throw new Error(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (hookStatus.latestProcessedBlockNumber > (blockNumber - 1)) {
|
||||||
|
log(`Hooks for blockNumber ${blockNumber} already processed`);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
await this._indexer.processCanonicalBlock(job);
|
await this._indexer.processCanonicalBlock(job);
|
||||||
|
@ -8,7 +8,7 @@ import debug from 'debug';
|
|||||||
import Decimal from 'decimal.js';
|
import Decimal from 'decimal.js';
|
||||||
import { GraphQLScalarType } from 'graphql';
|
import { GraphQLScalarType } from 'graphql';
|
||||||
|
|
||||||
import { BlockHeight, STATE_KIND_DIFF } from '@vulcanize/util';
|
import { BlockHeight, StateKind } from '@vulcanize/util';
|
||||||
|
|
||||||
import { Indexer } from './indexer';
|
import { Indexer } from './indexer';
|
||||||
import { EventWatcher } from './events';
|
import { EventWatcher } from './events';
|
||||||
@ -217,7 +217,7 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch
|
|||||||
return ipldBlock && ipldBlock.block.isComplete ? indexer.getResultIPLDBlock(ipldBlock) : undefined;
|
return ipldBlock && ipldBlock.block.isComplete ? indexer.getResultIPLDBlock(ipldBlock) : undefined;
|
||||||
},
|
},
|
||||||
|
|
||||||
getState: async (_: any, { blockHash, contractAddress, kind = STATE_KIND_DIFF }: { blockHash: string, contractAddress: string, kind: string }) => {
|
getState: async (_: any, { blockHash, contractAddress, kind = StateKind.Diff }: { blockHash: string, contractAddress: string, kind: string }) => {
|
||||||
log('getState', blockHash, contractAddress, kind);
|
log('getState', blockHash, contractAddress, kind);
|
||||||
|
|
||||||
const ipldBlock = await indexer.getPrevIPLDBlock(blockHash, contractAddress, kind);
|
const ipldBlock = await indexer.getPrevIPLDBlock(blockHash, contractAddress, kind);
|
||||||
|
@ -172,8 +172,8 @@ export class GraphWatcher {
|
|||||||
|
|
||||||
// Call block handler(s) for each contract.
|
// Call block handler(s) for each contract.
|
||||||
for (const dataSource of this._dataSources) {
|
for (const dataSource of this._dataSources) {
|
||||||
// Check if block handler(s) are configured.
|
// Check if block handler(s) are configured and start block has been reached.
|
||||||
if (!dataSource.mapping.blockHandlers) {
|
if (!dataSource.mapping.blockHandlers || blockData.blockNumber < dataSource.source.startBlock) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -9,7 +9,7 @@ import debug from 'debug';
|
|||||||
import fs from 'fs';
|
import fs from 'fs';
|
||||||
import path from 'path';
|
import path from 'path';
|
||||||
|
|
||||||
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, STATE_KIND_CHECKPOINT } from '@vulcanize/util';
|
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, StateKind } from '@vulcanize/util';
|
||||||
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
|
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
|
||||||
import * as codec from '@ipld/dag-cbor';
|
import * as codec from '@ipld/dag-cbor';
|
||||||
|
|
||||||
@ -94,7 +94,7 @@ const main = async (): Promise<void> => {
|
|||||||
if (contract.checkpoint) {
|
if (contract.checkpoint) {
|
||||||
await indexer.createCheckpoint(contract.address, block.blockHash);
|
await indexer.createCheckpoint(contract.address, block.blockHash);
|
||||||
|
|
||||||
const ipldBlock = await indexer.getLatestIPLDBlock(contract.address, STATE_KIND_CHECKPOINT, block.blockNumber);
|
const ipldBlock = await indexer.getLatestIPLDBlock(contract.address, StateKind.Checkpoint, block.blockNumber);
|
||||||
assert(ipldBlock);
|
assert(ipldBlock);
|
||||||
|
|
||||||
const data = indexer.getIPLDData(ipldBlock);
|
const data = indexer.getIPLDData(ipldBlock);
|
||||||
|
@ -11,7 +11,7 @@ import { PubSub } from 'apollo-server-express';
|
|||||||
import fs from 'fs';
|
import fs from 'fs';
|
||||||
import path from 'path';
|
import path from 'path';
|
||||||
|
|
||||||
import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, Config, initClients, STATE_KIND_INIT, STATE_KIND_DIFF_STAGED } from '@vulcanize/util';
|
import { getConfig, fillBlocks, JobQueue, DEFAULT_CONFIG_PATH, Config, initClients, StateKind } from '@vulcanize/util';
|
||||||
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
|
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
|
||||||
import * as codec from '@ipld/dag-cbor';
|
import * as codec from '@ipld/dag-cbor';
|
||||||
|
|
||||||
@ -112,8 +112,8 @@ export const main = async (): Promise<any> => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// The 'diff_staged' and 'init' IPLD blocks are unnecessary as checkpoints have been already created for the snapshot block.
|
// The 'diff_staged' and 'init' IPLD blocks are unnecessary as checkpoints have been already created for the snapshot block.
|
||||||
await indexer.removeIPLDBlocks(block.blockNumber, STATE_KIND_INIT);
|
await indexer.removeIPLDBlocks(block.blockNumber, StateKind.Init);
|
||||||
await indexer.removeIPLDBlocks(block.blockNumber, STATE_KIND_DIFF_STAGED);
|
await indexer.removeIPLDBlocks(block.blockNumber, StateKind.DiffStaged);
|
||||||
};
|
};
|
||||||
|
|
||||||
main().catch(err => {
|
main().catch(err => {
|
||||||
|
@ -6,7 +6,7 @@ import assert from 'assert';
|
|||||||
import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions } from 'typeorm';
|
import { Connection, ConnectionOptions, DeepPartial, FindConditions, QueryRunner, FindManyOptions } from 'typeorm';
|
||||||
import path from 'path';
|
import path from 'path';
|
||||||
|
|
||||||
import { IPLDDatabase as BaseDatabase, IPLDDatabaseInterface, QueryOptions, Where } from '@vulcanize/util';
|
import { IPLDDatabase as BaseDatabase, IPLDDatabaseInterface, QueryOptions, StateKind, Where } from '@vulcanize/util';
|
||||||
|
|
||||||
import { Contract } from './entity/Contract';
|
import { Contract } from './entity/Contract';
|
||||||
import { Event } from './entity/Event';
|
import { Event } from './entity/Event';
|
||||||
@ -83,7 +83,7 @@ export class Database implements IPLDDatabaseInterface {
|
|||||||
return this._baseDatabase.getIPLDBlocks(repo, where);
|
return this._baseDatabase.getIPLDBlocks(repo, where);
|
||||||
}
|
}
|
||||||
|
|
||||||
async getLatestIPLDBlock (contractAddress: string, kind: string | null, blockNumber?: number): Promise<IPLDBlock | undefined> {
|
async getLatestIPLDBlock (contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise<IPLDBlock | undefined> {
|
||||||
const repo = this._conn.getRepository(IPLDBlock);
|
const repo = this._conn.getRepository(IPLDBlock);
|
||||||
|
|
||||||
return this._baseDatabase.getLatestIPLDBlock(repo, contractAddress, kind, blockNumber);
|
return this._baseDatabase.getLatestIPLDBlock(repo, contractAddress, kind, blockNumber);
|
||||||
@ -109,7 +109,9 @@ export class Database implements IPLDDatabaseInterface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async removeIPLDBlocks (dbTx: QueryRunner, blockNumber: number, kind: string): Promise<void> {
|
async removeIPLDBlocks (dbTx: QueryRunner, blockNumber: number, kind: string): Promise<void> {
|
||||||
await this._baseDatabase.removeEntities(dbTx, IPLDBlock, { relations: ['block'], where: { block: { blockNumber }, kind } });
|
const repo = dbTx.manager.getRepository(IPLDBlock);
|
||||||
|
|
||||||
|
await this._baseDatabase.removeIPLDBlocks(repo, blockNumber, kind);
|
||||||
}
|
}
|
||||||
|
|
||||||
async getHookStatus (): Promise<HookStatus | undefined> {
|
async getHookStatus (): Promise<HookStatus | undefined> {
|
||||||
|
@ -3,6 +3,9 @@
|
|||||||
//
|
//
|
||||||
|
|
||||||
import { Entity, PrimaryGeneratedColumn, Column, Index, ManyToOne } from 'typeorm';
|
import { Entity, PrimaryGeneratedColumn, Column, Index, ManyToOne } from 'typeorm';
|
||||||
|
|
||||||
|
import { StateKind } from '@vulcanize/util';
|
||||||
|
|
||||||
import { BlockProgress } from './BlockProgress';
|
import { BlockProgress } from './BlockProgress';
|
||||||
|
|
||||||
@Entity()
|
@Entity()
|
||||||
@ -22,8 +25,11 @@ export class IPLDBlock {
|
|||||||
@Column('varchar')
|
@Column('varchar')
|
||||||
cid!: string;
|
cid!: string;
|
||||||
|
|
||||||
@Column('varchar')
|
@Column({
|
||||||
kind!: string;
|
type: 'enum',
|
||||||
|
enum: StateKind
|
||||||
|
})
|
||||||
|
kind!: StateKind;
|
||||||
|
|
||||||
@Column('bytea')
|
@Column('bytea')
|
||||||
data!: Buffer;
|
data!: Buffer;
|
||||||
|
@ -23,6 +23,7 @@ export async function createInitialState (indexer: Indexer, contractAddress: str
|
|||||||
state: {}
|
state: {}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Return initial state data to be saved.
|
||||||
return ipldBlockData;
|
return ipldBlockData;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -34,6 +35,8 @@ export async function createInitialState (indexer: Indexer, contractAddress: str
|
|||||||
export async function createStateDiff (indexer: Indexer, blockHash: string): Promise<void> {
|
export async function createStateDiff (indexer: Indexer, blockHash: string): Promise<void> {
|
||||||
assert(indexer);
|
assert(indexer);
|
||||||
assert(blockHash);
|
assert(blockHash);
|
||||||
|
|
||||||
|
// Use indexer.createStateDiff() method to create a custom diff.
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -48,6 +51,8 @@ export async function createStateCheckpoint (indexer: Indexer, contractAddress:
|
|||||||
assert(blockHash);
|
assert(blockHash);
|
||||||
assert(contractAddress);
|
assert(contractAddress);
|
||||||
|
|
||||||
|
// Use indexer.createStateCheckpoint() method to create a custom checkpoint.
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -24,7 +24,8 @@ import {
|
|||||||
Where,
|
Where,
|
||||||
QueryOptions,
|
QueryOptions,
|
||||||
BlockHeight,
|
BlockHeight,
|
||||||
IPFSClient
|
IPFSClient,
|
||||||
|
StateKind
|
||||||
} from '@vulcanize/util';
|
} from '@vulcanize/util';
|
||||||
import { GraphWatcher } from '@vulcanize/graph-node';
|
import { GraphWatcher } from '@vulcanize/graph-node';
|
||||||
|
|
||||||
@ -134,6 +135,7 @@ export class Indexer implements IndexerInterface {
|
|||||||
|
|
||||||
async init (): Promise<void> {
|
async init (): Promise<void> {
|
||||||
await this._baseIndexer.fetchContracts();
|
await this._baseIndexer.fetchContracts();
|
||||||
|
await this._baseIndexer.fetchIPLDStatus();
|
||||||
}
|
}
|
||||||
|
|
||||||
getResultEvent (event: Event): ResultEvent {
|
getResultEvent (event: Event): ResultEvent {
|
||||||
@ -254,6 +256,16 @@ export class Indexer implements IndexerInterface {
|
|||||||
await this._baseIndexer.pushToIPFS(data);
|
await this._baseIndexer.pushToIPFS(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async processInitialState (contractAddress: string, blockHash: string): Promise<any> {
|
||||||
|
// Call initial state hook.
|
||||||
|
return createInitialState(this, contractAddress, blockHash);
|
||||||
|
}
|
||||||
|
|
||||||
|
async processStateCheckpoint (contractAddress: string, blockHash: string): Promise<boolean> {
|
||||||
|
// Call checkpoint hook.
|
||||||
|
return createStateCheckpoint(this, contractAddress, blockHash);
|
||||||
|
}
|
||||||
|
|
||||||
async processCanonicalBlock (job: any): Promise<void> {
|
async processCanonicalBlock (job: any): Promise<void> {
|
||||||
const { data: { blockHash } } = job;
|
const { data: { blockHash } } = job;
|
||||||
|
|
||||||
@ -281,7 +293,7 @@ export class Indexer implements IndexerInterface {
|
|||||||
return this._db.getPrevIPLDBlock(blockHash, contractAddress, kind);
|
return this._db.getPrevIPLDBlock(blockHash, contractAddress, kind);
|
||||||
}
|
}
|
||||||
|
|
||||||
async getLatestIPLDBlock (contractAddress: string, kind: string | null, blockNumber?: number): Promise<IPLDBlock | undefined> {
|
async getLatestIPLDBlock (contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise<IPLDBlock | undefined> {
|
||||||
return this._db.getLatestIPLDBlock(contractAddress, kind, blockNumber);
|
return this._db.getLatestIPLDBlock(contractAddress, kind, blockNumber);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -301,31 +313,40 @@ export class Indexer implements IndexerInterface {
|
|||||||
return this._baseIndexer.isIPFSConfigured();
|
return this._baseIndexer.isIPFSConfigured();
|
||||||
}
|
}
|
||||||
|
|
||||||
async createInitialState (contractAddress: string, blockHash: string): Promise<any> {
|
// Method used to create auto diffs (diff_staged).
|
||||||
return createInitialState(this, contractAddress, blockHash);
|
|
||||||
}
|
|
||||||
|
|
||||||
async createDiffStaged (contractAddress: string, blockHash: string, data: any): Promise<void> {
|
async createDiffStaged (contractAddress: string, blockHash: string, data: any): Promise<void> {
|
||||||
await this._baseIndexer.createDiffStaged(contractAddress, blockHash, data);
|
await this._baseIndexer.createDiffStaged(contractAddress, blockHash, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Method to be used by createStateDiff hook.
|
||||||
async createDiff (contractAddress: string, blockHash: string, data: any): Promise<void> {
|
async createDiff (contractAddress: string, blockHash: string, data: any): Promise<void> {
|
||||||
await this._baseIndexer.createDiff(contractAddress, blockHash, data);
|
const block = await this.getBlockProgress(blockHash);
|
||||||
|
assert(block);
|
||||||
|
|
||||||
|
await this._baseIndexer.createDiff(contractAddress, block, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
async createStateCheckpoint (contractAddress: string, blockHash: string): Promise<boolean> {
|
// Method to be used by createStateCheckpoint hook.
|
||||||
return createStateCheckpoint(this, contractAddress, blockHash);
|
async createStateCheckpoint (contractAddress: string, blockHash: string, data: any): Promise<void> {
|
||||||
|
const block = await this.getBlockProgress(blockHash);
|
||||||
|
assert(block);
|
||||||
|
|
||||||
|
return this._baseIndexer.createStateCheckpoint(contractAddress, block, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
async createCheckpoint (contractAddress: string, blockHash?: string, data?: any, checkpointInterval?: number): Promise<string | undefined> {
|
// Method to be used by checkpoint CLI.
|
||||||
return this._baseIndexer.createCheckpoint(this, contractAddress, blockHash, data, checkpointInterval);
|
async createCheckpoint (contractAddress: string, blockHash: string): Promise<string | undefined> {
|
||||||
|
const block = await this.getBlockProgress(blockHash);
|
||||||
|
assert(block);
|
||||||
|
|
||||||
|
return this._baseIndexer.createCheckpoint(this, contractAddress, block);
|
||||||
}
|
}
|
||||||
|
|
||||||
async saveOrUpdateIPLDBlock (ipldBlock: IPLDBlock): Promise<IPLDBlock> {
|
async saveOrUpdateIPLDBlock (ipldBlock: IPLDBlock): Promise<IPLDBlock> {
|
||||||
return this._baseIndexer.saveOrUpdateIPLDBlock(ipldBlock);
|
return this._baseIndexer.saveOrUpdateIPLDBlock(ipldBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
async removeIPLDBlocks (blockNumber: number, kind: string): Promise<void> {
|
async removeIPLDBlocks (blockNumber: number, kind: StateKind): Promise<void> {
|
||||||
await this._baseIndexer.removeIPLDBlocks(blockNumber, kind);
|
await this._baseIndexer.removeIPLDBlocks(blockNumber, kind);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -420,13 +441,12 @@ export class Indexer implements IndexerInterface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async getLatestHooksProcessedBlock (): Promise<BlockProgress> {
|
async getLatestHooksProcessedBlock (): Promise<BlockProgress> {
|
||||||
const hookStatus = await this.getHookStatus();
|
return this._baseIndexer.getLatestHooksProcessedBlock();
|
||||||
assert(hookStatus);
|
|
||||||
|
|
||||||
return this._baseIndexer.getLatestHooksProcessedBlock(hookStatus);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<void> {
|
async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<void> {
|
||||||
|
this._baseIndexer.updateIPLDStatusMap(address, {});
|
||||||
|
|
||||||
return this._baseIndexer.watchContract(address, kind, checkpoint, startingBlock);
|
return this._baseIndexer.watchContract(address, kind, checkpoint, startingBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -78,11 +78,19 @@ export class JobRunner {
|
|||||||
|
|
||||||
const hookStatus = await this._indexer.getHookStatus();
|
const hookStatus = await this._indexer.getHookStatus();
|
||||||
|
|
||||||
if (hookStatus && hookStatus.latestProcessedBlockNumber < (blockNumber - 1)) {
|
if (hookStatus) {
|
||||||
const message = `Hooks for blockNumber ${blockNumber - 1} not processed yet, aborting`;
|
if (hookStatus.latestProcessedBlockNumber < (blockNumber - 1)) {
|
||||||
log(message);
|
const message = `Hooks for blockNumber ${blockNumber - 1} not processed yet, aborting`;
|
||||||
|
log(message);
|
||||||
|
|
||||||
throw new Error(message);
|
throw new Error(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (hookStatus.latestProcessedBlockNumber > (blockNumber - 1)) {
|
||||||
|
log(`Hooks for blockNumber ${blockNumber} already processed`);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
await this._indexer.processCanonicalBlock(job);
|
await this._indexer.processCanonicalBlock(job);
|
||||||
|
@ -8,7 +8,7 @@ import debug from 'debug';
|
|||||||
import Decimal from 'decimal.js';
|
import Decimal from 'decimal.js';
|
||||||
import { GraphQLScalarType } from 'graphql';
|
import { GraphQLScalarType } from 'graphql';
|
||||||
|
|
||||||
import { ValueResult, BlockHeight, STATE_KIND_DIFF } from '@vulcanize/util';
|
import { ValueResult, BlockHeight, StateKind } from '@vulcanize/util';
|
||||||
|
|
||||||
import { Indexer } from './indexer';
|
import { Indexer } from './indexer';
|
||||||
import { EventWatcher } from './events';
|
import { EventWatcher } from './events';
|
||||||
@ -122,7 +122,7 @@ export const createResolvers = async (indexer: Indexer, eventWatcher: EventWatch
|
|||||||
return ipldBlock && ipldBlock.block.isComplete ? indexer.getResultIPLDBlock(ipldBlock) : undefined;
|
return ipldBlock && ipldBlock.block.isComplete ? indexer.getResultIPLDBlock(ipldBlock) : undefined;
|
||||||
},
|
},
|
||||||
|
|
||||||
getState: async (_: any, { blockHash, contractAddress, kind = STATE_KIND_DIFF }: { blockHash: string, contractAddress: string, kind: string }) => {
|
getState: async (_: any, { blockHash, contractAddress, kind = StateKind.Diff }: { blockHash: string, contractAddress: string, kind: string }) => {
|
||||||
log('getState', blockHash, contractAddress, kind);
|
log('getState', blockHash, contractAddress, kind);
|
||||||
|
|
||||||
const ipldBlock = await indexer.getPrevIPLDBlock(blockHash, contractAddress, kind);
|
const ipldBlock = await indexer.getPrevIPLDBlock(blockHash, contractAddress, kind);
|
||||||
|
@ -23,8 +23,3 @@ export const UNKNOWN_EVENT_NAME = '__unknown__';
|
|||||||
|
|
||||||
export const KIND_ACTIVE = 'active';
|
export const KIND_ACTIVE = 'active';
|
||||||
export const KIND_LAZY = 'lazy';
|
export const KIND_LAZY = 'lazy';
|
||||||
|
|
||||||
export const STATE_KIND_INIT = 'init';
|
|
||||||
export const STATE_KIND_DIFF_STAGED = 'diff_staged';
|
|
||||||
export const STATE_KIND_DIFF = 'diff';
|
|
||||||
export const STATE_KIND_CHECKPOINT = 'checkpoint';
|
|
||||||
|
@ -4,12 +4,12 @@
|
|||||||
|
|
||||||
import { FindConditions, MoreThan, Repository } from 'typeorm';
|
import { FindConditions, MoreThan, Repository } from 'typeorm';
|
||||||
|
|
||||||
import { IPLDBlockInterface, HookStatusInterface } from './types';
|
import { IPLDBlockInterface, HookStatusInterface, StateKind } from './types';
|
||||||
import { Database } from './database';
|
import { Database } from './database';
|
||||||
import { MAX_REORG_DEPTH, STATE_KIND_DIFF_STAGED, STATE_KIND_DIFF } from './constants';
|
import { MAX_REORG_DEPTH } from './constants';
|
||||||
|
|
||||||
export class IPLDDatabase extends Database {
|
export class IPLDDatabase extends Database {
|
||||||
async getLatestIPLDBlock (repo: Repository<IPLDBlockInterface>, contractAddress: string, kind: string | null, blockNumber?: number): Promise<IPLDBlockInterface | undefined> {
|
async getLatestIPLDBlock (repo: Repository<IPLDBlockInterface>, contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise<IPLDBlockInterface | undefined> {
|
||||||
let queryBuilder = repo.createQueryBuilder('ipld_block')
|
let queryBuilder = repo.createQueryBuilder('ipld_block')
|
||||||
.leftJoinAndSelect('ipld_block.block', 'block')
|
.leftJoinAndSelect('ipld_block.block', 'block')
|
||||||
.where('block.is_pruned = false')
|
.where('block.is_pruned = false')
|
||||||
@ -24,7 +24,7 @@ export class IPLDDatabase extends Database {
|
|||||||
// Filter using kind if specified else order by id to give preference to checkpoint.
|
// Filter using kind if specified else order by id to give preference to checkpoint.
|
||||||
queryBuilder = kind
|
queryBuilder = kind
|
||||||
? queryBuilder.andWhere('ipld_block.kind = :kind', { kind })
|
? queryBuilder.andWhere('ipld_block.kind = :kind', { kind })
|
||||||
: queryBuilder.andWhere('ipld_block.kind != :kind', { kind: STATE_KIND_DIFF_STAGED })
|
: queryBuilder.andWhere('ipld_block.kind != :kind', { kind: StateKind.DiffStaged })
|
||||||
.addOrderBy('ipld_block.id', 'DESC');
|
.addOrderBy('ipld_block.id', 'DESC');
|
||||||
|
|
||||||
return queryBuilder.getOne();
|
return queryBuilder.getOne();
|
||||||
@ -116,7 +116,7 @@ export class IPLDDatabase extends Database {
|
|||||||
relations: ['block'],
|
relations: ['block'],
|
||||||
where: {
|
where: {
|
||||||
contractAddress,
|
contractAddress,
|
||||||
kind: STATE_KIND_DIFF,
|
kind: StateKind.Diff,
|
||||||
block: {
|
block: {
|
||||||
isPruned: false,
|
isPruned: false,
|
||||||
blockNumber: MoreThan(blockNumber)
|
blockNumber: MoreThan(blockNumber)
|
||||||
@ -132,6 +132,15 @@ export class IPLDDatabase extends Database {
|
|||||||
return repo.save(ipldBlock);
|
return repo.save(ipldBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async removeIPLDBlocks (repo: Repository<IPLDBlockInterface>, blockNumber: number, kind: string): Promise<void> {
|
||||||
|
const entities = await repo.find({ relations: ['block'], where: { block: { blockNumber }, kind } });
|
||||||
|
|
||||||
|
// Delete if entities found.
|
||||||
|
if (entities.length) {
|
||||||
|
await repo.delete(entities.map((entity) => entity.id));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async getHookStatus (repo: Repository<HookStatusInterface>): Promise<HookStatusInterface | undefined> {
|
async getHookStatus (repo: Repository<HookStatusInterface>): Promise<HookStatusInterface | undefined> {
|
||||||
return repo.findOne();
|
return repo.findOne();
|
||||||
}
|
}
|
||||||
|
@ -16,23 +16,26 @@ import {
|
|||||||
IndexerInterface,
|
IndexerInterface,
|
||||||
BlockProgressInterface,
|
BlockProgressInterface,
|
||||||
IPLDBlockInterface,
|
IPLDBlockInterface,
|
||||||
HookStatusInterface
|
StateKind
|
||||||
} from './types';
|
} from './types';
|
||||||
import { Indexer } from './indexer';
|
import { Indexer } from './indexer';
|
||||||
import { ServerConfig } from './config';
|
import { ServerConfig } from './config';
|
||||||
import { IPFSClient } from './ipfs';
|
import { IPFSClient } from './ipfs';
|
||||||
import {
|
|
||||||
STATE_KIND_INIT,
|
|
||||||
STATE_KIND_DIFF_STAGED,
|
|
||||||
STATE_KIND_DIFF,
|
|
||||||
STATE_KIND_CHECKPOINT
|
|
||||||
} from './constants';
|
|
||||||
import { JobQueue } from './job-queue';
|
import { JobQueue } from './job-queue';
|
||||||
|
|
||||||
|
export interface IpldStatus {
|
||||||
|
init?: number;
|
||||||
|
diff?: number;
|
||||||
|
checkpoint?: number;
|
||||||
|
// eslint-disable-next-line camelcase
|
||||||
|
diff_staged?: number;
|
||||||
|
}
|
||||||
|
|
||||||
export class IPLDIndexer extends Indexer {
|
export class IPLDIndexer extends Indexer {
|
||||||
_serverConfig: ServerConfig;
|
_serverConfig: ServerConfig;
|
||||||
_ipldDb: IPLDDatabaseInterface;
|
_ipldDb: IPLDDatabaseInterface;
|
||||||
_ipfsClient: IPFSClient;
|
_ipfsClient: IPFSClient;
|
||||||
|
_ipldStatusMap: { [key: string]: IpldStatus } = {};
|
||||||
|
|
||||||
constructor (
|
constructor (
|
||||||
serverConfig: ServerConfig,
|
serverConfig: ServerConfig,
|
||||||
@ -65,7 +68,12 @@ export class IPLDIndexer extends Indexer {
|
|||||||
return (ipfsAddr !== undefined && ipfsAddr !== null && ipfsAddr !== '');
|
return (ipfsAddr !== undefined && ipfsAddr !== null && ipfsAddr !== '');
|
||||||
}
|
}
|
||||||
|
|
||||||
async getLatestHooksProcessedBlock (hookStatus: HookStatusInterface): Promise<BlockProgressInterface> {
|
async getLatestHooksProcessedBlock (): Promise<BlockProgressInterface> {
|
||||||
|
// Get current hookStatus.
|
||||||
|
const hookStatus = await this._ipldDb.getHookStatus();
|
||||||
|
assert(hookStatus, 'Hook status not found');
|
||||||
|
|
||||||
|
// Get all the blocks at height hookStatus.latestProcessedBlockNumber.
|
||||||
const blocksAtHeight = await this.getBlocksAtHeight(hookStatus.latestProcessedBlockNumber, false);
|
const blocksAtHeight = await this.getBlocksAtHeight(hookStatus.latestProcessedBlockNumber, false);
|
||||||
|
|
||||||
// There can exactly one block at hookStatus.latestProcessedBlockNumber height.
|
// There can exactly one block at hookStatus.latestProcessedBlockNumber height.
|
||||||
@ -76,26 +84,50 @@ export class IPLDIndexer extends Indexer {
|
|||||||
|
|
||||||
async processCheckpoint (indexer: IndexerInterface, blockHash: string, checkpointInterval: number): Promise<void> {
|
async processCheckpoint (indexer: IndexerInterface, blockHash: string, checkpointInterval: number): Promise<void> {
|
||||||
// Get all the contracts.
|
// Get all the contracts.
|
||||||
assert(this._ipldDb.getContracts);
|
const contracts = Object.values(this._watchedContracts);
|
||||||
const contracts = await this._ipldDb.getContracts();
|
|
||||||
|
// Getting the block for checkpoint.
|
||||||
|
const block = await this.getBlockProgress(blockHash);
|
||||||
|
assert(block);
|
||||||
|
|
||||||
// For each contract, merge the diff till now to create a checkpoint.
|
// For each contract, merge the diff till now to create a checkpoint.
|
||||||
for (const contract of contracts) {
|
for (const contract of contracts) {
|
||||||
|
// Get IPLD status for the contract.
|
||||||
|
const ipldStatus = this._ipldStatusMap[contract.address];
|
||||||
|
assert(ipldStatus, `IPLD status for contract ${contract.address} not found`);
|
||||||
|
|
||||||
|
const initBlockNumber = ipldStatus.init;
|
||||||
|
|
||||||
// Check if contract has checkpointing on.
|
// Check if contract has checkpointing on.
|
||||||
if (contract.checkpoint) {
|
// Check if it's time for a checkpoint or the init is in current block.
|
||||||
await this.createCheckpoint(indexer, contract.address, blockHash, null, checkpointInterval);
|
if (
|
||||||
|
contract.checkpoint &&
|
||||||
|
(block.blockNumber % checkpointInterval === 0 || initBlockNumber === block.blockNumber)
|
||||||
|
) {
|
||||||
|
await this.createCheckpoint(indexer, contract.address, block);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async processCLICheckpoint (indexer: IndexerInterface, contractAddress: string, blockHash?: string): Promise<string | undefined> {
|
async processCLICheckpoint (indexer: IndexerInterface, contractAddress: string, blockHash?: string): Promise<string | undefined> {
|
||||||
const checkpointBlockHash = await this.createCheckpoint(indexer, contractAddress, blockHash);
|
// Getting the block for checkpoint.
|
||||||
assert(checkpointBlockHash);
|
let block;
|
||||||
|
|
||||||
|
if (blockHash) {
|
||||||
|
block = await this.getBlockProgress(blockHash);
|
||||||
|
} else {
|
||||||
|
// In case of empty blockHash from checkpoint CLI, get the latest processed block from hookStatus for the checkpoint.
|
||||||
|
block = await this.getLatestHooksProcessedBlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(block);
|
||||||
|
|
||||||
|
const checkpointBlockHash = await this.createCheckpoint(indexer, contractAddress, block);
|
||||||
|
assert(checkpointBlockHash, 'Checkpoint not created');
|
||||||
|
|
||||||
// Push checkpoint to IPFS if configured.
|
// Push checkpoint to IPFS if configured.
|
||||||
if (this.isIPFSConfigured()) {
|
if (this.isIPFSConfigured()) {
|
||||||
const block = await this.getBlockProgress(checkpointBlockHash);
|
const checkpointIPLDBlocks = await this._ipldDb.getIPLDBlocks({ block, contractAddress, kind: StateKind.Checkpoint });
|
||||||
const checkpointIPLDBlocks = await this._ipldDb.getIPLDBlocks({ block, contractAddress, kind: STATE_KIND_CHECKPOINT });
|
|
||||||
|
|
||||||
// There can be at most one IPLDBlock for a (block, contractAddress, kind) combination.
|
// There can be at most one IPLDBlock for a (block, contractAddress, kind) combination.
|
||||||
assert(checkpointIPLDBlocks.length <= 1);
|
assert(checkpointIPLDBlocks.length <= 1);
|
||||||
@ -108,34 +140,61 @@ export class IPLDIndexer extends Indexer {
|
|||||||
return checkpointBlockHash;
|
return checkpointBlockHash;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async createStateCheckpoint (contractAddress: string, block: BlockProgressInterface, data: any): Promise<void> {
|
||||||
|
// Get the contract.
|
||||||
|
const contract = this._watchedContracts[contractAddress];
|
||||||
|
assert(contract, `Contract ${contractAddress} not watched`);
|
||||||
|
|
||||||
|
if (block.blockNumber < contract.startingBlock) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a checkpoint from the hook data without being concerned about diffs.
|
||||||
|
const ipldBlock = await this.prepareIPLDBlock(block, contractAddress, data, StateKind.Checkpoint);
|
||||||
|
await this.saveOrUpdateIPLDBlock(ipldBlock);
|
||||||
|
}
|
||||||
|
|
||||||
async createInit (
|
async createInit (
|
||||||
indexer: IndexerInterface,
|
indexer: IndexerInterface,
|
||||||
blockHash: string,
|
blockHash: string,
|
||||||
blockNumber: number
|
blockNumber: number
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
// Get all the contracts.
|
// Get all the contracts.
|
||||||
assert(this._ipldDb.getContracts);
|
const contracts = Object.values(this._watchedContracts);
|
||||||
const contracts = await this._ipldDb.getContracts();
|
|
||||||
|
|
||||||
// Create an initial state for each contract.
|
// Create an initial state for each contract.
|
||||||
for (const contract of contracts) {
|
for (const contract of contracts) {
|
||||||
// Check if contract has checkpointing on.
|
// Check if contract has checkpointing on.
|
||||||
if (contract.checkpoint) {
|
if (contract.checkpoint) {
|
||||||
// Check if a 'diff' | 'checkpoint' ipldBlock already exists or blockNumber is < to startingBlock.
|
// Check if starting block not reached yet.
|
||||||
const existingIpldBlock = await this._ipldDb.getLatestIPLDBlock(contract.address, null);
|
if (blockNumber < contract.startingBlock) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (existingIpldBlock || blockNumber < contract.startingBlock) {
|
// Get IPLD status for the contract.
|
||||||
|
const ipldStatus = this._ipldStatusMap[contract.address];
|
||||||
|
assert(ipldStatus, `IPLD status for contract ${contract.address} not found`);
|
||||||
|
|
||||||
|
// Check if a 'init' IPLDBlock already exists.
|
||||||
|
// Or if a 'diff' IPLDBlock already exists.
|
||||||
|
// Or if a 'checkpoint' IPLDBlock already exists.
|
||||||
|
// (A watcher with imported state won't have an init IPLDBlock, but it will have the imported checkpoint)
|
||||||
|
if (
|
||||||
|
ipldStatus.init ||
|
||||||
|
ipldStatus.diff ||
|
||||||
|
ipldStatus.checkpoint
|
||||||
|
) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Call initial state hook.
|
// Call initial state hook.
|
||||||
assert(indexer.createInitialState);
|
assert(indexer.processInitialState);
|
||||||
const stateData = await indexer.createInitialState(contract.address, blockHash);
|
const stateData = await indexer.processInitialState(contract.address, blockHash);
|
||||||
|
|
||||||
const block = await this.getBlockProgress(blockHash);
|
const block = await this.getBlockProgress(blockHash);
|
||||||
assert(block);
|
assert(block);
|
||||||
|
|
||||||
const ipldBlock = await this.prepareIPLDBlock(block, contract.address, stateData, STATE_KIND_INIT);
|
const ipldBlock = await this.prepareIPLDBlock(block, contract.address, stateData, StateKind.Init);
|
||||||
await this.saveOrUpdateIPLDBlock(ipldBlock);
|
await this.saveOrUpdateIPLDBlock(ipldBlock);
|
||||||
|
|
||||||
// Push initial state to IPFS if configured.
|
// Push initial state to IPFS if configured.
|
||||||
@ -151,8 +210,16 @@ export class IPLDIndexer extends Indexer {
|
|||||||
const block = await this.getBlockProgress(blockHash);
|
const block = await this.getBlockProgress(blockHash);
|
||||||
assert(block);
|
assert(block);
|
||||||
|
|
||||||
|
// Get the contract.
|
||||||
|
const contract = this._watchedContracts[contractAddress];
|
||||||
|
assert(contract, `Contract ${contractAddress} not watched`);
|
||||||
|
|
||||||
|
if (block.blockNumber < contract.startingBlock) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Create a staged diff block.
|
// Create a staged diff block.
|
||||||
const ipldBlock = await this.prepareIPLDBlock(block, contractAddress, data, STATE_KIND_DIFF_STAGED);
|
const ipldBlock = await this.prepareIPLDBlock(block, contractAddress, data, StateKind.DiffStaged);
|
||||||
await this.saveOrUpdateIPLDBlock(ipldBlock);
|
await this.saveOrUpdateIPLDBlock(ipldBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -161,76 +228,75 @@ export class IPLDIndexer extends Indexer {
|
|||||||
assert(block);
|
assert(block);
|
||||||
|
|
||||||
// Get all the staged diff blocks for the given blockHash.
|
// Get all the staged diff blocks for the given blockHash.
|
||||||
const stagedBlocks = await this._ipldDb.getIPLDBlocks({ block, kind: STATE_KIND_DIFF_STAGED });
|
const stagedBlocks = await this._ipldDb.getIPLDBlocks({ block, kind: StateKind.DiffStaged });
|
||||||
|
|
||||||
// For each staged block, create a diff block.
|
// For each staged block, create a diff block.
|
||||||
for (const stagedBlock of stagedBlocks) {
|
for (const stagedBlock of stagedBlocks) {
|
||||||
const data = codec.decode(Buffer.from(stagedBlock.data));
|
const data = codec.decode(Buffer.from(stagedBlock.data));
|
||||||
await this.createDiff(stagedBlock.contractAddress, stagedBlock.block.blockHash, data);
|
await this.createDiff(stagedBlock.contractAddress, block, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove all the staged diff blocks for current blockNumber.
|
// Remove all the staged diff blocks for current blockNumber.
|
||||||
await this.removeIPLDBlocks(block.blockNumber, STATE_KIND_DIFF_STAGED);
|
// (Including staged diff blocks associated with pruned blocks)
|
||||||
|
await this.removeIPLDBlocks(block.blockNumber, StateKind.DiffStaged);
|
||||||
}
|
}
|
||||||
|
|
||||||
async createDiff (contractAddress: string, blockHash: string, data: any): Promise<void> {
|
async createDiff (contractAddress: string, block: BlockProgressInterface, data: any): Promise<void> {
|
||||||
const block = await this.getBlockProgress(blockHash);
|
// Get the contract.
|
||||||
assert(block);
|
const contract = this._watchedContracts[contractAddress];
|
||||||
|
assert(contract, `Contract ${contractAddress} not watched`);
|
||||||
// Fetch the latest checkpoint for the contract.
|
|
||||||
const checkpoint = await this._ipldDb.getLatestIPLDBlock(contractAddress, STATE_KIND_CHECKPOINT);
|
|
||||||
|
|
||||||
// There should be an initial state at least.
|
|
||||||
if (!checkpoint) {
|
|
||||||
// Fetch the initial state for the contract.
|
|
||||||
const initState = await this._ipldDb.getLatestIPLDBlock(contractAddress, STATE_KIND_INIT);
|
|
||||||
assert(initState, 'No initial state found');
|
|
||||||
} else {
|
|
||||||
// Check if the latest checkpoint is in the same block.
|
|
||||||
assert(checkpoint.block.blockHash !== block.blockHash, 'Checkpoint already created for the block hash');
|
|
||||||
}
|
|
||||||
|
|
||||||
const ipldBlock = await this.prepareIPLDBlock(block, contractAddress, data, STATE_KIND_DIFF);
|
|
||||||
await this.saveOrUpdateIPLDBlock(ipldBlock);
|
|
||||||
}
|
|
||||||
|
|
||||||
async createCheckpoint (indexer: IndexerInterface, contractAddress: string, blockHash?: string, data?: any, checkpointInterval?: number): Promise<string | undefined> {
|
|
||||||
// Get current hookStatus.
|
|
||||||
const hookStatus = await this._ipldDb.getHookStatus();
|
|
||||||
assert(hookStatus);
|
|
||||||
|
|
||||||
// Getting the current block.
|
|
||||||
let currentBlock;
|
|
||||||
|
|
||||||
if (blockHash) {
|
|
||||||
currentBlock = await this.getBlockProgress(blockHash);
|
|
||||||
} else {
|
|
||||||
// In case of empty blockHash from checkpoint CLI, get the latest processed block from hookStatus for the checkpoint.
|
|
||||||
currentBlock = await this.getLatestHooksProcessedBlock(hookStatus);
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(currentBlock);
|
|
||||||
|
|
||||||
// Data is passed in case of checkpoint hook.
|
|
||||||
if (data) {
|
|
||||||
// Create a checkpoint from the hook data without being concerned about diffs.
|
|
||||||
const ipldBlock = await this.prepareIPLDBlock(currentBlock, contractAddress, data, STATE_KIND_CHECKPOINT);
|
|
||||||
await this.saveOrUpdateIPLDBlock(ipldBlock);
|
|
||||||
|
|
||||||
|
if (block.blockNumber < contract.startingBlock) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If data is not passed, create from previous 'checkpoint' | 'init' and diffs after that.
|
// Get IPLD status for the contract.
|
||||||
|
const ipldStatus = this._ipldStatusMap[contractAddress];
|
||||||
|
assert(ipldStatus, `IPLD status for contract ${contractAddress} not found`);
|
||||||
|
|
||||||
|
// Get the latest checkpoint block number.
|
||||||
|
const checkpointBlockNumber = ipldStatus.checkpoint;
|
||||||
|
|
||||||
|
if (!checkpointBlockNumber) {
|
||||||
|
// Get the initial state block number.
|
||||||
|
const initBlockNumber = ipldStatus.init;
|
||||||
|
|
||||||
|
// There should be an initial state at least.
|
||||||
|
assert(initBlockNumber, 'No initial state found');
|
||||||
|
} else if (checkpointBlockNumber === block.blockNumber) {
|
||||||
|
// Check if the latest checkpoint is in the same block if block number is same.
|
||||||
|
const checkpoint = await this._ipldDb.getLatestIPLDBlock(contractAddress, StateKind.Checkpoint);
|
||||||
|
assert(checkpoint);
|
||||||
|
|
||||||
|
assert(checkpoint.block.blockHash !== block.blockHash, 'Checkpoint already created for the block hash');
|
||||||
|
}
|
||||||
|
|
||||||
|
const ipldBlock = await this.prepareIPLDBlock(block, contractAddress, data, StateKind.Diff);
|
||||||
|
await this.saveOrUpdateIPLDBlock(ipldBlock);
|
||||||
|
}
|
||||||
|
|
||||||
|
async createCheckpoint (indexer: IndexerInterface, contractAddress: string, currentBlock: BlockProgressInterface): Promise<string | undefined> {
|
||||||
|
// Get the contract.
|
||||||
|
const contract = this._watchedContracts[contractAddress];
|
||||||
|
assert(contract, `Contract ${contractAddress} not watched`);
|
||||||
|
|
||||||
|
if (currentBlock.blockNumber < contract.startingBlock) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Make sure the block is marked complete.
|
// Make sure the block is marked complete.
|
||||||
assert(currentBlock.isComplete, 'Block for a checkpoint should be marked as complete');
|
assert(currentBlock.isComplete, 'Block for a checkpoint should be marked as complete');
|
||||||
|
|
||||||
|
// Get current hookStatus.
|
||||||
|
const hookStatus = await this._ipldDb.getHookStatus();
|
||||||
|
assert(hookStatus);
|
||||||
|
|
||||||
// Make sure the hooks have been processed for the block.
|
// Make sure the hooks have been processed for the block.
|
||||||
assert(currentBlock.blockNumber <= hookStatus.latestProcessedBlockNumber, 'Block for a checkpoint should have hooks processed');
|
assert(currentBlock.blockNumber <= hookStatus.latestProcessedBlockNumber, 'Block for a checkpoint should have hooks processed');
|
||||||
|
|
||||||
// Call state checkpoint hook and check if default checkpoint is disabled.
|
// Call state checkpoint hook and check if default checkpoint is disabled.
|
||||||
assert(indexer.createStateCheckpoint);
|
assert(indexer.processStateCheckpoint);
|
||||||
const disableDefaultCheckpoint = await indexer.createStateCheckpoint(contractAddress, currentBlock.blockHash);
|
const disableDefaultCheckpoint = await indexer.processStateCheckpoint(contractAddress, currentBlock.blockHash);
|
||||||
|
|
||||||
if (disableDefaultCheckpoint) {
|
if (disableDefaultCheckpoint) {
|
||||||
// Return if default checkpoint is disabled.
|
// Return if default checkpoint is disabled.
|
||||||
@ -238,26 +304,28 @@ export class IPLDIndexer extends Indexer {
|
|||||||
return currentBlock.blockHash;
|
return currentBlock.blockHash;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch the latest 'checkpoint' | 'init' for the contract.
|
// Fetch the latest 'checkpoint' | 'init' for the contract to fetch diffs after it.
|
||||||
let prevNonDiffBlock: IPLDBlockInterface;
|
let prevNonDiffBlock: IPLDBlockInterface;
|
||||||
let getDiffBlockNumber: number;
|
let getDiffBlockNumber: number;
|
||||||
const checkpointBlock = await this._ipldDb.getLatestIPLDBlock(contractAddress, STATE_KIND_CHECKPOINT, currentBlock.blockNumber);
|
const checkpointBlock = await this._ipldDb.getLatestIPLDBlock(contractAddress, StateKind.Checkpoint, currentBlock.blockNumber);
|
||||||
|
|
||||||
if (checkpointBlock) {
|
if (checkpointBlock) {
|
||||||
prevNonDiffBlock = checkpointBlock;
|
const checkpointBlockNumber = checkpointBlock.block.blockNumber;
|
||||||
getDiffBlockNumber = checkpointBlock.block.blockNumber;
|
|
||||||
|
|
||||||
// Check (only if checkpointInterval is passed) if it is time for a new checkpoint.
|
prevNonDiffBlock = checkpointBlock;
|
||||||
if (checkpointInterval && checkpointBlock.block.blockNumber > (currentBlock.blockNumber - checkpointInterval)) {
|
getDiffBlockNumber = checkpointBlockNumber;
|
||||||
return;
|
|
||||||
}
|
// Update IPLD status map with the latest checkpoint info.
|
||||||
|
// Essential while importing state as checkpoint at the snapshot block is added by import-state CLI.
|
||||||
|
// (job-runner won't have the updated ipld status)
|
||||||
|
this.updateIPLDStatusMap(contractAddress, { checkpoint: checkpointBlockNumber });
|
||||||
} else {
|
} else {
|
||||||
// There should be an initial state at least.
|
// There should be an initial state at least.
|
||||||
const initBlock = await this._ipldDb.getLatestIPLDBlock(contractAddress, STATE_KIND_INIT);
|
const initBlock = await this._ipldDb.getLatestIPLDBlock(contractAddress, StateKind.Init);
|
||||||
assert(initBlock, 'No initial state found');
|
assert(initBlock, 'No initial state found');
|
||||||
|
|
||||||
prevNonDiffBlock = initBlock;
|
prevNonDiffBlock = initBlock;
|
||||||
// Take block number previous to initial state block to get diffs after that.
|
// Take block number previous to initial state block as the checkpoint is to be created in the same block.
|
||||||
getDiffBlockNumber = initBlock.block.blockNumber - 1;
|
getDiffBlockNumber = initBlock.block.blockNumber - 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -265,7 +333,7 @@ export class IPLDIndexer extends Indexer {
|
|||||||
const diffBlocks = await this._ipldDb.getDiffIPLDBlocksByBlocknumber(contractAddress, getDiffBlockNumber);
|
const diffBlocks = await this._ipldDb.getDiffIPLDBlocksByBlocknumber(contractAddress, getDiffBlockNumber);
|
||||||
|
|
||||||
const prevNonDiffBlockData = codec.decode(Buffer.from(prevNonDiffBlock.data)) as any;
|
const prevNonDiffBlockData = codec.decode(Buffer.from(prevNonDiffBlock.data)) as any;
|
||||||
data = {
|
const data = {
|
||||||
state: prevNonDiffBlockData.state
|
state: prevNonDiffBlockData.state
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -274,43 +342,46 @@ export class IPLDIndexer extends Indexer {
|
|||||||
data.state = _.merge(data.state, diff.state);
|
data.state = _.merge(data.state, diff.state);
|
||||||
}
|
}
|
||||||
|
|
||||||
const ipldBlock = await this.prepareIPLDBlock(currentBlock, contractAddress, data, STATE_KIND_CHECKPOINT);
|
const ipldBlock = await this.prepareIPLDBlock(currentBlock, contractAddress, data, StateKind.Checkpoint);
|
||||||
await this.saveOrUpdateIPLDBlock(ipldBlock);
|
await this.saveOrUpdateIPLDBlock(ipldBlock);
|
||||||
|
|
||||||
return currentBlock.blockHash;
|
return currentBlock.blockHash;
|
||||||
}
|
}
|
||||||
|
|
||||||
async prepareIPLDBlock (block: BlockProgressInterface, contractAddress: string, data: any, kind: string):Promise<any> {
|
async prepareIPLDBlock (block: BlockProgressInterface, contractAddress: string, data: any, kind: StateKind):Promise<any> {
|
||||||
assert(_.includes([
|
let ipldBlock: IPLDBlockInterface;
|
||||||
STATE_KIND_INIT,
|
|
||||||
STATE_KIND_DIFF_STAGED,
|
|
||||||
STATE_KIND_DIFF,
|
|
||||||
STATE_KIND_CHECKPOINT
|
|
||||||
], kind));
|
|
||||||
|
|
||||||
// Get an existing 'init' | 'diff' | 'diff_staged' | 'checkpoint' IPLDBlock for current block, contractAddress.
|
// Get IPLD status for the contract.
|
||||||
const currentIPLDBlocks = await this._ipldDb.getIPLDBlocks({ block, contractAddress, kind });
|
const ipldStatus = this._ipldStatusMap[contractAddress];
|
||||||
|
assert(ipldStatus, `IPLD status for contract ${contractAddress} not found`);
|
||||||
|
|
||||||
// There can be at most one IPLDBlock for a (block, contractAddress, kind) combination.
|
// Get an existing 'init' | 'diff' | 'diff_staged' | 'checkpoint' IPLDBlock for current block, contractAddress to update.
|
||||||
assert(currentIPLDBlocks.length <= 1);
|
let currentIPLDBlock: IPLDBlockInterface | undefined;
|
||||||
const currentIPLDBlock = currentIPLDBlocks[0];
|
const prevIPLDBlockNumber = ipldStatus[kind];
|
||||||
|
|
||||||
// Update currentIPLDBlock of same kind if it exists.
|
if (prevIPLDBlockNumber && prevIPLDBlockNumber === block.blockNumber) {
|
||||||
let ipldBlock;
|
const currentIPLDBlocks = await this._ipldDb.getIPLDBlocks({ block, contractAddress, kind });
|
||||||
|
|
||||||
|
// There can be at most one IPLDBlock for a (block, contractAddress, kind) combination.
|
||||||
|
assert(currentIPLDBlocks.length <= 1);
|
||||||
|
currentIPLDBlock = currentIPLDBlocks[0];
|
||||||
|
}
|
||||||
|
|
||||||
if (currentIPLDBlock) {
|
if (currentIPLDBlock) {
|
||||||
|
// Update current IPLDBlock of same kind if it exists.
|
||||||
ipldBlock = currentIPLDBlock;
|
ipldBlock = currentIPLDBlock;
|
||||||
|
|
||||||
// Update the data field.
|
// Update the data field.
|
||||||
const oldData = codec.decode(Buffer.from(currentIPLDBlock.data));
|
const oldData = codec.decode(Buffer.from(ipldBlock.data));
|
||||||
data = _.merge(oldData, data);
|
data = _.merge(oldData, data);
|
||||||
} else {
|
} else {
|
||||||
|
// Create a new IPLDBlock instance.
|
||||||
ipldBlock = this._ipldDb.getNewIPLDBlock();
|
ipldBlock = this._ipldDb.getNewIPLDBlock();
|
||||||
|
|
||||||
// Fetch the parent IPLDBlock.
|
// Fetch the parent IPLDBlock.
|
||||||
const parentIPLDBlock = await this._ipldDb.getLatestIPLDBlock(contractAddress, null, block.blockNumber);
|
const parentIPLDBlock = await this._ipldDb.getLatestIPLDBlock(contractAddress, null, block.blockNumber);
|
||||||
|
|
||||||
// Setting the meta-data for an IPLDBlock (done only once per block).
|
// Setting the meta-data for an IPLDBlock (done only once per IPLD block).
|
||||||
data.meta = {
|
data.meta = {
|
||||||
id: contractAddress,
|
id: contractAddress,
|
||||||
kind,
|
kind,
|
||||||
@ -369,7 +440,15 @@ export class IPLDIndexer extends Indexer {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
res = await this._ipldDb.saveOrUpdateIPLDBlock(dbTx, ipldBlock);
|
res = await this._ipldDb.saveOrUpdateIPLDBlock(dbTx, ipldBlock);
|
||||||
|
|
||||||
await dbTx.commitTransaction();
|
await dbTx.commitTransaction();
|
||||||
|
|
||||||
|
// Get IPLD status for the contract.
|
||||||
|
const ipldStatus = this._ipldStatusMap[res.contractAddress];
|
||||||
|
assert(ipldStatus, `IPLD status for contract ${res.contractAddress} not found`);
|
||||||
|
|
||||||
|
// Update the IPLD status for the kind.
|
||||||
|
ipldStatus[res.kind] = res.block.blockNumber;
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
await dbTx.rollbackTransaction();
|
await dbTx.rollbackTransaction();
|
||||||
throw error;
|
throw error;
|
||||||
@ -380,7 +459,7 @@ export class IPLDIndexer extends Indexer {
|
|||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
async removeIPLDBlocks (blockNumber: number, kind: string): Promise<void> {
|
async removeIPLDBlocks (blockNumber: number, kind: StateKind): Promise<void> {
|
||||||
const dbTx = await this._db.createTransactionRunner();
|
const dbTx = await this._db.createTransactionRunner();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@ -393,4 +472,28 @@ export class IPLDIndexer extends Indexer {
|
|||||||
await dbTx.release();
|
await dbTx.release();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fetchIPLDStatus (): Promise<void> {
|
||||||
|
const contracts = Object.values(this._watchedContracts);
|
||||||
|
|
||||||
|
for (const contract of contracts) {
|
||||||
|
const initIPLDBlock = await this._ipldDb.getLatestIPLDBlock(contract.address, StateKind.Init);
|
||||||
|
const diffIPLDBlock = await this._ipldDb.getLatestIPLDBlock(contract.address, StateKind.Diff);
|
||||||
|
const diffStagedIPLDBlock = await this._ipldDb.getLatestIPLDBlock(contract.address, StateKind.DiffStaged);
|
||||||
|
const checkpointIPLDBlock = await this._ipldDb.getLatestIPLDBlock(contract.address, StateKind.Checkpoint);
|
||||||
|
|
||||||
|
this._ipldStatusMap[contract.address] = {
|
||||||
|
init: initIPLDBlock?.block.blockNumber,
|
||||||
|
diff: diffIPLDBlock?.block.blockNumber,
|
||||||
|
diff_staged: diffStagedIPLDBlock?.block.blockNumber,
|
||||||
|
checkpoint: checkpointIPLDBlock?.block.blockNumber
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async updateIPLDStatusMap (address: string, ipldStatus: IpldStatus): Promise<void> {
|
||||||
|
// Get and update IPLD status for the contract.
|
||||||
|
const ipldStatusOld = this._ipldStatusMap[address];
|
||||||
|
this._ipldStatusMap[address] = _.merge(ipldStatusOld, ipldStatus);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -6,6 +6,13 @@ import { Connection, DeepPartial, FindConditions, FindManyOptions, QueryRunner }
|
|||||||
|
|
||||||
import { Where, QueryOptions } from './database';
|
import { Where, QueryOptions } from './database';
|
||||||
|
|
||||||
|
export enum StateKind {
|
||||||
|
Diff = 'diff',
|
||||||
|
Init = 'init',
|
||||||
|
DiffStaged = 'diff_staged',
|
||||||
|
Checkpoint = 'checkpoint'
|
||||||
|
}
|
||||||
|
|
||||||
export interface BlockProgressInterface {
|
export interface BlockProgressInterface {
|
||||||
id: number;
|
id: number;
|
||||||
cid: string;
|
cid: string;
|
||||||
@ -61,7 +68,7 @@ export interface IPLDBlockInterface {
|
|||||||
block: BlockProgressInterface;
|
block: BlockProgressInterface;
|
||||||
contractAddress: string;
|
contractAddress: string;
|
||||||
cid: string;
|
cid: string;
|
||||||
kind: string;
|
kind: StateKind;
|
||||||
data: Buffer;
|
data: Buffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -89,8 +96,8 @@ export interface IndexerInterface {
|
|||||||
watchContract?: (address: string, kind: string, checkpoint: boolean, startingBlock: number) => Promise<void>
|
watchContract?: (address: string, kind: string, checkpoint: boolean, startingBlock: number) => Promise<void>
|
||||||
getEntityTypesMap?: () => Map<string, { [key: string]: string }>
|
getEntityTypesMap?: () => Map<string, { [key: string]: string }>
|
||||||
createDiffStaged?: (contractAddress: string, blockHash: string, data: any) => Promise<void>
|
createDiffStaged?: (contractAddress: string, blockHash: string, data: any) => Promise<void>
|
||||||
createInitialState?: (contractAddress: string, blockHash: string) => Promise<any>
|
processInitialState?: (contractAddress: string, blockHash: string) => Promise<any>
|
||||||
createStateCheckpoint?: (contractAddress: string, blockHash: string) => Promise<boolean>
|
processStateCheckpoint?: (contractAddress: string, blockHash: string) => Promise<boolean>
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface EventWatcherInterface {
|
export interface EventWatcherInterface {
|
||||||
@ -126,11 +133,11 @@ export interface DatabaseInterface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export interface IPLDDatabaseInterface extends DatabaseInterface {
|
export interface IPLDDatabaseInterface extends DatabaseInterface {
|
||||||
getLatestIPLDBlock (contractAddress: string, kind: string | null, blockNumber?: number): Promise<IPLDBlockInterface | undefined>
|
getLatestIPLDBlock (contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise<IPLDBlockInterface | undefined>
|
||||||
getIPLDBlocks (where: FindConditions<IPLDBlockInterface>): Promise<IPLDBlockInterface[]>
|
getIPLDBlocks (where: FindConditions<IPLDBlockInterface>): Promise<IPLDBlockInterface[]>
|
||||||
getDiffIPLDBlocksByBlocknumber (contractAddress: string, blockNumber: number): Promise<IPLDBlockInterface[]>
|
getDiffIPLDBlocksByBlocknumber (contractAddress: string, blockNumber: number): Promise<IPLDBlockInterface[]>
|
||||||
getNewIPLDBlock (): IPLDBlockInterface
|
getNewIPLDBlock (): IPLDBlockInterface
|
||||||
removeIPLDBlocks(dbTx: QueryRunner, blockNumber: number, kind: string): Promise<void>
|
removeIPLDBlocks(dbTx: QueryRunner, blockNumber: number, kind: StateKind): Promise<void>
|
||||||
saveOrUpdateIPLDBlock (dbTx: QueryRunner, ipldBlock: IPLDBlockInterface): Promise<IPLDBlockInterface>
|
saveOrUpdateIPLDBlock (dbTx: QueryRunner, ipldBlock: IPLDBlockInterface): Promise<IPLDBlockInterface>
|
||||||
getHookStatus (): Promise<HookStatusInterface | undefined>
|
getHookStatus (): Promise<HookStatusInterface | undefined>
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user