diff --git a/packages/eden-watcher/src/cli/export-state.ts b/packages/eden-watcher/src/cli/export-state.ts index 8a558af2..9a7c3249 100644 --- a/packages/eden-watcher/src/cli/export-state.ts +++ b/packages/eden-watcher/src/cli/export-state.ts @@ -70,8 +70,8 @@ const main = async (): Promise => { const contracts = await db.getContracts(); - // Get latest canonical block. - const block = await indexer.getLatestCanonicalBlock(); + // Get latest block with hooks processed. + const block = await indexer.getLatestHooksProcessedBlock(); assert(block); // Export snapshot block. @@ -96,7 +96,11 @@ const main = async (): Promise => { const ipldBlock = await indexer.getLatestIPLDBlock(contract.address, 'checkpoint', block.blockNumber); assert(ipldBlock); - const data = codec.decode(Buffer.from(ipldBlock.data)) as any; + const data = indexer.getIPLDData(ipldBlock); + + if (indexer.isIPFSConfigured()) { + await indexer.pushToIPFS(data); + } exportData.ipldCheckpoints.push({ contractAddress: ipldBlock.contractAddress, diff --git a/packages/eden-watcher/src/cli/import-state.ts b/packages/eden-watcher/src/cli/import-state.ts index 433a18df..aa2426c6 100644 --- a/packages/eden-watcher/src/cli/import-state.ts +++ b/packages/eden-watcher/src/cli/import-state.ts @@ -110,6 +110,9 @@ export const main = async (): Promise => { await db.saveOrUpdateIPLDBlock(ipldBlock); } + + // The staged IPLD blocks are unnecessary as checkpoints have been already created for the snapshot block. + await indexer.removeStagedIPLDBlocks(block.blockNumber); }; main().catch(err => { diff --git a/packages/eden-watcher/src/cli/reset-cmds/state.ts b/packages/eden-watcher/src/cli/reset-cmds/state.ts index a14ba5d9..3f60696d 100644 --- a/packages/eden-watcher/src/cli/reset-cmds/state.ts +++ b/packages/eden-watcher/src/cli/reset-cmds/state.ts @@ -13,6 +13,24 @@ import { GraphWatcher, Database as GraphDatabase } from '@vulcanize/graph-node'; import { Database } from '../../database'; import { Indexer } from '../../indexer'; import { BlockProgress } from '../../entity/BlockProgress'; +import { Producer } from '../../entity/Producer'; +import { ProducerSet } from '../../entity/ProducerSet'; +import { ProducerSetChange } from '../../entity/ProducerSetChange'; +import { ProducerRewardCollectorChange } from '../../entity/ProducerRewardCollectorChange'; +import { RewardScheduleEntry } from '../../entity/RewardScheduleEntry'; +import { RewardSchedule } from '../../entity/RewardSchedule'; +import { ProducerEpoch } from '../../entity/ProducerEpoch'; +import { Block } from '../../entity/Block'; +import { Epoch } from '../../entity/Epoch'; +import { SlotClaim } from '../../entity/SlotClaim'; +import { Slot } from '../../entity/Slot'; +import { Staker } from '../../entity/Staker'; +import { Network } from '../../entity/Network'; +import { Distributor } from '../../entity/Distributor'; +import { Distribution } from '../../entity/Distribution'; +import { Claim } from '../../entity/Claim'; +import { Slash } from '../../entity/Slash'; +import { Account } from '../../entity/Account'; const log = debug('vulcanize:reset-state'); @@ -68,7 +86,7 @@ export const handler = async (argv: any): Promise => { const dbTx = await db.createTransactionRunner(); try { - const entities = [BlockProgress]; + const entities = [BlockProgress, Producer, ProducerSet, ProducerSetChange, ProducerRewardCollectorChange, RewardScheduleEntry, RewardSchedule, ProducerEpoch, Block, Epoch, SlotClaim, Slot, Staker, Network, Distributor, Distribution, Claim, Slash, Account]; const removeEntitiesPromise = entities.map(async entityClass => { return db.removeEntities(dbTx, entityClass, { blockNumber: MoreThan(argv.blockNumber) }); diff --git a/packages/eden-watcher/src/events.ts b/packages/eden-watcher/src/events.ts index 0bae9fe6..9befb6ca 100644 --- a/packages/eden-watcher/src/events.ts +++ b/packages/eden-watcher/src/events.ts @@ -152,7 +152,6 @@ export class EventWatcher implements EventWatcherInterface { // If it's a pruning job: Create a hook job for the latest canonical block. if (kind === JOB_KIND_PRUNE) { const latestCanonicalBlock = await this._indexer.getLatestCanonicalBlock(); - assert(latestCanonicalBlock); await this._jobQueue.pushJob( QUEUE_HOOKS, diff --git a/packages/eden-watcher/src/indexer.ts b/packages/eden-watcher/src/indexer.ts index fe374dff..118330af 100644 --- a/packages/eden-watcher/src/indexer.ts +++ b/packages/eden-watcher/src/indexer.ts @@ -327,32 +327,31 @@ export class Indexer implements IndexerInterface { const checkpointBlockHash = await this.createCheckpoint(contractAddress, blockHash); assert(checkpointBlockHash); - const block = await this.getBlockProgress(checkpointBlockHash); - const checkpointIPLDBlocks = await this._db.getIPLDBlocks({ block, contractAddress, kind: 'checkpoint' }); + // Push checkpoint to IPFS if configured. + if (this.isIPFSConfigured()) { + const block = await this.getBlockProgress(checkpointBlockHash); + const checkpointIPLDBlocks = await this._db.getIPLDBlocks({ block, contractAddress, kind: 'checkpoint' }); - // There can be at most one IPLDBlock for a (block, contractAddress, kind) combination. - assert(checkpointIPLDBlocks.length <= 1); - const checkpointIPLDBlock = checkpointIPLDBlocks[0]; + // There can be at most one IPLDBlock for a (block, contractAddress, kind) combination. + assert(checkpointIPLDBlocks.length <= 1); + const checkpointIPLDBlock = checkpointIPLDBlocks[0]; - const checkpointData = this.getIPLDData(checkpointIPLDBlock); - - await this.pushToIPFS(checkpointData); + const checkpointData = this.getIPLDData(checkpointIPLDBlock); + await this.pushToIPFS(checkpointData); + } return checkpointBlockHash; } async createCheckpoint (contractAddress: string, blockHash?: string, data?: any, checkpointInterval?: number): Promise { - const syncStatus = await this.getSyncStatus(); - assert(syncStatus); - // Getting the current block. let currentBlock; if (blockHash) { currentBlock = await this.getBlockProgress(blockHash); } else { - // In case of empty blockHash from checkpoint CLI, get the latest canonical block for the checkpoint. - currentBlock = await this.getBlockProgress(syncStatus.latestCanonicalBlockHash); + // In case of empty blockHash from checkpoint CLI, get the latest processed block from hookStatus for the checkpoint. + currentBlock = await this.getLatestHooksProcessedBlock(); } assert(currentBlock); @@ -371,8 +370,11 @@ export class Indexer implements IndexerInterface { // Make sure the block is marked complete. assert(currentBlock.isComplete, 'Block for a checkpoint should be marked as complete'); - // Make sure the block is in the pruned region. - assert(currentBlock.blockNumber <= syncStatus.latestCanonicalBlockNumber, 'Block for a checkpoint should be in the pruned region'); + const hookStatus = await this.getHookStatus(); + assert(hookStatus); + + // Make sure the hooks have been processed for the block. + assert(currentBlock.blockNumber <= hookStatus.latestProcessedBlockNumber, 'Block for a checkpoint should have hooks processed'); // Fetch the latest checkpoint for the contract. const checkpointBlock = await this.getLatestIPLDBlock(contractAddress, 'checkpoint', currentBlock.blockNumber); @@ -1035,11 +1037,26 @@ export class Indexer implements IndexerInterface { return res; } - async getLatestCanonicalBlock (): Promise { + async getLatestCanonicalBlock (): Promise { const syncStatus = await this.getSyncStatus(); assert(syncStatus); - return this.getBlockProgress(syncStatus.latestCanonicalBlockHash); + const latestCanonicalBlock = await this.getBlockProgress(syncStatus.latestCanonicalBlockHash); + assert(latestCanonicalBlock); + + return latestCanonicalBlock; + } + + async getLatestHooksProcessedBlock (): Promise { + const hookStatus = await this.getHookStatus(); + assert(hookStatus); + + const blocksAtHeight = await this.getBlocksAtHeight(hookStatus.latestProcessedBlockNumber, false); + + // There can exactly one block at hookStatus.latestProcessedBlockNumber height. + assert(blocksAtHeight.length === 1); + + return blocksAtHeight[0]; } async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise { diff --git a/packages/graph-test-watcher/src/cli/export-state.ts b/packages/graph-test-watcher/src/cli/export-state.ts index 8a558af2..9a7c3249 100644 --- a/packages/graph-test-watcher/src/cli/export-state.ts +++ b/packages/graph-test-watcher/src/cli/export-state.ts @@ -70,8 +70,8 @@ const main = async (): Promise => { const contracts = await db.getContracts(); - // Get latest canonical block. - const block = await indexer.getLatestCanonicalBlock(); + // Get latest block with hooks processed. + const block = await indexer.getLatestHooksProcessedBlock(); assert(block); // Export snapshot block. @@ -96,7 +96,11 @@ const main = async (): Promise => { const ipldBlock = await indexer.getLatestIPLDBlock(contract.address, 'checkpoint', block.blockNumber); assert(ipldBlock); - const data = codec.decode(Buffer.from(ipldBlock.data)) as any; + const data = indexer.getIPLDData(ipldBlock); + + if (indexer.isIPFSConfigured()) { + await indexer.pushToIPFS(data); + } exportData.ipldCheckpoints.push({ contractAddress: ipldBlock.contractAddress, diff --git a/packages/graph-test-watcher/src/cli/import-state.ts b/packages/graph-test-watcher/src/cli/import-state.ts index 433a18df..aa2426c6 100644 --- a/packages/graph-test-watcher/src/cli/import-state.ts +++ b/packages/graph-test-watcher/src/cli/import-state.ts @@ -110,6 +110,9 @@ export const main = async (): Promise => { await db.saveOrUpdateIPLDBlock(ipldBlock); } + + // The staged IPLD blocks are unnecessary as checkpoints have been already created for the snapshot block. + await indexer.removeStagedIPLDBlocks(block.blockNumber); }; main().catch(err => { diff --git a/packages/graph-test-watcher/src/cli/reset-cmds/state.ts b/packages/graph-test-watcher/src/cli/reset-cmds/state.ts index efd76e28..45d6ef91 100644 --- a/packages/graph-test-watcher/src/cli/reset-cmds/state.ts +++ b/packages/graph-test-watcher/src/cli/reset-cmds/state.ts @@ -16,6 +16,9 @@ import { BlockProgress } from '../../entity/BlockProgress'; import { GetMethod } from '../../entity/GetMethod'; import { _Test } from '../../entity/_Test'; +import { ExampleEntity } from '../../entity/ExampleEntity'; +import { RelatedEntity } from '../../entity/RelatedEntity'; +import { ManyRelatedEntity } from '../../entity/ManyRelatedEntity'; const log = debug('vulcanize:reset-state'); @@ -71,7 +74,7 @@ export const handler = async (argv: any): Promise => { const dbTx = await db.createTransactionRunner(); try { - const entities = [BlockProgress, GetMethod, _Test]; + const entities = [BlockProgress, GetMethod, _Test, ExampleEntity, ManyRelatedEntity, RelatedEntity]; const removeEntitiesPromise = entities.map(async entityClass => { return db.removeEntities(dbTx, entityClass, { blockNumber: MoreThan(argv.blockNumber) }); diff --git a/packages/graph-test-watcher/src/events.ts b/packages/graph-test-watcher/src/events.ts index 0bae9fe6..9befb6ca 100644 --- a/packages/graph-test-watcher/src/events.ts +++ b/packages/graph-test-watcher/src/events.ts @@ -152,7 +152,6 @@ export class EventWatcher implements EventWatcherInterface { // If it's a pruning job: Create a hook job for the latest canonical block. if (kind === JOB_KIND_PRUNE) { const latestCanonicalBlock = await this._indexer.getLatestCanonicalBlock(); - assert(latestCanonicalBlock); await this._jobQueue.pushJob( QUEUE_HOOKS, diff --git a/packages/graph-test-watcher/src/indexer.ts b/packages/graph-test-watcher/src/indexer.ts index b01ecc09..ba627667 100644 --- a/packages/graph-test-watcher/src/indexer.ts +++ b/packages/graph-test-watcher/src/indexer.ts @@ -324,32 +324,31 @@ export class Indexer implements IndexerInterface { const checkpointBlockHash = await this.createCheckpoint(contractAddress, blockHash); assert(checkpointBlockHash); - const block = await this.getBlockProgress(checkpointBlockHash); - const checkpointIPLDBlocks = await this._db.getIPLDBlocks({ block, contractAddress, kind: 'checkpoint' }); + // Push checkpoint to IPFS if configured. + if (this.isIPFSConfigured()) { + const block = await this.getBlockProgress(checkpointBlockHash); + const checkpointIPLDBlocks = await this._db.getIPLDBlocks({ block, contractAddress, kind: 'checkpoint' }); - // There can be at most one IPLDBlock for a (block, contractAddress, kind) combination. - assert(checkpointIPLDBlocks.length <= 1); - const checkpointIPLDBlock = checkpointIPLDBlocks[0]; + // There can be at most one IPLDBlock for a (block, contractAddress, kind) combination. + assert(checkpointIPLDBlocks.length <= 1); + const checkpointIPLDBlock = checkpointIPLDBlocks[0]; - const checkpointData = this.getIPLDData(checkpointIPLDBlock); - - await this.pushToIPFS(checkpointData); + const checkpointData = this.getIPLDData(checkpointIPLDBlock); + await this.pushToIPFS(checkpointData); + } return checkpointBlockHash; } async createCheckpoint (contractAddress: string, blockHash?: string, data?: any, checkpointInterval?: number): Promise { - const syncStatus = await this.getSyncStatus(); - assert(syncStatus); - // Getting the current block. let currentBlock; if (blockHash) { currentBlock = await this.getBlockProgress(blockHash); } else { - // In case of empty blockHash from checkpoint CLI, get the latest canonical block for the checkpoint. - currentBlock = await this.getBlockProgress(syncStatus.latestCanonicalBlockHash); + // In case of empty blockHash from checkpoint CLI, get the latest processed block from hookStatus for the checkpoint. + currentBlock = await this.getLatestHooksProcessedBlock(); } assert(currentBlock); @@ -368,8 +367,11 @@ export class Indexer implements IndexerInterface { // Make sure the block is marked complete. assert(currentBlock.isComplete, 'Block for a checkpoint should be marked as complete'); - // Make sure the block is in the pruned region. - assert(currentBlock.blockNumber <= syncStatus.latestCanonicalBlockNumber, 'Block for a checkpoint should be in the pruned region'); + const hookStatus = await this.getHookStatus(); + assert(hookStatus); + + // Make sure the hooks have been processed for the block. + assert(currentBlock.blockNumber <= hookStatus.latestProcessedBlockNumber, 'Block for a checkpoint should have hooks processed'); // Fetch the latest checkpoint for the contract. const checkpointBlock = await this.getLatestIPLDBlock(contractAddress, 'checkpoint', currentBlock.blockNumber); @@ -633,11 +635,26 @@ export class Indexer implements IndexerInterface { return res; } - async getLatestCanonicalBlock (): Promise { + async getLatestCanonicalBlock (): Promise { const syncStatus = await this.getSyncStatus(); assert(syncStatus); - return this.getBlockProgress(syncStatus.latestCanonicalBlockHash); + const latestCanonicalBlock = await this.getBlockProgress(syncStatus.latestCanonicalBlockHash); + assert(latestCanonicalBlock); + + return latestCanonicalBlock; + } + + async getLatestHooksProcessedBlock (): Promise { + const hookStatus = await this.getHookStatus(); + assert(hookStatus); + + const blocksAtHeight = await this.getBlocksAtHeight(hookStatus.latestProcessedBlockNumber, false); + + // There can exactly one block at hookStatus.latestProcessedBlockNumber height. + assert(blocksAtHeight.length === 1); + + return blocksAtHeight[0]; } async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise {