diff --git a/packages/uni-watcher/src/database.ts b/packages/uni-watcher/src/database.ts index e7730f26..e30d9dbd 100644 --- a/packages/uni-watcher/src/database.ts +++ b/packages/uni-watcher/src/database.ts @@ -1,6 +1,6 @@ import assert from 'assert'; import _ from 'lodash'; -import { Connection, ConnectionOptions, createConnection, DeepPartial } from 'typeorm'; +import { Connection, ConnectionOptions, createConnection, DeepPartial, QueryRunner } from 'typeorm'; import { SnakeNamingStrategy } from 'typeorm-naming-strategies'; import { Event, UNKNOWN_EVENT_NAME } from './entity/Event'; @@ -30,6 +30,13 @@ export class Database { return this._conn.close(); } + async createTransactionRunner (): Promise { + const queryRunner = this._conn.createQueryRunner(); + await queryRunner.connect(); + await queryRunner.startTransaction(); + return queryRunner; + } + async getBlockEvents (blockHash: string): Promise { return this._conn.getRepository(Event) .createQueryBuilder('event') @@ -68,7 +75,7 @@ export class Database { return events; } - async saveEvents (block: any, events: DeepPartial[]): Promise { + async saveEvents (queryRunner: QueryRunner, block: any, events: DeepPartial[]): Promise { const { hash: blockHash, number: blockNumber, @@ -86,89 +93,81 @@ export class Database { // In a transaction: // (1) Save all the events in the database. // (2) Add an entry to the block progress table. - await this._conn.transaction(async (tx) => { - const numEvents = events.length; - const blockProgressRepo = tx.getRepository(BlockProgress); - let blockProgress = await blockProgressRepo.findOne({ where: { blockHash } }); - if (!blockProgress) { - const entity = blockProgressRepo.create({ - blockHash, - parentHash, - blockNumber, - blockTimestamp, - numEvents, - numProcessedEvents: 0, - lastProcessedEventIndex: -1, - isComplete: (numEvents === 0) - }); + const numEvents = events.length; + const blockProgressRepo = queryRunner.manager.getRepository(BlockProgress); + let blockProgress = await blockProgressRepo.findOne({ where: { blockHash } }); + if (!blockProgress) { + const entity = blockProgressRepo.create({ + blockHash, + parentHash, + blockNumber, + blockTimestamp, + numEvents, + numProcessedEvents: 0, + lastProcessedEventIndex: -1, + isComplete: (numEvents === 0) + }); - blockProgress = await blockProgressRepo.save(entity); + blockProgress = await blockProgressRepo.save(entity); - // Bulk insert events. - events.forEach(event => { - event.block = blockProgress; - }); + // Bulk insert events. + events.forEach(event => { + event.block = blockProgress; + }); - await tx.createQueryBuilder().insert().into(Event).values(events).execute(); - } - }); + await queryRunner.manager.createQueryBuilder().insert().into(Event).values(events).execute(); + } } - async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise { - return await this._conn.transaction(async (tx) => { - const repo = tx.getRepository(SyncStatus); + async updateSyncStatusIndexedBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise { + const repo = queryRunner.manager.getRepository(SyncStatus); - const entity = await repo.findOne(); - assert(entity); + const entity = await repo.findOne(); + assert(entity); - if (blockNumber >= entity.latestIndexedBlockNumber) { - entity.latestIndexedBlockHash = blockHash; - entity.latestIndexedBlockNumber = blockNumber; - } + if (blockNumber >= entity.latestIndexedBlockNumber) { + entity.latestIndexedBlockHash = blockHash; + entity.latestIndexedBlockNumber = blockNumber; + } - return await repo.save(entity); - }); + return await repo.save(entity); } - async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number): Promise { - return await this._conn.transaction(async (tx) => { - const repo = tx.getRepository(SyncStatus); + async updateSyncStatusCanonicalBlock (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise { + const repo = queryRunner.manager.getRepository(SyncStatus); - const entity = await repo.findOne(); - assert(entity); + const entity = await repo.findOne(); + assert(entity); - if (blockNumber >= entity.latestCanonicalBlockNumber) { - entity.latestCanonicalBlockHash = blockHash; - entity.latestCanonicalBlockNumber = blockNumber; - } + if (blockNumber >= entity.latestCanonicalBlockNumber) { + entity.latestCanonicalBlockHash = blockHash; + entity.latestCanonicalBlockNumber = blockNumber; + } - return await repo.save(entity); - }); + return await repo.save(entity); } - async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise { - return await this._conn.transaction(async (tx) => { - const repo = tx.getRepository(SyncStatus); + async updateSyncStatusChainHead (queryRunner: QueryRunner, blockHash: string, blockNumber: number): Promise { + const repo = queryRunner.manager.getRepository(SyncStatus); - let entity = await repo.findOne(); - if (!entity) { - entity = repo.create({ - chainHeadBlockHash: blockHash, - chainHeadBlockNumber: blockNumber, - latestCanonicalBlockHash: blockHash, - latestCanonicalBlockNumber: blockNumber, - latestIndexedBlockHash: '', - latestIndexedBlockNumber: -1 - }); - } + let entity = await repo.findOne(); + if (!entity) { + entity = repo.create({ + chainHeadBlockHash: blockHash, + chainHeadBlockNumber: blockNumber, + latestCanonicalBlockHash: blockHash, + latestCanonicalBlockNumber: blockNumber, + latestIndexedBlockHash: '', + latestIndexedBlockNumber: -1 + }); + } - if (blockNumber >= entity.chainHeadBlockNumber) { - entity.chainHeadBlockHash = blockHash; - entity.chainHeadBlockNumber = blockNumber; - } + if (blockNumber >= entity.chainHeadBlockNumber) { + entity.chainHeadBlockHash = blockHash; + entity.chainHeadBlockNumber = blockNumber; + } - return await repo.save(entity); - }); + return await repo.save(entity); } async getSyncStatus (): Promise { @@ -180,8 +179,8 @@ export class Database { return this._conn.getRepository(Event).findOne(id, { relations: ['block'] }); } - async saveEventEntity (entity: Event): Promise { - const repo = this._conn.getRepository(Event); + async saveEventEntity (queryRunner: QueryRunner, entity: Event): Promise { + const repo = queryRunner.manager.getRepository(Event); return await repo.save(entity); } @@ -200,20 +199,18 @@ export class Database { .getOne(); } - async saveContract (address: string, kind: string, startingBlock: number): Promise { - await this._conn.transaction(async (tx) => { - const repo = tx.getRepository(Contract); + async saveContract (queryRunner: QueryRunner, address: string, kind: string, startingBlock: number): Promise { + const repo = queryRunner.manager.getRepository(Contract); - const numRows = await repo - .createQueryBuilder() - .where('address = :address', { address }) - .getCount(); + const numRows = await repo + .createQueryBuilder() + .where('address = :address', { address }) + .getCount(); - if (numRows === 0) { - const entity = repo.create({ address, kind, startingBlock }); - await repo.save(entity); - } - }); + if (numRows === 0) { + const entity = repo.create({ address, kind, startingBlock }); + await repo.save(entity); + } } async getBlocksAtHeight (height: number, isPruned: boolean): Promise { @@ -223,8 +220,8 @@ export class Database { .getMany(); } - async markBlockAsPruned (block: BlockProgress): Promise { - const repo = this._conn.getRepository(BlockProgress); + async markBlockAsPruned (queryRunner: QueryRunner, block: BlockProgress): Promise { + const repo = queryRunner.manager.getRepository(BlockProgress); block.isPruned = true; return repo.save(block); } @@ -234,23 +231,21 @@ export class Database { return repo.findOne({ where: { blockHash } }); } - async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise { - await this._conn.transaction(async (tx) => { - const repo = tx.getRepository(BlockProgress); - const entity = await repo.findOne({ where: { blockHash } }); - if (entity && !entity.isComplete) { - if (lastProcessedEventIndex <= entity.lastProcessedEventIndex) { - throw new Error(`Events processed out of order ${blockHash}, was ${entity.lastProcessedEventIndex}, got ${lastProcessedEventIndex}`); - } - - entity.lastProcessedEventIndex = lastProcessedEventIndex; - entity.numProcessedEvents++; - if (entity.numProcessedEvents >= entity.numEvents) { - entity.isComplete = true; - } - - await repo.save(entity); + async updateBlockProgress (queryRunner: QueryRunner, blockHash: string, lastProcessedEventIndex: number): Promise { + const repo = queryRunner.manager.getRepository(BlockProgress); + const entity = await repo.findOne({ where: { blockHash } }); + if (entity && !entity.isComplete) { + if (lastProcessedEventIndex <= entity.lastProcessedEventIndex) { + throw new Error(`Events processed out of order ${blockHash}, was ${entity.lastProcessedEventIndex}, got ${lastProcessedEventIndex}`); } - }); + + entity.lastProcessedEventIndex = lastProcessedEventIndex; + entity.numProcessedEvents++; + if (entity.numProcessedEvents >= entity.numEvents) { + entity.isComplete = true; + } + + await repo.save(entity); + } } } diff --git a/packages/uni-watcher/src/indexer.ts b/packages/uni-watcher/src/indexer.ts index 6d406c07..9ea163bf 100644 --- a/packages/uni-watcher/src/indexer.ts +++ b/packages/uni-watcher/src/indexer.ts @@ -1,5 +1,5 @@ import debug from 'debug'; -import { DeepPartial } from 'typeorm'; +import { DeepPartial, QueryRunner } from 'typeorm'; import JSONbig from 'json-bigint'; import { ethers } from 'ethers'; import assert from 'assert'; @@ -138,13 +138,13 @@ export class Indexer { return result; } - async triggerIndexingOnEvent (dbEvent: Event): Promise { + async triggerIndexingOnEvent (dbTx: QueryRunner, dbEvent: Event): Promise { const re = this.getResultEvent(dbEvent); switch (re.event.__typename) { case 'PoolCreatedEvent': { const poolContract = ethers.utils.getAddress(re.event.pool); - await this._db.saveContract(poolContract, KIND_POOL, dbEvent.block.blockNumber); + await this._db.saveContract(dbTx, poolContract, KIND_POOL, dbEvent.block.blockNumber); } } } @@ -154,8 +154,18 @@ export class Indexer { } async processEvent (event: Event): Promise { - // Trigger indexing of data based on the event. - await this.triggerIndexingOnEvent(event); + const dbTx = await this._db.createTransactionRunner(); + + try { + // Trigger indexing of data based on the event. + await this.triggerIndexingOnEvent(dbTx, event); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } } parseEventNameAndArgs (kind: string, logObj: any): any { @@ -371,19 +381,68 @@ export class Indexer { }); } - await this._db.saveEvents(block, dbEvents); + const dbTx = await this._db.createTransactionRunner(); + + try { + await this._db.saveEvents(dbTx, block, dbEvents); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } } async updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number): Promise { - return this._db.updateSyncStatusIndexedBlock(blockHash, blockNumber); + const dbTx = await this._db.createTransactionRunner(); + let res; + + try { + res = await this._db.updateSyncStatusIndexedBlock(dbTx, blockHash, blockNumber); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + + return res; } async updateSyncStatusChainHead (blockHash: string, blockNumber: number): Promise { - return this._db.updateSyncStatusChainHead(blockHash, blockNumber); + const dbTx = await this._db.createTransactionRunner(); + let res; + + try { + res = await this._db.updateSyncStatusChainHead(dbTx, blockHash, blockNumber); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + + return res; } async updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number): Promise { - return this._db.updateSyncStatusCanonicalBlock(blockHash, blockNumber); + const dbTx = await this._db.createTransactionRunner(); + let res; + + try { + res = await this._db.updateSyncStatusCanonicalBlock(dbTx, blockHash, blockNumber); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + + return res; } async getSyncStatus (): Promise { @@ -400,7 +459,20 @@ export class Indexer { } async saveEventEntity (dbEvent: Event): Promise { - return this._db.saveEventEntity(dbEvent); + const dbTx = await this._db.createTransactionRunner(); + let res; + + try { + res = this._db.saveEventEntity(dbTx, dbEvent); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + + return res; } async getBlockProgress (blockHash: string): Promise { @@ -439,11 +511,37 @@ export class Indexer { } async markBlockAsPruned (block: BlockProgress): Promise { - return this._db.markBlockAsPruned(block); + const dbTx = await this._db.createTransactionRunner(); + let res; + + try { + res = await this._db.markBlockAsPruned(dbTx, block); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + + return res; } async updateBlockProgress (blockHash: string, lastProcessedEventIndex: number): Promise { - return this._db.updateBlockProgress(blockHash, lastProcessedEventIndex); + const dbTx = await this._db.createTransactionRunner(); + let res; + + try { + res = await this._db.updateBlockProgress(dbTx, blockHash, lastProcessedEventIndex); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } + + return res; } async getProcessedBlockCountForRange (fromBlockNumber: number, toBlockNumber: number): Promise<{ expected: number, actual: number }> { diff --git a/packages/uni-watcher/src/utils/index.ts b/packages/uni-watcher/src/utils/index.ts index 661739c2..201f6706 100644 --- a/packages/uni-watcher/src/utils/index.ts +++ b/packages/uni-watcher/src/utils/index.ts @@ -6,8 +6,17 @@ import { Client as UniClient } from '../client'; export async function watchContract (db: Database, address: string, kind: string, startingBlock: number): Promise { // Always use the checksum address (https://docs.ethers.io/v5/api/utils/address/#utils-getAddress). const contractAddress = ethers.utils.getAddress(address); + const dbTx = await db.createTransactionRunner(); - await db.saveContract(contractAddress, kind, startingBlock); + try { + await db.saveContract(dbTx, contractAddress, kind, startingBlock); + await dbTx.commitTransaction(); + } catch (error) { + await dbTx.rollbackTransaction(); + throw error; + } finally { + await dbTx.release(); + } } export const watchEvent = async (uniClient: UniClient, eventType: string): Promise => {