mirror of
https://github.com/cerc-io/watcher-ts
synced 2025-01-08 12:28:05 +00:00
Implement reset job queue and state CLI in watchers (#276)
* Implement clean jobs CLI in watchers * Pull common code cleanJobs to util * Implement commands for reset job-queue and state * Reset command for job-queues and watcher state * Reset sync status on reset state * Fix using cli from built js * Implement skipCheck to reset syncStatus table and skip complete handler on fail * Check for block isComplete on reset state * Set default value for force param to update syncStatus to false * Reset tables in erc20 watcher * Push job in fill after previos block is complete
This commit is contained in:
parent
f00ea0c1f0
commit
47b9e6bbbd
@ -70,15 +70,27 @@ export class EventWatcher {
|
||||
|
||||
async initBlockProcessingOnCompleteHandler (): Promise<void> {
|
||||
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
|
||||
const { id, data: { failed } } = job;
|
||||
|
||||
if (failed) {
|
||||
log(`Job ${id} for queue ${QUEUE_BLOCK_PROCESSING} failed`);
|
||||
return;
|
||||
}
|
||||
|
||||
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
|
||||
});
|
||||
}
|
||||
|
||||
async initEventProcessingOnCompleteHandler (): Promise<void> {
|
||||
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
|
||||
const dbEvent = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
|
||||
const { id, data: { request, failed, state, createdOn } } = job;
|
||||
|
||||
const { data: { request, failed, state, createdOn } } = job;
|
||||
if (failed) {
|
||||
log(`Job ${id} for queue ${QUEUE_EVENT_PROCESSING} failed`);
|
||||
return;
|
||||
}
|
||||
|
||||
const dbEvent = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
|
||||
|
||||
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;
|
||||
log(`Job onComplete event ${request.data.id} publish ${!!request.data.publish}`);
|
||||
|
@ -26,7 +26,9 @@
|
||||
"token:transfer-from": "hardhat --network localhost token-transfer-from",
|
||||
"token:transfer-from:docker": "hardhat --network docker token-transfer-from",
|
||||
"block:latest": "hardhat --network localhost block-latest",
|
||||
"block:latest:docker": "hardhat --network docker block-latest"
|
||||
"block:latest:docker": "hardhat --network docker block-latest",
|
||||
"reset": "DEBUG=vulcanize:* node --enable-source-maps dist/cli/reset.js",
|
||||
"reset:dev": "DEBUG=vulcanize:* ts-node src/cli/reset.ts"
|
||||
},
|
||||
"repository": {
|
||||
"type": "git",
|
||||
|
22
packages/erc20-watcher/src/cli/reset-cmds/job-queue.ts
Normal file
22
packages/erc20-watcher/src/cli/reset-cmds/job-queue.ts
Normal file
@ -0,0 +1,22 @@
|
||||
//
|
||||
// Copyright 2021 Vulcanize, Inc.
|
||||
//
|
||||
|
||||
import debug from 'debug';
|
||||
|
||||
import { getConfig, resetJobs } from '@vulcanize/util';
|
||||
|
||||
const log = debug('vulcanize:reset-job-queue');
|
||||
|
||||
export const command = 'job-queue';
|
||||
|
||||
export const desc = 'Reset job queue';
|
||||
|
||||
export const builder = {};
|
||||
|
||||
export const handler = async (argv: any): Promise<void> => {
|
||||
const config = await getConfig(argv.configFile);
|
||||
await resetJobs(config);
|
||||
|
||||
log('Job queue reset successfully');
|
||||
};
|
74
packages/erc20-watcher/src/cli/reset-cmds/state.ts
Normal file
74
packages/erc20-watcher/src/cli/reset-cmds/state.ts
Normal file
@ -0,0 +1,74 @@
|
||||
//
|
||||
// Copyright 2021 Vulcanize, Inc.
|
||||
//
|
||||
|
||||
import debug from 'debug';
|
||||
import { MoreThan } from 'typeorm';
|
||||
import assert from 'assert';
|
||||
|
||||
import { getConfig, getResetConfig, resetJobs } from '@vulcanize/util';
|
||||
|
||||
import { Database } from '../../database';
|
||||
import { Indexer } from '../../indexer';
|
||||
import { BlockProgress } from '../../entity/BlockProgress';
|
||||
import { Allowance } from '../../entity/Allowance';
|
||||
import { Balance } from '../../entity/Balance';
|
||||
|
||||
const log = debug('vulcanize:reset-state');
|
||||
|
||||
export const command = 'state';
|
||||
|
||||
export const desc = 'Reset state to block number';
|
||||
|
||||
export const builder = {
|
||||
blockNumber: {
|
||||
type: 'number'
|
||||
}
|
||||
};
|
||||
|
||||
export const handler = async (argv: any): Promise<void> => {
|
||||
const config = await getConfig(argv.configFile);
|
||||
await resetJobs(config);
|
||||
const { dbConfig, serverConfig, ethClient, ethProvider } = await getResetConfig(config);
|
||||
|
||||
// Initialize database.
|
||||
const db = new Database(dbConfig);
|
||||
await db.init();
|
||||
|
||||
const indexer = new Indexer(db, ethClient, ethProvider, serverConfig.mode);
|
||||
|
||||
const syncStatus = await indexer.getSyncStatus();
|
||||
assert(syncStatus, 'Missing syncStatus');
|
||||
|
||||
const blockProgresses = await indexer.getBlocksAtHeight(argv.blockNumber, false);
|
||||
assert(blockProgresses.length, `No blocks at specified block number ${argv.blockNumber}`);
|
||||
assert(!blockProgresses.some(block => !block.isComplete), `Incomplete block at block number ${argv.blockNumber} with unprocessed events`);
|
||||
const [blockProgress] = blockProgresses;
|
||||
|
||||
const dbTx = await db.createTransactionRunner();
|
||||
|
||||
try {
|
||||
const removeEntitiesPromise = [BlockProgress, Allowance, Balance].map(async entityClass => {
|
||||
return db.removeEntities<any>(dbTx, entityClass, { blockNumber: MoreThan(argv.blockNumber) });
|
||||
});
|
||||
|
||||
await Promise.all(removeEntitiesPromise);
|
||||
|
||||
if (syncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) {
|
||||
await indexer.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
|
||||
}
|
||||
|
||||
if (syncStatus.latestCanonicalBlockNumber > blockProgress.blockNumber) {
|
||||
await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
|
||||
}
|
||||
|
||||
dbTx.commitTransaction();
|
||||
} catch (error) {
|
||||
await dbTx.rollbackTransaction();
|
||||
throw error;
|
||||
} finally {
|
||||
await dbTx.release();
|
||||
}
|
||||
|
||||
log('Reset state successfully');
|
||||
};
|
24
packages/erc20-watcher/src/cli/reset.ts
Normal file
24
packages/erc20-watcher/src/cli/reset.ts
Normal file
@ -0,0 +1,24 @@
|
||||
//
|
||||
// Copyright 2021 Vulcanize, Inc.
|
||||
//
|
||||
|
||||
import 'reflect-metadata';
|
||||
import debug from 'debug';
|
||||
|
||||
import { getResetYargs } from '@vulcanize/util';
|
||||
|
||||
const log = debug('vulcanize:reset');
|
||||
|
||||
const main = async () => {
|
||||
return getResetYargs()
|
||||
.commandDir('reset-cmds', { extensions: ['ts', 'js'], exclude: /([a-zA-Z0-9\s_\\.\-:])+(.d.ts)$/ })
|
||||
.demandCommand(1)
|
||||
.help()
|
||||
.argv;
|
||||
};
|
||||
|
||||
main().then(() => {
|
||||
process.exit();
|
||||
}).catch(err => {
|
||||
log(err);
|
||||
});
|
@ -64,15 +64,15 @@ export class Database {
|
||||
.getOne();
|
||||
}
|
||||
|
||||
async saveBalance ({ blockHash, token, owner, value, proof }: DeepPartial<Balance>): Promise<Balance> {
|
||||
async saveBalance ({ blockHash, blockNumber, token, owner, value, proof }: DeepPartial<Balance>): Promise<Balance> {
|
||||
const repo = this._conn.getRepository(Balance);
|
||||
const entity = repo.create({ blockHash, token, owner, value, proof });
|
||||
const entity = repo.create({ blockHash, blockNumber, token, owner, value, proof });
|
||||
return repo.save(entity);
|
||||
}
|
||||
|
||||
async saveAllowance ({ blockHash, token, owner, spender, value, proof }: DeepPartial<Allowance>): Promise<Allowance> {
|
||||
async saveAllowance ({ blockHash, blockNumber, token, owner, spender, value, proof }: DeepPartial<Allowance>): Promise<Allowance> {
|
||||
const repo = this._conn.getRepository(Allowance);
|
||||
const entity = repo.create({ blockHash, token, owner, spender, value, proof });
|
||||
const entity = repo.create({ blockHash, blockNumber, token, owner, spender, value, proof });
|
||||
return repo.save(entity);
|
||||
}
|
||||
|
||||
@ -124,16 +124,16 @@ export class Database {
|
||||
});
|
||||
}
|
||||
|
||||
async updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
async updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
|
||||
const repo = queryRunner.manager.getRepository(SyncStatus);
|
||||
|
||||
return this._baseDatabase.updateSyncStatusIndexedBlock(repo, blockHash, blockNumber);
|
||||
return this._baseDatabase.updateSyncStatusIndexedBlock(repo, blockHash, blockNumber, force);
|
||||
}
|
||||
|
||||
async updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
async updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
|
||||
const repo = queryRunner.manager.getRepository(SyncStatus);
|
||||
|
||||
return this._baseDatabase.updateSyncStatusCanonicalBlock(repo, blockHash, blockNumber);
|
||||
return this._baseDatabase.updateSyncStatusCanonicalBlock(repo, blockHash, blockNumber, force);
|
||||
}
|
||||
|
||||
async updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
|
@ -5,7 +5,7 @@
|
||||
import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm';
|
||||
|
||||
@Entity()
|
||||
@Index(['blockHash', 'token', 'owner', 'spender'], { unique: true })
|
||||
@Index(['blockHash', 'blockNumber', 'token', 'owner', 'spender'], { unique: true })
|
||||
export class Allowance {
|
||||
@PrimaryGeneratedColumn()
|
||||
id!: number;
|
||||
@ -13,6 +13,9 @@ export class Allowance {
|
||||
@Column('varchar', { length: 66 })
|
||||
blockHash!: string;
|
||||
|
||||
@Column('integer')
|
||||
blockNumber!: number;
|
||||
|
||||
@Column('varchar', { length: 42 })
|
||||
token!: string;
|
||||
|
||||
|
@ -5,7 +5,7 @@
|
||||
import { Entity, PrimaryGeneratedColumn, Column, Index } from 'typeorm';
|
||||
|
||||
@Entity()
|
||||
@Index(['blockHash', 'token', 'owner'], { unique: true })
|
||||
@Index(['blockHash', 'blockNumber', 'token', 'owner'], { unique: true })
|
||||
export class Balance {
|
||||
@PrimaryGeneratedColumn()
|
||||
id!: number;
|
||||
@ -13,6 +13,9 @@ export class Balance {
|
||||
@Column('varchar', { length: 66 })
|
||||
blockHash!: string;
|
||||
|
||||
@Column('integer')
|
||||
blockNumber!: number;
|
||||
|
||||
@Column('varchar', { length: 42 })
|
||||
token!: string;
|
||||
|
||||
|
@ -14,7 +14,7 @@ export class Event {
|
||||
@PrimaryGeneratedColumn()
|
||||
id!: number;
|
||||
|
||||
@ManyToOne(() => BlockProgress)
|
||||
@ManyToOne(() => BlockProgress, { onDelete: 'CASCADE' })
|
||||
block!: BlockProgress;
|
||||
|
||||
@Column('varchar', { length: 66 })
|
||||
|
@ -70,15 +70,27 @@ export class EventWatcher {
|
||||
|
||||
async initBlockProcessingOnCompleteHandler (): Promise<void> {
|
||||
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
|
||||
const { id, data: { failed } } = job;
|
||||
|
||||
if (failed) {
|
||||
log(`Job ${id} for queue ${QUEUE_BLOCK_PROCESSING} failed`);
|
||||
return;
|
||||
}
|
||||
|
||||
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
|
||||
});
|
||||
}
|
||||
|
||||
async initEventProcessingOnCompleteHandler (): Promise<void> {
|
||||
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
|
||||
const dbEvent = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
|
||||
const { id, data: { request, failed, state, createdOn } } = job;
|
||||
|
||||
const { data: { request, failed, state, createdOn } } = job;
|
||||
if (failed) {
|
||||
log(`Job ${id} for queue ${QUEUE_EVENT_PROCESSING} failed`);
|
||||
return;
|
||||
}
|
||||
|
||||
const dbEvent = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
|
||||
|
||||
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;
|
||||
log(`Job onComplete event ${request.data.id} publish ${!!request.data.publish}`);
|
||||
|
@ -117,10 +117,11 @@ export class Indexer {
|
||||
log('balanceOf: db miss, fetching from upstream server');
|
||||
let result: ValueResult;
|
||||
|
||||
const { block: { number } } = await this._ethClient.getBlockByHash(blockHash);
|
||||
const blockNumber = BigNumber.from(number).toNumber();
|
||||
|
||||
if (this._serverMode === ETH_CALL_MODE) {
|
||||
const contract = new ethers.Contract(token, this._abi, this._ethProvider);
|
||||
const { block: { number } } = await this._ethClient.getBlockByHash(blockHash);
|
||||
const blockNumber = BigNumber.from(number).toNumber();
|
||||
|
||||
// eth_call doesnt support calling method by blockHash https://eth.wiki/json-rpc/API#the-default-block-parameter
|
||||
const value = await contract.balanceOf(owner, { blockTag: blockNumber });
|
||||
@ -135,7 +136,7 @@ export class Indexer {
|
||||
log(JSONbig.stringify(result, null, 2));
|
||||
|
||||
const { value, proof } = result;
|
||||
await this._db.saveBalance({ blockHash, token, owner, value: BigInt(value), proof: JSONbig.stringify(proof) });
|
||||
await this._db.saveBalance({ blockHash, blockNumber, token, owner, value: BigInt(value), proof: JSONbig.stringify(proof) });
|
||||
|
||||
return result;
|
||||
}
|
||||
@ -154,10 +155,11 @@ export class Indexer {
|
||||
log('allowance: db miss, fetching from upstream server');
|
||||
let result: ValueResult;
|
||||
|
||||
const { block: { number } } = await this._ethClient.getBlockByHash(blockHash);
|
||||
const blockNumber = BigNumber.from(number).toNumber();
|
||||
|
||||
if (this._serverMode === ETH_CALL_MODE) {
|
||||
const contract = new ethers.Contract(token, this._abi, this._ethProvider);
|
||||
const { block: { number } } = await this._ethClient.getBlockByHash(blockHash);
|
||||
const blockNumber = BigNumber.from(number).toNumber();
|
||||
const value = await contract.allowance(owner, spender, { blockTag: blockNumber });
|
||||
|
||||
result = {
|
||||
@ -170,7 +172,7 @@ export class Indexer {
|
||||
// log(JSONbig.stringify(result, null, 2));
|
||||
|
||||
const { value, proof } = result;
|
||||
await this._db.saveAllowance({ blockHash, token, owner, spender, value: BigInt(value), proof: JSONbig.stringify(proof) });
|
||||
await this._db.saveAllowance({ blockHash, blockNumber, token, owner, spender, value: BigInt(value), proof: JSONbig.stringify(proof) });
|
||||
|
||||
return result;
|
||||
}
|
||||
@ -315,16 +317,16 @@ export class Indexer {
|
||||
return this._baseIndexer.getEventsInRange(fromBlockNumber, toBlockNumber);
|
||||
}
|
||||
|
||||
async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber);
|
||||
async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
|
||||
return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber, force);
|
||||
}
|
||||
|
||||
async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
return this._baseIndexer.updateSyncStatusChainHead(blockHash, blockNumber);
|
||||
}
|
||||
|
||||
async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
return this._baseIndexer.updateSyncStatusCanonicalBlock(blockHash, blockNumber);
|
||||
async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
|
||||
return this._baseIndexer.updateSyncStatusCanonicalBlock(blockHash, blockNumber, force);
|
||||
}
|
||||
|
||||
async getSyncStatus (): Promise<SyncStatus | undefined> {
|
||||
|
@ -72,6 +72,10 @@ export class EthClient {
|
||||
);
|
||||
}
|
||||
|
||||
async getBlocksByNumber (blockNumber: number): Promise<any> {
|
||||
return this._graphqlClient.query(ethQueries.getBlocksByNumber, { blockNumber });
|
||||
}
|
||||
|
||||
async getBlockByHash (blockHash: string): Promise<any> {
|
||||
return this._graphqlClient.query(ethQueries.getBlockByHash, { blockHash });
|
||||
}
|
||||
|
@ -64,6 +64,20 @@ query allEthHeaderCids($blockNumber: BigInt, $blockHash: String) {
|
||||
}
|
||||
`;
|
||||
|
||||
export const getBlocksByNumber = gql`
|
||||
query allEthHeaderCids($blockNumber: BigInt) {
|
||||
allEthHeaderCids(condition: { blockNumber: $blockNumber }) {
|
||||
nodes {
|
||||
cid
|
||||
blockNumber
|
||||
blockHash
|
||||
parentHash
|
||||
timestamp
|
||||
}
|
||||
}
|
||||
}
|
||||
`;
|
||||
|
||||
export const getBlockByHash = gql`
|
||||
query block($blockHash: Bytes32) {
|
||||
block(hash: $blockHash) {
|
||||
@ -113,6 +127,7 @@ export default {
|
||||
getStorageAt,
|
||||
getLogs,
|
||||
getBlockWithTransactions,
|
||||
getBlocksByNumber,
|
||||
getBlockByHash,
|
||||
subscribeBlocks,
|
||||
subscribeTransactions
|
||||
|
@ -39,7 +39,9 @@
|
||||
"fill:dev": "DEBUG=vulcanize:* ts-node src/fill.ts",
|
||||
"generate:schema": "get-graphql-schema https://api.thegraph.com/subgraphs/name/ianlapham/uniswap-v3-alt > docs/analysis/schema/full-schema.graphql",
|
||||
"generate:health-schema": "get-graphql-schema https://api.thegraph.com/index-node/graphql > docs/analysis/schema/health-schema.graphql",
|
||||
"lint:schema": "graphql-schema-linter"
|
||||
"lint:schema": "graphql-schema-linter",
|
||||
"reset": "DEBUG=vulcanize:* node --enable-source-maps dist/cli/reset.js",
|
||||
"reset:dev": "DEBUG=vulcanize:* ts-node src/cli/reset.ts"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/chance": "^1.1.2",
|
||||
|
22
packages/uni-info-watcher/src/cli/reset-cmds/job-queue.ts
Normal file
22
packages/uni-info-watcher/src/cli/reset-cmds/job-queue.ts
Normal file
@ -0,0 +1,22 @@
|
||||
//
|
||||
// Copyright 2021 Vulcanize, Inc.
|
||||
//
|
||||
|
||||
import debug from 'debug';
|
||||
|
||||
import { getConfig, resetJobs } from '@vulcanize/util';
|
||||
|
||||
const log = debug('vulcanize:reset-job-queue');
|
||||
|
||||
export const command = 'job-queue';
|
||||
|
||||
export const desc = 'Reset job queue';
|
||||
|
||||
export const builder = {};
|
||||
|
||||
export const handler = async (argv: any): Promise<void> => {
|
||||
const config = await getConfig(argv.configFile);
|
||||
await resetJobs(config);
|
||||
|
||||
log('Job queue reset successfully');
|
||||
};
|
91
packages/uni-info-watcher/src/cli/reset-cmds/state.ts
Normal file
91
packages/uni-info-watcher/src/cli/reset-cmds/state.ts
Normal file
@ -0,0 +1,91 @@
|
||||
//
|
||||
// Copyright 2021 Vulcanize, Inc.
|
||||
//
|
||||
|
||||
import debug from 'debug';
|
||||
import { MoreThan } from 'typeorm';
|
||||
import assert from 'assert';
|
||||
|
||||
import { getConfig, getResetConfig, resetJobs } from '@vulcanize/util';
|
||||
import { Client as ERC20Client } from '@vulcanize/erc20-watcher';
|
||||
import { Client as UniClient } from '@vulcanize/uni-watcher';
|
||||
|
||||
import { Database } from '../../database';
|
||||
import { Indexer } from '../../indexer';
|
||||
import { BlockProgress } from '../../entity/BlockProgress';
|
||||
import { Factory } from '../../entity/Factory';
|
||||
import { Bundle } from '../../entity/Bundle';
|
||||
import { Pool } from '../../entity/Pool';
|
||||
import { Mint } from '../../entity/Mint';
|
||||
import { Burn } from '../../entity/Burn';
|
||||
import { Swap } from '../../entity/Swap';
|
||||
import { PositionSnapshot } from '../../entity/PositionSnapshot';
|
||||
import { Position } from '../../entity/Position';
|
||||
import { Token } from '../../entity/Token';
|
||||
|
||||
const log = debug('vulcanize:reset-state');
|
||||
|
||||
export const command = 'state';
|
||||
|
||||
export const desc = 'Reset state to block number';
|
||||
|
||||
export const builder = {
|
||||
blockNumber: {
|
||||
type: 'number'
|
||||
}
|
||||
};
|
||||
|
||||
export const handler = async (argv: any): Promise<void> => {
|
||||
const config = await getConfig(argv.configFile);
|
||||
await resetJobs(config);
|
||||
const { dbConfig, serverConfig, upstreamConfig, ethClient } = await getResetConfig(config);
|
||||
|
||||
// Initialize database.
|
||||
const db = new Database(dbConfig);
|
||||
await db.init();
|
||||
|
||||
const {
|
||||
uniWatcher,
|
||||
tokenWatcher
|
||||
} = upstreamConfig;
|
||||
|
||||
const uniClient = new UniClient(uniWatcher);
|
||||
const erc20Client = new ERC20Client(tokenWatcher);
|
||||
|
||||
const indexer = new Indexer(db, uniClient, erc20Client, ethClient, serverConfig.mode);
|
||||
|
||||
const syncStatus = await indexer.getSyncStatus();
|
||||
assert(syncStatus, 'Missing syncStatus');
|
||||
|
||||
const blockProgresses = await indexer.getBlocksAtHeight(argv.blockNumber, false);
|
||||
assert(blockProgresses.length, `No blocks at specified block number ${argv.blockNumber}`);
|
||||
assert(!blockProgresses.some(block => !block.isComplete), `Incomplete block at block number ${argv.blockNumber} with unprocessed events`);
|
||||
const [blockProgress] = blockProgresses;
|
||||
|
||||
const dbTx = await db.createTransactionRunner();
|
||||
|
||||
try {
|
||||
const removeEntitiesPromise = [BlockProgress, Factory, Bundle, Pool, Mint, Burn, Swap, PositionSnapshot, Position, Token].map(async entityClass => {
|
||||
return db.removeEntities<any>(dbTx, entityClass, { blockNumber: MoreThan(argv.blockNumber) });
|
||||
});
|
||||
|
||||
await Promise.all(removeEntitiesPromise);
|
||||
|
||||
if (syncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) {
|
||||
await indexer.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
|
||||
}
|
||||
|
||||
if (syncStatus.latestCanonicalBlockNumber > blockProgress.blockNumber) {
|
||||
await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
|
||||
}
|
||||
|
||||
dbTx.commitTransaction();
|
||||
} catch (error) {
|
||||
await dbTx.rollbackTransaction();
|
||||
throw error;
|
||||
} finally {
|
||||
await dbTx.release();
|
||||
}
|
||||
|
||||
log('Reset state successfully');
|
||||
};
|
24
packages/uni-info-watcher/src/cli/reset.ts
Normal file
24
packages/uni-info-watcher/src/cli/reset.ts
Normal file
@ -0,0 +1,24 @@
|
||||
//
|
||||
// Copyright 2021 Vulcanize, Inc.
|
||||
//
|
||||
|
||||
import 'reflect-metadata';
|
||||
import debug from 'debug';
|
||||
|
||||
import { getResetYargs } from '@vulcanize/util';
|
||||
|
||||
const log = debug('vulcanize:reset');
|
||||
|
||||
const main = async () => {
|
||||
return getResetYargs()
|
||||
.commandDir('reset-cmds', { extensions: ['ts', 'js'], exclude: /([a-zA-Z0-9\s_\\.\-:])+(.d.ts)$/ })
|
||||
.demandCommand(1)
|
||||
.help()
|
||||
.argv;
|
||||
};
|
||||
|
||||
main().then(() => {
|
||||
process.exit();
|
||||
}).catch(err => {
|
||||
log(err);
|
||||
});
|
@ -569,16 +569,16 @@ export class Database implements DatabaseInterface {
|
||||
return this._baseDatabase.saveEvents(blockRepo, eventRepo, block, events);
|
||||
}
|
||||
|
||||
async updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
async updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
|
||||
const repo = queryRunner.manager.getRepository(SyncStatus);
|
||||
|
||||
return this._baseDatabase.updateSyncStatusIndexedBlock(repo, blockHash, blockNumber);
|
||||
return this._baseDatabase.updateSyncStatusIndexedBlock(repo, blockHash, blockNumber, force);
|
||||
}
|
||||
|
||||
async updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
async updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
|
||||
const repo = queryRunner.manager.getRepository(SyncStatus);
|
||||
|
||||
return this._baseDatabase.updateSyncStatusCanonicalBlock(repo, blockHash, blockNumber);
|
||||
return this._baseDatabase.updateSyncStatusCanonicalBlock(repo, blockHash, blockNumber, force);
|
||||
}
|
||||
|
||||
async updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
|
@ -22,19 +22,19 @@ export class Burn {
|
||||
@Column('integer')
|
||||
blockNumber!: number;
|
||||
|
||||
@ManyToOne(() => Transaction, transaction => transaction.burns)
|
||||
@ManyToOne(() => Transaction, transaction => transaction.burns, { onDelete: 'CASCADE' })
|
||||
transaction!: Transaction
|
||||
|
||||
@Column('bigint')
|
||||
timestamp!: BigInt;
|
||||
|
||||
@ManyToOne(() => Pool)
|
||||
@ManyToOne(() => Pool, { onDelete: 'CASCADE' })
|
||||
pool!: Pool
|
||||
|
||||
@ManyToOne(() => Token)
|
||||
@ManyToOne(() => Token, { onDelete: 'CASCADE' })
|
||||
token0!: Token
|
||||
|
||||
@ManyToOne(() => Token)
|
||||
@ManyToOne(() => Token, { onDelete: 'CASCADE' })
|
||||
token1!: Token
|
||||
|
||||
@Column('varchar', { length: 42 })
|
||||
|
@ -12,7 +12,7 @@ export class Event {
|
||||
@PrimaryGeneratedColumn()
|
||||
id!: number;
|
||||
|
||||
@ManyToOne(() => BlockProgress)
|
||||
@ManyToOne(() => BlockProgress, { onDelete: 'CASCADE' })
|
||||
block!: BlockProgress;
|
||||
|
||||
@Column('varchar', { length: 66 })
|
||||
|
@ -22,19 +22,19 @@ export class Mint {
|
||||
@Column('integer')
|
||||
blockNumber!: number;
|
||||
|
||||
@ManyToOne(() => Transaction, transaction => transaction.mints)
|
||||
@ManyToOne(() => Transaction, transaction => transaction.mints, { onDelete: 'CASCADE' })
|
||||
transaction!: Transaction
|
||||
|
||||
@Column('bigint')
|
||||
timestamp!: BigInt;
|
||||
|
||||
@ManyToOne(() => Pool)
|
||||
@ManyToOne(() => Pool, { onDelete: 'CASCADE' })
|
||||
pool!: Pool
|
||||
|
||||
@ManyToOne(() => Token)
|
||||
@ManyToOne(() => Token, { onDelete: 'CASCADE' })
|
||||
token0!: Token
|
||||
|
||||
@ManyToOne(() => Token)
|
||||
@ManyToOne(() => Token, { onDelete: 'CASCADE' })
|
||||
token1!: Token
|
||||
|
||||
@Column('varchar', { length: 42 })
|
||||
|
@ -20,10 +20,10 @@ export class Pool {
|
||||
@Column('integer')
|
||||
blockNumber!: number;
|
||||
|
||||
@ManyToOne(() => Token)
|
||||
@ManyToOne(() => Token, { onDelete: 'CASCADE' })
|
||||
token0!: Token;
|
||||
|
||||
@ManyToOne(() => Token)
|
||||
@ManyToOne(() => Token, { onDelete: 'CASCADE' })
|
||||
token1!: Token;
|
||||
|
||||
@Column('numeric', { default: 0, transformer: decimalTransformer })
|
||||
|
@ -23,7 +23,7 @@ export class PoolDayData {
|
||||
@Column('integer')
|
||||
date!: number;
|
||||
|
||||
@ManyToOne(() => Pool)
|
||||
@ManyToOne(() => Pool, { onDelete: 'CASCADE' })
|
||||
pool!: Pool;
|
||||
|
||||
@Column('numeric', { transformer: decimalTransformer })
|
||||
|
@ -23,7 +23,7 @@ export class PoolHourData {
|
||||
@Column('integer')
|
||||
periodStartUnix!: number;
|
||||
|
||||
@ManyToOne(() => Pool)
|
||||
@ManyToOne(() => Pool, { onDelete: 'CASCADE' })
|
||||
pool!: Pool;
|
||||
|
||||
@Column('numeric', { transformer: decimalTransformer })
|
||||
|
@ -54,7 +54,7 @@ export class Position {
|
||||
@Column('numeric', { default: 0, transformer: decimalTransformer })
|
||||
collectedFeesToken1!: Decimal
|
||||
|
||||
@ManyToOne(() => Pool)
|
||||
@ManyToOne(() => Pool, { onDelete: 'CASCADE' })
|
||||
pool!: Pool
|
||||
|
||||
@ManyToOne(() => Token)
|
||||
|
@ -56,12 +56,12 @@ export class PositionSnapshot {
|
||||
@Column('numeric', { default: 0, transformer: decimalTransformer })
|
||||
collectedFeesToken1!: Decimal
|
||||
|
||||
@ManyToOne(() => Pool)
|
||||
@ManyToOne(() => Pool, { onDelete: 'CASCADE' })
|
||||
pool!: Pool
|
||||
|
||||
@ManyToOne(() => Position)
|
||||
@ManyToOne(() => Position, { onDelete: 'CASCADE' })
|
||||
position!: Position
|
||||
|
||||
@ManyToOne(() => Transaction)
|
||||
@ManyToOne(() => Transaction, { onDelete: 'CASCADE' })
|
||||
transaction!: Transaction
|
||||
}
|
||||
|
@ -22,19 +22,19 @@ export class Swap {
|
||||
@Column('integer')
|
||||
blockNumber!: number;
|
||||
|
||||
@ManyToOne(() => Transaction, transaction => transaction.swaps)
|
||||
@ManyToOne(() => Transaction, transaction => transaction.swaps, { onDelete: 'CASCADE' })
|
||||
transaction!: Transaction
|
||||
|
||||
@Column('bigint')
|
||||
timestamp!: BigInt;
|
||||
|
||||
@ManyToOne(() => Pool)
|
||||
@ManyToOne(() => Pool, { onDelete: 'CASCADE' })
|
||||
pool!: Pool
|
||||
|
||||
@ManyToOne(() => Token)
|
||||
@ManyToOne(() => Token, { onDelete: 'CASCADE' })
|
||||
token0!: Token
|
||||
|
||||
@ManyToOne(() => Token)
|
||||
@ManyToOne(() => Token, { onDelete: 'CASCADE' })
|
||||
token1!: Token
|
||||
|
||||
@Column('varchar', { length: 42 })
|
||||
|
@ -23,7 +23,7 @@ export class Tick {
|
||||
@Column('bigint')
|
||||
tickIdx!: BigInt;
|
||||
|
||||
@ManyToOne(() => Pool)
|
||||
@ManyToOne(() => Pool, { onDelete: 'CASCADE' })
|
||||
pool!: Pool
|
||||
|
||||
@Column('varchar', { length: 42 })
|
||||
|
@ -23,7 +23,7 @@ export class TokenDayData {
|
||||
@Column('integer')
|
||||
date!: number
|
||||
|
||||
@ManyToOne(() => Token)
|
||||
@ManyToOne(() => Token, { onDelete: 'CASCADE' })
|
||||
token!: Token
|
||||
|
||||
@Column('numeric', { transformer: decimalTransformer })
|
||||
|
@ -23,7 +23,7 @@ export class TokenHourData {
|
||||
@Column('integer')
|
||||
periodStartUnix!: number
|
||||
|
||||
@ManyToOne(() => Token)
|
||||
@ManyToOne(() => Token, { onDelete: 'CASCADE' })
|
||||
token!: Token
|
||||
|
||||
@Column('numeric', { transformer: decimalTransformer })
|
||||
|
@ -157,12 +157,26 @@ export class EventWatcher implements EventWatcherInterface {
|
||||
|
||||
async initBlockProcessingOnCompleteHandler (): Promise<void> {
|
||||
await this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
|
||||
const { id, data: { failed } } = job;
|
||||
|
||||
if (failed) {
|
||||
log(`Job ${id} for queue ${QUEUE_BLOCK_PROCESSING} failed`);
|
||||
return;
|
||||
}
|
||||
|
||||
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
|
||||
});
|
||||
}
|
||||
|
||||
async initEventProcessingOnCompleteHandler (): Promise<void> {
|
||||
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
|
||||
const { id, data: { failed } } = job;
|
||||
|
||||
if (failed) {
|
||||
log(`Job ${id} for queue ${QUEUE_EVENT_PROCESSING} failed`);
|
||||
return;
|
||||
}
|
||||
|
||||
await this._baseEventWatcher.eventProcessingCompleteHandler(job);
|
||||
});
|
||||
}
|
||||
|
@ -308,16 +308,16 @@ export class Indexer implements IndexerInterface {
|
||||
return this._baseIndexer.getBlockEvents(blockHash);
|
||||
}
|
||||
|
||||
async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber);
|
||||
async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
|
||||
return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber, force);
|
||||
}
|
||||
|
||||
async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
return this._baseIndexer.updateSyncStatusChainHead(blockHash, blockNumber);
|
||||
}
|
||||
|
||||
async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
return this._baseIndexer.updateSyncStatusCanonicalBlock(blockHash, blockNumber);
|
||||
async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
|
||||
return this._baseIndexer.updateSyncStatusCanonicalBlock(blockHash, blockNumber, force);
|
||||
}
|
||||
|
||||
async getSyncStatus (): Promise<SyncStatus | undefined> {
|
||||
|
@ -19,7 +19,9 @@
|
||||
"fill": "DEBUG=vulcanize:* node --enable-source-maps dist/fill.js",
|
||||
"fill:dev": "DEBUG=vulcanize:* ts-node src/fill.ts",
|
||||
"watch:contract": "node --enable-source-maps dist/cli/watch-contract.js",
|
||||
"watch:contract:dev": "ts-node src/cli/watch-contract.ts"
|
||||
"watch:contract:dev": "ts-node src/cli/watch-contract.ts",
|
||||
"reset": "DEBUG=vulcanize:* node --enable-source-maps dist/cli/reset.js",
|
||||
"reset:dev": "DEBUG=vulcanize:* ts-node src/cli/reset.ts"
|
||||
},
|
||||
"repository": {
|
||||
"type": "git",
|
||||
|
22
packages/uni-watcher/src/cli/reset-cmds/job-queue.ts
Normal file
22
packages/uni-watcher/src/cli/reset-cmds/job-queue.ts
Normal file
@ -0,0 +1,22 @@
|
||||
//
|
||||
// Copyright 2021 Vulcanize, Inc.
|
||||
//
|
||||
|
||||
import debug from 'debug';
|
||||
|
||||
import { getConfig, resetJobs } from '@vulcanize/util';
|
||||
|
||||
const log = debug('vulcanize:reset-job-queue');
|
||||
|
||||
export const command = 'job-queue';
|
||||
|
||||
export const desc = 'Reset job queue';
|
||||
|
||||
export const builder = {};
|
||||
|
||||
export const handler = async (argv: any): Promise<void> => {
|
||||
const config = await getConfig(argv.configFile);
|
||||
await resetJobs(config);
|
||||
|
||||
log('Job queue reset successfully');
|
||||
};
|
68
packages/uni-watcher/src/cli/reset-cmds/state.ts
Normal file
68
packages/uni-watcher/src/cli/reset-cmds/state.ts
Normal file
@ -0,0 +1,68 @@
|
||||
//
|
||||
// Copyright 2021 Vulcanize, Inc.
|
||||
//
|
||||
|
||||
import debug from 'debug';
|
||||
import { MoreThan } from 'typeorm';
|
||||
import assert from 'assert';
|
||||
|
||||
import { getConfig, getResetConfig, resetJobs } from '@vulcanize/util';
|
||||
|
||||
import { Database } from '../../database';
|
||||
import { Indexer } from '../../indexer';
|
||||
import { BlockProgress } from '../../entity/BlockProgress';
|
||||
|
||||
const log = debug('vulcanize:reset-state');
|
||||
|
||||
export const command = 'state';
|
||||
|
||||
export const desc = 'Reset state to block number';
|
||||
|
||||
export const builder = {
|
||||
blockNumber: {
|
||||
type: 'number'
|
||||
}
|
||||
};
|
||||
|
||||
export const handler = async (argv: any): Promise<void> => {
|
||||
const config = await getConfig(argv.configFile);
|
||||
await resetJobs(config);
|
||||
const { dbConfig, ethClient, postgraphileClient } = await getResetConfig(config);
|
||||
|
||||
// Initialize database.
|
||||
const db = new Database(dbConfig);
|
||||
await db.init();
|
||||
|
||||
const indexer = new Indexer(db, ethClient, postgraphileClient);
|
||||
|
||||
const syncStatus = await indexer.getSyncStatus();
|
||||
assert(syncStatus, 'Missing syncStatus');
|
||||
|
||||
const blockProgresses = await indexer.getBlocksAtHeight(argv.blockNumber, false);
|
||||
assert(blockProgresses.length, `No blocks at specified block number ${argv.blockNumber}`);
|
||||
assert(!blockProgresses.some(block => !block.isComplete), `Incomplete block at block number ${argv.blockNumber} with unprocessed events`);
|
||||
const [blockProgress] = blockProgresses;
|
||||
|
||||
const dbTx = await db.createTransactionRunner();
|
||||
|
||||
try {
|
||||
await db.removeEntities(dbTx, BlockProgress, { blockNumber: MoreThan(blockProgress.blockNumber) });
|
||||
|
||||
if (syncStatus.latestIndexedBlockNumber > blockProgress.blockNumber) {
|
||||
await indexer.updateSyncStatusIndexedBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
|
||||
}
|
||||
|
||||
if (syncStatus.latestCanonicalBlockNumber > blockProgress.blockNumber) {
|
||||
await indexer.updateSyncStatusCanonicalBlock(blockProgress.blockHash, blockProgress.blockNumber, true);
|
||||
}
|
||||
|
||||
dbTx.commitTransaction();
|
||||
} catch (error) {
|
||||
await dbTx.rollbackTransaction();
|
||||
throw error;
|
||||
} finally {
|
||||
await dbTx.release();
|
||||
}
|
||||
|
||||
log('Reset state successfully');
|
||||
};
|
24
packages/uni-watcher/src/cli/reset.ts
Normal file
24
packages/uni-watcher/src/cli/reset.ts
Normal file
@ -0,0 +1,24 @@
|
||||
//
|
||||
// Copyright 2021 Vulcanize, Inc.
|
||||
//
|
||||
|
||||
import 'reflect-metadata';
|
||||
import debug from 'debug';
|
||||
|
||||
import { getResetYargs } from '@vulcanize/util';
|
||||
|
||||
const log = debug('vulcanize:reset');
|
||||
|
||||
const main = async () => {
|
||||
return getResetYargs()
|
||||
.commandDir('reset-cmds', { extensions: ['js', 'ts'], exclude: /([a-zA-Z0-9\s_\\.\-:])+(.d.ts)$/ })
|
||||
.demandCommand(1)
|
||||
.help()
|
||||
.argv;
|
||||
};
|
||||
|
||||
main().then(() => {
|
||||
process.exit();
|
||||
}).catch(err => {
|
||||
log(err);
|
||||
});
|
@ -91,16 +91,16 @@ export class Database implements DatabaseInterface {
|
||||
return this._baseDatabase.saveEvents(blockRepo, eventRepo, block, events);
|
||||
}
|
||||
|
||||
async updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
async updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
|
||||
const repo = queryRunner.manager.getRepository(SyncStatus);
|
||||
|
||||
return this._baseDatabase.updateSyncStatusIndexedBlock(repo, blockHash, blockNumber);
|
||||
return this._baseDatabase.updateSyncStatusIndexedBlock(repo, blockHash, blockNumber, force);
|
||||
}
|
||||
|
||||
async updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
async updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
|
||||
const repo = queryRunner.manager.getRepository(SyncStatus);
|
||||
|
||||
return this._baseDatabase.updateSyncStatusCanonicalBlock(repo, blockHash, blockNumber);
|
||||
return this._baseDatabase.updateSyncStatusCanonicalBlock(repo, blockHash, blockNumber, force);
|
||||
}
|
||||
|
||||
async updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
|
@ -14,7 +14,7 @@ export class Event {
|
||||
@PrimaryGeneratedColumn()
|
||||
id!: number;
|
||||
|
||||
@ManyToOne(() => BlockProgress)
|
||||
@ManyToOne(() => BlockProgress, { onDelete: 'CASCADE' })
|
||||
block!: BlockProgress;
|
||||
|
||||
@Column('varchar', { length: 66 })
|
||||
|
@ -67,15 +67,27 @@ export class EventWatcher implements EventWatcherInterface {
|
||||
|
||||
async initBlockProcessingOnCompleteHandler (): Promise<void> {
|
||||
this._jobQueue.onComplete(QUEUE_BLOCK_PROCESSING, async (job) => {
|
||||
const { id, data: { failed } } = job;
|
||||
|
||||
if (failed) {
|
||||
log(`Job ${id} for queue ${QUEUE_BLOCK_PROCESSING} failed`);
|
||||
return;
|
||||
}
|
||||
|
||||
await this._baseEventWatcher.blockProcessingCompleteHandler(job);
|
||||
});
|
||||
}
|
||||
|
||||
async initEventProcessingOnCompleteHandler (): Promise<void> {
|
||||
await this._jobQueue.onComplete(QUEUE_EVENT_PROCESSING, async (job) => {
|
||||
const dbEvent = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
|
||||
const { id, data: { request, failed, state, createdOn } } = job;
|
||||
|
||||
const { data: { request, failed, state, createdOn } } = job;
|
||||
if (failed) {
|
||||
log(`Job ${id} for queue ${QUEUE_EVENT_PROCESSING} failed`);
|
||||
return;
|
||||
}
|
||||
|
||||
const dbEvent = await this._baseEventWatcher.eventProcessingCompleteHandler(job);
|
||||
|
||||
const timeElapsedInSeconds = (Date.now() - Date.parse(createdOn)) / 1000;
|
||||
log(`Job onComplete event ${request.data.id} publish ${!!request.data.publish}`);
|
||||
|
@ -324,16 +324,16 @@ export class Indexer implements IndexerInterface {
|
||||
return this._baseIndexer.getBlockEvents(blockHash);
|
||||
}
|
||||
|
||||
async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber);
|
||||
async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
|
||||
return this._baseIndexer.updateSyncStatusIndexedBlock(blockHash, blockNumber, force);
|
||||
}
|
||||
|
||||
async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
return this._baseIndexer.updateSyncStatusChainHead(blockHash, blockNumber);
|
||||
}
|
||||
|
||||
async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number): Promise<SyncStatus> {
|
||||
return this._baseIndexer.updateSyncStatusCanonicalBlock(blockHash, blockNumber);
|
||||
async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number, force = false): Promise<SyncStatus> {
|
||||
return this._baseIndexer.updateSyncStatusCanonicalBlock(blockHash, blockNumber, force);
|
||||
}
|
||||
|
||||
async getSyncStatus (): Promise<SyncStatus | undefined> {
|
||||
|
@ -2,13 +2,17 @@
|
||||
// Copyright 2021 Vulcanize, Inc.
|
||||
//
|
||||
|
||||
import assert from 'assert';
|
||||
import fs from 'fs-extra';
|
||||
import path from 'path';
|
||||
import toml from 'toml';
|
||||
import debug from 'debug';
|
||||
import { ConnectionOptions } from 'typeorm';
|
||||
import { getDefaultProvider } from 'ethers';
|
||||
|
||||
import { Config as CacheConfig } from '@vulcanize/cache';
|
||||
import { BaseProvider } from '@ethersproject/providers';
|
||||
import { Config as CacheConfig, getCache } from '@vulcanize/cache';
|
||||
import { EthClient } from '@vulcanize/ipld-eth-client';
|
||||
|
||||
const log = debug('vulcanize:config');
|
||||
|
||||
@ -18,31 +22,35 @@ export interface JobQueueConfig {
|
||||
jobDelayInMilliSecs?: number;
|
||||
}
|
||||
|
||||
export interface Config {
|
||||
server: {
|
||||
host: string;
|
||||
port: number;
|
||||
mode: string;
|
||||
kind: string;
|
||||
interface ServerConfig {
|
||||
host: string;
|
||||
port: number;
|
||||
mode: string;
|
||||
kind: string;
|
||||
}
|
||||
|
||||
interface UpstreamConfig {
|
||||
cache: CacheConfig,
|
||||
ethServer: {
|
||||
gqlApiEndpoint: string;
|
||||
gqlPostgraphileEndpoint: string;
|
||||
rpcProviderEndpoint: string
|
||||
}
|
||||
traceProviderEndpoint: string;
|
||||
uniWatcher: {
|
||||
gqlEndpoint: string;
|
||||
gqlSubscriptionEndpoint: string;
|
||||
};
|
||||
tokenWatcher: {
|
||||
gqlEndpoint: string;
|
||||
gqlSubscriptionEndpoint: string;
|
||||
}
|
||||
}
|
||||
|
||||
export interface Config {
|
||||
server: ServerConfig;
|
||||
database: ConnectionOptions;
|
||||
upstream: {
|
||||
cache: CacheConfig,
|
||||
ethServer: {
|
||||
gqlApiEndpoint: string;
|
||||
gqlPostgraphileEndpoint: string;
|
||||
rpcProviderEndpoint: string
|
||||
}
|
||||
traceProviderEndpoint: string;
|
||||
uniWatcher: {
|
||||
gqlEndpoint: string;
|
||||
gqlSubscriptionEndpoint: string;
|
||||
};
|
||||
tokenWatcher: {
|
||||
gqlEndpoint: string;
|
||||
gqlSubscriptionEndpoint: string;
|
||||
}
|
||||
},
|
||||
upstream: UpstreamConfig,
|
||||
jobQueue: JobQueueConfig
|
||||
}
|
||||
|
||||
@ -58,3 +66,47 @@ export const getConfig = async (configFile: string): Promise<Config> => {
|
||||
|
||||
return config;
|
||||
};
|
||||
|
||||
export const getResetConfig = async (config: Config): Promise<{
|
||||
dbConfig: ConnectionOptions,
|
||||
serverConfig: ServerConfig,
|
||||
upstreamConfig: UpstreamConfig,
|
||||
ethClient: EthClient,
|
||||
postgraphileClient: EthClient,
|
||||
ethProvider: BaseProvider
|
||||
}> => {
|
||||
const { database: dbConfig, upstream: upstreamConfig, server: serverConfig } = config;
|
||||
|
||||
assert(serverConfig, 'Missing server config');
|
||||
assert(dbConfig, 'Missing database config');
|
||||
|
||||
assert(upstreamConfig, 'Missing upstream config');
|
||||
const { ethServer: { gqlApiEndpoint, gqlPostgraphileEndpoint, rpcProviderEndpoint }, cache: cacheConfig } = upstreamConfig;
|
||||
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');
|
||||
assert(gqlPostgraphileEndpoint, 'Missing upstream ethServer.gqlPostgraphileEndpoint');
|
||||
assert(rpcProviderEndpoint, 'Missing upstream ethServer.rpcProviderEndpoint');
|
||||
|
||||
const cache = await getCache(cacheConfig);
|
||||
|
||||
const ethClient = new EthClient({
|
||||
gqlEndpoint: gqlApiEndpoint,
|
||||
gqlSubscriptionEndpoint: gqlPostgraphileEndpoint,
|
||||
cache
|
||||
});
|
||||
|
||||
const postgraphileClient = new EthClient({
|
||||
gqlEndpoint: gqlPostgraphileEndpoint,
|
||||
cache
|
||||
});
|
||||
|
||||
const ethProvider = getDefaultProvider(rpcProviderEndpoint);
|
||||
|
||||
return {
|
||||
dbConfig,
|
||||
serverConfig,
|
||||
upstreamConfig,
|
||||
ethClient,
|
||||
postgraphileClient,
|
||||
ethProvider
|
||||
};
|
||||
};
|
||||
|
@ -97,11 +97,11 @@ export class Database {
|
||||
return repo.findOne();
|
||||
}
|
||||
|
||||
async updateSyncStatusIndexedBlock (repo: Repository<SyncStatusInterface>, blockHash: string, blockNumber: number): Promise<SyncStatusInterface> {
|
||||
async updateSyncStatusIndexedBlock (repo: Repository<SyncStatusInterface>, blockHash: string, blockNumber: number, force = false): Promise<SyncStatusInterface> {
|
||||
const entity = await repo.findOne();
|
||||
assert(entity);
|
||||
|
||||
if (blockNumber >= entity.latestIndexedBlockNumber) {
|
||||
if (force || blockNumber >= entity.latestIndexedBlockNumber) {
|
||||
entity.latestIndexedBlockHash = blockHash;
|
||||
entity.latestIndexedBlockNumber = blockNumber;
|
||||
}
|
||||
@ -109,11 +109,11 @@ export class Database {
|
||||
return await repo.save(entity);
|
||||
}
|
||||
|
||||
async updateSyncStatusCanonicalBlock (repo: Repository<SyncStatusInterface>, blockHash: string, blockNumber: number): Promise<SyncStatusInterface> {
|
||||
async updateSyncStatusCanonicalBlock (repo: Repository<SyncStatusInterface>, blockHash: string, blockNumber: number, force = false): Promise<SyncStatusInterface> {
|
||||
const entity = await repo.findOne();
|
||||
assert(entity);
|
||||
|
||||
if (blockNumber >= entity.latestCanonicalBlockNumber) {
|
||||
if (force || blockNumber >= entity.latestCanonicalBlockNumber) {
|
||||
entity.latestCanonicalBlockHash = blockHash;
|
||||
entity.latestCanonicalBlockNumber = blockNumber;
|
||||
}
|
||||
|
@ -3,6 +3,7 @@
|
||||
//
|
||||
|
||||
import debug from 'debug';
|
||||
import assert from 'assert';
|
||||
|
||||
import { EthClient } from '@vulcanize/ipld-eth-client';
|
||||
|
||||
@ -19,33 +20,24 @@ export const fillBlocks = async (
|
||||
eventWatcher: EventWatcherInterface,
|
||||
{ startBlock, endBlock }: { startBlock: number, endBlock: number}
|
||||
): Promise<any> => {
|
||||
assert(startBlock < endBlock, 'endBlock should be greater than startBlock');
|
||||
|
||||
await eventWatcher.initBlockProcessingOnCompleteHandler();
|
||||
await eventWatcher.initEventProcessingOnCompleteHandler();
|
||||
|
||||
for (let blockNumber = startBlock; blockNumber <= endBlock; blockNumber++) {
|
||||
log(`Fill block ${blockNumber}`);
|
||||
let currentBlockNumber = startBlock;
|
||||
const syncStatus = await indexer.getSyncStatus();
|
||||
|
||||
// TODO: Add pause between requests so as to not overwhelm the upsteam server.
|
||||
const result = await ethClient.getBlockWithTransactions({ blockNumber });
|
||||
const { allEthHeaderCids: { nodes: blockNodes } } = result;
|
||||
for (let bi = 0; bi < blockNodes.length; bi++) {
|
||||
const { blockHash, blockNumber, parentHash, timestamp } = blockNodes[bi];
|
||||
const blockProgress = await indexer.getBlockProgress(blockHash);
|
||||
|
||||
if (blockProgress) {
|
||||
log(`Block number ${blockNumber}, block hash ${blockHash} already known, skip filling`);
|
||||
} else {
|
||||
const syncStatus = await indexer.getSyncStatus();
|
||||
|
||||
if (!syncStatus || syncStatus.chainHeadBlockNumber < blockNumber) {
|
||||
await indexer.updateSyncStatusChainHead(blockHash, blockNumber);
|
||||
}
|
||||
|
||||
await jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { kind: JOB_KIND_INDEX, blockHash, blockNumber, parentHash, timestamp });
|
||||
}
|
||||
if (syncStatus) {
|
||||
if (currentBlockNumber > syncStatus.latestIndexedBlockNumber + 1) {
|
||||
throw new Error(`Missing blocks between startBlock ${currentBlockNumber} and latestIndexedBlockNumber ${syncStatus.latestIndexedBlockNumber}`);
|
||||
}
|
||||
|
||||
currentBlockNumber = syncStatus.latestIndexedBlockNumber + 1;
|
||||
}
|
||||
|
||||
processBlockByNumber(jobQueue, indexer, ethClient, currentBlockNumber);
|
||||
|
||||
// Creating an AsyncIterable from AsyncIterator to iterate over the values.
|
||||
// https://www.codementor.io/@tiagolopesferreira/asynchronous-iterators-in-javascript-jl1yg8la1#for-wait-of
|
||||
const blockProgressEventIterable = {
|
||||
@ -58,9 +50,46 @@ export const fillBlocks = async (
|
||||
for await (const data of blockProgressEventIterable) {
|
||||
const { onBlockProgressEvent: { blockNumber, isComplete } } = data;
|
||||
|
||||
if (blockNumber >= endBlock && isComplete) {
|
||||
// Break the async loop if blockProgress event is for the endBlock and processing is complete.
|
||||
break;
|
||||
if (blockNumber === currentBlockNumber && isComplete) {
|
||||
if (blockNumber >= endBlock) {
|
||||
// Break the async loop when blockProgress event is for the endBlock and processing is complete.
|
||||
break;
|
||||
}
|
||||
|
||||
currentBlockNumber++;
|
||||
processBlockByNumber(jobQueue, indexer, ethClient, currentBlockNumber);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Method to fetch block by number and push to job queue.
|
||||
* @param jobQueue
|
||||
* @param indexer
|
||||
* @param ethClient
|
||||
* @param blockNumber
|
||||
*/
|
||||
const processBlockByNumber = async (
|
||||
jobQueue: JobQueue,
|
||||
indexer: IndexerInterface,
|
||||
ethClient: EthClient,
|
||||
blockNumber: number
|
||||
) => {
|
||||
log(`Fill block ${blockNumber}`);
|
||||
|
||||
const result = await ethClient.getBlocksByNumber(blockNumber);
|
||||
const { allEthHeaderCids: { nodes: blockNodes } } = result;
|
||||
|
||||
for (let bi = 0; bi < blockNodes.length; bi++) {
|
||||
const { blockHash, blockNumber, parentHash, timestamp } = blockNodes[bi];
|
||||
const blockProgress = await indexer.getBlockProgress(blockHash);
|
||||
|
||||
if (blockProgress) {
|
||||
log(`Block number ${blockNumber}, block hash ${blockHash} already known, skip filling`);
|
||||
} else {
|
||||
await indexer.updateSyncStatusChainHead(blockHash, blockNumber);
|
||||
|
||||
await jobQueue.pushJob(QUEUE_BLOCK_PROCESSING, { kind: JOB_KIND_INDEX, blockHash, blockNumber, parentHash, timestamp });
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -2,8 +2,15 @@
|
||||
// Copyright 2021 Vulcanize, Inc.
|
||||
//
|
||||
|
||||
import assert from 'assert';
|
||||
import Decimal from 'decimal.js';
|
||||
import { ValueTransformer } from 'typeorm';
|
||||
import yargs from 'yargs';
|
||||
import { hideBin } from 'yargs/helpers';
|
||||
|
||||
import { DEFAULT_CONFIG_PATH } from './constants';
|
||||
import { Config } from './config';
|
||||
import { JobQueue } from './job-queue';
|
||||
|
||||
/**
|
||||
* Method to wait for specified time.
|
||||
@ -50,3 +57,30 @@ export const bigintTransformer: ValueTransformer = {
|
||||
return value;
|
||||
}
|
||||
};
|
||||
|
||||
export const resetJobs = async (config: Config): Promise<void> => {
|
||||
const { jobQueue: jobQueueConfig } = config;
|
||||
|
||||
const { dbConnectionString, maxCompletionLagInSecs } = jobQueueConfig;
|
||||
assert(dbConnectionString, 'Missing job queue db connection string');
|
||||
|
||||
const jobQueue = new JobQueue({ dbConnectionString, maxCompletionLag: maxCompletionLagInSecs });
|
||||
await jobQueue.start();
|
||||
await jobQueue.deleteAllJobs();
|
||||
};
|
||||
|
||||
export const getResetYargs = (): yargs.Argv => {
|
||||
return yargs(hideBin(process.argv))
|
||||
.parserConfiguration({
|
||||
'parse-numbers': false
|
||||
}).options({
|
||||
configFile: {
|
||||
alias: 'f',
|
||||
type: 'string',
|
||||
require: true,
|
||||
demandOption: true,
|
||||
describe: 'configuration file path (toml)',
|
||||
default: DEFAULT_CONFIG_PATH
|
||||
}
|
||||
});
|
||||
};
|
||||
|
@ -52,12 +52,12 @@ export class Indexer {
|
||||
return res;
|
||||
}
|
||||
|
||||
async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise<SyncStatusInterface> {
|
||||
async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force = false): Promise<SyncStatusInterface> {
|
||||
const dbTx = await this._db.createTransactionRunner();
|
||||
let res;
|
||||
|
||||
try {
|
||||
res = await this._db.updateSyncStatusIndexedBlock(dbTx, blockHash, blockNumber);
|
||||
res = await this._db.updateSyncStatusIndexedBlock(dbTx, blockHash, blockNumber, force);
|
||||
await dbTx.commitTransaction();
|
||||
} catch (error) {
|
||||
await dbTx.rollbackTransaction();
|
||||
@ -86,12 +86,12 @@ export class Indexer {
|
||||
return res;
|
||||
}
|
||||
|
||||
async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number): Promise<SyncStatusInterface> {
|
||||
async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number, force = false): Promise<SyncStatusInterface> {
|
||||
const dbTx = await this._db.createTransactionRunner();
|
||||
let res;
|
||||
|
||||
try {
|
||||
res = await this._db.updateSyncStatusCanonicalBlock(dbTx, blockHash, blockNumber);
|
||||
res = await this._db.updateSyncStatusCanonicalBlock(dbTx, blockHash, blockNumber, force);
|
||||
await dbTx.commitTransaction();
|
||||
} catch (error) {
|
||||
await dbTx.rollbackTransaction();
|
||||
|
@ -81,4 +81,8 @@ export class JobQueue {
|
||||
const jobId = await this._boss.publish(queue, job, options);
|
||||
log(`Created job in queue ${queue}: ${jobId}`);
|
||||
}
|
||||
|
||||
async deleteAllJobs (): Promise<void> {
|
||||
await this._boss.deleteAllQueues();
|
||||
}
|
||||
}
|
||||
|
@ -57,8 +57,8 @@ export interface IndexerInterface {
|
||||
getOrFetchBlockEvents (block: DeepPartial<BlockProgressInterface>): Promise<Array<EventInterface>>
|
||||
updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise<void>
|
||||
updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise<SyncStatusInterface>
|
||||
updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise<SyncStatusInterface>
|
||||
updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number): Promise<SyncStatusInterface>
|
||||
updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>
|
||||
updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>
|
||||
markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise<void>;
|
||||
}
|
||||
|
||||
@ -80,9 +80,9 @@ export interface DatabaseInterface {
|
||||
getEventsInRange (fromBlockNumber: number, toBlockNumber: number): Promise<Array<EventInterface>>;
|
||||
markBlocksAsPruned (queryRunner: QueryRunner, blocks: BlockProgressInterface[]): Promise<void>;
|
||||
updateBlockProgress (queryRunner: QueryRunner, blockHash: string, lastProcessedEventIndex: number): Promise<void>
|
||||
updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatusInterface>;
|
||||
updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>;
|
||||
updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatusInterface>;
|
||||
updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise<SyncStatusInterface>;
|
||||
updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>;
|
||||
saveEvents (queryRunner: QueryRunner, block: DeepPartial<BlockProgressInterface>, events: DeepPartial<EventInterface>[]): Promise<void>;
|
||||
saveEventEntity (queryRunner: QueryRunner, entity: EventInterface): Promise<EventInterface>;
|
||||
removeEntities<Entity> (queryRunner: QueryRunner, entity: new () => Entity, findConditions?: FindConditions<Entity>): Promise<void>;
|
||||
|
Loading…
Reference in New Issue
Block a user