Create external checkpoints after hook processing and clear subgraph entities on reset (#61)

* Create checkpoint using CLI after the hooks have been processed

* Push checkpoints created while exporting state to IPFS

* Clear subgraph entities on reset state

* Remove unnecessary staged IPLD blocks created while importing state
This commit is contained in:
prathamesh0 2021-11-24 15:13:00 +05:30 committed by nabarun
parent d979e51723
commit 129ba694fb
10 changed files with 111 additions and 44 deletions

View File

@ -70,8 +70,8 @@ const main = async (): Promise<void> => {
const contracts = await db.getContracts();
// Get latest canonical block.
const block = await indexer.getLatestCanonicalBlock();
// Get latest block with hooks processed.
const block = await indexer.getLatestHooksProcessedBlock();
assert(block);
// Export snapshot block.
@ -96,7 +96,11 @@ const main = async (): Promise<void> => {
const ipldBlock = await indexer.getLatestIPLDBlock(contract.address, 'checkpoint', block.blockNumber);
assert(ipldBlock);
const data = codec.decode(Buffer.from(ipldBlock.data)) as any;
const data = indexer.getIPLDData(ipldBlock);
if (indexer.isIPFSConfigured()) {
await indexer.pushToIPFS(data);
}
exportData.ipldCheckpoints.push({
contractAddress: ipldBlock.contractAddress,

View File

@ -110,6 +110,9 @@ export const main = async (): Promise<any> => {
await db.saveOrUpdateIPLDBlock(ipldBlock);
}
// The staged IPLD blocks are unnecessary as checkpoints have been already created for the snapshot block.
await indexer.removeStagedIPLDBlocks(block.blockNumber);
};
main().catch(err => {

View File

@ -13,6 +13,24 @@ import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node';
import { Database } from '../../database';
import { Indexer } from '../../indexer';
import { BlockProgress } from '../../entity/BlockProgress';
import { Producer } from '../../entity/Producer';
import { ProducerSet } from '../../entity/ProducerSet';
import { ProducerSetChange } from '../../entity/ProducerSetChange';
import { ProducerRewardCollectorChange } from '../../entity/ProducerRewardCollectorChange';
import { RewardScheduleEntry } from '../../entity/RewardScheduleEntry';
import { RewardSchedule } from '../../entity/RewardSchedule';
import { ProducerEpoch } from '../../entity/ProducerEpoch';
import { Block } from '../../entity/Block';
import { Epoch } from '../../entity/Epoch';
import { SlotClaim } from '../../entity/SlotClaim';
import { Slot } from '../../entity/Slot';
import { Staker } from '../../entity/Staker';
import { Network } from '../../entity/Network';
import { Distributor } from '../../entity/Distributor';
import { Distribution } from '../../entity/Distribution';
import { Claim } from '../../entity/Claim';
import { Slash } from '../../entity/Slash';
import { Account } from '../../entity/Account';
const log = debug('vulcanize:reset-state');
@ -68,7 +86,7 @@ export const handler = async (argv: any): Promise<void> => {
const dbTx = await db.createTransactionRunner();
try {
const entities = [BlockProgress];
const entities = [BlockProgress, Producer, ProducerSet, ProducerSetChange, ProducerRewardCollectorChange, RewardScheduleEntry, RewardSchedule, ProducerEpoch, Block, Epoch, SlotClaim, Slot, Staker, Network, Distributor, Distribution, Claim, Slash, Account];
const removeEntitiesPromise = entities.map(async entityClass => {
return db.removeEntities<any>(dbTx, entityClass, { blockNumber: MoreThan(argv.blockNumber) });

View File

@ -152,7 +152,6 @@ export class EventWatcher implements EventWatcherInterface {
// If it's a pruning job: Create a hook job for the latest canonical block.
if (kind === JOB_KIND_PRUNE) {
const latestCanonicalBlock = await this._indexer.getLatestCanonicalBlock();
assert(latestCanonicalBlock);
await this._jobQueue.pushJob(
QUEUE_HOOKS,

View File

@ -327,32 +327,31 @@ export class Indexer implements IndexerInterface {
const checkpointBlockHash = await this.createCheckpoint(contractAddress, blockHash);
assert(checkpointBlockHash);
const block = await this.getBlockProgress(checkpointBlockHash);
const checkpointIPLDBlocks = await this._db.getIPLDBlocks({ block, contractAddress, kind: 'checkpoint' });
// Push checkpoint to IPFS if configured.
if (this.isIPFSConfigured()) {
const block = await this.getBlockProgress(checkpointBlockHash);
const checkpointIPLDBlocks = await this._db.getIPLDBlocks({ block, contractAddress, kind: 'checkpoint' });
// There can be at most one IPLDBlock for a (block, contractAddress, kind) combination.
assert(checkpointIPLDBlocks.length <= 1);
const checkpointIPLDBlock = checkpointIPLDBlocks[0];
// There can be at most one IPLDBlock for a (block, contractAddress, kind) combination.
assert(checkpointIPLDBlocks.length <= 1);
const checkpointIPLDBlock = checkpointIPLDBlocks[0];
const checkpointData = this.getIPLDData(checkpointIPLDBlock);
await this.pushToIPFS(checkpointData);
const checkpointData = this.getIPLDData(checkpointIPLDBlock);
await this.pushToIPFS(checkpointData);
}
return checkpointBlockHash;
}
async createCheckpoint (contractAddress: string, blockHash?: string, data?: any, checkpointInterval?: number): Promise<string | undefined> {
const syncStatus = await this.getSyncStatus();
assert(syncStatus);
// Getting the current block.
let currentBlock;
if (blockHash) {
currentBlock = await this.getBlockProgress(blockHash);
} else {
// In case of empty blockHash from checkpoint CLI, get the latest canonical block for the checkpoint.
currentBlock = await this.getBlockProgress(syncStatus.latestCanonicalBlockHash);
// In case of empty blockHash from checkpoint CLI, get the latest processed block from hookStatus for the checkpoint.
currentBlock = await this.getLatestHooksProcessedBlock();
}
assert(currentBlock);
@ -371,8 +370,11 @@ export class Indexer implements IndexerInterface {
// Make sure the block is marked complete.
assert(currentBlock.isComplete, 'Block for a checkpoint should be marked as complete');
// Make sure the block is in the pruned region.
assert(currentBlock.blockNumber <= syncStatus.latestCanonicalBlockNumber, 'Block for a checkpoint should be in the pruned region');
const hookStatus = await this.getHookStatus();
assert(hookStatus);
// Make sure the hooks have been processed for the block.
assert(currentBlock.blockNumber <= hookStatus.latestProcessedBlockNumber, 'Block for a checkpoint should have hooks processed');
// Fetch the latest checkpoint for the contract.
const checkpointBlock = await this.getLatestIPLDBlock(contractAddress, 'checkpoint', currentBlock.blockNumber);
@ -1035,11 +1037,26 @@ export class Indexer implements IndexerInterface {
return res;
}
async getLatestCanonicalBlock (): Promise<BlockProgress | undefined> {
async getLatestCanonicalBlock (): Promise<BlockProgress> {
const syncStatus = await this.getSyncStatus();
assert(syncStatus);
return this.getBlockProgress(syncStatus.latestCanonicalBlockHash);
const latestCanonicalBlock = await this.getBlockProgress(syncStatus.latestCanonicalBlockHash);
assert(latestCanonicalBlock);
return latestCanonicalBlock;
}
async getLatestHooksProcessedBlock (): Promise<BlockProgress> {
const hookStatus = await this.getHookStatus();
assert(hookStatus);
const blocksAtHeight = await this.getBlocksAtHeight(hookStatus.latestProcessedBlockNumber, false);
// There can exactly one block at hookStatus.latestProcessedBlockNumber height.
assert(blocksAtHeight.length === 1);
return blocksAtHeight[0];
}
async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<void> {

View File

@ -70,8 +70,8 @@ const main = async (): Promise<void> => {
const contracts = await db.getContracts();
// Get latest canonical block.
const block = await indexer.getLatestCanonicalBlock();
// Get latest block with hooks processed.
const block = await indexer.getLatestHooksProcessedBlock();
assert(block);
// Export snapshot block.
@ -96,7 +96,11 @@ const main = async (): Promise<void> => {
const ipldBlock = await indexer.getLatestIPLDBlock(contract.address, 'checkpoint', block.blockNumber);
assert(ipldBlock);
const data = codec.decode(Buffer.from(ipldBlock.data)) as any;
const data = indexer.getIPLDData(ipldBlock);
if (indexer.isIPFSConfigured()) {
await indexer.pushToIPFS(data);
}
exportData.ipldCheckpoints.push({
contractAddress: ipldBlock.contractAddress,

View File

@ -110,6 +110,9 @@ export const main = async (): Promise<any> => {
await db.saveOrUpdateIPLDBlock(ipldBlock);
}
// The staged IPLD blocks are unnecessary as checkpoints have been already created for the snapshot block.
await indexer.removeStagedIPLDBlocks(block.blockNumber);
};
main().catch(err => {

View File

@ -16,6 +16,9 @@ import { BlockProgress } from '../../entity/BlockProgress';
import { GetMethod } from '../../entity/GetMethod';
import { _Test } from '../../entity/_Test';
import { ExampleEntity } from '../../entity/ExampleEntity';
import { RelatedEntity } from '../../entity/RelatedEntity';
import { ManyRelatedEntity } from '../../entity/ManyRelatedEntity';
const log = debug('vulcanize:reset-state');
@ -71,7 +74,7 @@ export const handler = async (argv: any): Promise<void> => {
const dbTx = await db.createTransactionRunner();
try {
const entities = [BlockProgress, GetMethod, _Test];
const entities = [BlockProgress, GetMethod, _Test, ExampleEntity, ManyRelatedEntity, RelatedEntity];
const removeEntitiesPromise = entities.map(async entityClass => {
return db.removeEntities<any>(dbTx, entityClass, { blockNumber: MoreThan(argv.blockNumber) });

View File

@ -152,7 +152,6 @@ export class EventWatcher implements EventWatcherInterface {
// If it's a pruning job: Create a hook job for the latest canonical block.
if (kind === JOB_KIND_PRUNE) {
const latestCanonicalBlock = await this._indexer.getLatestCanonicalBlock();
assert(latestCanonicalBlock);
await this._jobQueue.pushJob(
QUEUE_HOOKS,

View File

@ -324,32 +324,31 @@ export class Indexer implements IndexerInterface {
const checkpointBlockHash = await this.createCheckpoint(contractAddress, blockHash);
assert(checkpointBlockHash);
const block = await this.getBlockProgress(checkpointBlockHash);
const checkpointIPLDBlocks = await this._db.getIPLDBlocks({ block, contractAddress, kind: 'checkpoint' });
// Push checkpoint to IPFS if configured.
if (this.isIPFSConfigured()) {
const block = await this.getBlockProgress(checkpointBlockHash);
const checkpointIPLDBlocks = await this._db.getIPLDBlocks({ block, contractAddress, kind: 'checkpoint' });
// There can be at most one IPLDBlock for a (block, contractAddress, kind) combination.
assert(checkpointIPLDBlocks.length <= 1);
const checkpointIPLDBlock = checkpointIPLDBlocks[0];
// There can be at most one IPLDBlock for a (block, contractAddress, kind) combination.
assert(checkpointIPLDBlocks.length <= 1);
const checkpointIPLDBlock = checkpointIPLDBlocks[0];
const checkpointData = this.getIPLDData(checkpointIPLDBlock);
await this.pushToIPFS(checkpointData);
const checkpointData = this.getIPLDData(checkpointIPLDBlock);
await this.pushToIPFS(checkpointData);
}
return checkpointBlockHash;
}
async createCheckpoint (contractAddress: string, blockHash?: string, data?: any, checkpointInterval?: number): Promise<string | undefined> {
const syncStatus = await this.getSyncStatus();
assert(syncStatus);
// Getting the current block.
let currentBlock;
if (blockHash) {
currentBlock = await this.getBlockProgress(blockHash);
} else {
// In case of empty blockHash from checkpoint CLI, get the latest canonical block for the checkpoint.
currentBlock = await this.getBlockProgress(syncStatus.latestCanonicalBlockHash);
// In case of empty blockHash from checkpoint CLI, get the latest processed block from hookStatus for the checkpoint.
currentBlock = await this.getLatestHooksProcessedBlock();
}
assert(currentBlock);
@ -368,8 +367,11 @@ export class Indexer implements IndexerInterface {
// Make sure the block is marked complete.
assert(currentBlock.isComplete, 'Block for a checkpoint should be marked as complete');
// Make sure the block is in the pruned region.
assert(currentBlock.blockNumber <= syncStatus.latestCanonicalBlockNumber, 'Block for a checkpoint should be in the pruned region');
const hookStatus = await this.getHookStatus();
assert(hookStatus);
// Make sure the hooks have been processed for the block.
assert(currentBlock.blockNumber <= hookStatus.latestProcessedBlockNumber, 'Block for a checkpoint should have hooks processed');
// Fetch the latest checkpoint for the contract.
const checkpointBlock = await this.getLatestIPLDBlock(contractAddress, 'checkpoint', currentBlock.blockNumber);
@ -633,11 +635,26 @@ export class Indexer implements IndexerInterface {
return res;
}
async getLatestCanonicalBlock (): Promise<BlockProgress | undefined> {
async getLatestCanonicalBlock (): Promise<BlockProgress> {
const syncStatus = await this.getSyncStatus();
assert(syncStatus);
return this.getBlockProgress(syncStatus.latestCanonicalBlockHash);
const latestCanonicalBlock = await this.getBlockProgress(syncStatus.latestCanonicalBlockHash);
assert(latestCanonicalBlock);
return latestCanonicalBlock;
}
async getLatestHooksProcessedBlock (): Promise<BlockProgress> {
const hookStatus = await this.getHookStatus();
assert(hookStatus);
const blocksAtHeight = await this.getBlocksAtHeight(hookStatus.latestProcessedBlockNumber, false);
// There can exactly one block at hookStatus.latestProcessedBlockNumber height.
assert(blocksAtHeight.length === 1);
return blocksAtHeight[0];
}
async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<void> {