Fixes and improvements for eden-watcher job-runner and compare CLI (#165)

* Compare IPLD state entity without derived fields

* Apply default limit to array relation fields in IPLD state entity

* Mark block as complete after processing of block handler

* Avoid re processing of block handler

* Use LIMIT 1 in the query to get latest IPLD block

* Replace eth_calls in eden-watcher with getStorageValue

* Add checkpoint verification to export state CLI

* Fix get diff blocks query when creating checkpoint

* Fix subgraph staker sort and remove entities sequentially in reset CLI

Co-authored-by: prathamesh0 <prathamesh.musale0@gmail.com>
This commit is contained in:
nikugogoi 2022-08-26 12:02:39 +05:30 committed by GitHub
parent a5b3c7942d
commit 97e88ab5f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 327 additions and 132 deletions

View File

@ -102,10 +102,10 @@ export class Database implements IPLDDatabaseInterface {
}
// Fetch all diff IPLDBlocks after the specified block number.
async getDiffIPLDBlocksByBlocknumber (contractAddress: string, blockNumber: number): Promise<IPLDBlock[]> {
async getDiffIPLDBlocksInRange (contractAddress: string, startBlock: number, endBlock: number): Promise<IPLDBlock[]> {
const repo = this._conn.getRepository(IPLDBlock);
return this._baseDatabase.getDiffIPLDBlocksByBlocknumber(repo, contractAddress, blockNumber);
return this._baseDatabase.getDiffIPLDBlocksInRange(repo, contractAddress, startBlock, endBlock);
}
async saveOrUpdateIPLDBlock (dbTx: QueryRunner, ipldBlock: IPLDBlock): Promise<IPLDBlock> {

View File

@ -9,7 +9,17 @@ import debug from 'debug';
import fs from 'fs';
import path from 'path';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, StateKind } from '@vulcanize/util';
import {
Config,
DEFAULT_CONFIG_PATH,
getConfig,
initClients,
JobQueue,
{{#if (subgraphPath)}}
verifyCheckpointData,
{{/if}}
StateKind
} from '@vulcanize/util';
{{#if (subgraphPath)}}
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
{{/if}}
@ -36,6 +46,20 @@ const main = async (): Promise<void> => {
alias: 'o',
type: 'string',
describe: 'Export file path'
},
{{#if (subgraphPath)}}
verify: {
alias: 'v',
type: 'boolean',
describe: 'Verify checkpoint',
default: true
},
{{/if}}
createCheckpoint: {
alias: 'c',
type: 'boolean',
describe: 'Create new checkpoint',
default: false
}
}).argv;
@ -98,13 +122,24 @@ const main = async (): Promise<void> => {
// Create and export checkpoint if checkpointing is on for the contract.
if (contract.checkpoint) {
await indexer.createCheckpoint(contract.address, block.blockHash);
if (argv.createCheckpoint) {
log(`Creating checkpoint at block ${block.blockNumber}`);
await indexer.createCheckpoint(contract.address, block.blockHash);
}
const ipldBlock = await indexer.getLatestIPLDBlock(contract.address, StateKind.Checkpoint, block.blockNumber);
assert(ipldBlock);
const data = indexer.getIPLDData(ipldBlock);
{{#if (subgraphPath)}}
if (argv.verify) {
log(`Verifying checkpoint data for contract ${contract.address}`);
await verifyCheckpointData(graphDb, ipldBlock.block, data);
log('Checkpoint data verified');
}
{{/if}}
if (indexer.isIPFSConfigured()) {
await indexer.pushToIPFS(data);
}

View File

@ -9,7 +9,7 @@ import debug from 'debug';
import fs from 'fs';
import path from 'path';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, StateKind } from '@vulcanize/util';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, StateKind, verifyCheckpointData } from '@vulcanize/util';
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
import * as codec from '@ipld/dag-cbor';
@ -34,6 +34,18 @@ const main = async (): Promise<void> => {
alias: 'o',
type: 'string',
describe: 'Export file path'
},
createCheckpoint: {
alias: 'c',
type: 'boolean',
describe: 'Create new checkpoint',
default: false
},
verify: {
alias: 'v',
type: 'boolean',
describe: 'Verify checkpoint',
default: true
}
}).argv;
@ -92,13 +104,22 @@ const main = async (): Promise<void> => {
// Create and export checkpoint if checkpointing is on for the contract.
if (contract.checkpoint) {
await indexer.createCheckpoint(contract.address, block.blockHash);
if (argv.createCheckpoint) {
log(`Creating checkpoint at block ${block.blockNumber}`);
await indexer.createCheckpoint(contract.address, block.blockHash);
}
const ipldBlock = await indexer.getLatestIPLDBlock(contract.address, StateKind.Checkpoint, block.blockNumber);
assert(ipldBlock);
const data = indexer.getIPLDData(ipldBlock);
if (argv.verify) {
log(`Verifying checkpoint data for contract ${contract.address}`);
await verifyCheckpointData(graphDb, ipldBlock.block, data);
log('Checkpoint data verified');
}
if (indexer.isIPFSConfigured()) {
await indexer.pushToIPFS(data);
}

View File

@ -83,11 +83,9 @@ export const handler = async (argv: any): Promise<void> => {
try {
const entities = [BlockProgress, Producer, ProducerSet, ProducerSetChange, ProducerRewardCollectorChange, RewardScheduleEntry, RewardSchedule, ProducerEpoch, Block, Epoch, SlotClaim, Slot, Staker, Network, Distributor, Distribution, Claim, Slash, Account];
const removeEntitiesPromise = entities.map(async entityClass => {
return db.removeEntities<any>(dbTx, entityClass, { blockNumber: MoreThan(argv.blockNumber) });
});
await Promise.all(removeEntitiesPromise);
for (const entity of entities) {
await db.removeEntities<any>(dbTx, entity, { blockNumber: MoreThan(argv.blockNumber) });
}
const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');

View File

@ -62,10 +62,10 @@ export class Database implements IPLDDatabaseInterface {
}
// Fetch all diff IPLDBlocks after the specified block number.
async getDiffIPLDBlocksByBlocknumber (contractAddress: string, blockNumber: number): Promise<IPLDBlock[]> {
async getDiffIPLDBlocksInRange (contractAddress: string, startblock: number, endBlock: number): Promise<IPLDBlock[]> {
const repo = this._conn.getRepository(IPLDBlock);
return this._baseDatabase.getDiffIPLDBlocksByBlocknumber(repo, contractAddress, blockNumber);
return this._baseDatabase.getDiffIPLDBlocksInRange(repo, contractAddress, startblock, endBlock);
}
async saveOrUpdateIPLDBlock (dbTx: QueryRunner, ipldBlock: IPLDBlock): Promise<IPLDBlock> {

View File

@ -60,14 +60,14 @@ export async function createStateCheckpoint (indexer: Indexer, contractAddress:
// Fetch the latest 'checkpoint' | 'init' for the contract to fetch diffs after it.
let prevNonDiffBlock: IPLDBlockInterface;
let getDiffBlockNumber: number;
const checkpointBlock = await indexer.getLatestIPLDBlock(contractAddress, StateKind.Checkpoint, block.blockNumber);
let diffStartBlockNumber: number;
const checkpointBlock = await indexer.getLatestIPLDBlock(contractAddress, StateKind.Checkpoint, block.blockNumber - 1);
if (checkpointBlock) {
const checkpointBlockNumber = checkpointBlock.block.blockNumber;
prevNonDiffBlock = checkpointBlock;
getDiffBlockNumber = checkpointBlockNumber;
diffStartBlockNumber = checkpointBlockNumber;
// Update IPLD status map with the latest checkpoint info.
// Essential while importing state as checkpoint at the snapshot block is added by import-state CLI.
@ -80,11 +80,11 @@ export async function createStateCheckpoint (indexer: Indexer, contractAddress:
prevNonDiffBlock = initBlock;
// Take block number previous to initial state block to include any diff state at that block.
getDiffBlockNumber = initBlock.block.blockNumber - 1;
diffStartBlockNumber = initBlock.block.blockNumber - 1;
}
// Fetching all diff blocks after the latest 'checkpoint' | 'init'.
const diffBlocks = await indexer.getDiffIPLDBlocksByBlocknumber(contractAddress, getDiffBlockNumber);
const diffBlocks = await indexer.getDiffIPLDBlocksInRange(contractAddress, diffStartBlockNumber, block.blockNumber);
const prevNonDiffBlockData = codec.decode(Buffer.from(prevNonDiffBlock.data)) as any;
const data = {

View File

@ -303,8 +303,8 @@ export class Indexer implements IPLDIndexerInterface {
return this._baseIndexer.getIPLDBlockByCid(cid);
}
async getDiffIPLDBlocksByBlocknumber (contractAddress: string, blockNumber: number): Promise<IPLDBlock[]> {
return this._db.getDiffIPLDBlocksByBlocknumber(contractAddress, blockNumber);
async getDiffIPLDBlocksInRange (contractAddress: string, startBlock: number, endBlock: number): Promise<IPLDBlock[]> {
return this._db.getDiffIPLDBlocksInRange(contractAddress, startBlock, endBlock);
}
getIPLDData (ipldBlock: IPLDBlock): any {

View File

@ -56,11 +56,9 @@ export const handler = async (argv: any): Promise<void> => {
const dbTx = await db.createTransactionRunner();
try {
const removeEntitiesPromise = [BlockProgress, Allowance, Balance].map(async entityClass => {
return db.removeEntities<any>(dbTx, entityClass, { blockNumber: MoreThan(argv.blockNumber) });
});
await Promise.all(removeEntitiesPromise);
for (const entity of [BlockProgress, Allowance, Balance]) {
await db.removeEntities<any>(dbTx, entity, { blockNumber: MoreThan(argv.blockNumber) });
}
if (syncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) {
await indexer.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true);

View File

@ -33,6 +33,12 @@ const main = async (): Promise<void> => {
alias: 'o',
type: 'string',
describe: 'Export file path'
},
createCheckpoint: {
alias: 'c',
type: 'boolean',
describe: 'Create new checkpoint',
default: false
}
}).argv;
@ -83,7 +89,10 @@ const main = async (): Promise<void> => {
// Create and export checkpoint if checkpointing is on for the contract.
if (contract.checkpoint) {
await indexer.createCheckpoint(contract.address, block.blockHash);
if (argv.createCheckpoint) {
log(`Creating checkpoint at block ${block.blockNumber}`);
await indexer.createCheckpoint(contract.address, block.blockHash);
}
const ipldBlock = await indexer.getLatestIPLDBlock(contract.address, StateKind.Checkpoint, block.blockNumber);
assert(ipldBlock);

View File

@ -70,11 +70,9 @@ export const handler = async (argv: any): Promise<void> => {
try {
const entities = [BlockProgress, SupportsInterface, BalanceOf, OwnerOf, GetApproved, IsApprovedForAll, Name, Symbol, TokenURI, _Name, _Symbol, _Owners, _Balances, _TokenApprovals, _OperatorApprovals];
const removeEntitiesPromise = entities.map(async entityClass => {
return db.removeEntities<any>(dbTx, entityClass, { blockNumber: MoreThan(argv.blockNumber) });
});
await Promise.all(removeEntitiesPromise);
for (const entity of entities) {
await db.removeEntities<any>(dbTx, entity, { blockNumber: MoreThan(argv.blockNumber) });
}
const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');

View File

@ -323,10 +323,10 @@ export class Database implements IPLDDatabaseInterface {
}
// Fetch all diff IPLDBlocks after the specified block number.
async getDiffIPLDBlocksByBlocknumber (contractAddress: string, blockNumber: number): Promise<IPLDBlock[]> {
async getDiffIPLDBlocksInRange (contractAddress: string, startBlock: number, endBlock: number): Promise<IPLDBlock[]> {
const repo = this._conn.getRepository(IPLDBlock);
return this._baseDatabase.getDiffIPLDBlocksByBlocknumber(repo, contractAddress, blockNumber);
return this._baseDatabase.getDiffIPLDBlocksInRange(repo, contractAddress, startBlock, endBlock);
}
async saveOrUpdateIPLDBlock (dbTx: QueryRunner, ipldBlock: IPLDBlock): Promise<IPLDBlock> {

View File

@ -5,14 +5,14 @@
[queries]
queryDir = "../graph-test-watcher/src/gql/queries"
names = []
idsEndpoint = "gqlEndpoint1"
blockDelayInMs = 250
[watcher]
configpath = "../../graph-test-watcher/environments/local.toml"
configPath = "../../graph-test-watcher/environments/local.toml"
entitiesDir = "../../graph-test-watcher/src/entity"
endpoint = "gqlEndpoint2"
verifyState = true
derivedFields = []
[cache]
endpoint = "gqlEndpoint1"

View File

@ -9,11 +9,10 @@ import path from 'path';
import assert from 'assert';
import { SnakeNamingStrategy } from 'typeorm-naming-strategies';
import _ from 'lodash';
import omitDeep from 'omit-deep';
import { getConfig as getWatcherConfig, wait } from '@vulcanize/util';
import { GraphQLClient } from '@vulcanize/ipld-eth-client';
import { compareObjects, compareQuery, Config, getBlockIPLDState as getIPLDStateByBlock, getClients, getConfig } from './utils';
import { checkEntityInIPLDState, compareQuery, Config, getBlockIPLDState as getIPLDStateByBlock, getClients, getConfig } from './utils';
import { Database } from '../../database';
import { getSubgraphConfig } from '../../utils';
@ -130,7 +129,12 @@ export const main = async (): Promise<void> => {
);
if (config.watcher.verifyState) {
await checkEntityInIPLDState(ipldStateByBlock, queryName, result, id, rawJson);
const ipldDiff = await checkEntityInIPLDState(ipldStateByBlock, queryName, result, id, rawJson, config.watcher.derivedFields);
if (ipldDiff) {
log('Results mismatch for IPLD state:', ipldDiff);
diffFound = true;
}
}
if (diff) {
@ -167,22 +171,3 @@ export const main = async (): Promise<void> => {
process.exit(1);
}
};
const checkEntityInIPLDState = async (
ipldState: {[key: string]: any},
queryName: string,
entityResult: {[key: string]: any},
id: string,
rawJson: boolean
) => {
const entityName = _.startCase(queryName);
const ipldEntity = ipldState[entityName][id];
// Filter __typename key in GQL result.
const resultEntity = omitDeep(entityResult[queryName], '__typename');
const diff = compareObjects(ipldEntity, resultEntity, rawJson);
if (diff) {
log('Results mismatch for IPLD state:', diff);
}
};

View File

@ -9,12 +9,14 @@ import toml from 'toml';
import fs from 'fs-extra';
import { diffString, diff } from 'json-diff';
import _ from 'lodash';
import omitDeep from 'omit-deep';
import { Config as CacheConfig, getCache } from '@vulcanize/cache';
import { GraphQLClient } from '@vulcanize/ipld-eth-client';
import { gql } from '@apollo/client/core';
import { Client } from './client';
import { DEFAULT_LIMIT } from '../../database';
const IPLD_STATE_QUERY = `
query getState($blockHash: String!, $contractAddress: String!, $kind: String){
@ -36,6 +38,11 @@ interface QueryConfig {
blockDelayInMs: number;
}
interface EntityDerivedFields {
entity: string;
fields: string[];
}
export interface Config {
endpoints: EndpointConfig;
queries: QueryConfig;
@ -44,6 +51,7 @@ export interface Config {
entitiesDir: string;
verifyState: boolean;
endpoint: keyof EndpointConfig;
derivedFields: EntityDerivedFields[]
}
cache: {
endpoint: keyof EndpointConfig;
@ -154,6 +162,25 @@ export const getBlockIPLDState = async (client: GraphQLClient, contracts: string
if (getState) {
const data = JSON.parse(getState.data);
// Apply default limit on array type relation fields.
Object.values(data.state)
.forEach((idEntityMap: any) => {
Object.values(idEntityMap)
.forEach((entity: any) => {
Object.values(entity)
.forEach(fieldValue => {
if (
Array.isArray(fieldValue) &&
fieldValue.length &&
fieldValue[0].id
) {
fieldValue.splice(DEFAULT_LIMIT);
}
});
});
});
return data.state;
}
@ -163,7 +190,35 @@ export const getBlockIPLDState = async (client: GraphQLClient, contracts: string
return contractIPLDStates.reduce((acc, state) => _.merge(acc, state));
};
export const compareObjects = (obj1: any, obj2: any, rawJson: boolean): string => {
export const checkEntityInIPLDState = async (
ipldState: {[key: string]: any},
queryName: string,
entityResult: {[key: string]: any},
id: string,
rawJson: boolean,
derivedFields: EntityDerivedFields[] = []
): Promise<string> => {
const entityName = _.upperFirst(queryName);
const ipldEntity = ipldState[entityName][id];
// Filter __typename key in GQL result.
const resultEntity = omitDeep(entityResult[queryName], '__typename');
// Filter derived fields in GQL result.
derivedFields.forEach(({ entity, fields }) => {
if (entityName === entity) {
fields.forEach(field => {
delete resultEntity[field];
});
}
});
const diff = compareObjects(ipldEntity, resultEntity, rawJson);
return diff;
};
const compareObjects = (obj1: any, obj2: any, rawJson: boolean): string => {
if (rawJson) {
const diffObj = diff(obj1, obj2);

View File

@ -19,7 +19,7 @@ import {
import { Block, fromEntityValue, toEntityValue } from './utils';
const DEFAULT_LIMIT = 100;
export const DEFAULT_LIMIT = 100;
export class Database {
_config: ConnectionOptions

View File

@ -77,7 +77,8 @@ xdescribe('eden wasm loader tests', async () => {
},
dataSource: {
address: contractAddress,
network: 'mainnet'
network: 'mainnet',
name: 'EdenNetwork'
}
};
@ -197,7 +198,8 @@ xdescribe('eden wasm loader tests', async () => {
},
dataSource: {
address: contractAddress,
network: 'mainnet'
network: 'mainnet',
name: 'EdenNetworkDistribution'
}
};
@ -313,7 +315,8 @@ xdescribe('eden wasm loader tests', async () => {
},
dataSource: {
address: contractAddress,
network: 'mainnet'
network: 'mainnet',
name: 'EdenNetworkGovernance'
}
};

View File

@ -29,7 +29,8 @@ xdescribe('eth-call wasm tests', () => {
},
dataSource: {
address: contractAddress,
network: 'mainnet'
network: 'mainnet',
name: 'Example1'
}
};

View File

@ -15,7 +15,7 @@ import debug from 'debug';
import { BaseProvider } from '@ethersproject/providers';
import loader from '@vulcanize/assemblyscript/lib/loader';
import { IndexerInterface, GraphDecimal, getGraphDigitsAndExp } from '@vulcanize/util';
import { IndexerInterface, GraphDecimal, getGraphDigitsAndExp, jsonBigIntStringReplacer } from '@vulcanize/util';
import { TypeId, Level } from './types';
import {
@ -25,8 +25,7 @@ import {
resolveEntityFieldConflicts,
getEthereumTypes,
jsonFromBytes,
getStorageValueType,
jsonBigIntStringReplacer
getStorageValueType
} from './utils';
import { Database } from './database';
@ -41,6 +40,7 @@ export interface GraphData {
abis?: {[key: string]: ContractInterface};
dataSource: {
network: string;
name: string;
};
}
@ -261,10 +261,9 @@ export const instantiate = async (
return toEthereumValue(instanceExports, utils.ParamType.from(typesString), decoded);
},
'ethereum.storageValue': async (contractName: number, contractAddress: number, variable: number, mappingKeys: number) => {
const contractNameString = __getString(contractName);
const address = await Address.wrap(contractAddress);
const addressStringPtr = await address.toHexString();
'ethereum.storageValue': async (variable: number, mappingKeys: number) => {
assert(context.contractAddress);
const addressStringPtr = await __newString(context.contractAddress);
const addressString = __getString(addressStringPtr);
const variableString = __getString(variable);
@ -276,7 +275,7 @@ export const instantiate = async (
});
const mappingKeyValues = await Promise.all(mappingKeyPromises);
const storageLayout = indexer.storageLayoutMap.get(contractNameString);
const storageLayout = indexer.storageLayoutMap.get(dataSource.name);
assert(storageLayout);
assert(context.block);

View File

@ -30,7 +30,8 @@ xdescribe('storage-call wasm tests', () => {
},
dataSource: {
address: contractAddress,
network: 'mainnet'
network: 'mainnet',
name: 'Example1'
}
};

View File

@ -798,11 +798,3 @@ const getEthereumType = (storageTypes: StorageLayout['types'], type: string, map
return utils.ParamType.from(label);
};
export const jsonBigIntStringReplacer = (_: string, value: any) => {
if (typeof value === 'bigint') {
return value.toString();
}
return value;
};

View File

@ -52,7 +52,7 @@ export class GraphWatcher {
// Create wasm instance and contract interface for each dataSource and template in subgraph yaml.
const dataPromises = this._dataSources.map(async (dataSource: any) => {
const { source: { abi }, mapping, network } = dataSource;
const { source: { abi }, mapping, network, name } = dataSource;
const { abis, file } = mapping;
const abisMap = abis.reduce((acc: {[key: string]: ContractInterface}, abi: any) => {
@ -68,7 +68,8 @@ export class GraphWatcher {
const data = {
abis: abisMap,
dataSource: {
network
network,
name
}
};

View File

@ -10,7 +10,7 @@
"deploy-local": "graph deploy --node http://localhost:8020/ --ipfs http://localhost:5001 example1"
},
"dependencies": {
"@graphprotocol/graph-ts": "npm:@vulcanize/graph-ts@0.22.1",
"@graphprotocol/graph-ts": "npm:@vulcanize/graph-ts@0.22.2",
"@vulcanize/graph-cli": "0.22.5"
}
}

View File

@ -165,7 +165,7 @@ export function testGetStorageValue (): void {
// Bind the contract to the address.
const contractAddress = dataSource.address();
const contract = Example1.bind(contractAddress);
const res = contract.getStorageValue('_test', []);
const res = ethereum.getStorageValue('_test', []);
log.debug('Storage call result: {}', [res!.toBigInt().toString()]);
}
@ -176,7 +176,7 @@ export function testMapStorageValue (): void {
const contractAddress = dataSource.address();
const contract = Example1.bind(contractAddress);
const addressValue = ethereum.Value.fromAddress(Address.zero());
const res = contract.getStorageValue('addressUintMap', [addressValue]);
const res = ethereum.getStorageValue('addressUintMap', [addressValue]);
log.debug('Storage call result: {}', [res!.toBigInt().toString()]);
}

View File

@ -23,10 +23,10 @@
chalk "^2.0.0"
js-tokens "^4.0.0"
"@graphprotocol/graph-ts@npm:@vulcanize/graph-ts@0.22.1":
version "0.22.1"
resolved "https://npm.pkg.github.com/download/@vulcanize/graph-ts/0.22.1/7a14baaab8b99d4a88e19620dc7200aa501fbecf#7a14baaab8b99d4a88e19620dc7200aa501fbecf"
integrity sha512-0CoKeFezskYjAsLmqfdxmS7q+gWy1V1wFgiNB4tMJSa2EiPTVG62qlPKkqTduApK2gZX9//rmE5Vb2xcF/v2+w==
"@graphprotocol/graph-ts@npm:@vulcanize/graph-ts@0.22.2":
version "0.22.2"
resolved "https://npm.pkg.github.com/download/@vulcanize/graph-ts/0.22.2/a403a4ef6a5742246c4a1c97695a2f55943eb3a7#a403a4ef6a5742246c4a1c97695a2f55943eb3a7"
integrity sha512-Fscv1owyoeAkS9QsLGXOalMZlb3j0Ge22z+wmpqA6zJHRiSUyyIyiarSz6e0ZTs761oFqqvt00dR6A/4xxf40A==
dependencies:
assemblyscript "0.19.10"

View File

@ -9,7 +9,7 @@ import debug from 'debug';
import fs from 'fs';
import path from 'path';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, StateKind } from '@vulcanize/util';
import { Config, DEFAULT_CONFIG_PATH, getConfig, initClients, JobQueue, StateKind, verifyCheckpointData } from '@vulcanize/util';
import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
import * as codec from '@ipld/dag-cbor';
@ -34,6 +34,18 @@ const main = async (): Promise<void> => {
alias: 'o',
type: 'string',
describe: 'Export file path'
},
createCheckpoint: {
alias: 'c',
type: 'boolean',
describe: 'Create new checkpoint',
default: false
},
verify: {
alias: 'v',
type: 'boolean',
describe: 'Verify checkpoint',
default: true
}
}).argv;
@ -92,13 +104,22 @@ const main = async (): Promise<void> => {
// Create and export checkpoint if checkpointing is on for the contract.
if (contract.checkpoint) {
await indexer.createCheckpoint(contract.address, block.blockHash);
if (argv.createCheckpoint) {
log(`Creating checkpoint at block ${block.blockNumber}`);
await indexer.createCheckpoint(contract.address, block.blockHash);
}
const ipldBlock = await indexer.getLatestIPLDBlock(contract.address, StateKind.Checkpoint, block.blockNumber);
assert(ipldBlock);
const data = indexer.getIPLDData(ipldBlock);
if (argv.verify) {
log(`Verifying checkpoint data for contract ${contract.address}`);
await verifyCheckpointData(graphDb, ipldBlock.block, data);
log('Checkpoint data verified');
}
if (indexer.isIPFSConfigured()) {
await indexer.pushToIPFS(data);
}

View File

@ -71,11 +71,9 @@ export const handler = async (argv: any): Promise<void> => {
try {
const entities = [BlockProgress, GetMethod, _Test, Author, Category, Blog];
const removeEntitiesPromise = entities.map(async entityClass => {
return db.removeEntities<any>(dbTx, entityClass, { blockNumber: MoreThan(argv.blockNumber) });
});
await Promise.all(removeEntitiesPromise);
for (const entity of entities) {
await db.removeEntities<any>(dbTx, entity, { blockNumber: MoreThan(argv.blockNumber) });
}
const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');

View File

@ -96,10 +96,10 @@ export class Database implements IPLDDatabaseInterface {
}
// Fetch all diff IPLDBlocks after the specified block number.
async getDiffIPLDBlocksByBlocknumber (contractAddress: string, blockNumber: number): Promise<IPLDBlock[]> {
async getDiffIPLDBlocksInRange (contractAddress: string, startBlock: number, endBlock: number): Promise<IPLDBlock[]> {
const repo = this._conn.getRepository(IPLDBlock);
return this._baseDatabase.getDiffIPLDBlocksByBlocknumber(repo, contractAddress, blockNumber);
return this._baseDatabase.getDiffIPLDBlocksInRange(repo, contractAddress, startBlock, endBlock);
}
async saveOrUpdateIPLDBlock (dbTx: QueryRunner, ipldBlock: IPLDBlock): Promise<IPLDBlock> {

View File

@ -33,6 +33,12 @@ const main = async (): Promise<void> => {
alias: 'o',
type: 'string',
describe: 'Export file path'
},
createCheckpoint: {
alias: 'c',
type: 'boolean',
describe: 'Create new checkpoint',
default: false
}
}).argv;
@ -83,7 +89,10 @@ const main = async (): Promise<void> => {
// Create and export checkpoint if checkpointing is on for the contract.
if (contract.checkpoint) {
await indexer.createCheckpoint(contract.address, block.blockHash);
if (argv.createCheckpoint) {
log(`Creating checkpoint at block ${block.blockNumber}`);
await indexer.createCheckpoint(contract.address, block.blockHash);
}
const ipldBlock = await indexer.getLatestIPLDBlock(contract.address, StateKind.Checkpoint, block.blockNumber);
assert(ipldBlock);

View File

@ -61,11 +61,9 @@ export const handler = async (argv: any): Promise<void> => {
try {
const entities = [BlockProgress, MultiNonce, _Owner, IsRevoked, IsPhisher, IsMember];
const removeEntitiesPromise = entities.map(async entityClass => {
return db.removeEntities<any>(dbTx, entityClass, { blockNumber: MoreThan(argv.blockNumber) });
});
await Promise.all(removeEntitiesPromise);
for (const entity of entities) {
await db.removeEntities<any>(dbTx, entity, { blockNumber: MoreThan(argv.blockNumber) });
}
const syncStatus = await indexer.getSyncStatus();
assert(syncStatus, 'Missing syncStatus');

View File

@ -155,10 +155,10 @@ export class Database implements IPLDDatabaseInterface {
}
// Fetch all diff IPLDBlocks after the specified block number.
async getDiffIPLDBlocksByBlocknumber (contractAddress: string, blockNumber: number): Promise<IPLDBlock[]> {
async getDiffIPLDBlocksInRange (contractAddress: string, startBlock: number, endBlock: number): Promise<IPLDBlock[]> {
const repo = this._conn.getRepository(IPLDBlock);
return this._baseDatabase.getDiffIPLDBlocksByBlocknumber(repo, contractAddress, blockNumber);
return this._baseDatabase.getDiffIPLDBlocksInRange(repo, contractAddress, startBlock, endBlock);
}
async saveOrUpdateIPLDBlock (dbTx: QueryRunner, ipldBlock: IPLDBlock): Promise<IPLDBlock> {

View File

@ -111,7 +111,7 @@ export const processBlockByNumber = async (
*/
export const processBatchEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<void> => {
// Check if block processing is complete.
while (!block.isComplete) {
while (block.numProcessedEvents < block.numEvents) {
console.time('time:common#processBacthEvents-fetching_events_batch');
// Fetch events in batches
@ -193,6 +193,11 @@ export const processBatchEvents = async (indexer: IndexerInterface, block: Block
}
if (indexer.processBlockAfterEvents) {
await indexer.processBlockAfterEvents(block.blockHash);
if (!block.isComplete || block.numEvents === 0) {
await indexer.processBlockAfterEvents(block.blockHash);
}
}
block.isComplete = true;
await indexer.updateBlockProgress(block, block.lastProcessedEventIndex);
};

View File

@ -179,22 +179,17 @@ export class Database {
block.lastProcessedEventIndex = lastProcessedEventIndex;
block.numProcessedEvents++;
if (block.numProcessedEvents >= block.numEvents) {
block.isComplete = true;
}
const { generatedMaps } = await repo.createQueryBuilder()
.update()
.set(block)
.where('id = :id', { id: block.id })
.whereEntity(block)
.returning('*')
.execute();
block = generatedMaps[0] as BlockProgressInterface;
}
return block;
const { generatedMaps } = await repo.createQueryBuilder()
.update()
.set(block)
.where('id = :id', { id: block.id })
.whereEntity(block)
.returning('*')
.execute();
return generatedMaps[0] as BlockProgressInterface;
}
async markBlocksAsPruned (repo: Repository<BlockProgressInterface>, blocks: BlockProgressInterface[]): Promise<void> {

View File

@ -2,7 +2,7 @@
// Copyright 2021 Vulcanize, Inc.
//
import { FindConditions, MoreThan, Repository } from 'typeorm';
import { Between, FindConditions, Repository } from 'typeorm';
import assert from 'assert';
import { IPLDBlockInterface, IpldStatusInterface, StateKind } from './types';
@ -28,6 +28,9 @@ export class IPLDDatabase extends Database {
: queryBuilder.andWhere('ipld_block.kind != :kind', { kind: StateKind.DiffStaged })
.addOrderBy('ipld_block.id', 'DESC');
// Get the first entry.
queryBuilder.limit(1);
return queryBuilder.getOne();
}
@ -102,6 +105,9 @@ export class IPLDDatabase extends Database {
? queryBuilder.andWhere('ipld_block.kind = :kind', { kind })
: queryBuilder.addOrderBy('ipld_block.id', 'DESC');
// Get the first entry.
queryBuilder.limit(1);
result = await queryBuilder.getOne();
}
@ -112,7 +118,7 @@ export class IPLDDatabase extends Database {
return repo.find({ where, relations: ['block'] });
}
async getDiffIPLDBlocksByBlocknumber (repo: Repository<IPLDBlockInterface>, contractAddress: string, blockNumber: number): Promise<IPLDBlockInterface[]> {
async getDiffIPLDBlocksInRange (repo: Repository<IPLDBlockInterface>, contractAddress: string, startblock: number, endBlock: number): Promise<IPLDBlockInterface[]> {
return repo.find({
relations: ['block'],
where: {
@ -120,7 +126,7 @@ export class IPLDDatabase extends Database {
kind: StateKind.Diff,
block: {
isPruned: false,
blockNumber: MoreThan(blockNumber)
blockNumber: Between(startblock + 1, endBlock)
}
},
order: {

View File

@ -1,4 +1,10 @@
import _ from 'lodash';
import debug from 'debug';
import { BlockProgressInterface, GraphDatabaseInterface } from './types';
import { jsonBigIntStringReplacer } from './misc';
const log = debug('vulcanize:ipld-helper');
export const updateStateForElementaryType = (initialObject: any, stateVariable: string, value: any): any => {
const object = _.cloneDeep(initialObject);
@ -14,3 +20,51 @@ export const updateStateForMappingType = (initialObject: any, stateVariable: str
// Use _.setWith() with Object as customizer as _.set() treats numeric value in path as an index to an array.
return _.setWith(object, keys, value, Object);
};
export const verifyCheckpointData = async (database: GraphDatabaseInterface, block: BlockProgressInterface, data: any) => {
const { state } = data;
for (const [entityName, idEntityMap] of Object.entries(state)) {
for (const [id, ipldEntity] of Object.entries(idEntityMap as {[key: string]: any})) {
const entityData = await database.getEntity(entityName, id, block.blockHash) as any;
// Compare entities.
const diffFound = Object.keys(ipldEntity)
.some(key => {
let ipldValue = ipldEntity[key];
if (key === 'blockNumber') {
entityData.blockNumber = entityData._blockNumber;
}
if (key === 'blockHash') {
entityData.blockHash = entityData._blockHash;
}
if (typeof ipldEntity[key] === 'object' && ipldEntity[key]?.id) {
ipldValue = ipldEntity[key].id;
}
if (
Array.isArray(ipldEntity[key]) &&
ipldEntity[key].length &&
ipldEntity[key][0].id
) {
// Map IPLD entity 1 to N relation field array to match DB entity.
ipldValue = ipldEntity[key].map(({ id }: { id: string }) => id);
// Sort DB entity 1 to N relation field array.
entityData[key] = entityData[key].sort((a: string, b: string) => a.localeCompare(b));
}
return JSON.stringify(ipldValue) !== JSON.stringify(entityData[key], jsonBigIntStringReplacer);
});
if (diffFound) {
const message = `Diff found for checkpoint at block ${block.blockNumber} in entity ${entityName} id ${id}`;
log(message);
throw new Error(message);
}
}
}
};

View File

@ -305,14 +305,14 @@ export class IPLDIndexer extends Indexer {
// Fetch the latest 'checkpoint' | 'init' for the contract to fetch diffs after it.
let prevNonDiffBlock: IPLDBlockInterface;
let getDiffBlockNumber: number;
const checkpointBlock = await this._ipldDb.getLatestIPLDBlock(contractAddress, StateKind.Checkpoint, currentBlock.blockNumber);
let diffStartBlockNumber: number;
const checkpointBlock = await this._ipldDb.getLatestIPLDBlock(contractAddress, StateKind.Checkpoint, currentBlock.blockNumber - 1);
if (checkpointBlock) {
const checkpointBlockNumber = checkpointBlock.block.blockNumber;
prevNonDiffBlock = checkpointBlock;
getDiffBlockNumber = checkpointBlockNumber;
diffStartBlockNumber = checkpointBlockNumber;
// Update IPLD status map with the latest checkpoint info.
// Essential while importing state as checkpoint at the snapshot block is added by import-state CLI.
@ -325,11 +325,11 @@ export class IPLDIndexer extends Indexer {
prevNonDiffBlock = initBlock;
// Take block number previous to initial state block as the checkpoint is to be created in the same block.
getDiffBlockNumber = initBlock.block.blockNumber - 1;
diffStartBlockNumber = initBlock.block.blockNumber - 1;
}
// Fetching all diff blocks after the latest 'checkpoint' | 'init'.
const diffBlocks = await this._ipldDb.getDiffIPLDBlocksByBlocknumber(contractAddress, getDiffBlockNumber);
const diffBlocks = await this._ipldDb.getDiffIPLDBlocksInRange(contractAddress, diffStartBlockNumber, currentBlock.blockNumber);
const prevNonDiffBlockData = codec.decode(Buffer.from(prevNonDiffBlock.data)) as any;
const data = {
@ -358,7 +358,8 @@ export class IPLDIndexer extends Indexer {
let currentIPLDBlock: IPLDBlockInterface | undefined;
const prevIPLDBlockNumber = ipldStatus[kind];
if (prevIPLDBlockNumber && prevIPLDBlockNumber === block.blockNumber) {
// Fetch from DB for previous IPLD block or for checkpoint kind.
if (kind === 'checkpoint' || (prevIPLDBlockNumber && prevIPLDBlockNumber === block.blockNumber)) {
const currentIPLDBlocks = await this._ipldDb.getIPLDBlocks({ block, contractAddress, kind });
// There can be at most one IPLDBlock for a (block, contractAddress, kind) combination.

View File

@ -238,3 +238,11 @@ export const getFullTransaction = async (ethClient: EthClient, txHash: string):
maxFeePerGas: txData.maxFeePerGas?.toString()
};
};
export const jsonBigIntStringReplacer = (_: string, value: any) => {
if (typeof value === 'bigint') {
return value.toString();
}
return value;
};

View File

@ -152,9 +152,13 @@ export interface DatabaseInterface {
export interface IPLDDatabaseInterface extends DatabaseInterface {
getLatestIPLDBlock (contractAddress: string, kind: StateKind | null, blockNumber?: number): Promise<IPLDBlockInterface | undefined>
getIPLDBlocks (where: FindConditions<IPLDBlockInterface>): Promise<IPLDBlockInterface[]>
getDiffIPLDBlocksByBlocknumber (contractAddress: string, blockNumber: number): Promise<IPLDBlockInterface[]>
getDiffIPLDBlocksInRange (contractAddress: string, startBlock: number, endBlock: number): Promise<IPLDBlockInterface[]>
getNewIPLDBlock (): IPLDBlockInterface
removeIPLDBlocks(dbTx: QueryRunner, blockNumber: number, kind: StateKind): Promise<void>
saveOrUpdateIPLDBlock (dbTx: QueryRunner, ipldBlock: IPLDBlockInterface): Promise<IPLDBlockInterface>
getIPLDStatus (): Promise<IpldStatusInterface | undefined>
}
export interface GraphDatabaseInterface {
getEntity<Entity> (entity: (new () => Entity) | string, id: string, blockHash?: string): Promise<Entity | undefined>;
}